Flink Connector Integration with Index Layers

Create Table Sink and Table Source for Index Layer

The main entry point of the Flink Connector API is OlpStreamConnectorHelper.

Scala
Java
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnectorHelper
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnectorHelper;

An instance of OlpStreamConnectorHelper used to create flink.table.api.Schema and build SQL statement. The following code snippet shows how to create an instance of OlpStreamConnectorHelper, build flink.table.api.Schema and create table with given schema and options:

Scala
Java
// define the properties
val sourceProperties =
  Map(
    "olp.layer.query" -> "event_id=in=(1,2,3)",
    "olp.connector.query-parallelism" -> "100"
  )

// create the Table Connector Descriptor Source
val sourceHelper: OlpStreamConnectorHelper =
  OlpStreamConnectorHelper(HRN(inputCatalogHrn), "index-layer-protobuf-input", sourceProperties)

val tEnv = StreamTableEnvironment.create(env)
// register the Table Source

tEnv.executeSql(
  s"CREATE TABLE InputTable ${sourceHelper.prebuiltSchema(tEnv).build()} " +
    s"WITH ${sourceHelper.options}")
// register the Table Source
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

OlpStreamConnectorHelper sourceHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(inputCatalogHrn), inputLayerId, sourceProperties);
Schema sourceSchema = sourceHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
    String.format("CREATE TABLE InputTable %s WITH %s", sourceSchema, sourceHelper.options()));

The source factory supports the following properties for Index layers:

  • olp.layer.query: specifies an RSQL query that is used to query the index layer. If it is not defined, the value "timestamp=ge=0" will be used by default, and it would mean that all the partitions will be read.
  • olp.connector.query-parallelism: a flag to specify the number of sub-queries and indirectly sets the level of parallelism used for querying the metadata. Default value is equals 20.
  • olp.catalog.layer-schema: applicable only for the parquet and avro data formats. It is an Avro schema string that uses JSON format.
  • olp.connector.download-parallelism: the maximum number of blobs that are being read in parallel in one flink task. The number of tasks corresponds to the set parallelism. As a result, the number of blobs that your pipeline can read in parallel equals to the parallelism level times the value of this property. The default value is 10.
  • olp.connector.download-timeout: the overall timeout in milliseconds that is applied for reading a blob from the Blob API. The default value is 300000 milliseconds.

To create table schema, the helper fetches the catalog configuration using the passed HRN. Then it checks the data format and schema if it exists for the passed layerId. As the last step, Flink Connector automatically translates the layer schema into a flink.table.api.Schema.

You create a Table Sink the same way as Source with OlpStreamConnectorHelper:

Scala
Java
val sinkHelper: OlpStreamConnectorHelper =
  OlpStreamConnectorHelper(HRN(outputCatalogHrn), "index-layer-protobuf-output", Map.empty)

tEnv.executeSql(
  s"CREATE TABLE OutputTable ${sinkHelper.prebuiltSchema(tEnv).build()} " +
    s"WITH ${sinkHelper.options}")
OlpStreamConnectorHelper sinkHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(outputCatalogHrn), outputLayerId, new HashMap<>());

Schema sinkSchema = sinkHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
    String.format("CREATE TABLE OutputTable %s WITH %s", sinkSchema, sinkHelper.options()));

The sink factory supports the following properties for Index layers:

  • olp.catalog.layer-schema: applicable only for the parquet and avro data formats. It is an Avro schema string that uses JSON format.
  • olp.connector.aggregation-window: an interval in milliseconds that defines how often the sink should aggregate rows with the same index columns together. The default value is 10000 milliseconds. The property applies only for the avro and parquet formats.
  • olp.connector.publication-window: defines how often metadata are published to the Publish API. The default value is 1000 milliseconds. If the value is defined as -1, metadata will not be published.

Data Formats

The Flink Connector supports the following data formats for Index layer payload:

  • Raw. The decoding and encoding logic is not applied, and you get your data payload as an array of bytes. Your Table schema appears as follows:

    root
      |-- data: Array[Byte]
      |-- idx_index_field1: Long
      |-- idx_index_field2: String
      |-- ...
      |-- mt_metadata: Map[String, String]
      |-- mt_timestamp: Long
      |-- mt_checksum: String
      |-- mt_crc: String
      |-- mt_dataSize: Long
      |-- mt_compressedDataSize: Long
    

    The column with the payload data is called data. The user defined index fields follow the data column and have the idx_ prefix. The metadata columns follow the index columns and have the mt_ prefix.

    This format is used if your layer content type is configured as application/octet-stream.

  • Protobuf. Flink uses the attached Protobuf schema (that you specify in your layer configuration) to derive a Flink Table schema.

    root
      |-- protobuf_field_1: String
      |-- protobuf_field_2: String
      |-- probobuf_field_3.nested_column: Long
      |-- ...
      |-- idx_index_field1: Long
      |-- idx_index_field2: String
      |-- ...
      |-- mt_metadata: Map[String, String]
      |-- mt_timestamp: Long
      |-- mt_checksum: String
      |-- mt_crc: String
      |-- mt_dataSize: Long
      |-- mt_compressedDataSize: Long
    

    The Flink Connector puts the top level protobuf fields as the top level Row columns, then the index and metadata columns follow.

    This format is used if your layer content type is configured as application/x-protobuf and you have a specified schema. If the schema is not specified, an error will be thrown.

    Note

    Self-referencing protobuf fields are not supported because there is no way to represent them in the Flink TypeInformation-based schema.

  • Avro. The Flink uses the passed Avro schema (that you specify in the factory Map) to derive a Flink Table schema.

    root
      |-- avro_field_1: String
      |-- avro_field_2: String
      |-- ...
      |-- idx_index_field1: Long
      |-- idx_index_field2: String
      |-- ...
      |-- mt_metadata: Map[String, String]
      |-- mt_timestamp: Long
      |-- mt_checksum: String
      |-- mt_crc: String
      |-- mt_dataSize: Long
      |-- mt_compressedDataSize: Long
    

    The Flink Connector puts the top level Avro fields as the top level Row columns, then the index and metadata columns follow.

    This format is used if your layer content type is configured as application/x-avro-binary and you have a specified schema. If the schema is not specified, an error will be thrown.

    WARNING: New version of connector is not support metadata columns for avro data type.

  • Parquet. Flink uses the passed Avro schema (that you specify in the factory Map) to derive a Flink Table schema.

    root
      |-- parquet_field_1: String
      |-- parquet_field_2: String
      |-- ...
      |-- idx_index_field1: Long
      |-- idx_index_field2: String
      |-- ...
      |-- mt_metadata: Map[String, String]
      |-- mt_timestamp: Long
      |-- mt_checksum: String
      |-- mt_crc: String
      |-- mt_dataSize: Long
      |-- mt_compressedDataSize: Long
    

    The Flink Connector puts the top level parquet fields as the top level Row columns, then the index and metadata columns follow.

    This format is used if your layer content type is configured as application/x-parquet and you have a specified schema. If the schema is not specified, an error will be thrown.

    WARNING: New version of connector is not support metadata columns for parquet data type.

    The hadoop client is not provided by the streaming environment at the moment. As a result, if you want to use the parquet-format you have to include the hadoop client dependency in your fat jar:

Maven
sbt
<dependencies>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.7.3</version>
        <scope>compile</scope>
        <exclusions>
            <exclusion>
                <groupId>org.apache.htrace</groupId>
                <artifactId>htrace-core</artifactId>
            </exclusion>
            <exclusion>
                <groupId>xerces</groupId>
                <artifactId>xercesImpl</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
</dependencies>
libraryDependencies ++=
Seq("org.apache.hadoop" % "hadoop-client" % "2.7.3" exclude ("org.apache.htrace", "htrace-core") exclude ("xerces", "xercesImpl"))
  • Other formats

    If your layer uses a format other than the described formats, an error will be thrown.

Table Source and Sink have the same schema for the same layer.

You can always print your Table schema using the standard Flink API:

Scala
Java
// imagine that we have already registered InputTable
tEnv.from("InputTable").printSchema()
tEnv.from("InputTable").printSchema();

Read and Write Raw Data

Using SQL:

Scala
Java
val tEnv = StreamTableEnvironment.create(env)

val sourceProperties =
  Map(
    "olp.layer.query" -> "event_id=in=(1,2,3)",
    "olp.connector.query-parallelism" -> "100",
    "olp.connector.metadata-columns" -> "true"
  )

val sourceHelper: OlpStreamConnectorHelper =
  OlpStreamConnectorHelper(HRN(inputCatalogHrn), "index-layer-raw-input", sourceProperties)

tEnv.executeSql(
  s"CREATE TABLE InputTable ${sourceHelper.prebuiltSchema(tEnv).build()} " +
    s"WITH ${sourceHelper.options}")

val sinkHelper: OlpStreamConnectorHelper =
  OlpStreamConnectorHelper(HRN(outputCatalogHrn),
                           "index-layer-raw-output",
                           Map("olp.connector.metadata-columns" -> "true"))

tEnv.executeSql(
  s"CREATE TABLE OutputTable ${sinkHelper.prebuiltSchema(tEnv).build()} " +
    s"WITH ${sinkHelper.options}")

tEnv.executeSql(
  """
INSERT INTO
    OutputTable
SELECT
    data,
    idx_ingestion_time,
    idx_event_id,
    idx_event_type,
    mt_metadata,
    mt_timestamp,
    mt_checksum,
    mt_crc,
    mt_dataSize,
    mt_compressedDataSize
FROM InputTable"""
)
// define the properties
Map<String, String> sourceProperties = new HashMap<>();
sourceProperties.put("olp.layer.query", "event_id=in=(1,2,3)");
sourceProperties.put("olp.connector.query-parallelism", "100");
sourceProperties.put("olp.connector.metadata-columns", "true");

// register the Table Source
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

OlpStreamConnectorHelper sourceHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(inputCatalogHrn), inputLayerId, sourceProperties);
Schema sourceSchema = sourceHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
    String.format("CREATE TABLE InputTable %s WITH %s", sourceSchema, sourceHelper.options()));

Map<String, String> sinkProperties = new HashMap<>();
sinkProperties.put("olp.connector.metadata-columns", "true");

OlpStreamConnectorHelper sinkHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(outputCatalogHrn), outputLayerId, sinkProperties);

Schema sinkSchema = sinkHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
    String.format("CREATE TABLE OutputTable %s WITH %s", sinkSchema, sinkHelper.options()));

tEnv.executeSql(
    "INSERT INTO OutputTable SELECT data, idx_ingestion_time, idx_event_id, idx_event_type, mt_metadata, mt_timestamp, mt_checksum, mt_crc, mt_dataSize, mt_compressedDataSize FROM InputTable");

Read and Write Protobuf Data

Using SQL:

Scala
Java
/// [create-table-source]
// define the properties
val sourceProperties =
  Map(
    "olp.layer.query" -> "event_id=in=(1,2,3)",
    "olp.connector.query-parallelism" -> "100"
  )

// create the Table Connector Descriptor Source
val sourceHelper: OlpStreamConnectorHelper =
  OlpStreamConnectorHelper(HRN(inputCatalogHrn), "index-layer-protobuf-input", sourceProperties)

val tEnv = StreamTableEnvironment.create(env)
// register the Table Source

tEnv.executeSql(
  s"CREATE TABLE InputTable ${sourceHelper.prebuiltSchema(tEnv).build()} " +
    s"WITH ${sourceHelper.options}")
/// [create-table-source]

/// [create-table-sink]
val sinkHelper: OlpStreamConnectorHelper =
  OlpStreamConnectorHelper(HRN(outputCatalogHrn), "index-layer-protobuf-output", Map.empty)

tEnv.executeSql(
  s"CREATE TABLE OutputTable ${sinkHelper.prebuiltSchema(tEnv).build()} " +
    s"WITH ${sinkHelper.options}")
/// [create-table-sink]

tEnv.executeSql(
  "INSERT INTO OutputTable SELECT * FROM InputTable"
)
// define the properties
Map<String, String> sourceProperties = new HashMap<>();
sourceProperties.put("olp.layer.query", "event_id=in=(1,2,3)");
sourceProperties.put("olp.connector.query-parallelism", "100");

/// [create-table-source-java]
// register the Table Source
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

OlpStreamConnectorHelper sourceHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(inputCatalogHrn), inputLayerId, sourceProperties);
Schema sourceSchema = sourceHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
    String.format("CREATE TABLE InputTable %s WITH %s", sourceSchema, sourceHelper.options()));
/// [create-table-source-java]

/// [create-table-sink-java]
OlpStreamConnectorHelper sinkHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(outputCatalogHrn), outputLayerId, new HashMap<>());

Schema sinkSchema = sinkHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
    String.format("CREATE TABLE OutputTable %s WITH %s", sinkSchema, sinkHelper.options()));
/// [create-table-sink-java]

// we assume that the input and output tables have the same schema
tEnv.executeSql("INSERT INTO OutputTable SELECT * FROM InputTable");

Read and Write Avro Data

Using SQL:

Scala
Java
val tEnv = StreamTableEnvironment.create(env)

    val inputLayerSchema = """
{
  "type" : "record",
  "name" : "Event",
  "namespace" : "my.example",
  "fields" : [
    {"name" : "event_timestamp", "type" : "long"},
    {"name" : "latitude", "type" : "double"},
    {"name" : "longitude", "type" : "double"}
  ]
}
    """

    val outputLayerSchema = """
{
  "type" : "record",
  "name" : "Event",
  "namespace" : "my.example",
  "fields" : [
    {"name" : "city", "type" : "string"},
    {"name" : "event_timestamp", "type" : "long"},
    {"name" : "latitude", "type" : "double"},
    {"name" : "longitude", "type" : "double"}
  ]
}
    """

    val sourceProperties =
      Map(
        "olp.catalog.layer-schema" -> inputLayerSchema,
        "olp.layer.query" -> "event_id=in=(1,2,3)",
        "olp.connector.query-parallelism" -> "100"
      )

    val sourceHelper: OlpStreamConnectorHelper =
      OlpStreamConnectorHelper(HRN(inputCatalogHrn), "index-layer-avro-input", sourceProperties)

    tEnv.executeSql(
      s"CREATE TABLE InputTable ${sourceHelper.prebuiltSchema(tEnv).build()} " +
        s"WITH ${sourceHelper.options}")

    val sinkProperties =
      Map(
        "olp.catalog.layer-schema" -> outputLayerSchema
      )

    val sinkHelper: OlpStreamConnectorHelper =
      OlpStreamConnectorHelper(HRN(outputCatalogHrn), "index-layer-avro-output", sinkProperties)

    tEnv.executeSql(
      s"CREATE TABLE OutputTable ${sinkHelper.prebuiltSchema(tEnv).build()} " +
        s"WITH ${sinkHelper.options}")

    tEnv.executeSql(
      """
INSERT INTO OutputTable
    SELECT
        'Berlin',
        event_timestamp,
        latitude,
        longitude,
        idx_ingestion_time,
        idx_event_id,
        idx_event_type
    FROM InputTable"""
    )
// define source properties
Map<String, String> sourceProperties = new HashMap<>();
sourceProperties.put("olp.catalog.layer-schema", inputLayerSchema);
sourceProperties.put("olp.layer.query", "event_id=in=(1,2,3)");
sourceProperties.put("olp.connector.query-parallelism", "100");

// register the Table Source
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

OlpStreamConnectorHelper sourceHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(inputCatalogHrn), inputLayerId, sourceProperties);
Schema sourceSchema = sourceHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
    String.format("CREATE TABLE InputTable %s WITH %s", sourceSchema, sourceHelper.options()));

// define sink properties
Map<String, String> sinkProperties = new HashMap<>();
sinkProperties.put("olp.catalog.layer-schema", outputLayerSchema);

OlpStreamConnectorHelper sinkHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(outputCatalogHrn), outputLayerId, sinkProperties);

Schema sinkSchema = sinkHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
    String.format("CREATE TABLE OutputTable %s WITH %s", sinkSchema, sinkHelper.options()));

tEnv.executeSql(
    "INSERT INTO OutputTable SELECT 'Berlin', event_timestamp, latitude, longitude, idx_ingestion_time, idx_event_id, idx_event_type FROM InputTable");

Read and Write Parquet Data

Using SQL:

Scala
Java
val tEnv = StreamTableEnvironment.create(env)

    val inputLayerSchema = """
{
  "type" : "record",
  "name" : "Event",
  "namespace" : "my.example",
  "fields" : [
    {"name" : "event_timestamp", "type" : "long"},
    {"name" : "latitude", "type" : "double"},
    {"name" : "longitude", "type" : "double"}
  ]
}
    """

    val outputLayerSchema = """
{
  "type" : "record",
  "name" : "Event",
  "namespace" : "my.example",
  "fields" : [
    {"name" : "city", "type" : "string"},
    {"name" : "event_timestamp", "type" : "long"},
    {"name" : "latitude", "type" : "double"},
    {"name" : "longitude", "type" : "double"}
  ]
}
    """

    val sourceProperties =
      Map(
        "olp.catalog.layer-schema" -> inputLayerSchema,
        "olp.layer.query" -> "event_id=in=(1,2,3)",
        "olp.connector.query-parallelism" -> "100"
      )

    val sourceHelper: OlpStreamConnectorHelper =
      OlpStreamConnectorHelper(HRN(inputCatalogHrn), "index-layer-parquet-input", sourceProperties)

    tEnv.executeSql(
      s"CREATE TABLE InputTable ${sourceHelper.prebuiltSchema(tEnv).build()} " +
        s"WITH ${sourceHelper.options}")

    val sinkHelper: OlpStreamConnectorHelper =
      OlpStreamConnectorHelper(HRN(outputCatalogHrn),
                               "index-layer-parquet-output",
                               Map("olp.catalog.layer-schema" -> outputLayerSchema))

    tEnv.executeSql(
      s"CREATE TABLE OutputTable ${sinkHelper.prebuiltSchema(tEnv).build()} " +
        s"WITH ${sinkHelper.options}")

    tEnv.executeSql(
      """
INSERT INTO OutputTable
    SELECT
        'Berlin',
        event_timestamp,
        latitude,
        longitude,
        idx_ingestion_time,
        idx_event_id,
        idx_event_type
    FROM InputTable"""
    )
// define source properties
Map<String, String> sourceProperties = new HashMap<>();
sourceProperties.put("olp.catalog.layer-schema", inputLayerSchema);
sourceProperties.put("olp.layer.query", "event_id=in=(1,2,3)");
sourceProperties.put("olp.connector.query-parallelism", "100");

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

OlpStreamConnectorHelper sourceHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(inputCatalogHrn), "index-layer-parquet-input", sourceProperties);
Schema sourceSchema = sourceHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
    String.format("CREATE TABLE InputTable %s WITH %s", sourceSchema, sourceHelper.options()));

// define sink properties
Map<String, String> sinkProperties = new HashMap<>();
sinkProperties.put("olp.catalog.layer-schema", outputLayerSchema);

OlpStreamConnectorHelper sinkHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(outputCatalogHrn), "index-layer-parquet-output", sinkProperties);

Schema sinkSchema = sinkHelper.prebuiltSchema(tEnv).build();

tEnv.executeSql(
    String.format("CREATE TABLE OutputTable %s WITH %s", sinkSchema, sinkHelper.options()));

tEnv.executeSql(
    "INSERT INTO OutputTable SELECT 'Berlin', event_timestamp, latitude, longitude, idx_ingestion_time, idx_event_id, idx_event_type FROM InputTable");

results matching ""

    No results matching ""