Flink Connector Integration with Interactive Map Layers

Create Table Sink and Table Source for Interactive Map Layer

The main entry point of the Flink Connector API is OlpStreamConnectorDescriptorFactory.

Scala
Java
import com.here.platform.data.client.flink.scaladsl.{
  OlpStreamConnection,
  OlpStreamConnectorDescriptorFactory
}
import com.here.platform.data.client.flink.javadsl.OlpStreamConnectorDescriptorFactory;
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnection;

An instance of OlpStreamConnection consisting of ConnectorDescriptor and SchemaDescriptor is required to register the table source in the TableEnvironment. The following code snippet shows how you create an instance of OlpStreamConnection using OlpStreamConnectorDescriptorFactory and register it in the TableEnvironment:

Scala
Java
// define the properties
val sourceProperties =
  Map(
    "olp.connector.mode" -> "read",
    "olp.connector-refresh-interval" -> "-1" // no continuous reading
  ).asJava

// create the Table Connector Descriptor Source
val streamSource: OlpStreamConnection =
  OlpStreamConnectorDescriptorFactory(HRN(inputCatalogHrn), inputLayerId)
    .createConnectorDescriptorWithSchema(sourceProperties)

// register the Table Source
val tEnv = StreamTableEnvironment.create(env)
tEnv
  .connect(streamSource.connectorDescriptor)
  .withSchema(streamSource.schema)
  .inAppendMode()
  .createTemporaryTable("InputTable")
// define the properties
Map<String, String> sourceProperties = new HashMap<>();
sourceProperties.put("olp.connector.mode", "read");
sourceProperties.put("olp.connector-refresh-interval", "-1"); // no continuous reading

// create the Table Connector Descriptor Source
OlpStreamConnection streamSource =
    OlpStreamConnectorDescriptorFactory.create(HRN.fromString(inputCatalogHrn), inputLayerId)
        .createConnectorDescriptorWithSchema(sourceProperties);

// register the Table Source
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.connect(streamSource.connectorDescriptor())
    .withSchema(streamSource.schema())
    .inAppendMode()
    .createTemporaryTable("InputTable");

The connector descriptor factory supports the following properties for interactive map sources:

  • olp.connector.mode: read
  • olp.layer.query: specifies an RSQL query that is used to query the volatile layer. If it is not defined or empty, all features within web mercator bounds (latitude between -85.05° and +85.05°) will be read.
  • olp.connector-refresh-interval: interval in milliseconds for detecting changes in the layer. Use a value of -1 to disable continuous reading. The default value is 60000.
  • olp.connector.max-features-per-request: limits the number of features requested from the interactive map layer by the connector in a single call. Adjust this if the layer contains very big features, default is 10000.
  • olp.connector.download-parallelism: the maximum number of read requests executed in parallel in one flink task. As the number of tasks corresponds to the configured parallelism, the number of read requests the pipeline can execute in parallel is the value of this property multiplied by the task parallelism. The default value is 5.
  • olp.connector.download-timeout: the overall timeout in milliseconds that is applied for reading from the interactive map layer. The default value is 300000.

You create a Table Sink using OlpStreamConnectorDescriptorFactory and register it in the TableEnvironment as follows:

Scala
Java
// define the properties
val sinkProperties =
  Map(
    "olp.connector.mode" -> "write"
  ).asJava

// create the Table Connector Descriptor Sink
val streamSink: OlpStreamConnection =
  OlpStreamConnectorDescriptorFactory(HRN(outputCatalogHrn), outputLayerId)
    .createConnectorDescriptorWithSchema(sinkProperties)

// register the Table Sink
tEnv
  .connect(streamSink.connectorDescriptor)
  .withSchema(streamSink.schema)
  .inUpsertMode()
  .createTemporaryTable("OutputTable")
// define the properties
Map<String, String> sinkProperties = new HashMap<>();
sinkProperties.put("olp.connector.mode", "write");

// create the Table Connector Descriptor Sink
OlpStreamConnection streamSink =
    OlpStreamConnectorDescriptorFactory.create(HRN.fromString(outputCatalogHrn), outputLayerId)
        .createConnectorDescriptorWithSchema(sinkProperties);

// register the Table Sink
tEnv.connect(streamSink.connectorDescriptor())
    .withSchema(streamSink.schema())
    .inUpsertMode()
    .createTemporaryTable("OutputTable");

The connector descriptor factory supports the following properties for interactive map sinks:

  • olp.connector.mode: write
  • olp.connector.aggregation-window: defines the time interval in milliseconds for updating the interactive map layer with features modified by the pipeline. The default value is 1000 milliseconds.
  • olp.connector.upload-parallelism: the maximum number of write requests executed in parallel in one flink task. As the number of tasks corresponds to the configured parallelism, the number of read requests the pipeline can execute in parallel is the value of this property multiplied by the task parallelism. The default value is 5.
  • olp.connector.upload-timeout: the overall timeout in milliseconds that is applied for writing to the interactive map layer. The default value is 300000.

For a general description of the flink connector configuration please see here.

Data Schemas

Before creating a Table, the factory fetches the catalog configuration using the passed HRN. Then it checks the data format for the passed layerId. As the last step, Flink Connector automatically derives a Flink Table schema.

Table Source and Sink have different schemas for the same interactive map layer:

Table Source schema contains all columns

root
|-- geometry: ROW<`type` STRING, `coordinates` STRING>
|-- properties: MAP<STRING, STRING>
|-- customMembers: MAP<STRING, STRING>
|-- mt_id: STRING
|-- mt_tags: ARRAY<STRING>
|-- mt_datahub: ROW<`mt_updatedAt` BIGINT, `mt_createdAt` BIGINT>

Table Sink schema contains writable columns only

root
|-- geometry: ROW<`type` STRING, `coordinates` STRING>
|-- properties: MAP<STRING, STRING>
|-- customMembers: MAP<STRING, STRING>
|-- mt_id: STRING
|-- mt_tags: ARRAY<STRING>

The columns are derived from geojson feature fields

Column Description
geometry.type Point, LineString, Polygon, MultiPoint, MultiLineString or MultiPolygon
geometry.coordinates Stringified geojson coordinates. For the meaning of geojson coordinates please see chapter Geo-coordinates.
properties Map of geojson feature's properties, each value as stringified json
customMembers Map of members not described in geojson specification, each value as stringified json
mt_id Feature id
mt_tags Array of tags associated with the feature
mt_datahub.mt_updatedAt Unix timestamp (in milliseconds) of last update in interactive map layer
mt_datahub.mt_createdAt Unix timestamp (in milliseconds) of creation in interactive map layer

SQL functions to encode or decode json text

Stringified json can be converted to typed data by user defined functions

Scala
Java
tEnv.sqlUpdate(
  "CREATE FUNCTION fromJsonString AS 'com.here.platform.data.client.flink.common.sqlfunctions.FromJsonString'")
tEnv.sqlUpdate(
  "CREATE FUNCTION toJsonString AS 'com.here.platform.data.client.flink.common.sqlfunctions.ToJsonString'")
tEnv.sqlUpdate(
    "CREATE FUNCTION fromJsonString AS 'com.here.platform.data.client.flink.common.sqlfunctions.FromJsonString'");
tEnv.sqlUpdate(
    "CREATE FUNCTION toJsonString AS 'com.here.platform.data.client.flink.common.sqlfunctions.ToJsonString'");

Read Interactive Map Data

Scala
Java
val table = tEnv.sqlQuery("SELECT fromJsonString(geometry.coordinates) FROM InputTable")
Table table = tEnv.sqlQuery("SELECT fromJsonString(geometry.coordinates) FROM InputTable");

Write Interactive Map Data

Scala
Java
// column order is geometry, properties, customMembers, mt_id, mt_tags
tEnv.sqlUpdate("""INSERT OVERWRITE OutputTable VALUES
                 |(('Point','[8.0, 50.0]'),
                 | MAP[
                 |  'p1', toJsonString(1),
                 |  'p2', toJsonString('value2'),
                 |  'p3', '{
                 |    "a": false,
                 |    "b": 3.2,
                 |    "c": "p3.3"
                 |   }'
                 | ],
                 | MAP['r1', toJsonString(true)],
                 | 'id1',
                 | ARRAY['tagA','tagB']
                 |)""".stripMargin)
// column order is geometry, properties, customMembers, mt_id, mt_tags
tEnv.sqlUpdate(
    "INSERT OVERWRITE OutputTable VALUES "
        + "(('Point','[8.0, 50.0]'), "
        + " MAP[ "
        + "  'p1', toJsonString(1), "
        + "  'p2', toJsonString('value2'), "
        + "  'p3', '{ "
        + "    \"a\": false, "
        + "    \"b\": 3.2, "
        + "    \"c\": \"p3.3\" "
        + "   }' "
        + " ], "
        + " MAP['r1', toJsonString(true)], "
        + " 'id1', "
        + " ARRAY['tagA','tagB'] "
        + ")");

Read and Write Interactive Map Data

Scala
Java
tEnv.sqlUpdate(
  "INSERT INTO OutputTable " +
    "SELECT geometry, properties, customMembers, mt_id, mt_tags FROM InputTable")
tEnv.sqlUpdate(
    "INSERT INTO OutputTable "
        + "SELECT geometry, properties, customMembers, mt_id, mt_tags FROM InputTable");

results matching ""

    No results matching ""