Flink Connector Integration with Interactive Map Layers

Create Table Sink and Table Source for Interactive Map Layer

The main entry point of the Flink Connector API is OlpStreamConnectorHelper.

Scala
Java
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnectorHelper
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnectorHelper;

An instance of OlpStreamConnectorHelper used to create flink.table.api.Schema and build SQL statement. The following code snippet shows how to create an instance of OlpStreamConnectorHelper, build flink.table.api.Schema and create table with given schema and options:

Scala
Java
// define the properties
val sourceProperties =
  Map(
    "olp.connector.mode" -> "read",
    "olp.connector-refresh-interval" -> "-1", // no continuous reading
    "olp.connector.metadata-columns" -> "true"
  )

// create the Table Connector Descriptor Source
val sourceHelper: OlpStreamConnectorHelper =
  OlpStreamConnectorHelper(HRN(inputCatalogHrn), inputLayerId, sourceProperties)

// register the Table Source
val tEnv = StreamTableEnvironment.create(env)

tEnv.executeSql(
  s"CREATE TABLE InputTable ${sourceHelper.prebuiltSchema(tEnv).build()} " +
    s"WITH ${sourceHelper.options}")
OlpStreamConnectorHelper sourceHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(inputCatalogHrn), inputLayerId, sourceProperties);
Schema sourceSchema = sourceHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
    String.format("CREATE TABLE InputTable %s WITH %s", sourceSchema, sourceHelper.options()));

The source factory supports the following properties for interactive map sources:

  • olp.connector.mode: read
  • olp.layer.query: specifies an RSQL query that is used to query the volatile layer. If it is not defined or empty, all features within web mercator bounds (latitude between -85.05° and +85.05°) will be read.
  • olp.connector-refresh-interval: interval in milliseconds for detecting changes in the layer. Use a value of -1 to disable continuous reading. The default value is 60000.
  • olp.connector.max-features-per-request: limits the number of features requested from the interactive map layer by the connector in a single call. Adjust this if the layer contains very big features, default is 10000.
  • olp.connector.ignore-invalid-partitions: if there are more features than allowed (via olp.connector.max-features-per-request property) at the same spot in the highest zoom level, the connector will not try to load them if set to true. The default value is false.
  • olp.connector.download-parallelism: the maximum number of read requests executed in parallel in one flink task. As the number of tasks corresponds to the configured parallelism, the number of read requests the pipeline can execute in parallel is the value of this property multiplied by the task parallelism. The default value is 5.
  • olp.connector.download-timeout: the overall timeout in milliseconds that is applied for reading from the interactive map layer. The default value is 300000.

You create a Table Sink the same way as Source with OlpStreamConnectorHelper:

Scala
Java
// define the properties
val sinkProperties =
  Map(
    "olp.connector.mode" -> "write",
    "olp.connector.metadata-columns" -> "true"
  )

// create the Table Connector Descriptor Sink
val sinkHelper: OlpStreamConnectorHelper =
  OlpStreamConnectorHelper(HRN(outputCatalogHrn), outputLayerId, sinkProperties)

// register the Table Sink

tEnv.executeSql(
  s"CREATE TABLE OutputTable ${sinkHelper.prebuiltSchema(tEnv).build()} " +
    s"WITH ${sinkHelper.options}")
OlpStreamConnectorHelper sinkHelper =
    OlpStreamConnectorHelper.create(
        HRN.fromString(outputCatalogHrn), outputLayerId, sinkProperties);

Schema sinkSchema = sinkHelper.prebuiltSchema(tEnv).build();
tEnv.executeSql(
    String.format("CREATE TABLE OutputTable %s WITH %s", sinkSchema, sinkHelper.options()));

The sink factory supports the following properties for interactive map sinks:

  • olp.connector.mode: write
  • olp.connector.aggregation-window: defines the time interval in milliseconds for updating the interactive map layer with features modified by the pipeline. The default value is 1000 milliseconds.
  • olp.connector.upload-parallelism: the maximum number of write requests executed in parallel in one flink task. As the number of tasks corresponds to the configured parallelism, the number of read requests the pipeline can execute in parallel is the value of this property multiplied by the task parallelism. The default value is 5.
  • olp.connector.upload-timeout: the overall timeout in milliseconds that is applied for writing to the interactive map layer. The default value is 300000.

For a general description of the flink connector configuration, see here.

Data Schemas

Before creating a table schema, the helper fetches the catalog configuration using the passed HRN. Then it checks the data format for the passed layerId. As the last step, Flink Connector automatically derives flink.table.api.Schema.

Table Source and Sink have different schemas for the same interactive map layer:

Table Source schema contains all columns

root
|-- geometry: ROW<`type` STRING, `coordinates` STRING>
|-- properties: MAP<STRING, STRING>
|-- customMembers: MAP<STRING, STRING>
|-- mt_id: STRING
|-- mt_tags: ARRAY<STRING>
|-- mt_datahub: ROW<`mt_updatedAt` BIGINT, `mt_createdAt` BIGINT>

Table Sink schema contains writable columns only

root
|-- geometry: ROW<`type` STRING, `coordinates` STRING>
|-- properties: MAP<STRING, STRING>
|-- customMembers: MAP<STRING, STRING>
|-- mt_id: STRING
|-- mt_tags: ARRAY<STRING>

The columns are derived from geojson feature fields

Column Description
geometry.type Point, LineString, Polygon, MultiPoint, MultiLineString or MultiPolygon
geometry.coordinates Stringified geojson coordinates. For the meaning of geojson coordinates, see chapter Geo-coordinates.
properties Map of geojson feature's properties, each value as stringified json
customMembers Map of members not described in geojson specification, each value as stringified json
mt_id Feature id
mt_tags Array of tags associated with the feature
mt_datahub.mt_updatedAt Unix timestamp (in milliseconds) of last update in interactive map layer
mt_datahub.mt_createdAt Unix timestamp (in milliseconds) of creation in interactive map layer

SQL functions to encode or decode JSON text

Stringified JSON can be converted to typed data by user defined functions

Scala
Java
tEnv.executeSql(
  "CREATE FUNCTION fromJsonString AS 'com.here.platform.data.client.flink.common.sqlfunctions.FromJsonString'")
tEnv.executeSql(
  "CREATE FUNCTION toJsonString AS 'com.here.platform.data.client.flink.common.sqlfunctions.ToJsonString'")
tEnv.createTemporaryFunction("fromJsonString", FromJsonString.class);
tEnv.createTemporaryFunction("toJsonString", ToJsonString.class);

Read Interactive Map Data

Scala
Java
val table = tEnv.sqlQuery("SELECT fromJsonString(geometry.coordinates) FROM InputTable")
tEnv.sqlQuery("SELECT fromJsonString(geometry.coordinates) FROM InputTable");

Write Interactive Map Data

Scala
Java
// column order is geometry, properties, customMembers, mt_id, mt_tags
tEnv.executeSql("""INSERT OVERWRITE OutputTable VALUES
                  |(('Point','[8.0, 50.0]'),
                  | MAP[
                  |  'p1', toJsonString(1),
                  |  'p2', toJsonString('value2'),
                  |  'p3', '{
                  |    "a": false,
                  |    "b": 3.2,
                  |    "c": "p3.3"
                  |   }'
                  | ],
                  | MAP['r1', toJsonString(true)],
                  | 'id1',
                  | ARRAY['tagA','tagB']
                  |)""".stripMargin)
// column order is geometry, properties, customMembers, mt_id, mt_tags
tEnv.executeSql(
    "INSERT OVERWRITE OutputTable VALUES "
        + "(('Point','[8.0, 50.0]'), "
        + " MAP[ "
        + "  'p1', toJsonString(1), "
        + "  'p2', toJsonString('value2'), "
        + "  'p3', '{ "
        + "    \"a\": false, "
        + "    \"b\": 3.2, "
        + "    \"c\": \"p3.3\" "
        + "   }' "
        + " ], "
        + " MAP['r1', toJsonString(true)], "
        + " 'id1', "
        + " ARRAY['tagA','tagB'] "
        + ")");

Read and Write Interactive Map Data

Scala
Java
tEnv.executeSql(
  "INSERT INTO OutputTable " +
    "SELECT geometry, properties, customMembers, mt_id, mt_tags FROM InputTable")
tEnv.executeSql(
    "INSERT INTO OutputTable "
        + "SELECT geometry, properties, customMembers, mt_id, mt_tags FROM InputTable");

results matching ""

    No results matching ""