Use Flink Connector to read and write data
Objectives: Understand how to use the Flink Connector to read and write data from different layers and data formats in a catalog.
Complexity: Beginner
Time to complete: 40 min
Prerequisites: Organize your work in projects
Source code: Download
The examples in this tutorial demonstrate how to use the Flink Connector provided by the Data Client Library. This provides support for interacting with Flink for stream processing workloads, allowing the use of all standard APIs and functions in Flink to read, write and delete data. For batch processing workloads, you should use the provided Spark Connector instead.
In the main part of the tutorial, we will cover the following usages:
- Subscribe to streaming layer in protobuf format
- Transform data from Table API to DataStream API and change the structure
- Print data from stream and use index layer as destination for stream
- Transfer data from one layer to another with different types
As a preparation step, you will need to create your data destination catalog with appropriate layers, so that these are in place when it comes to the main part of this tutorial. The dataset used will be sourced from the HERE Sample SDII Messages catalog, and contains simulated streaming sensor data in form of SDII messages.
Set up the Maven Project
Create the following folder structure for the project:
flink-connector
└── src
└── main
├── java
└── resources
└── scala
You can do this with a single bash
command:
mkdir -p flink-connector/src/main/{java,resources,scala}
Create the output catalog
Create a file named pipeline-config.conf
, and populate it with the contents below, replacing {{YOUR_OUTPUT_CATALOG_HRN}}
with the HRN to the catalog you created in Organize your work in projects.
pipeline.config {
output-catalog { hrn = "{{YOUR_OUTPUT_CATALOG_HRN}}" }
input-catalogs {
// Be sure to use hrn:here-cn:data::olp-cn-here:sample-data on the China Environment.
sensorData { hrn = "hrn:here:data::olp-here:olp-sdii-sample-berlin-2" }
}
}
The Maven POM file is similar to the file in the Verify Maven Settings example. The parent POM and dependencies sections are updated as follows:
Parent POM:
<parent>
<groupId>com.here.platform</groupId>
<artifactId>sdk-stream-bom_2.12</artifactId>
<version>2.51.5</version>
<relativePath/>
</parent>
Dependencies:
<dependencies>
<dependency>
<groupId>com.here.platform.data.client</groupId>
<artifactId>flink-support_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>com.here.platform.pipeline</groupId>
<artifactId>pipeline-interface_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.compat.version}</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.compat.version}</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.compat.version}</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.compat.version}</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.3</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>org.apache.htrace</groupId>
<artifactId>htrace-core</artifactId>
</exclusion>
<exclusion>
<groupId>xerces</groupId>
<artifactId>xercesImpl</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
You will need to create the output catalog. You can perform these steps using the OLP Command Line Interface (CLI).
You should use a unique identifier name for the catalog, for example {{YOUR_USERNAME}}-flink-connector-output
.
For the output catalog, you can name the file flink-connector-ouput.json
with the contents below.
{
"id": "flink-connector-output",
"name": "Simulated sensor data archive (from tutorial) flink-connector-output",
"summary": "Archive of simulated sensor data for the FlinkConnector tutorial",
"description": "Archive of simulated sensor data",
"tags": [
"Tutorial",
"Simulated"
],
"layers": [
{
"id": "volatile-layer-avro-data",
"name": "volatile-layer-avro-data",
"summary": "Simulated sensor data for the FlinkConnector tutorial",
"description": "Simulated sensor data for the FlinkConnector tutorial",
"contentType": "application/x-avro-binary",
"layerType": "volatile",
"volume": {
"volumeType": "durable"
},
"partitioning": {
"scheme": "generic"
}
},
{
"id": "index-layer-parquet-data",
"name": "index-layer-parquet-data",
"summary": "Simulated sensor data for the FlinkConnector tutorial",
"description": "Simulated sensor data for the FlinkConnector tutorial",
"contentType": "application/x-parquet",
"layerType": "index",
"indexProperties": {
"indexDefinitions": [
{
"name": "tile_id",
"type": "int"
},
{
"name": "time_window",
"duration": 600000,
"type": "timewindow"
}
],
"ttl": "unlimited"
},
"volume": {
"volumeType": "durable"
},
"partitioning": {
"scheme": "generic"
}
}
]
}
Replace {{YOUR_CATALOG_ID}}
below with your own identifier and then run the following command (on Windows, this works in Cygwin and git bash; otherwise you can run olp.bat
):
olp catalog create {{YOUR_CATALOG_ID}} \
"Simulated sensor data output from tutorial ({{YOUR_USERNAME}})" \
--config flink-connector-output.json \
--scope {{YOUR_PROJECT_HRN}}
Note
If a billing tag is required in your realm, update the config file by adding the billingTags: ["YOUR_BILLING_TAG"]
property to the layer
section.
Implement the Flink Connector application
This application uses the public data source to read from the stream layer in protobuf data format, performing some transformations on the received data, and writing to the output volatile layer from the previously created catalog.
import com.here.olp.util.quad.factory.HereQuadFactory
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnectorHelper
import com.here.platform.pipeline.PipelineContext
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.functions.ScalarFunction
import org.slf4j.LoggerFactory
import java.time.Duration
import java.util.{Timer, TimerTask}
object StreamToVolatileLayerScalaPipeline extends App {
private val logger = LoggerFactory.getLogger(StreamToVolatileLayerScalaPipeline.getClass)
private val pipelineContext = new PipelineContext
val sensorDataCatalogHrn = pipelineContext.config.inputCatalogs("sensorData")
val outputCatalogHrn = pipelineContext.config.outputCatalog
val streamingLayer = "sample-streaming-layer"
val outputVolatileLayer = "volatile-layer-avro-data"
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.enableCheckpointing(5000)
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 10000L))
val properties = Map(
"olp.kafka.group-name" -> "protobuf-streaming",
"olp.kafka.offset" -> "earliest",
)
val helper = OlpStreamConnectorHelper(sensorDataCatalogHrn, streamingLayer, properties)
val tEnv = StreamTableEnvironment.create(env)
val schema = helper.prebuiltSchema(tEnv).build()
tEnv.executeSql(s"CREATE TABLE SensorDataTable $schema WITH ${helper.options}")
tEnv.createTemporarySystemFunction("computeHereTile", new ComputeTileFunction())
tEnv.createTemporarySystemFunction("assignUuid", new AssignUuidFunction())
val observationsTable: Table =
tEnv.sqlQuery("""SELECT
| assignUuid(envelope.version) AS eventId,
| timeStampUTC_ms AS timestampUtc,
| computeHereTile(latitude_deg, longitude_deg) AS tile
|FROM
| SensorDataTable CROSS JOIN UNNEST(positionEstimate)
|""".stripMargin)
case class PositionEvent(eventId: String, timestampUtc: Long, tile: Long)
case class PositionStatistics(timestampUtc: Long, tile: Long, totalObservations: Int)
val watermarkStrategy: WatermarkStrategy[PositionEvent] = WatermarkStrategy
.forBoundedOutOfOrderness[PositionEvent](Duration.ofSeconds(10))
.withTimestampAssigner(new SerializableTimestampAssigner[PositionEvent] {
override def extractTimestamp(event: PositionEvent, recordTimestamp: Long): Long =
event.timestampUtc
})
val outputStream: DataStream[PositionStatistics] = tEnv
.toAppendStream[PositionEvent](observationsTable)
.assignTimestampsAndWatermarks(watermarkStrategy)
.map(v => (v.timestampUtc, v.tile, 1))
.keyBy(_._2)
.window(SlidingEventTimeWindows.of(Time.seconds(15), Time.seconds(5)))
.sum(2)
.map(PositionStatistics.tupled(_))
outputStream.print()
tEnv.createTemporaryView("PositionStatsTable", outputStream)
val outputAvroSchema =
"""{
| "type" : "record",
| "name" : "Event",
| "namespace" : "my.flink.tutorial",
| "fields" : [
| {"name" : "city", "type" : "string"},
| {"name" : "total", "type" : "int"},
| {"name" : "timestampUtc", "type" : "long"}
| ]
|}
|""".stripMargin
val sinkHelper: OlpStreamConnectorHelper =
OlpStreamConnectorHelper(outputCatalogHrn,
outputVolatileLayer,
Map("olp.catalog.layer-schema" -> outputAvroSchema))
tEnv.executeSql(
s"CREATE TABLE OutputIndexTable ${sinkHelper.prebuiltSchema(tEnv).build()} " +
s"WITH ${sinkHelper.options}")
tEnv.executeSql("""INSERT INTO
| OutputIndexTable
|SELECT
| 'Berlin' AS city,
| totalObservations AS total,
| timestampUtc,
| CAST(tile AS STRING) AS mt_partition
|FROM
| PositionStatsTable
|""".stripMargin)
try {
env.executeAsync()
logger.info(s"Stream to $outputCatalogHrn executed")
new Timer(true).schedule(new TimerTask {
override def run(): Unit = {
logger.info(s"Stream to $outputCatalogHrn canceled")
System.exit(0)
}
}, 30000)
} catch {
case ex: Exception =>
ex.printStackTrace()
}
}
class ComputeTileFunction() extends ScalarFunction {
private val tileLevel = 14
def eval(latitude: java.lang.Double, longitude: java.lang.Double): Long =
HereQuadFactory.INSTANCE
.getMapQuadByLocation(latitude, longitude, tileLevel)
.getLongKey
}
class AssignUuidFunction() extends ScalarFunction {
def eval(input: String): String =
java.util.UUID.randomUUID.toString
}
import com.here.hrn.HRN;
import com.here.olp.util.quad.factory.HereQuadFactory;
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnectorHelper;
import com.here.platform.pipeline.PipelineContext;
import java.time.Duration;
import java.util.*;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StreamToVolatileLayerPipeline {
private static final PipelineContext pipelineContext = new PipelineContext();
private static final HRN sensorDataCatalogHrn =
pipelineContext.getConfig().getInputCatalogs().get("sensorData");
private static final HRN outputCatalogHrn = pipelineContext.getConfig().getOutputCatalog();
private static final String streamingLayer = "sample-streaming-layer";
private static final String outputIndexLayer = "volatile-layer-avro-data";
public static void main(String[] args) {
Logger logger = LoggerFactory.getLogger(StreamToVolatileLayerPipeline.class);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(5000);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 10000L));
Map<String, String> properties = new HashMap<>();
properties.put("olp.kafka.group-name", "protobuf-streaming");
properties.put("olp.kafka.offset", "earliest");
OlpStreamConnectorHelper helper =
OlpStreamConnectorHelper.create(sensorDataCatalogHrn, streamingLayer, properties);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
Schema schema = helper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
String.format("CREATE TABLE SensorDataTable %s WITH %s", schema, helper.options()));
tEnv.createTemporarySystemFunction("computeHereTile", new ComputeTileFunction());
tEnv.createTemporarySystemFunction("assignUuid", new AssignUuidFunction());
Table observationsTable =
tEnv.sqlQuery(
"SELECT"
+ " assignUuid(envelope.version) AS eventId, "
+ " timeStampUTC_ms AS timestampUtc, "
+ " computeHereTile(latitude_deg, longitude_deg) AS tile "
+ "FROM SensorDataTable "
+ "CROSS JOIN UNNEST(positionEstimate)");
WatermarkStrategy<Tuple3<String, Long, Long>> watermarkStrategy =
WatermarkStrategy.<Tuple3<String, Long, Long>>forBoundedOutOfOrderness(
Duration.ofSeconds(10))
.withTimestampAssigner((event, timestamp) -> event.f1);
DataStream<PositionStatistics> outputStream =
tEnv.toAppendStream(
observationsTable,
new TupleTypeInfo<Tuple3<String, Long, Long>>(Types.STRING, Types.LONG, Types.LONG))
.assignTimestampsAndWatermarks(watermarkStrategy)
.map(
new MapFunction<Tuple3<String, Long, Long>, Tuple3<Long, Long, Integer>>() {
@Override
public Tuple3<Long, Long, Integer> map(Tuple3<String, Long, Long> observation) {
return new Tuple3<>(observation.f1, observation.f2, 1);
}
})
.keyBy(selector -> selector.f1)
.window(SlidingEventTimeWindows.of(Time.seconds(15), Time.seconds(5)))
.sum(2)
.map(
new MapFunction<Tuple3<Long, Long, Integer>, PositionStatistics>() {
@Override
public PositionStatistics map(Tuple3<Long, Long, Integer> result) {
return new PositionStatistics(result.f0, result.f1, result.f2);
}
});
outputStream.print();
tEnv.createTemporaryView("PositionStatsTable", outputStream);
String outputAvroSchema =
" {\n"
+ " \"type\" : \"record\",\n"
+ " \"name\" : \"Event\",\n"
+ " \"namespace\" : \"my.flink.tutorial\",\n"
+ " \"fields\" : [\n"
+ " {\"name\" : \"city\", \"type\" : \"string\"},\n"
+ " {\"name\" : \"total\", \"type\" : \"int\"},\n"
+ " {\"name\" : \"timestampUtc\", \"type\" : \"long\"}\n"
+ " ]\n"
+ " }\n";
Map<String, String> sinkProperties = new HashMap<>();
sinkProperties.put("olp.catalog.layer-schema", outputAvroSchema);
OlpStreamConnectorHelper sinkHelper =
OlpStreamConnectorHelper.create(outputCatalogHrn, outputIndexLayer, sinkProperties);
Schema sinkSchema = sinkHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
String.format(
"CREATE TABLE OutputIndexTable %s WITH %s", sinkSchema, sinkHelper.options()));
tEnv.executeSql(
"INSERT INTO"
+ " OutputIndexTable "
+ "SELECT"
+ " 'Berlin' AS city,"
+ " totalObservations AS total,"
+ " timestampUtc,"
+ " CAST(tile AS STRING) AS mt_partition "
+ "FROM"
+ " PositionStatsTable");
try {
env.executeAsync();
logger.info("Stream to {} executed", outputCatalogHrn);
new Timer(true)
.schedule(
new TimerTask() {
@Override
public void run() {
logger.info("Stream to {} canceled", outputCatalogHrn);
System.exit(0);
}
},
30000);
} catch (Exception ex) {
ex.printStackTrace();
}
}
public static class ComputeTileFunction extends ScalarFunction {
private final int tileLevel = 14;
public long eval(Double latitude, Double longitude) {
return HereQuadFactory.INSTANCE
.getMapQuadByLocation(latitude, longitude, tileLevel)
.getLongKey();
}
}
public static class AssignUuidFunction extends ScalarFunction {
public String eval(String input) {
return UUID.randomUUID().toString();
}
}
public static class PositionStatistics {
public long tile;
public long timestampUtc;
public int totalObservations;
public PositionStatistics() {}
public PositionStatistics(long timestampUtc, long tile, int totalObservations) {
this.tile = tile;
this.timestampUtc = timestampUtc;
this.totalObservations = totalObservations;
}
@Override
public String toString() {
return tile + " " + timestampUtc + " " + totalObservations;
}
}
}
Another example uses the volatile layer filled in the previous step to read the data in avro data format, performing some transformations on the received data, and writing to the output index layer from the previously created catalog.
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnectorHelper
import com.here.platform.pipeline.PipelineContext
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.{tableConversions, StreamTableEnvironment}
import org.apache.flink.types.Row
import org.apache.flink.streaming.api.scala._
import org.slf4j.LoggerFactory
import java.util.{Timer, TimerTask}
object VolatileToIndexLayerScalaPipeline extends App {
private val logger = LoggerFactory.getLogger(VolatileToIndexLayerScalaPipeline.getClass)
private val pipelineContext = new PipelineContext
val catalogHrn = pipelineContext.config.outputCatalog
val inputVolatileLayer = "volatile-layer-avro-data"
val outputIndexLayer = "index-layer-parquet-data"
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.enableCheckpointing(5000)
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 10000L))
val inputSchema =
"""{
| "type" : "record",
| "name" : "Event",
| "namespace" : "my.flink.tutorial",
| "fields" : [
| {"name" : "city", "type" : "string"},
| {"name" : "total", "type" : "int"},
| {"name" : "timestampUtc", "type" : "long"}
| ]
|}
|""".stripMargin
val tiles = List("377894440", "377894441", "377894442")
val properties =
Map(
"olp.layer.query" -> s"mt_partition=in=${tiles.mkString("(", ", ", ")")}",
"olp.catalog.layer-schema" -> inputSchema,
"olp.connector-refresh-interval" -> "-1"
)
val helper = OlpStreamConnectorHelper(catalogHrn, inputVolatileLayer, properties)
val tEnv = StreamTableEnvironment.create(env)
val schema = helper.prebuiltSchema(tEnv).build()
tEnv.executeSql(s"CREATE TABLE TableSource $schema WITH ${helper.options}")
tEnv.from("TableSource").toAppendStream[Row].print()
val outputSchema =
"""{
| "type" : "record",
| "name" : "Event",
| "namespace" : "my.flink.tutorial",
| "fields" : [
| {"name" : "city", "type" : "string"},
| {"name" : "total", "type" : "int"}
| ]
|}
|""".stripMargin
val sinkHelper: OlpStreamConnectorHelper =
OlpStreamConnectorHelper(catalogHrn,
outputIndexLayer,
Map("olp.catalog.layer-schema" -> outputSchema))
tEnv.executeSql(
s"CREATE TABLE Sink ${sinkHelper.prebuiltSchema(tEnv).build()} " +
s"WITH ${sinkHelper.options}")
tEnv.executeSql("""INSERT INTO Sink
|SELECT
| city,
| total,
| CAST(mt_partition as BIGINT) as idx_tile_id,
| timestampUtc as idx_time_window
|FROM TableSource
|""".stripMargin)
try {
env.executeAsync()
logger.info(s"Stream to $catalogHrn executed")
new Timer(true).schedule(new TimerTask {
override def run(): Unit = {
logger.info(s"Stream to $catalogHrn canceled")
System.exit(0)
}
}, 30000)
} catch {
case ex: Exception =>
ex.printStackTrace()
}
}
import com.here.hrn.HRN;
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnectorHelper;
import com.here.platform.pipeline.PipelineContext;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class VolatileToIndexLayerPipeline {
private static final PipelineContext pipelineContext = new PipelineContext();
private static final HRN catalogHrn = pipelineContext.config().getOutputCatalog();
private static final String inputVolatileLayer = "volatile-layer-avro-data";
private static final String outputIndexLayer = "index-layer-parquet-data";
public static void main(String[] args) {
Logger logger = LoggerFactory.getLogger(VolatileToIndexLayerPipeline.class);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(5000);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 10000L));
String inputAvroSchema =
" {\n"
+ " \"type\" : \"record\",\n"
+ " \"name\" : \"Event\",\n"
+ " \"namespace\" : \"my.flink.tutorial\",\n"
+ " \"fields\" : [\n"
+ " {\"name\" : \"city\", \"type\" : \"string\"},\n"
+ " {\"name\" : \"total\", \"type\" : \"int\"},\n"
+ " {\"name\" : \"timestampUtc\", \"type\" : \"long\"}\n"
+ " ]\n"
+ " }\n";
String[] tiles = (new String[] {"377894440", "377894441", "377894442"});
String tilesQueryString = Arrays.stream(tiles).collect(Collectors.joining(", ", "(", ")"));
Map<String, String> properties = new HashMap<>();
properties.put("olp.layer.query", "mt_partition=in=" + tilesQueryString);
properties.put("olp.catalog.layer-schema", inputAvroSchema);
properties.put("olp.connector-refresh-interval", "-1");
OlpStreamConnectorHelper helper =
OlpStreamConnectorHelper.create(catalogHrn, inputVolatileLayer, properties);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
Schema schema = helper.prebuiltSchema(tEnv).build();
tEnv.executeSql(String.format("CREATE TABLE TableSource %s WITH %s", schema, helper.options()));
tEnv.toAppendStream(tEnv.from("TableSource"), Row.class).print();
String outputParquetSchema =
" {\n"
+ " \"type\" : \"record\",\n"
+ " \"name\" : \"Event\",\n"
+ " \"namespace\" : \"my.flink.tutorial\",\n"
+ " \"fields\" : [\n"
+ " {\"name\" : \"city\", \"type\" : \"string\"},\n"
+ " {\"name\" : \"total\", \"type\" : \"int\"}\n"
+ " ]\n"
+ " }\n";
Map<String, String> sinkProperties = new HashMap<>();
sinkProperties.put("olp.catalog.layer-schema", outputParquetSchema);
OlpStreamConnectorHelper sinkHelper =
OlpStreamConnectorHelper.create(catalogHrn, outputIndexLayer, sinkProperties);
Schema sinkSchema = sinkHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
String.format("CREATE TABLE Sink %s WITH %s", sinkSchema, sinkHelper.options()));
tEnv.executeSql(
"INSERT INTO Sink "
+ "SELECT"
+ " city,"
+ " total,"
+ " CAST(mt_partition as BIGINT) as idx_tile_id,"
+ " timestampUtc as idx_time_window "
+ "FROM TableSource");
try {
env.executeAsync();
logger.info("Stream to {} executed", catalogHrn);
new Timer(true)
.schedule(
new TimerTask() {
@Override
public void run() {
logger.info("Stream to {} canceled", catalogHrn);
System.exit(0);
}
},
30000);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
Compile and run locally
To run the application locally, execute the following command:
mvn compile exec:java \
-Dexec.mainClass=StreamToVolatileLayerScalaPipeline \
-Dpipeline-config.file=pipeline-config.conf \
-Dpipeline-job.file=pipeline-job.conf
mvn compile exec:java \
-Dexec.mainClass=StreamToVolatileLayerPipeline \
-Dpipeline-config.file=pipeline-config.conf \
-Dpipeline-job.file=pipeline-job.conf
to run the second application, execute the following command:
mvn compile exec:java \
-Dexec.mainClass=VolatileToIndexLayerScalaPipeline \
-Dpipeline-config.file=pipeline-config.conf \
-Dpipeline-job.file=pipeline-job.conf
mvn compile exec:java \
-Dexec.mainClass=VolatileToIndexLayerPipeline \
-Dpipeline-config.file=pipeline-config.conf \
-Dpipeline-job.file=pipeline-job.conf
For more details on the topics covered in this tutorial, you can refer to the following sources: