Run a Stream Application Locally
Objectives: Set up and run a stream application locally that reads streaming messages and writes them to a versioned layer.
Complexity: Easy
Time to complete: 30 min
Depends on: Organize your work in projects
Source code: Download
This example demonstrates how to set up and run a local Flink application that reads streaming messages from an input catalog. The Flink Application writes the message content into the versioned layer of the catalog that you created in the Organize your work in projects example.
Set up the Maven Project
Create the following folder structure for the project:
archive-stream
└── src
└── main
├── java
└── resources
└── scala
You can do this with a single bash
command:
mkdir -p archive-stream/src/main/{java,resources,scala}
Create a file named pipeline-config.conf
, and populate it with the contents below, replacing {{YOUR_CATALOG_HRN}}
with the HRN to the catalog you created in Organize your work in projects.
pipeline.config {
// Replace this with the HRN to your catalog.
output-catalog {hrn = "{{YOUR_CATALOG_HRN}}"}
input-catalogs {
//Please, use hrn:here-cn:data::olp-cn-here:sample-data on China Environment
sensorData {hrn = "hrn:here:data::olp-here:olp-sdii-sample-berlin-2"}
}
}
We will use SDII Sensor Data Sample Catalog as an input data catalog, so we need to link it to the project. To do this, replace {{YOUR_PROJECT_HRN}}
with the HRN of your project in the following command and execute it:
olp project resources link {{YOUR_PROJECT_HRN}} hrn:here:data::olp-here:olp-sdii-sample-berlin-2
The CLI should return the following message:
Project resource hrn:here:data::olp-here:olp-sdii-sample-berlin-2 has been linked.
The POM for this example is identical to that in the first Maven example, except for its parent
and dependencies
section:
<parent>
<groupId>com.here.platform</groupId>
<artifactId>sdk-stream-bom</artifactId>
<version>2.24.8</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>com.here.platform.data.client</groupId>
<artifactId>flink-support_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>com.here.platform.pipeline</groupId>
<artifactId>pipeline-interface_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
</dependencies>
<dependencies>
<dependency>
<groupId>com.here.platform.data.client</groupId>
<artifactId>flink-support_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>com.here.platform.pipeline</groupId>
<artifactId>pipeline-interface_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
</dependencies>
Implement the Stream Reading Application
The respective Scala and Java implementations, which use the flink-support
module of the Data Client Library to receive streaming messages from a sample streaming layer and to copy their contents into an output versioned layer, are as follows:
import com.here.hrn.HRN
import com.here.platform.data.client.flink.scaladsl.FlinkReadEngine
import com.here.platform.data.client.flink.scaladsl.{
FlinkDataClient,
FlinkPublishApi,
FlinkWriteEngine
}
import com.here.platform.data.client.model.VersionDependency
import com.here.platform.data.client.scaladsl.{CommitPartition, NewPartition, Partition}
import com.here.platform.data.client.settings.{ConsumerSettings, LatestOffset}
import com.here.platform.pipeline.PipelineContext
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.function.RichAllWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object ArchiveStreamScala extends App {
class UploadDataFunction(val inputHRN: HRN, val outputHRN: HRN)
extends RichMapFunction[Partition, CommitPartition]
with Serializable {
@transient
private lazy val flinkDataClient: FlinkDataClient =
new FlinkDataClient()
override def close(): Unit =
flinkDataClient.terminate()
@transient
private lazy val readEngine: FlinkReadEngine =
flinkDataClient.readEngine(inputHRN)
@transient
private lazy val writeApi: FlinkWriteEngine =
flinkDataClient.writeEngine(outputHRN)
def map(partition: Partition): CommitPartition = {
val data = readEngine.getDataAsBytes(partition)
val newPartition = NewPartition(
partition = partition.partition,
layer = "sdii-message-archive",
data = NewPartition.ByteArrayData(data)
)
writeApi.put(newPartition)
}
}
class PublishBatchWindowFunction(hrn: HRN)
extends RichAllWindowFunction[CommitPartition, String, TimeWindow]
with Serializable {
@transient
private lazy val flinkDataClient: FlinkDataClient =
new FlinkDataClient()
@transient
private lazy val publishApi: FlinkPublishApi =
flinkDataClient.publishApi(hrn)
override def close(): Unit =
flinkDataClient.terminate()
override def apply(window: TimeWindow,
partitions: Iterable[CommitPartition],
out: Collector[String]): Unit = {
val baseVersion = publishApi.getBaseVersion()
val layerId = partitions.map(p => p.layer).toSeq
publishApi.publishBatch2(baseVersion,
Some(layerId),
Seq.empty[VersionDependency],
partitions.iterator)
out.collect(s"commit on $baseVersion success")
}
}
override def main(args: Array[String]): Unit = {
val pipelineContext = new PipelineContext
val sensorDataHRN = pipelineContext.config.inputCatalogs("sensorData")
val outputHRN = pipelineContext.config.outputCatalog
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
val flinkDataClient = new FlinkDataClient
val queryApi = flinkDataClient.queryApi(sensorDataHRN)
val streamingLayer = "sample-streaming-layer"
val subscriptionFunction = queryApi.subscribe(
streamingLayer,
ConsumerSettings(groupName = "archive-stream-consumer", offset = LatestOffset))
val partitions: DataStream[Partition] = env.addSource(subscriptionFunction)
try {
partitions
.map(new UploadDataFunction(sensorDataHRN, outputHRN))
.windowAll(TumblingEventTimeWindows.of(Time.minutes(1)))
.apply(new PublishBatchWindowFunction(outputHRN))
.addSink(_ => {
throw new InterruptedException("Halting stream processing")
})
env.execute()
} catch {
case ex: Exception => ()
}
subscriptionFunction.cancel()
flinkDataClient.terminate()
}
}
import com.here.hrn.HRN;
import com.here.platform.data.client.flink.javadsl.*;
import com.here.platform.data.client.javadsl.CommitPartition;
import com.here.platform.data.client.javadsl.NewPartition;
import com.here.platform.data.client.javadsl.Partition;
import com.here.platform.data.client.settings.ConsumerSettings;
import com.here.platform.pipeline.PipelineContext;
import java.io.Serializable;
import java.util.*;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
class UploadDataFunction extends RichMapFunction<Partition, CommitPartition>
implements Serializable {
private HRN sensorDataHRN;
private HRN outputHRN;
private transient FlinkDataClient dataClient;
private transient FlinkReadEngine readEngine;
private transient FlinkWriteEngine writeEngine;
public UploadDataFunction(HRN sensorDataHRN, HRN outputHRN) {
this.sensorDataHRN = sensorDataHRN;
this.outputHRN = outputHRN;
}
@Override
public void open(Configuration parameters) throws Exception {
dataClient = new FlinkDataClient();
readEngine = dataClient.readEngine(sensorDataHRN);
writeEngine = dataClient.writeEngine(outputHRN);
}
@Override
public void close() throws Exception {
dataClient.terminate();
}
@Override
public CommitPartition map(Partition partition) throws Exception {
byte[] data = readEngine.getDataAsBytes(partition);
NewPartition newPartition =
new NewPartition.Builder()
.withPartition(partition.getPartition())
.withLayer("sdii-message-archive")
.withData(data)
.build();
return writeEngine.put(newPartition);
}
}
class PublishBatchWindowFunction extends RichAllWindowFunction<CommitPartition, String, TimeWindow>
implements Serializable {
private HRN hrn;
private transient FlinkDataClient dataClient;
private transient FlinkPublishApi publishApi;
public PublishBatchWindowFunction(HRN hrn) {
this.hrn = hrn;
}
@Override
public void open(Configuration parameters) throws Exception {
dataClient = new FlinkDataClient();
publishApi = dataClient.publishApi(hrn);
}
@Override
public void close() throws Exception {
dataClient.terminate();
}
@Override
public void apply(
TimeWindow window, Iterable<CommitPartition> commitPartitions, Collector<String> out)
throws Exception {
OptionalLong baseVersion = publishApi.getBaseVersion();
Set<String> layersId = new HashSet<>();
commitPartitions.forEach(partition -> layersId.add(partition.getLayer()));
publishApi.publishBatch2(
baseVersion,
Optional.of(new ArrayList<>(layersId)),
Collections.emptyList(),
commitPartitions.iterator());
out.collect("commit on " + baseVersion + " success");
}
}
public class ArchiveStreamJava {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
PipelineContext pipelineContext = new PipelineContext();
HRN sensorDataHRN = pipelineContext.getConfig().getInputCatalogs().get("sensorData");
HRN outputHRN = pipelineContext.getConfig().getOutputCatalog();
String streamingLayer = "sample-streaming-layer";
FlinkDataClient flinkDataClient = new FlinkDataClient();
FlinkQueryApi queryApi = flinkDataClient.queryApi(sensorDataHRN);
ConsumerSettings consumerSettings =
new ConsumerSettings.Builder()
.withLatestOffset()
.withGroupName("archive-stream-consumer")
.build();
SourceFunction<Partition> subscriptionFunction =
queryApi.subscribe(streamingLayer, consumerSettings);
DataStream<Partition> partitions = env.addSource(subscriptionFunction);
try {
partitions
.map(new UploadDataFunction(sensorDataHRN, outputHRN))
.windowAll(TumblingEventTimeWindows.of(Time.minutes(1)))
.apply(new PublishBatchWindowFunction(outputHRN))
.addSink(
new SinkFunction<String>() {
@Override
public void invoke(String value, Context context) throws Exception {
throw new InterruptedException("Halting stream processing");
}
});
env.execute();
} catch (Exception ex) {
}
subscriptionFunction.cancel();
flinkDataClient.terminate();
}
}
You can place your configuration files in the resources
folder to control, for example, the logging level of the application's output and the size of the request queues for the Data Services.
This resources/application.conf
file forwards logging events from akka
to an slf4j logger and increases the size of the request queues for the config and streaming services:
akka {
loglevel = "DEBUG"
loggers = ["akka.event.slf4j.Slf4jLogger"]
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
logger-startup-timeout = 1m
}
here.platform.data-client {
config {
request-executor {
akka.http.host-connection-pool {
max-open-requests = 16
max-connections = 16
}
}
}
stream {
request-executor {
akka.http.host-connection-pool {
max-open-requests = 256
max-connections = 256
}
}
connector {
consumer = "kafka-connector"
}
}
}
You can configure the logging levels exposed in this resources/log4j.properties
file:
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{1} - %m%n
log4j.logger.akka.http=DEBUG
log4j.logger.com.here.platform.data.client=INFO
Compile and Run Locally
To run the application locally, execute the following command:
mvn compile exec:java -Dexec.mainClass=ArchiveStreamScala -Dpipeline-config.file=pipeline-config.conf \
-Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
mvn compile exec:java -Dexec.mainClass=ArchiveStreamJava -Dpipeline-config.file=pipeline-config.conf \
-Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
The application publishes a new version of the output catalog after uploading partitions to the catalog for one minute. You can inspect the output at the platform portal. To do this, navigate in the portal, find {{YOUR_CATALOG_HRN}}
in the list and open this catalog. Select the sdii-message-archive layer to inspect its data.
Manually refresh the page to update the Versions:
field once the app has published the new version. Next, go the Inspect tab and select a tile to visualize it on the map and view its decoded data in the right-hand panel.
Note
To keep this tutorial focused on the raw mechanics of interacting with the Flink Data Client, the sample code here is a simpler implementation, but not illustrative of a real-world use case.
A normal production application could also decode the content of the streaming messages in order to bucket them into useful partitions. For example, the coordinates of an event within a message could be correlated to a specific HERE tile on a map.
Further, a typical application to archive streaming messages would use the Data Archiving Library to process their content and write the results to an index layer. See "Further Information" below for more on archiving.
As implemented, the application deliberately throws an exception after successfully publishing the new version to prevent it from continuing to process streaming input indefinitely. To see how this application behaves in a real production case, comment out the line that throws the exception. In this case, the application continually publishes a new version of the output catalog once per minute until you manually cancel the pipeline to stop the stream processing.
throw new InterruptedException("Halting stream processing")
throw new InterruptedException("Halting stream processing");
You can learn how to run this streaming application with Pipeline API in Run a Stream Application with Pipeline API.
You can learn how to archive steaming messages into index layers with the Data Archiving Library.