Data Injector

Overview

Data Injector can be used to improve testing and validation of streaming pipelines through a steady flow of sample/test sensor data. This customizable pipeline template is designed to simulate a flow of sensor data by providing a constant flow of data into a stream layer. A lot of applications being developed on the HERE platform rely on sensor data as their input. It is an essential step for both development and testing of such applications to have a steady flow of sensor data. Data Injector is a Stream Pipeline that reads in a sample of messages, refreshes them with new timestamps and IDs, and keeps sending them with a configured frequency in a random order for as long as deployed pipeline is running. Data Injector can use the data provided by the user, or if the user doesn't have their own data, Data Injector will use a sample catalog provided by the HERE platform. Data Injector supports the following data formats:

  1. SDIIMessage
  2. SDIIMessageList
  3. Sensoris
  4. GeoJSON

Data Injector provides the user with the following options:

  • Send existing data as is: This way Data Injector will not alter existing data and will simply keep sending messages in random order
  • Update position and event timestamps: If this option is selected, Data Injector will update all timestamps in the message. The user has several options here:
    • Now - Data Injector will update timestamps to be as close to current time as possible. The latest timestamp in each message will become a NOW timestamp and all other timestamps will be altered correspondingly to preserve the difference and order of events and positions. This is an option for users to simulate data being sent by the cars in real-time fashion.
    • Specific time - Users may want to update their data so that events happen around a specific date/time. For this, the user needs to provide a UTC timestamp in milliseconds. If provided, this timestamp will become the latest timestamp found in each message and all other timestamps will be altered accordingly, preserving the time difference between positions and events.
  • Update IDs: Users may want to have a unique ID for each message sent, which is often important for testing of latency and performance in stream applications.
  • Select frequency of sending: Users have an option to specify a number of messages per second they wish Data Injector to produce, or they can use the "unlimited" option, which means Data Injector will keep sending messages without throttling. Please note that it is not guaranteed that Data Injector will produce precisely the number of messages per second requested by the user, however the "messageFrequency" value provided during deployment will be used to calculate the size of the cluster needed to reach as close as possible to the desired rate. Highly precise rate of messages per second cannot be guaranteed, since processing time depends on multiple factors: size of the input data files, data type, and requested data transformations.

SDII and Sensoris both have well defined formats, and Data Injector knows which fields it needs to look at for timestamp and ID updates.

  • SDII: timeStampUTCMs and transientVehicleUUID
  • Sensoris: posixTime and messageId

GeoJSON doesn't have a predefined timestamp field, so users will have to provide the exact name of the field containing time information if they wish to update timestamps. As for the ID - the unique "id" field will be generated for each feature.

NOTE: When the user provides an existing Versioned layer with input data, Data Injector will pull in all data available in this layer and will keep looping through the data randomly. However the maximum amount of data that will be pulled from the layer is currently limited to 4GB.

Structure

Application Flow Diagram
Figure 1. Application Flow Diagram

Legend: Legend Diagram

Prerequisites

  • This pipeline template expects a user to provide a Versioned layer with the input data that they want to keep sending to a Stream layer. The partitioning system of the data doesn't matter, but data has to be either of SDIIMessage, SDIIMessageList, GeoJSON, or Sensoris type.
  • If you wish to provide an existing catalog with the input data, please make sure to share it with the same GROUP that will be used for deployment of this pipeline template.
  • Confirm that your local credentials (~/.here/credentials.properties) are added to the same GROUP.

Execution

In order to deploy and run this pipeline template, you will need the Wizard Deployer. The Wizard executes interactively, asking questions about the application, and expects the user to provide needed answers. Assuming you followed the Wizard's documentation instructions and set up the needed parameters beforehand, follow these steps:

  1. Execute the script as ./wizard.sh
  2. Follow the prompts and provide needed answers

NOTE:

  • One of the important things to consider before answering the Wizard questions is output Stream layer configuration. We recommend that you set the outbound throughput to be at least the expected number of consumers (users and pipelines) times the inbound throughput. The output rate can be higher if some consumers "replay" recent data. The inbound throughput must not be more than the outbound throughput. If it is, the consumer cannot read all the data that the producer provides.

You can use your existing output layer or let the Wizard create a new catalog/layer for you. If using an existing catalog, make sure it is shared with the GROUP_ID that will be used for the deployment of this pipeline template.

Verification

In Platform Portal select the Pipelines tab where you should be able to see your Pipeline deployed and running. Flink Dashboard provides important details about your running pipeline. You will be able to monitor such important metrics as number of messages or amount of data sent by Data Injector.

  1. Select the job from the "Running Jobs" panel on the Overview page.
  2. Click on the "Subtasks" tab on the lower panel.

The easiest way to verify that your data is flowing is to use OLP CLI to read data from your output Stream layer:

olp catalog layer stream get <your-catalog-hrn> <your-output-stream-layer-id>

Cost Estimation

Executing this pipeline template will incur the following costs:

Storage-Blob

Cost will depend on the amount of test data being stored in a Versioned layer.

Metadata

Cost will depend on the amount and size of partitions (metadata) stored in the Versioned layer.

Storage-Stream and Stream TTL

Cost will depend on the configuration of the output Stream layer selected by the user, such as Throughput IN, Throughput OUT, and TTL.

Data Transfer IO

Cost will depend on amount of:

  • input data read from Versioned layer on a start-up of this pipeline template (done once per deployment)
  • amount of data published to an output Stream layer
Compute Core and Compute RAM

Cost will depend on the frequency configuration selected by the user. If high frequency is needed, more workers will be used for deployment.

Log Search IO

Cost will depend on the log level set for the execution of this pipeline template. To minimize this cost, the user can set the log level to WARN.

Support

If you need support with this pipeline template, please contact us.

results matching ""

    No results matching ""