Stream Processing Best Practices

Topics

Overview

Apache Flink is used by the Pipeline Service to implement Stream data processing. The sections below examine the best practices for developers creating stream processing pipelines for the HERE platform using Flink.

When you install the HERE platform SDK, you will also install the runtime libraries for Flink v1.13.5. This is done using the provided Maven archetype that sets up a Flink project for the developer on their local machine. That project includes a download of the correct Flink dependencies from Apache. When the developer builds a project, it creates a Fat JAR (aka "Uber JAR") which contains all project class files and resources packed together with all its dependencies. That Fat JAR includes the Flink library dependencies. The developer must deploy the Fat JAR – they cannot omit the Flink library dependencies, since those dependencies are not predeployed on the pipeline clusters.

The following data libraries are designed to be used with Flink.

Data Client Library

The Data Client Library is a multi-module library that uses both lower level building blocks and higher level APIs to provide asynchronous APIs, streaming, backpressure, parallelism support, built-in retries/resumes, and Akka Streams connectors. To run an application that uses the Data Client Library within a stream pipeline (Flink), use the flink-support module as a dependency to your project. Don't add any other Data Client Library modules to your project because this will cause dependency conflicts that will make your stream pipeline fail. For additional information, see the Data Client Library Developer Guide.

Data Archiving Library

This library assists with archiving messages ingested via a stream layer. It provides a data agnostic workflow that you can customize to produce and archive stream data in different formats including Protobuf, Avro and Parquet. For more information, see the Data Archiving Library Developer Guide.

Location Library

This is a set of algorithms for location-based analysis, such as navigating a road network, accessing road attributes, and geospatial queries. It is used to create location based programs that run in batch or stream pipelines. For more information, see the Location Library Developer Guide.

Sensor Data Ingestion Platform

While not a library, the Sensor Data Ingestion Platform is a multi-functional web service for collecting and validating data messages from various sensors. The Platform's API allows you to POST messages with sensor data to the Sensor Data Ingestion Platform. That data is then available for processing by a stream pipeline. For additional information, see the Sensor Data Ingestion Interface (SDII) Data Specification.

Enable Checkpointing

Flink Checkpointing is required to recover the state of a Flink (Stream) pipeline. When enabled, Flink takes consistent snapshots of the pipeline (or, in Flink terms, the Job graph) at specified intervals. Checkpointing is disabled by default. To enable Checkpointing, use the method enableCheckpointing(n) on the StreamExecutionEnvironment, where n is the checkpoint interval in milliseconds. For more information, see the Apache Flink documentation on Checkpoints.

It is important to enable checkpointing so that Flink can create Savepoints when pausing or upgrading the Stream pipeline. A Savepoint is a collection of related Checkpoints, and if there are no checkpoints, the savepoint is empty. In addition, it is important to set stable UIDs for each operator. Flink uses these UIDs to match the stored state with the operators upon restart. If the operators do not match, Flink is not able to restore the state. For more information, see the Apache Flink documentation on Savepoints.

Info: Additional Cost

Flink pipelines utilizing checkpointing read/write from the underlying storage. This traffic is billed as Pipeline IO.

Frequency of Checkpoints

To store stateful data in the operators, it's important to consider how quickly the checkpoints can be created. For more information on how checkpoints work, see the Apache Flink documentation on Data Streaming Fault Tolerance.

You can also configure the checkpointing/savepointing feature so that in case of a failure, you can select the backup that you want to re-activate the pipeline.

Restoring a Stream Pipeline from a Snapshot

Stream pipelines using Stream-5.0 and newer runtimes can be activated from a previously taken snapshot (or savepoint, in the Flink documentation). During the 'activate' operation, the snapshot ID can be provided to run the pipeline from the state captured by the snapshot.

Snapshots are taken by the system for stream pipelines during pause, upgrade, and restart operations. The list of snapshots can be retrieved to identify the snapshot ID to use for activating the desired Pipeline version. Both Pipeline API and CLI provide the capability to list the snapshots and provide the snapshot ID during the 'activate' operation.

Note

As Stream Pipelines use Apache Flink for runtime, the snapshots are essentially savepoints in Flink.

Externalized Checkpoint Feature

When you are developing your Stream 5.0 or later pipeline, you can enable the externalized check-pointing within the code and configure it to be retained. While activating your Stream 5.0 or later pipeline version, you can request to activate it with an externalized checkpoint created in an earlier run of the same pipeline. When you request to activate your pipeline version with an externalized checkpoint, the system will start the pipeline and supply the checkpoint location of the latest available checkpoint of the same pipeline.

To use externalized checkpoints to activate your pipeline, first you need to add the following code to your pipeline to create and retain externalized checkpoints.

// enable externalized checkpoints which are retained
// after job cancellation
env.getCheckpointConfig().setExternalizedCheckpointCleanup(
    ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

For more detail, refer to the Flink checkpointing document.

Both Pipeline API and CLI allow you to provide a flag when activating the pipeline to indicate that you want to use the latest checkpoint available.

Setting Supervisors and Workers

When a pipeline JAR file is developed, an internal parameter sets the anticipated resource levels for each Flink task. This parameter is effectively used as a multiplier, with the runtime configuration specified in the pipeline template, to determine the processing resources to be used with the Pipeline Version using that template. This allows each Pipeline Version to be optimized in its own runtime environment.

Every stream pipeline executes two types of processes:

  • Supervisors (also called JobManagers) coordinate the distributed execution. There is always one supervisor.
  • Workers (also called TaskManagers) execute the tasks of a dataflow, and buffer and exchange the data streams. There must always be at least one worker, but no more than 9999.

In the Pipeline Template, you should specify the number of supervisor-units, worker-units, and the number of workers. These parameters are used to determine default runtime resource allocations, but may be overwritten by values specified when a Pipeline Version is created. For detailed information on the meaning of these values, see the Pipeline User Guide article Quotas and Limits.

Templates can be created in the platform portal, by the CLI, or at the API level. A good place to start is in the OLP CLI guide under Templates. The CLI command pipeline-template create lists these runtime parameters under optional. In the API reference, you will find the same information contained in the default cluster configuration of the PipelineTemplate parameter. For more information, see the API Reference entry for the createPipelineTemplate operator.

Setting Parallelism

From Flink's documentation:

"A Flink program consists of multiple tasks (transformations/operators, data sources, and sinks). A task is split into several parallel instances for execution and each parallel instance processes a subset of the task’s input data. The number of parallel instances of a task is called its parallelism."

You can control the parallelism of the tasks by setting it either at the operator level or at the runtime environment level.

Cluster Configuration and Parallelism

Configure enough cluster resources for your Flink pipeline to support the maximum parallelism set in the pipeline code. At the same time, the maximum parallelism that can be set in the pipeline code is the total number of Task Slots in the pipeline cluster. The maximum parallelism that a Flink pipeline cluster can support is governed by the worker parameters. By default, in the HERE platform, each worker gets only one (1) Task Slot. The Task Slots per worker can be set in the stream configuration via the CLI.

Note

For more information about TaskManager and slots, see the Apache Flink documentation on Distributed Runtime Environment.

Consider the following example of a Flink Pipeline:

  • The Pipeline has 3 operators: a source, a map, and a sink operator with parallelism of 3, 8, and 3 respectively.
  • The highest parallelism set in the stream pipeline code is 8.

In this example, to support the maximum parallelism of 8, the pipeline cluster should have at least 8 Task Slots. This is achieved by specifying a combination of workers and Task Slots to equal the value of 8; for example, setting 4 workers with 2 Task Slots or setting 8 workers with 1 Task Slot per worker.

The parameter workerUnits specifies the resources allocated for each worker. Depending on the resource profile selected for the pipeline, each worker unit contains a pre-defined number of CPUs, RAM size and disk space (in the default resource profile for stream pipelines, one worker unit has 1 CPU, 7 GB of RAM, and 8 GB of disk space.) Our recommendation is to use higher parallelism and fewer resources per TaskManager instead of using fewer TaskManagers, each with a large amount of resources. The maximum number of workerUnits that can be allocated for per worker is 15. It is also recommended to limit the number of Task Slots per worker to the maximum number of CPU allocated for the worker.

Stream Configuration

The stream configuration allows you to configure certain Flink configuration properties. For example, set taskmanager.numberOfTaskSlots=2 to configure two Task Slots per worker. The following is the list of Flink configuration properties supported by Stream Configuration:


      taskmanager.numberOfTaskSlots,
      cluster.evenly-spread-out-slots,
      taskmanager.heap.size,
      taskmanager.memory.flink.size,
      taskmanager.memory.jvm-metaspace.size,
      taskmanager.memory.framework.heap.size,
      taskmanager.memory.task.heap.size,
      taskmanager.memory.managed.size,
      taskmanager.memory.managed.fraction,
      taskmanager.memory.framework.off-heap.size,
      taskmanager.memory.task.off-heap.size,
      taskmanager.memory.network.min,
      taskmanager.memory.network.max,
      taskmanager.memory.network.fraction,
      taskmanager.memory.jvm-overhead.min,
      taskmanager.memory.jvm-overhead.max,
      taskmanager.memory.jvm-overhead.fraction,
      statefun.message.serializer,
      classloader.parent-first-patterns.additional,
      statefun.feedback.memory.size,
      akka.framesize,
      akka.ask.timeout,
      akka.lookup.timeout,
      akka.tcp.timeout,
      akka.client.timeout,
      heartbeat.interval,
      heartbeat.timeout,
      state.storage.fs.memory-threshold,
      jobmanager.memory.heap.size,
      jobmanager.memory.off-heap.size,
      jobmanager.memory.jvm-metaspace.size,
      jobmanager.memory.jvm-overhead.min,
      jobmanager.memory.jvm-overhead.max,
      jobmanager.memory.jvm-overhead.fraction

For more information, see the Apache Flink documentation on Configuration and State functions Configuration.

In the Pipeline Template, you should specify the number of supervisor-units, worker-units, and the number of workers. These parameters are used to determine default runtime resource allocations, but may be overwritten by values specified when a Pipeline Version is created. For detailed information on the meaning of these values, see the article Quotas and Limits.

Templates can be created in the platform portal, by the CLI, or at the API level. A good place to start is in the OLP CLI guide under Templates. The CLI command pipeline-template create lists these runtime parameters under optional. In the API reference, you will find the same information contained in the default cluster configuration of the PipelineTemplate parameter. For more information, see the API Reference entry for the createPipelineTemplate operator. See Pipeline's CLI command to learn about setting the value of Task Slots.

Enable Notification and Recovery of Stream Pipelines

In a planned outage, a Stream pipeline can get affected. To prevent any unexpected interruptions in the data processing, it is recommended to update the email address associated with the pipeline so that a planned outage notification can be sent with clear details about the incident and expected user actions, such as:

  • Incident Summary
  • Realm
  • Pipeline ID
  • Pipeline Name
  • Pipeline Version ID
  • Requested Action with supporting instructions
  • Due Date and Time
  • System Operation to be performed by HERE platform if the Requested Action is not performed

If the Requested Action is not performed by the due date and time, the System Operation listed in the outage email will be performed. Once the System Operation has been initiated on the affected Stream Pipeline, a second email will be sent with the following details:

  • Realm
  • Pipeline ID
  • Pipeline Name
  • Pipeline Version ID
  • Incident Summary
  • System Operation Date and Time
  • System Operation Being Performed

During the System Operation, an attempt will be made to save the current state and start a new job with the saved state. If the Requested Action is performed by the due date and time, the System Operation will be dropped. The state will be recovered for only those Stream Pipelines that utilize check-pointing. Otherwise, if the requested action hasn't been performed by the due date and time, the pipeline job will be Canceled and the pipeline will be Activated again. Therefore, it is recommended to use check-pointing in the Stream Pipelines.

Note: Time to Complete a Savepoint

While attempting to take a savepoint, we will wait 120 seconds for the pipeline to complete the savepoint. If the pipeline is not able to complete the savepoint within that time, we will cancel and then activate the pipeline.

If an affected Stream Pipeline does not have an email address associated with it, a notification can't be sent and the System Operation will be performed as outlined in the planned outage notification.

Restart Strategies

The restart strategy used by default is the "No Restart" strategy. To use a different restart strategy, it will need to be implemented in the pipeline code. For example, this code will set a "Failure Rate" Restart Strategy:


        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setRestartStrategy(RestartStrategies.failureRateRestart(
            3, // max failures per interval
            Time.of(60, TimeUnit.MINUTES), //time interval for measuring failure rate
            Time.of(30, TimeUnit.SECONDS) // delay
        ));

Caution

No performance claims are made for this code example. It is only intended to illustrate a technique.

Flink supports different restart strategies. For more information on Flink restart strategies, see the Apache Flink documentation on Restart Strategies.

The Job Manager coordinates the scheduling and resource management for every Flink pipeline. By default, we create a single Job Manager for each pipeline's compute cluster. This creates a single point of failure: if the Job Manager crashes, the running pipeline fails.

Note: Job Manager High Availability Configuration

  • For Stream-2.0.0 (deprecated) pipelines, one Job Manager is used. Zookeeper is used to save the state of the Job Manager in order to restore it from a failure. A bug in Flink 1.7 that prevented the use of more than one Job Manager in Stream-2.0.0.
  • For Stream-3.0.0, Stream 4.0 and Stream 5.0 pipelines, two (2) Job Managers are used, one as active and another as standby. These multiple Job Managers are managed via Zookeeper which coordinates leader election and the pipeline's state. If the primary Job Manager crashes, the standby Job Manager quickly takes over and the pipeline continues to run. Also, the failed primary Job Manager is restarted and it becomes the new standby Job Manager to re-establish high availability to protect against future failures.

The option to enable High Availability is available during Activate, Resume, and Upgrade operations.

Caution: Additional Cost

Flink Job Manager High Availability option increases the cost of running a Stream pipeline. The following resources are required to run a Stream pipeline's Job Manager with high availability:

  • Resources for the Zookeeper: 1.5 CPU and 1.5 GB of RAM
  • Resources for the extra Job Manager (same as the primary Job Manager)

The cost of these extra resources is added to the pipeline's original cost.

More information about Flink Job Manager High Availability is available in the Apache Flink documentation.

Flink documentation includes a production readiness checklist, which lists some useful recommendations for developers. In brief, it includes the following:

  • Maximum parallelism configuration parameter for operators
  • Setting UUIDs for operator for better savepointing and restoring of state
  • Choosing a state backend, which can affect your checkpointing configuration
  • Configuring JobManager High Availability (HA)

The full checklist is available in the Apache Flink documentation.

Multiple Pipeline Use

There are two use cases where it is desirable to use multiple stream pipelines and a single data source. However, care must be used in how these pipelines are configured in order to get the desired results.

Info

Apache Kafka is used by the pipeline service as a data stream messaging system to enable real-time processing of streams. This allows multiple pipelines to use a common input catalog data source where Kafka functions as a queuing model or as a publish-subscribe model. for additional information, see Kafka as a Messaging System.

Use Case 1: Shared Processing

Use Case 1 - Shared Processing
Figure 1. Shared Processing

In this use case, the idea is to have multiple pipelines sharing the processing load of a single streaming data source. Each pipeline processes data routed to it by the Kafka Stream Processor so that each pipeline only has to process a portion of the input catalog and layer data stream. This allows for faster processing of new stream data because each pipeline only has to deal with a portion of the over-all input data load.

Use Case 2: Shared Data Source

Use Case 2 - Shared Data Source
Figure 2. Shared Data Source

In this use case, the multiple pipelines each apply a different data processing workflow to the same source data. HERE platform Stream Services see the pipelines as unrelated except that they are using the same streaming data source. Each pipeline consumes the entire input catalog data stream.

Each use case is executable in the HERE platform, but the configuration is different for each use case.

The key configuration parameters in both use cases are the Group ID and the Kafka Consumer Group ID. The Group ID is normally assigned during deployment and is one of the parameters included in the Template. Every Pipeline Version has an associated Group ID. Multiple Pipeline Versions can be created that differ only in their Group ID. A Kafka Consumer Group ID is assigned automatically to associate the Kafka Cluster to the Data Consumer (Pipeline). While the Kafka Consumer Group IDs can be over-ridden programmatically, that is not a good choice because it can result in unexpected behavior.

Configuration Details

Where there are two stream pipelines and a single input data catalog.

illustrating possible configuration scenarios
Figure 3. Possible Configurations

Scenario 1: Shared Processing - Both pipelines have the same Group ID and the Kafka Consumer Group ID is not over-ridden.

Scenario 2: Shared Data Source - Each pipeline has a different Group ID and the Kafka Consumer Group ID is not over-ridden.

Scenario 3: Shared Data Source - Both pipelines have the same Group ID and the Kafka Consumer Group ID is over-ridden to be different for each pipeline. Not Recommended.

Use Unique Metric Operator Naming

In earlier versions of Flink, metric operators were automatically named based on their associated task names. But this system was found to be failure prone. It has been replaced by explicitly named metric operators that are stored in the respective streamConfig. However, you must make your metric operator names unique to avoid naming conflicts that will cause the duplicate metrics to not be reported.

Also, it is a good idea to limit the operatorName component of a metric name to not exceed 80 characters. This avoids unwieldy metric names being reported in the logs. This also avoids the known risk of metrics being ignored by the Graphite Reporter when their names are too long.

For more information about Operator IDs and Naming, see the Apache Flink documentation on Savepoints.

Use a Unique Application ID

Info: Application ID

Every pipeline application has to be registered with the HERE platform and receives an application ID. This is used for authorization purposes and is normally accessed from the credentials.properties file created when the application is registered. For an example, see the article Configuration File Reference.

A potential problem exists when the same group ID is used for a given combination of an application id, layer id, and catalog id. Consequently, when running two pipelines that share the same application ID and consume the same streaming layer, any given message will be consumed only by one pipeline. This might be a problem when each pipeline is expected to work in isolation and consumes all incoming messages.

The best way to avoid this issue is to create a different Group (HERE Account Group) for every pipeline, thus ensuring that each pipeline uses a unique application ID. This strategy will guarantee that pipelines consuming data from stream layers consumes all messages as they should. Sharing application IDs across pipelines will lead to partial data consumption issues per pipeline as opposed to every pipeline consuming all messages from the stream. For more information, see the Data Client Library documentation.

Use Same Catalog for Input and Output

Unlike batch pipelines, stream pipelines can use the same catalog for input and output. However, the catalog must be configured with Stream Layers to be valid.

Flink runtime libraries are automatically included in the HERE platform pipeline. A maven archetype is available to provide a preconfigured project structure for creating a new stream pipeline. For further information, see the Archetypes documentation.

results matching ""

    No results matching ""