Flink Connector Integration with Stream Layers

Create Table Sink and Table Source for Stream Layer

The main entry point of the Flink Connector API is OlpStreamConnectorDescriptorFactory.

Scala
Java
import com.here.platform.data.client.flink.scaladsl.{
  OlpStreamConnection,
  OlpStreamConnectorDescriptorFactory
}
import com.here.platform.data.client.flink.javadsl.OlpStreamTableSinkFactory;
import com.here.platform.data.client.flink.javadsl.OlpStreamTableSourceFactory;

An instance of OlpStreamConnection consisting of ConnectorDescriptor and SchemaDescriptor is required to register the table source in the TableEnvironment's catalog. The following code snippet shows how you create an instance of OlpStreamConnection using OlpStreamConnectorDescriptorFactory and register in the TableEnvironment's catalog:

Scala
Java
// define the properties
val properties = Map(
  "olp.kafka.group-name" -> "protobuf-streaming-test",
  "olp.kafka.offset" -> "earliest"
).asJava

// create the Table Connector Descriptor Source
val streamSource: OlpStreamConnection =
  OlpStreamConnectorDescriptorFactory(HRN(inputCatalogHrn), "sample-streaming-layer")
    .createConnectorDescriptorWithSchema(properties)

// register the Table Source
val tEnv = StreamTableEnvironment.create(env)

tEnv
  .connect(streamSource.connectorDescriptor)
  .withSchema(streamSource.schema)
  .inAppendMode()
  .createTemporaryTable("InputTable")
// define the properties
Map<String, String> properties = new HashMap<>();
properties.put("olp.kafka.group-name", "protobuf-streaming-test");
properties.put("olp.kafka.offset", "earliest");

// create the Table Connector Descriptor Source
OlpStreamConnection streamSource =
    OlpStreamConnectorDescriptorFactory.create(
            HRN.fromString(inputCatalogHrn), "sample-streaming-layer")
        .createConnectorDescriptorWithSchema(properties);

// register the Table Source
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

tEnv.connect(streamSource.connectorDescriptor())
    .withSchema(streamSource.schema())
    .inAppendMode()
    .createTemporaryTable("InputTable");

The source factory supports the following properties for Stream layers:

  • olp.kafka.group-name: required; used to derive/compose the group ID settings of the Kafka consumer config.
  • olp.kafka.offset: can get either the "earliest" or the "latest" value; translates the value to the Kafka auto.offset.reset consumer config.

Before creating a Table, the factory fetches the catalog configuration using the passed HRN. Then it checks the data format and schema if it exists for the passed layerId. As the last step, Flink Connector automatically translates the layer schema into a Flink Table schema.

The following section describes how the schema translation works.

You create a Table Sink using OlpStreamConnectorDescriptorFactory and register in TableEnvironment's catalog as follows:

Scala
Java
val streamSink: OlpStreamConnection =
  OlpStreamConnectorDescriptorFactory(HRN(outputCatalogHrn), "protobuf-stream-in")
    .createConnectorDescriptorWithSchema(Map.empty[String, String].asJava)

tEnv
  .connect(streamSink.connectorDescriptor)
  .withSchema(streamSink.schema)
  .inAppendMode()
  .createTemporaryTable("OutputTable")
OlpStreamConnection streamSink =
    OlpStreamConnectorDescriptorFactory.create(
            HRN.fromString(outputCatalogHrn), "protobuf-stream-in")
        .createConnectorDescriptorWithSchema(new HashMap<>());

tEnv.connect(streamSink.connectorDescriptor())
    .withSchema(streamSink.schema())
    .inAppendMode()
    .createTemporaryTable("OutputTable");

There are no properties that apply to Stream Layers Sinks. An empty Map is passed in the above code snippet.

Data Formats

Flink Connector supports the following data formats for Stream layer payload:

  • Raw. The decoding and encoding logic is not applied and you get your data payload as an array of bytes. Your Table schema appears as follows:

    root
      |-- data: Array[Byte]
      |-- mt_partition: String
      |-- mt_timestamp: Long
      |-- mt_checksum: String
      |-- mt_crc: String
      |-- mt_dataSize: Long
      |-- mt_compressedDataSize: Long
    

    The column with the payload data is called data. The metadata columns follow the data column and have the mt_ prefix. In the list of metadata columns, only mt_partition is a required column. The rest are optional and you can use null as a value for them.

    This format is used if your layer content type is configured as application/octet-stream.

  • Protobuf. Flink uses the attached Protobuf schema (that you specify in your layer configuration) to derive a Flink Table schema.

    root
      |-- protobuf_field_1: String
      |-- protobuf_field_2: String
      |-- probobuf_field_3.nested_column: Long
      |-- ...
      |-- mt_partition: String
      |-- mt_timestamp: Long
      |-- mt_checksum: String
      |-- mt_crc: String
      |-- mt_dataSize: Long
      |-- mt_compressedDataSize: Long
    

    The Flink Connector puts the top level protobuf fields as the top level Row columns, then the metadata columns follow.

    This format is used if your layer content type is configured as application/x-protobuf and you have a specified schema. If the schema is not specified, an error will be thrown.

    Note:

    Self-referencing protobuf fields are not supported because there is no way to represent them in the Flink TypeInformation-based schema.

  • Other formats

    If your layer uses a format other than the two described, an error will be thrown.

Table Source and Sink have the same schema for the same layer.

You can always print your Table schema using the standard Flink API:

Scala
Java
// imagine that we have already registered InputTable
tEnv.from("InputTable").printSchema()
// imagine that we have already registered SDII_TABLE
tEnv.from("SDII_TABLE").printSchema();

Read and Write Raw Data

Using SQL:

Scala
Java
val tEnv = StreamTableEnvironment.create(env)

val sourceProperties =
  Map(
    "olp.kafka.group-name" -> "raw-streaming-group",
    "olp.kafka.offset" -> "earliest"
  ).asJava

val streamSource: OlpStreamConnection =
  OlpStreamConnectorDescriptorFactory(HRN(inputCatalogHrn), "raw-stream-in")
    .createConnectorDescriptorWithSchema(sourceProperties)

tEnv
  .connect(streamSource.connectorDescriptor)
  .withSchema(streamSource.schema)
  .inAppendMode()
  .createTemporaryTable("InputTable")

val streamSink: OlpStreamConnection =
  OlpStreamConnectorDescriptorFactory(HRN(outputCatalogHrn), "raw-stream-out")
    .createConnectorDescriptorWithSchema(
      Map.empty[String, String].asJava
    )

tEnv
  .connect(streamSink.connectorDescriptor)
  .withSchema(streamSink.schema)
  .inAppendMode()
  .createTemporaryTable("OutputTable")

tEnv.sqlUpdate(
  "INSERT INTO OutputTable SELECT data, mt_partition, mt_timestamp, mt_checksum, mt_crc, mt_dataSize, mt_compressedDataSize FROM InputTable"
)
// define the properties
Map<String, String> properties = new HashMap<>();
properties.put("olp.kafka.group-name", "raw-streaming-group");
properties.put("olp.kafka.offset", "earliest");

// create the Table Connector Descriptor Source
OlpStreamConnection streamSource =
    OlpStreamConnectorDescriptorFactory.create(HRN.fromString(inputCatalogHrn), "raw-stream-in")
        .createConnectorDescriptorWithSchema(properties);

// register the Table Source
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

tEnv.connect(streamSource.connectorDescriptor())
    .withSchema(streamSource.schema())
    .inAppendMode()
    .createTemporaryTable("InputTable");

OlpStreamConnection streamSink =
    OlpStreamConnectorDescriptorFactory.create(
            HRN.fromString(outputCatalogHrn), "raw-stream-out")
        .createConnectorDescriptorWithSchema(new HashMap<>());

tEnv.connect(streamSink.connectorDescriptor())
    .withSchema(streamSink.schema())
    .inAppendMode()
    .createTemporaryTable("OutputTable");

tEnv.sqlUpdate(
    "INSERT INTO OutputTable SELECT data, mt_partition, mt_timestamp, mt_checksum, mt_crc, mt_dataSize, mt_compressedDataSize FROM InputTable");

Read and Write Protobuf Data

Using SQL:

Scala
Java
/// [create-table-source]
// define the properties
val properties = Map(
  "olp.kafka.group-name" -> "protobuf-streaming-test",
  "olp.kafka.offset" -> "earliest"
).asJava

// create the Table Connector Descriptor Source
val streamSource: OlpStreamConnection =
  OlpStreamConnectorDescriptorFactory(HRN(inputCatalogHrn), "sample-streaming-layer")
    .createConnectorDescriptorWithSchema(properties)

// register the Table Source
val tEnv = StreamTableEnvironment.create(env)

tEnv
  .connect(streamSource.connectorDescriptor)
  .withSchema(streamSource.schema)
  .inAppendMode()
  .createTemporaryTable("InputTable")

/// [create-table-source]

/// [create-table-sink]
val streamSink: OlpStreamConnection =
  OlpStreamConnectorDescriptorFactory(HRN(outputCatalogHrn), "protobuf-stream-in")
    .createConnectorDescriptorWithSchema(Map.empty[String, String].asJava)

tEnv
  .connect(streamSink.connectorDescriptor)
  .withSchema(streamSink.schema)
  .inAppendMode()
  .createTemporaryTable("OutputTable")
/// [create-table-sink]

// we assume that the input and output tables have the same schema
tEnv.sqlUpdate("INSERT INTO OutputTable SELECT * FROM InputTable")
/// [create-table-source-java]
// define the properties
Map<String, String> properties = new HashMap<>();
properties.put("olp.kafka.group-name", "protobuf-streaming-test");
properties.put("olp.kafka.offset", "earliest");

// create the Table Connector Descriptor Source
OlpStreamConnection streamSource =
    OlpStreamConnectorDescriptorFactory.create(
            HRN.fromString(inputCatalogHrn), "sample-streaming-layer")
        .createConnectorDescriptorWithSchema(properties);

// register the Table Source
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

tEnv.connect(streamSource.connectorDescriptor())
    .withSchema(streamSource.schema())
    .inAppendMode()
    .createTemporaryTable("InputTable");
/// [create-table-source-java]

/// [create-table-sink-java]
OlpStreamConnection streamSink =
    OlpStreamConnectorDescriptorFactory.create(
            HRN.fromString(outputCatalogHrn), "protobuf-stream-in")
        .createConnectorDescriptorWithSchema(new HashMap<>());

tEnv.connect(streamSink.connectorDescriptor())
    .withSchema(streamSink.schema())
    .inAppendMode()
    .createTemporaryTable("OutputTable");
/// [create-table-sink-java]

// we assume that the input and output tables have the same schema
tEnv.sqlUpdate("INSERT INTO OutputTable SELECT * FROM InputTable");

Read Protobuf Data and Write Raw Data

You can extract coordinates from the Probofuf data that has the SDII schema and write each coordinate pair as a separate message into a Stream layer with the Raw format. First, you have to define how the coordinates can be encoded as an array of bytes. You can use the following simple appoach:

Scala
Java
/** Decodes an array of bytes of size 16 into an array of doubles of size 2 */
class DecodeCoordinates extends ScalarFunction {
  private def byteArrayToDouble(array: Array[Byte]): Double = ByteBuffer.wrap(array).getDouble

  def eval(bytes: Array[Byte]): Array[Double] =
    Array(byteArrayToDouble(bytes.take(8)), byteArrayToDouble(bytes.slice(8, 16)))

}

/** Encodes two Double values into a byte array of size 16 */
class EncodeCoordinates extends ScalarFunction {
  private def doubleToByteArray(value: Double): Array[Byte] = {
    val longBits = java.lang.Double.doubleToLongBits(value)
    ByteBuffer.allocate(8).putLong(longBits).array()
  }

  def eval(longitude: Double, latitude: Double): Array[Byte] =
    doubleToByteArray(longitude) ++ doubleToByteArray(latitude)

}
/** Encodes two Double values into a byte array of size 16 */
public class EncodeCoordinatesJava extends ScalarFunction {
  private byte[] doubleToByteArray(Double value) {
    long longBits = Double.doubleToLongBits(value);
    return ByteBuffer.allocate(8).putLong(longBits).array();
  }

  public byte[] eval(Double longitude, Double latitude) {
    byte[] result = Arrays.copyOf(doubleToByteArray(longitude), 16);
    System.arraycopy(doubleToByteArray(latitude), 0, result, 8, 8);
    return result;
  }
}
/** Decodes an array of bytes of size 16 into an array of doubles of size 2 */
public class DecodeCoordinatesJava extends ScalarFunction {
  private Double byteArrayToDouble(byte[] array) {
    return ByteBuffer.wrap(array).getDouble();
  }

  public Double[] eval(byte[] bytes) {
    byte[] firstDoubleArray = Arrays.copyOfRange(bytes, 0, 8);
    byte[] secondDoubleArray = Arrays.copyOfRange(bytes, 8, 16);
    return new Double[] {byteArrayToDouble(firstDoubleArray), byteArrayToDouble(secondDoubleArray)};
  }
}

Then you use the above encoding function to convert a pair of coordinates into an array of bytes, and then write that array as a payload into the Stream Layer with the Raw format:

Scala
Java
val sourceProperties =
  Map(
    "olp.kafka.group-name" -> "my-consumer-group",
    "olp.kafka.offset" -> "latest"
  ).asJava

val streamSource: OlpStreamConnection =
  OlpStreamConnectorDescriptorFactory(HRN(inputCatalogHrn), "sample-streaming-layer")
    .createConnectorDescriptorWithSchema(sourceProperties)

val tEnv = StreamTableEnvironment.create(env)

tEnv
  .connect(streamSource.connectorDescriptor)
  .withSchema(streamSource.schema)
  .inAppendMode()
  .createTemporaryTable("InputTable")

val streamSink: OlpStreamConnection =
  OlpStreamConnectorDescriptorFactory(HRN(outputCatalogHrn), outputLayerId)
    .createConnectorDescriptorWithSchema(
      Map.empty[String, String].asJava
    )

tEnv
  .connect(streamSink.connectorDescriptor)
  .withSchema(streamSink.schema)
  .inAppendMode()
  .createTemporaryTable("OutputTable")

tEnv.registerFunction("ENCODE_COORDINATES", new EncodeCoordinates)
tEnv.from("InputTable").printSchema()

tEnv.sqlUpdate(
  """
    INSERT INTO
        OutputTable
    SELECT
        ENCODE_COORDINATES(longitude_deg, latitude_deg) as data,
        mt_partition, mt_timestamp, mt_checksum, mt_crc, mt_dataSize, mt_compressedDataSize
    FROM
        InputTable CROSS JOIN UNNEST(path.positionEstimate) AS p
  """
)
Map<String, String> properties = new HashMap<>();
properties.put("olp.kafka.group-name", "my-consumer-group");
properties.put("olp.kafka.offset", "latest");

OlpStreamConnection streamSource =
    OlpStreamConnectorDescriptorFactory.create(
            HRN.fromString(inputCatalogHrn), "sample-streaming-layer")
        .createConnectorDescriptorWithSchema(properties);

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

tEnv.connect(streamSource.connectorDescriptor())
    .withSchema(streamSource.schema())
    .inAppendMode()
    .createTemporaryTable("InputTable");

OlpStreamConnection streamSink =
    OlpStreamConnectorDescriptorFactory.create(HRN.fromString(outputCatalogHrn), outputLayerId)
        .createConnectorDescriptorWithSchema(new HashMap<>());

tEnv.registerFunction("ENCODE_COORDINATES", new EncodeCoordinatesJava());
tEnv.connect(streamSink.connectorDescriptor())
    .withSchema(streamSink.schema())
    .inAppendMode()
    .createTemporaryTable("OutputTable");

tEnv.sqlUpdate(
    "INSERT INTO "
        + "    OutputTable "
        + "SELECT "
        + "    ENCODE_COORDINATES(longitude_deg, latitude_deg) as data, "
        + "    mt_partition, mt_timestamp, mt_checksum, mt_crc, mt_dataSize, mt_compressedDataSize "
        + "FROM"
        + "    InputTable CROSS JOIN UNNEST(path.positionEstimate) AS p");

If you want to read this data from the Raw Stream layer, you can use the decoding function introduced above to convert an array of bytes into a pair of coordinates:

Scala
Java
val properties = Map(
  GROUP_NAME_PROPERTY -> "my-consumer-group",
  OFFSET_PROPERTY -> "latest"
).asJava

val streamSource: OlpStreamConnection =
  OlpStreamConnectorDescriptorFactory(HRN(inputCatalogHrn), inputLayerId)
    .createConnectorDescriptorWithSchema(properties)

val tEnv = StreamTableEnvironment.create(env)

tEnv
  .connect(streamSource.connectorDescriptor)
  .withSchema(streamSource.schema)
  .inAppendMode()
  .createTemporaryTable("InputTable")

tEnv.registerFunction("DECODE_COORDINATES", new DecodeCoordinates)

val result: Table = tEnv.sqlQuery(
  """
    SELECT
        coordinates[1] AS longitude,
        coordinates[2] as latitude
    FROM (SELECT DECODE_COORDINATES(data) AS coordinates FROM InputTable)"""
)

val stream = tEnv.toAppendStream[(Double, Double)](result)
stream.print()
Map<String, String> properties = new HashMap<>();
properties.put("olp.kafka.group-name", "my-consumer-group");
properties.put("olp.kafka.offset", "latest");

OlpStreamConnection streamSource =
    OlpStreamConnectorDescriptorFactory.create(HRN.fromString(inputCatalogHrn), inputLayerId)
        .createConnectorDescriptorWithSchema(properties);

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.connect(streamSource.connectorDescriptor())
    .withSchema(streamSource.schema())
    .inAppendMode()
    .createTemporaryTable("InputTable");

tEnv.registerFunction("DECODE_COORDINATES", new DecodeCoordinatesJava());

Table result =
    tEnv.sqlQuery(
        "SELECT coordinates[1] AS longitude, coordinates[2] as latitude FROM (SELECT DECODE_COORDINATES(data) AS coordinates FROM InputTable)");

TupleTypeInfo<Tuple2<Double, Double>> tupleType =
    new TupleTypeInfo<>(Types.DOUBLE, Types.DOUBLE);

DataStream<Tuple2<Double, Double>> stream = tEnv.toAppendStream(result, tupleType);
stream.print();

How data is actually encoded/decoded in Kafka messages

The mt_partition field is encoded/decoded as a message key. The data field, such as described in the Read Protobuf Data and Write Raw Data section, is encoded/decoded as a message value. The remaining metadata columns are encoded/decoded as message headers.

application.conf configuration

You can use the application.conf configuration to specify the connector type (http-connector or kafka-connector) and the kafka consumer and producer properties. For more information on http-connector or kafka-connector, see Select a Streaming Connector: Kafka or HTTP.

results matching ""

    No results matching ""