Flink Connector Integration with Stream Layers
Create Table Sink and Table Source for Stream Layer
The main entry point of the Flink Connector API is OlpStreamConnectorHelper
.
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnectorHelper
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnectorHelper;
An instance of OlpStreamConnectorHelper
used to create flink.table.api.Schema and build SQL statement. The following code snippet shows how to create an instance of OlpStreamConnectorHelper
, build flink.table.api.Schema and create table with given schema and options:
val properties = Map(
"olp.kafka.group-name" -> "protobuf-streaming-test",
"olp.kafka.offset" -> "earliest"
)
val sourceHelper: OlpStreamConnectorHelper =
OlpStreamConnectorHelper(HRN(inputCatalogHrn), "sample-streaming-layer", properties)
val tEnv = StreamTableEnvironment.create(env)
tEnv.executeSql(
s"CREATE TABLE InputTable ${sourceHelper.prebuiltSchema(tEnv).build()} " +
s"WITH ${sourceHelper.options}")
OlpStreamConnectorHelper sourceHelper =
OlpStreamConnectorHelper.create(
HRN.fromString(inputCatalogHrn), inputLayerId, sourceProperties);
Schema sourceSchema = sourceHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
String.format("CREATE TABLE InputTable %s WITH %s", sourceSchema, sourceHelper.options()));
The source factory supports the following properties for Stream layers:
-
olp.kafka.group-name
: required; used to derive/compose the group ID settings of the Kafka consumer config. -
olp.kafka.offset
: can get either the "earliest" or the "latest" value; translates the value to the Kafka auto.offset.reset consumer config.
Before creating a table schema, the helper fetches the catalog configuration using the passed HRN
. Then it checks the data format and schema if it exists for the passed layerId
. As the last step, Flink Connector automatically translates the layer schema into a flink.table.api.Schema.
The following section describes how the schema translation works.
You create a Table
Sink the same way as Source with OlpStreamConnectorHelper
:
val sinkHelper: OlpStreamConnectorHelper =
OlpStreamConnectorHelper(HRN(outputCatalogHrn), "protobuf-stream-in", Map.empty)
tEnv.executeSql(
s"CREATE TABLE OutputTable ${sinkHelper.prebuiltSchema(tEnv).build()} " +
s"WITH ${sinkHelper.options}")
OlpStreamConnectorHelper sinkHelper =
OlpStreamConnectorHelper.create(
HRN.fromString(outputCatalogHrn), outputLayerId, new HashMap<>());
Schema sinkSchema = sinkHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
String.format("CREATE TABLE OutputTable %s WITH %s", sinkSchema, sinkHelper.options()));
There are no properties that apply to Stream Layers Sink
s. An empty Map
is passed in the above code snippet.
Flink Connector supports the following data formats for Stream layer payload:
-
Raw. The decoding and encoding logic is not applied and you get your data payload as an array of bytes. Your Table schema appears as follows:
root
|-- data: Array[Byte]
|-- mt_partition: String
|-- mt_timestamp: Long
|-- mt_checksum: String
|-- mt_crc: String
|-- mt_dataSize: Long
|-- mt_compressedDataSize: Long
The column with the payload data is called data
. The metadata columns follow the data column and have the mt_
prefix. In the list of metadata columns, only mt_partition
is a required column. The rest are optional and you can use null
as a value for them.
This format is used if your layer content type is configured as application/octet-stream
.
-
Protobuf. Flink uses the attached Protobuf schema (that you specify in your layer configuration) to derive a Flink Table schema.
root
|-- protobuf_field_1: String
|-- protobuf_field_2: String
|-- probobuf_field_3.nested_column: Long
|-- ...
|-- mt_partition: String
|-- mt_timestamp: Long
|-- mt_checksum: String
|-- mt_crc: String
|-- mt_dataSize: Long
|-- mt_compressedDataSize: Long
The Flink Connector puts the top level protobuf fields as the top level Row
columns, then the metadata columns follow.
This format is used if your layer content type is configured as application/x-protobuf
and you have a specified schema. If the schema is not specified, an error will be thrown.
Note
Self-referencing protobuf fields are not supported because there is no way to represent them in the Flink TypeInformation-based schema.
-
Other formats
If your layer uses a format other than the two described, an error will be thrown.
Table Source and Sink have the same schema for the same layer.
You can always print your Table schema using the standard Flink API:
tEnv.from("InputTable").printSchema()
tEnv.from("OutputTable").printSchema();
Read and Write Raw Data
Using SQL:
val tEnv = StreamTableEnvironment.create(env)
val sourceProperties =
Map(
"olp.kafka.group-name" -> "raw-streaming-group",
"olp.kafka.offset" -> "earliest",
"olp.connector.metadata-columns" -> "true"
)
val sourceHelper: OlpStreamConnectorHelper =
OlpStreamConnectorHelper(HRN(inputCatalogHrn), "raw-stream-in", sourceProperties)
tEnv.executeSql(
s"CREATE TABLE InputTable ${sourceHelper.prebuiltSchema(tEnv).build()} " +
s"WITH ${sourceHelper.options}")
val sinkHelper: OlpStreamConnectorHelper =
OlpStreamConnectorHelper(HRN(outputCatalogHrn),
"raw-stream-out",
Map("olp.connector.metadata-columns" -> "true"))
tEnv.executeSql(
s"CREATE TABLE OutputTable ${sinkHelper.prebuiltSchema(tEnv).build()} " +
s"WITH ${sinkHelper.options}")
tEnv.executeSql(
"INSERT INTO OutputTable SELECT data, mt_partition, mt_timestamp, mt_checksum, mt_crc, mt_dataSize, mt_compressedDataSize FROM InputTable"
)
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
OlpStreamConnectorHelper sourceHelper =
OlpStreamConnectorHelper.create(
HRN.fromString(inputCatalogHrn), inputLayerId, sourceProperties);
Schema sourceSchema = sourceHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
String.format("CREATE TABLE InputTable %s WITH %s", sourceSchema, sourceHelper.options()));
Map<String, String> sinkProperties = new HashMap<>();
sinkProperties.put("olp.connector.metadata-columns", "true");
OlpStreamConnectorHelper sinkHelper =
OlpStreamConnectorHelper.create(
HRN.fromString(outputCatalogHrn), outputLayerId, sinkProperties);
Schema sinkSchema = sinkHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
String.format("CREATE TABLE OutputTable %s WITH %s", sinkSchema, sinkHelper.options()));
tEnv.executeSql(
"INSERT INTO OutputTable SELECT data, mt_partition, mt_timestamp, mt_checksum, mt_crc, mt_dataSize, mt_compressedDataSize FROM InputTable");
Read and Write Protobuf Data
Using SQL:
val properties = Map(
"olp.kafka.group-name" -> "protobuf-streaming-test",
"olp.kafka.offset" -> "earliest"
)
val sourceHelper: OlpStreamConnectorHelper =
OlpStreamConnectorHelper(HRN(inputCatalogHrn), "sample-streaming-layer", properties)
val tEnv = StreamTableEnvironment.create(env)
tEnv.executeSql(
s"CREATE TABLE InputTable ${sourceHelper.prebuiltSchema(tEnv).build()} " +
s"WITH ${sourceHelper.options}")
val sinkHelper: OlpStreamConnectorHelper =
OlpStreamConnectorHelper(HRN(outputCatalogHrn), "protobuf-stream-in", Map.empty)
tEnv.executeSql(
s"CREATE TABLE OutputTable ${sinkHelper.prebuiltSchema(tEnv).build()} " +
s"WITH ${sinkHelper.options}")
tEnv.executeSql("INSERT INTO OutputTable SELECT * FROM InputTable")
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
OlpStreamConnectorHelper sourceHelper =
OlpStreamConnectorHelper.create(
HRN.fromString(inputCatalogHrn), inputLayerId, sourceProperties);
Schema sourceSchema = sourceHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
String.format("CREATE TABLE InputTable %s WITH %s", sourceSchema, sourceHelper.options()));
OlpStreamConnectorHelper sinkHelper =
OlpStreamConnectorHelper.create(
HRN.fromString(outputCatalogHrn), outputLayerId, new HashMap<>());
Schema sinkSchema = sinkHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
String.format("CREATE TABLE OutputTable %s WITH %s", sinkSchema, sinkHelper.options()));
tEnv.executeSql("INSERT INTO OutputTable SELECT * FROM InputTable");
Read Protobuf Data and Write Raw Data
You can extract coordinates from the Probofuf data that has the SDII
schema and write each coordinate pair as a separate message into a Stream layer with the Raw format. First, you have to define how the coordinates can be encoded as an array of bytes. You can use the following simple appoach:
class DecodeCoordinates extends ScalarFunction {
private def byteArrayToDouble(array: Array[Byte]): Double = ByteBuffer.wrap(array).getDouble
def eval(bytes: Array[Byte]): Array[Double] =
Array(byteArrayToDouble(bytes.take(8)), byteArrayToDouble(bytes.slice(8, 16)))
}
class EncodeCoordinates extends ScalarFunction {
private def doubleToByteArray(value: Double): Array[Byte] = {
val longBits = java.lang.Double.doubleToLongBits(value)
ByteBuffer.allocate(8).putLong(longBits).array()
}
def eval(longitude: Double, latitude: Double): Array[Byte] =
doubleToByteArray(longitude) ++ doubleToByteArray(latitude)
}
@SuppressWarnings("serial")
public class EncodeCoordinatesJava extends ScalarFunction {
private byte[] doubleToByteArray(Double value) {
long longBits = Double.doubleToLongBits(value);
return ByteBuffer.allocate(8).putLong(longBits).array();
}
public byte[] eval(Double longitude, Double latitude) {
byte[] result = Arrays.copyOf(doubleToByteArray(longitude), 16);
System.arraycopy(doubleToByteArray(latitude), 0, result, 8, 8);
return result;
}
}
@SuppressWarnings("serial")
public class DecodeCoordinatesJava extends ScalarFunction {
private Double byteArrayToDouble(byte[] array) {
return ByteBuffer.wrap(array).getDouble();
}
public Double[] eval(byte[] bytes) {
byte[] firstDoubleArray = Arrays.copyOfRange(bytes, 0, 8);
byte[] secondDoubleArray = Arrays.copyOfRange(bytes, 8, 16);
return new Double[] {byteArrayToDouble(firstDoubleArray), byteArrayToDouble(secondDoubleArray)};
}
}
Then you use the above encoding function to convert a pair of coordinates into an array of bytes, and then write that array as a payload into the Stream Layer with the Raw format:
val sourceProperties =
Map(
"olp.kafka.group-name" -> "my-consumer-group",
"olp.kafka.offset" -> "latest",
"olp.connector.metadata-columns" -> "true"
)
val sourceHelper: OlpStreamConnectorHelper =
OlpStreamConnectorHelper(HRN(inputCatalogHrn), "sample-streaming-layer", sourceProperties)
val tEnv = StreamTableEnvironment.create(env)
tEnv.executeSql(
s"CREATE TABLE InputTable ${sourceHelper.prebuiltSchema(tEnv).build()} " +
s"WITH ${sourceHelper.options}")
val sinkHelper: OlpStreamConnectorHelper =
OlpStreamConnectorHelper(HRN(outputCatalogHrn),
outputLayerId,
Map("olp.connector.metadata-columns" -> "true"))
tEnv.executeSql(
s"CREATE TABLE OutputTable ${sinkHelper.prebuiltSchema(tEnv).build()} " +
s"WITH ${sinkHelper.options}")
tEnv.createTemporarySystemFunction("ENCODE_COORDINATES", new EncodeCoordinates)
tEnv.from("InputTable").printSchema()
tEnv.executeSql(
"""
INSERT INTO
OutputTable
SELECT
ENCODE_COORDINATES(longitude_deg, latitude_deg) as data,
mt_partition, mt_timestamp, mt_checksum, mt_crc, mt_dataSize, mt_compressedDataSize
FROM
InputTable CROSS JOIN UNNEST(path.positionEstimate) AS p
"""
)
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
OlpStreamConnectorHelper sourceHelper =
OlpStreamConnectorHelper.create(
HRN.fromString(inputCatalogHrn), inputLayerId, sourceProperties);
Schema sourceSchema = sourceHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
String.format("CREATE TABLE InputTable %s WITH %s", sourceSchema, sourceHelper.options()));
Map<String, String> sinkProperties = new HashMap<>();
sinkProperties.put("olp.connector.metadata-columns", "true");
OlpStreamConnectorHelper sinkHelper =
OlpStreamConnectorHelper.create(
HRN.fromString(outputCatalogHrn), outputLayerId, sinkProperties);
Schema sinkSchema = sinkHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
String.format("CREATE TABLE OutputTable %s WITH %s", sinkSchema, sinkHelper.options()));
tEnv.createTemporaryFunction("ENCODE_COORDINATES", EncodeCoordinatesJava.class);
tEnv.executeSql(
"INSERT INTO "
+ " OutputTable "
+ "SELECT "
+ " ENCODE_COORDINATES(longitude_deg, latitude_deg) as data, "
+ " mt_partition, mt_timestamp, mt_checksum, mt_crc, mt_dataSize, mt_compressedDataSize "
+ "FROM"
+ " InputTable CROSS JOIN UNNEST(positionEstimate) AS p");
If you want to read this data from the Raw Stream layer, you can use the decoding function introduced above to convert an array of bytes into a pair of coordinates:
val properties = Map(
GROUP_NAME_PROPERTY.key() -> "my-consumer-group",
OFFSET_PROPERTY.key() -> "latest"
)
val sourceHelper: OlpStreamConnectorHelper =
OlpStreamConnectorHelper(HRN(inputCatalogHrn), inputLayerId, properties)
val tEnv = StreamTableEnvironment.create(env)
tEnv.executeSql(
s"CREATE TABLE InputTable ${sourceHelper.prebuiltSchema(tEnv).build()} " +
s"WITH ${sourceHelper.options}")
tEnv.createTemporarySystemFunction("DECODE_COORDINATES", new DecodeCoordinates)
val result: Table = tEnv.sqlQuery(
"""
SELECT
coordinates[1] AS longitude,
coordinates[2] as latitude
FROM (SELECT DECODE_COORDINATES(data) AS coordinates FROM InputTable)"""
)
val stream = tEnv.toAppendStream[(Double, Double)](result)
stream.print()
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
OlpStreamConnectorHelper sourceHelper =
OlpStreamConnectorHelper.create(
HRN.fromString(inputCatalogHrn), inputLayerId, sourceProperties);
Schema sourceSchema = sourceHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
String.format("CREATE TABLE InputTable %s WITH %s", sourceSchema, sourceHelper.options()));
tEnv.createTemporarySystemFunction("DECODE_COORDINATES", new DecodeCoordinatesJava());
tEnv.executeSql("SELECT * FROM InputTable").print();
Table result =
tEnv.sqlQuery(
"SELECT coordinates[1] AS longitude, "
+ "coordinates[2] as latitude "
+ "FROM (SELECT DECODE_COORDINATES(data) AS coordinates "
+ "FROM InputTable)");
TupleTypeInfo<Tuple2<Double, Double>> tupleType =
new TupleTypeInfo<>(Types.DOUBLE, Types.DOUBLE);
DataStream<Tuple2<Double, Double>> stream = tEnv.toAppendStream(result, tupleType);
stream.print();
How data is actually encoded/decoded in Kafka messages
The mt_partition
field is encoded/decoded as a message key. The data
field, such as described in the Read Protobuf Data and Write Raw Data section, is encoded/decoded as a message value. The remaining metadata columns are encoded/decoded as message headers.
application.conf configuration
You can use the application.conf
configuration to specify the connector type (http-connector or kafka-connector) and the kafka consumer and producer properties. For more information on http-connector or kafka-connector, see Select a Streaming Connector: Kafka or HTTP.