Flink Connector Integration with Version Layers

Create Table Source for Version Layer

The main entry point of the Flink Connector API is OlpStreamConnectorDescriptorFactory.

Scala
Java
import com.here.platform.data.client.flink.scaladsl.{
  OlpStreamConnection,
  OlpStreamConnectorDescriptorFactory
}
import com.here.platform.data.client.flink.javadsl.OlpStreamTableSinkFactory;
import com.here.platform.data.client.flink.javadsl.OlpStreamTableSourceFactory;

An instance of OlpStreamConnection consisting of ConnectorDescriptor and SchemaDescriptor is required to register the table source in the TableEnvironment's catalog. The following code snippet shows how you create an instance of OlpStreamConnection using OlpStreamConnectorDescriptorFactory and register in the TableEnvironment's catalog:

Scala
Java
// define the properties
val properties = Map(
  "olp.layer.query" -> "mt_version==LATEST"
).asJava

// create the Table Source
val streamSource: OlpStreamConnection =
  OlpStreamConnectorDescriptorFactory(HRN(inputCatalogHrn), "sample-versioned-layer")
    .createConnectorDescriptorWithSchema(properties)

// register the Table Source
val tEnv = StreamTableEnvironment.create(env)

tEnv
  .connect(streamSource.connectorDescriptor)
  .withSchema(streamSource.schema)
  .inAppendMode()
  .createTemporaryTable("InputTable")
// define the properties
Map<String, String> properties = new HashMap<>();
properties.put("olp.layer.query", "mt_version==LATEST");

// create the Table Connector Descriptor Source
OlpStreamConnection streamSource =
    OlpStreamConnectorDescriptorFactory.create(
            HRN.fromString(inputCatalogHrn), "sample-version-layer")
        .createConnectorDescriptorWithSchema(properties);

// register the Table Source
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

tEnv.connect(streamSource.connectorDescriptor())
    .withSchema(streamSource.schema())
    .inAppendMode()
    .createTemporaryTable("InputTable");

The source factory supports the following properties for Version layers:

  • olp.layer.query: a string written in the RSQL query language to query the version layer. Before creating a Table, the factory fetches the catalog configuration using the passed HRN. Then it checks the data format and schema if it exists for the passed layerId. As the last step, Flink Connector automatically translates the layer schema into a Flink Table schema.
  • olp.blob.batch.size: the maximum number of blobs that are being read in parallel in one flink task. The number of tasks corresponds to the set parallelism. As a result, the number of blobs that your pipeline can read in parallel equals the parallelism level times the value of this property. The default value is 10.
  • olp.read.blob.timeout: the overall timeout in milliseconds that is applied when reading a blob from the Blob API. The default value is 300000 milliseconds.

Data Formats

The Flink Connector supports the following data formats for Version layer payload:

  • Raw. The decoding and encoding logic is not applied and you get your data payload as an array of bytes. Your Table schema appears as follows:

    root
      |-- data: Array[Byte]
      |-- mt_partition: String
      |-- mt_timestamp: Long
      |-- mt_checksum: String
      |-- mt_crc: String
      |-- mt_dataSize: Long
      |-- mt_version: Long
      |-- mt_compressedDataSize: Long
    

    The column with the payload data is called data. The metadata columns follow the data column and have the mt_ prefix. In the list of metadata columns, only mt_partition is a required column. The rest are optional and you can use null as a value for them.

    This format is used if your layer content type is configured as application/octet-stream.

  • Protobuf. Flink uses the attached Protobuf schema (that you specify in your layer configuration) to derive a Flink Table schema.

    root
      |-- protobuf_field_1: String
      |-- protobuf_field_2: String
      |-- probobuf_field_3.nested_column: Long
      |-- ...
      |-- mt_partition: String
      |-- mt_timestamp: Long
      |-- mt_checksum: String
      |-- mt_crc: String
      |-- mt_dataSize: Long
      |-- mt_version: Long
      |-- mt_compressedDataSize: Long
    

    The Flink Connector puts the top level protobuf fields as the top level Row columns, then the metadata columns follow.

    This format is used if your layer content type is configured as application/x-protobuf and you have a specified schema. If the schema is not specified, an error will be thrown.

    Note:

    Self-referencing protobuf fields are not supported because there is no way to represent them in the Flink TypeInformation-based schema.

  • Avro. Flink uses the passed Avro schema (that you specify in the factory Map) to derive a Flink Table schema.

    root
      |-- avro_field_1: String
      |-- avro_field_2: String
      |-- ...
      |-- mt_metadata: Map[String, String]
      |-- mt_timestamp: Long
      |-- mt_checksum: String
      |-- mt_crc: String
      |-- mt_dataSize: Long
      |-- mt_compressedDataSize: Long
    

    The Flink Connector puts the top level Avro fields as the top level Row columns, then the metadata columns follow.

    This format is used if your layer content type is configured as application/x-avro-binary and you have a specified schema. If the schema is not specified, an error will be thrown.

  • Parquet. Flink uses the passed Avro schema (that you specify in the factory Map) to derive a Flink Table schema.

    root
      |-- parquet_field_1: String
      |-- parquet_field_2: String
      |-- ...
      |-- mt_metadata: Map[String, String]
      |-- mt_timestamp: Long
      |-- mt_checksum: String
      |-- mt_crc: String
      |-- mt_dataSize: Long
      |-- mt_compressedDataSize: Long
    

    The Flink Connector puts the top level parquet fields as the top level Row columns, then the metadata columns follow.

    This format is used if your layer content type is configured as application/x-parquet and you have a specified schema. If the schema is not specified, an error will be thrown.

    The Apache parquet-avro module expects the hadoop client to be available in the class path.

    The hadoop client is not provided by the streaming environment at the moment. As a result, if you want to use the parquet-format you have to include the hadoop client dependency in your fat jar:

Maven
sbt
<dependencies>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.7.3</version>
        <scope>compile</scope>
        <exclusions>
            <exclusion>
                <groupId>org.apache.htrace</groupId>
                <artifactId>htrace-core</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
</dependencies>
libraryDependencies ++=
Seq("org.apache.hadoop" % "hadoop-client" % "2.7.3" exclude ("org.apache.htrace", "htrace-core"))
  • Other formats

    If your layer uses a format other than the described formats, an error will be thrown.

Table Source have the same schema for the same layer.

You can always print your Table schema using the standard Flink API:

Scala
Java
// imagine that we have already registered InputTable
tEnv.from("InputTable").printSchema()
// imagine that we have already registered SDII_TABLE
tEnv.from("SDII_TABLE").printSchema();

Read Raw Data

Using SQL:

Scala
Java
/// [create-table-source]
// define the properties
val properties = Map(
  "olp.layer.query" -> "mt_version==LATEST"
).asJava

// create the Table Source
val streamSource: OlpStreamConnection =
  OlpStreamConnectorDescriptorFactory(HRN(inputCatalogHrn), "sample-versioned-layer")
    .createConnectorDescriptorWithSchema(properties)

// register the Table Source
val tEnv = StreamTableEnvironment.create(env)

tEnv
  .connect(streamSource.connectorDescriptor)
  .withSchema(streamSource.schema)
  .inAppendMode()
  .createTemporaryTable("InputTable")

/// [create-table-source]
// define the properties
Map<String, String> properties = new HashMap<>();
properties.put("olp.layer.query", "mt_version==LATEST");

// create the Table Connector Descriptor Source
OlpStreamConnection streamSource =
    OlpStreamConnectorDescriptorFactory.create(
            HRN.fromString(inputCatalogHrn), "sample-version-layer")
        .createConnectorDescriptorWithSchema(properties);

// register the Table Source
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

tEnv.connect(streamSource.connectorDescriptor())
    .withSchema(streamSource.schema())
    .inAppendMode()
    .createTemporaryTable("InputTable");

Read Protobuf Data

Using SQL:

Scala
Java
val sourceProperties = Map[String, String]("olp.layer.query" -> "mt_version==LATEST").asJava

val streamSource: OlpStreamConnection =
  OlpStreamConnectorDescriptorFactory(HRN(inputCatalogHrn), "sample-versioned-layer")
    .createConnectorDescriptorWithSchema(sourceProperties)

val tEnv = StreamTableEnvironment.create(env)

tEnv
  .connect(streamSource.connectorDescriptor)
  .withSchema(streamSource.schema)
  .inAppendMode()
  .createTemporaryTable("InputTable")

tEnv.from("InputTable").printSchema()

val result: Table = tEnv.sqlQuery("""
SELECT
    tileId,
    mt_timestamp,
    mt_checksum,
    mt_dataSize,
    mt_crc
FROM InputTable
  """)

tEnv
  .toAppendStream[Row](result)
  .print()
Map<String, String> properties = new HashMap<>();
properties.put("olp.layer.query", "mt_version==LATEST");

OlpStreamConnection streamSource =
    OlpStreamConnectorDescriptorFactory.create(
            HRN.fromString(inputCatalogHrn), "sample-version-layer")
        .createConnectorDescriptorWithSchema(properties);

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

tEnv.connect(streamSource.connectorDescriptor())
    .withSchema(streamSource.schema())
    .inAppendMode()
    .createTemporaryTable("InputTable");

Table result =
    tEnv.sqlQuery(
        "SELECT 'Berlin', event_timestamp, latitude, longitude, mt_timestamp, mt_checksum, mt_crc, mt_dataSize, mt_compressedDataSize FROM InputTable");

DataStream<Row> stream = tEnv.toAppendStream(result, Row.class);
stream.print();

Read Avro Data

Using SQL:

Scala
Java
val tEnv = StreamTableEnvironment.create(env)

    val layerSchema =
      """
{
  "type" : "record",
  "name" : "Event",
  "namespace" : "my.example",
  "fields" : [
    {"name" : "city", "type" : "string"},
    {"name" : "event_timestamp", "type" : "long"},
    {"name" : "latitude", "type" : "double"},
    {"name" : "longitude", "type" : "double"}
  ]
}
    """
    val sourceProperties = Map(
      "olp.catalog.layer-schema" -> layerSchema,
      "olp.layer.query" -> "mt_version==LATEST"
    ).asJava

    val streamSource: OlpStreamConnection =
      OlpStreamConnectorDescriptorFactory(HRN(inputCatalogHrn), "version-layer-avro-output")
        .createConnectorDescriptorWithSchema(sourceProperties)

    tEnv
      .connect(streamSource.connectorDescriptor)
      .withSchema(streamSource.schema)
      .inAppendMode()
      .createTemporaryTable("InputTable")

    val result: Table = tEnv.sqlQuery("""
    SELECT
        city,
        event_timestamp,
        latitude,
        longitude,
        mt_timestamp,
        mt_checksum,
        mt_dataSize,
        mt_crc
    FROM InputTable
      """)

    tEnv
      .toAppendStream[Row](result)
      .print()
Map<String, String> properties = new HashMap<>();
String inputLayerSchema =
    "{\"type\" : \"record\", \"name\" : \"Event\", \"namespace\" : \"my.example\", \"fields\" : [ {\"name\" : \"event_timestamp\", \"type\" : \"long\"}, {\"name\" : \"latitude\", \"type\" : \"double\"}, {\"name\" : \"longitude\", \"type\" : \"double\"} ] }";

properties.put("olp.layer.query", "mt_version==LATEST");
properties.put("olp.catalog.layer-schema", inputLayerSchema);
OlpStreamConnection streamSource =
    OlpStreamConnectorDescriptorFactory.create(
            HRN.fromString(inputCatalogHrn), "sample-version-layer")
        .createConnectorDescriptorWithSchema(properties);

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

tEnv.connect(streamSource.connectorDescriptor())
    .withSchema(streamSource.schema())
    .inAppendMode()
    .createTemporaryTable("InputTable");

Table result =
    tEnv.sqlQuery(
        "SELECT 'Berlin', event_timestamp, latitude, longitude, mt_timestamp, mt_checksum, mt_crc, mt_dataSize, mt_compressedDataSize FROM InputTable");

DataStream<Row> stream = tEnv.toAppendStream(result, Row.class);
stream.print();

Read Parquet Data

Using SQL:

Scala
Java
val tEnv = StreamTableEnvironment.create(env)

    val inputLayerSchema = """
{
  "type" : "record",
  "name" : "Event",
  "namespace" : "my.example",
  "fields" : [
    {"name" : "event_timestamp", "type" : "long"},
    {"name" : "latitude", "type" : "double"},
    {"name" : "longitude", "type" : "double"}
  ]
}
    """

    val sourceProperties = Map(
      "olp.catalog.layer-schema" -> inputLayerSchema,
      "olp.layer.query" -> "mt_version==LATEST"
    ).asJava

    val streamSource: OlpStreamConnection =
      OlpStreamConnectorDescriptorFactory(HRN(inputCatalogHrn), "version-layer-parquet-input")
        .createConnectorDescriptorWithSchema(sourceProperties)

    tEnv
      .connect(streamSource.connectorDescriptor)
      .withSchema(streamSource.schema)
      .inAppendMode()
      .createTemporaryTable("InputTable")

    val result: Table = tEnv.sqlQuery("""
    SELECT
        city,
        event_timestamp,
        latitude,
        longitude,
        mt_timestamp,
        mt_checksum,
        mt_dataSize,
        mt_crc
    FROM InputTable
      """)

    tEnv
      .toAppendStream[Row](result)
      .print()
Map<String, String> properties = new HashMap<>();
String inputLayerSchema =
    "{\"type\" : \"record\", \"name\" : \"Event\", \"namespace\" : \"my.example\", \"fields\" : [ {\"name\" : \"event_timestamp\", \"type\" : \"long\"}, {\"name\" : \"latitude\", \"type\" : \"double\"}, {\"name\" : \"longitude\", \"type\" : \"double\"} ] }";

properties.put("olp.layer.query", "mt_version==LATEST");
properties.put("olp.catalog.layer-schema", inputLayerSchema);
OlpStreamConnection streamSource =
    OlpStreamConnectorDescriptorFactory.create(
            HRN.fromString(inputCatalogHrn), "sample-version-layer")
        .createConnectorDescriptorWithSchema(properties);

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.connect(streamSource.connectorDescriptor())
    .withSchema(streamSource.schema())
    .inAppendMode()
    .createTemporaryTable("InputTable");

Table result =
    tEnv.sqlQuery(
        "SELECT 'Berlin', event_timestamp, latitude, longitude, mt_timestamp, mt_checksum, mt_crc, mt_dataSize, mt_compressedDataSize FROM InputTable");

DataStream<Row> stream = tEnv.toAppendStream(result, Row.class);
stream.print();

results matching ""

    No results matching ""