Flink Connector Tutorial

Objectives: Understand how to use the Flink Connector to read and write data from different layers and data formats in a catalog.

Complexity: Easy

Time to complete: 40 min

Dependencies: Organize your work in projects

Source code: Download

The examples in this tutorial demonstrate how to use the Flink Connector provided by the Data Client Library. This provides support for interacting with Flink for stream processing workloads, allowing the use of all standard APIs and functions in Flink to read, write and delete data. For batch processing workloads, you should use the provided Spark Connector instead.

In the main part of the tutorial, we will cover the following usages:

  • Subscribe to streaming layer in protobuf format
  • Transform data from Table API to DataStream API and change the structure
  • Print data from stream and use index layer as destination for stream
  • Transfer data from one layer to another with different types

As a preparation step, you will need to create your data destination catalog with appropriate layers, so that these are in place when it comes to the main part of this tutorial. The dataset used will be sourced from the HERE Sample SDII Messages catalog, and contains simulated streaming sensor data in form of SDII messages.

Set up the Maven Project

Create the following folder structure for the project:

flink-connector
└── src
    └── main
        ├── java
        └── resources
        └── scala

You can do this with a single bash command:

mkdir -p flink-connector/src/main/{java,resources,scala}

Create the Output Catalog

Create a file named pipeline-config.conf, and populate it with the contents below, replacing {{YOUR_OUTPUT_CATALOG_HRN}} with the HRN to the catalog you created in Organize your work in projects.

pipeline.config {

  output-catalog { hrn = "{{YOUR_OUTPUT_CATALOG_HRN}}" }

  input-catalogs {
    //Please, use hrn:here-cn:data:::sample-data on China Environment
    sensorData { hrn = "hrn:here:data:::olp-sdii-sample-berlin-2" }
  }
}

The Maven POM file is similar to that in the Verify Maven Settings example, with the parent POM and dependencies sections updated:

Parent POM:

<parent>
    <groupId>com.here.platform</groupId>
    <artifactId>sdk-stream-bom</artifactId>
    <version>2.29.23</version>
    <relativePath/>
</parent>

Dependencies:

<dependencies>
    <dependency>
        <groupId>com.here.platform.data.client</groupId>
        <artifactId>flink-support_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>com.here.platform.pipeline</groupId>
        <artifactId>pipeline-interface_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_${scala.compat.version}</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_${scala.compat.version}</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_${scala.compat.version}</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner_${scala.compat.version}</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.7.3</version>
        <scope>compile</scope>
        <exclusions>
            <exclusion>
                <groupId>org.apache.htrace</groupId>
                <artifactId>htrace-core</artifactId>
            </exclusion>
            <exclusion>
                <groupId>xerces</groupId>
                <artifactId>xercesImpl</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
</dependencies>

You will need to create the output catalog. You can perform these steps by following the steps outlined in the Create a Catalog Tutorial, using the OLP Command Line Interface (CLI).

You should use a unique identifier name for the catalog, for example {{YOUR_USERNAME}}-flink-connector-output.

For the output catalog, you can name the file flink-connector-ouput.json with the contents below.

{
  "id": "flink-connector-output",
  "name": "Simulated sensor data archive (From tutorial) flink-connector-output",
  "summary": "Archive of simulated sensor data for the FlinkConnector tutorial",
  "description": "Archive of simulated sensor data",
  "tags": [
    "Tutorial",
    "Simulated"
  ],
  "layers": [
    {
      "id": "volatile-layer-avro-data",
      "name": "volatile-layer-avro-data",
      "summary": "Simulated sensor data for the FlinkConnector tutorial",
      "description": "Simulated sensor data for the FlinkConnector tutorial",
      "contentType": "application/x-avro-binary",
      "layerType": "volatile",
      "volume": {
        "volumeType": "durable"
      },
      "partitioning": {
        "scheme": "generic"
      }
    },
    {
      "id": "index-layer-parquet-data",
      "name": "index-layer-parquet-data",
      "summary": "Simulated sensor data for the FlinkConnector tutorial",
      "description": "Simulated sensor data for the FlinkConnector tutorial",
      "contentType": "application/x-parquet",
      "layerType": "index",
      "indexProperties": {
        "indexDefinitions": [
          {
            "name": "tile_id",
            "type": "int"
          },
          {
            "name": "time_window",
            "duration": 600000,
            "type": "timewindow"
          }
        ],
        "ttl": "unlimited"
      },
      "volume": {
        "volumeType": "durable"
      },
      "partitioning": {
        "scheme": "generic"
      }
    }
  ]
}

Replace {{YOUR_CATALOG_ID}} below with your own identifier and then run the following command (on Windows, this works in Cygwin and git bash; otherwise you can run olp.bat):

olp catalog create {{YOUR_CATALOG_ID}} \
    "Simulated sensor data output from tutorial ({{YOUR_USERNAME}})" \
    --config flink-connector-output.json

This application uses the public data source to read from stream layer in protobuf data format, performing some transformations on the received data, and writing to the output volatile layer from the previously created catalog.

Scala
Java

/*
 * Copyright (c) 2018-2021 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import StreamToVolatileLayerScalaPipeline.PositionEvent
import com.here.platform.data.client.flink.scaladsl.{
  OlpStreamConnection,
  OlpStreamConnectorDescriptorFactory
}
import com.here.platform.pipeline.PipelineContext
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.Table
import com.here.olp.util.quad.factory.HereQuadFactory
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.slf4j.LoggerFactory

import java.util.{Timer, TimerTask}
import scala.collection.JavaConverters._

object StreamToVolatileLayerScalaPipeline extends App {

  private val logger = LoggerFactory.getLogger(StreamToVolatileLayerScalaPipeline.getClass)

  private val pipelineContext = new PipelineContext

  // Source and output catalogs / layers
  val sensorDataCatalogHrn = pipelineContext.config.inputCatalogs("sensorData")
  val outputCatalogHrn = pipelineContext.config.outputCatalog

  val streamingLayer = "sample-streaming-layer"
  val outputVolatileLayer = "volatile-layer-avro-data"

  // Configure stream execution environment settings
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.setParallelism(1)
  env.enableCheckpointing(5000)
  env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 10000L))

  // Define the properties
  val properties = Map(
    "olp.kafka.group-name" -> "protobuf-streaming",
    "olp.kafka.offset" -> "earliest",
    "olp.connector.flink.v2" -> "true"
  ).asJava

  // Create a TableSource
  val streamSource: OlpStreamConnection =
    OlpStreamConnectorDescriptorFactory(sensorDataCatalogHrn, streamingLayer)
      .createConnectorDescriptorWithSchema(properties)

  // Register the TableSource
  val tEnv = StreamTableEnvironment.create(env)
  tEnv
    .connect(streamSource.connectorDescriptor)
    .withSchema(streamSource.schema)
    .inAppendMode()
    .createTemporaryTable("SensorDataTable")

  // Register the user-defined functions to be used in the SQL query
  tEnv.registerFunction("computeHereTile", new ComputeTileFunction())
  tEnv.registerFunction("assignUuid", new AssignUuidFunction())

  // Define a Table containing the fields to be used in computing results
  val observationsTable: Table =
    tEnv.sqlQuery("""SELECT
                    |  assignUuid(envelope.version) AS eventId,
                    |  timeStampUTC_ms AS timestampUtc,
                    |  computeHereTile(latitude_deg, longitude_deg) AS tile
                    |FROM
                    |  SensorDataTable CROSS JOIN UNNEST(positionEstimate)
                    |""".stripMargin)

  // Domain case classes
  case class PositionEvent(eventId: String, timestampUtc: Long, tile: Long)
  case class PositionStatistics(timestampUtc: Long, tile: Long, totalObservations: Int)

  // Compute values for the number of observations by tile and time window
  val outputStream: DataStream[PositionStatistics] = tEnv
    .toAppendStream[PositionEvent](observationsTable) // Convert the records to a stream of PositionEvent
    .assignTimestampsAndWatermarks(new PositionTimestampAssigner) // Define how the event time is assigned
    .map(v => (v.timestampUtc, v.tile, 1)) // Map fields to a tuple
    .keyBy(_._2) // Key by tile id
    .timeWindow(Time.seconds(15), Time.seconds(5)) // Define the time window to use
    .sum(2)
    .map(PositionStatistics.tupled(_))
  outputStream.print()

  tEnv.createTemporaryView("PositionStatsTable", outputStream)

  // Define the avro schema in which the output data will be written
  val outputAvroSchema =
    """{
      |  "type" : "record",
      |  "name" : "Event",
      |  "namespace" : "my.flink.tutorial",
      |  "fields" : [
      |    {"name" : "city", "type" : "string"},
      |    {"name" : "total", "type" : "int"},
      |    {"name" : "timestampUtc", "type" : "long"}
      |  ]
      |}
      |""".stripMargin

  // Create TableSink for the output
  val streamSink =
    OlpStreamConnectorDescriptorFactory(outputCatalogHrn, outputVolatileLayer)
      .createConnectorDescriptorWithSchema(
        Map[String, String](
          "olp.catalog.layer-schema" -> outputAvroSchema,
          "olp.connector.flink.v2" -> "true"
        ).asJava
      )
  tEnv
    .connect(streamSink.connectorDescriptor)
    .withSchema(streamSink.schema)
    .inAppendMode()
    .createTemporaryTable("OutputIndexTable")

  // Write the result into the output table, indexed by timestamp and HERE tile id
  tEnv.sqlUpdate("""INSERT INTO
                   |  OutputIndexTable
                   |SELECT
                   |  'Berlin' AS city,
                   |  totalObservations AS total,
                   |  timestampUtc,
                   |  CAST(tile AS STRING) AS mt_partition
                   |FROM
                   |  PositionStatsTable
                   |""".stripMargin)

  try {
    val job = env.executeAsync()
    logger.info(s"Stream to $outputCatalogHrn executed")

    // remove this part to run as infinite pipeline
    new Timer(true).schedule(new TimerTask {
      override def run(): Unit = {
        job.cancel()
        logger.info(s"Stream to $outputCatalogHrn canceled")
      }
    }, 30000)

  } catch {
    case ex: Exception =>
      ex.printStackTrace()
  }
}

class ComputeTileFunction() extends ScalarFunction {
  private val tileLevel = 14

  def eval(latitude: Double, longitude: Double): Long =
    HereQuadFactory.INSTANCE
      .getMapQuadByLocation(latitude, longitude, tileLevel)
      .getLongKey
}

class AssignUuidFunction() extends ScalarFunction {

  def eval(input: String): String =
    java.util.UUID.randomUUID.toString
}

class PositionTimestampAssigner
    extends BoundedOutOfOrdernessTimestampExtractor[PositionEvent](
      Time.seconds(10) // events are out of order by max 10 seconds
    ) {

  override def extractTimestamp(event: PositionEvent): Long =
    event.timestampUtc
}



/*
 * Copyright (c) 2018-2021 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import com.here.hrn.HRN;
import com.here.olp.util.quad.factory.HereQuadFactory;
import com.here.platform.data.client.flink.javadsl.OlpStreamConnectorDescriptorFactory;
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnection;
import com.here.platform.pipeline.PipelineContext;
import java.util.*;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamToVolatileLayerPipeline {

  private static PipelineContext pipelineContext = new PipelineContext();

  // Source for the sensor data to be used as input
  private static HRN sensorDataCatalogHrn =
      pipelineContext.getConfig().getInputCatalogs().get("sensorData");
  private static HRN outputCatalogHrn = pipelineContext.getConfig().getOutputCatalog();

  private static String streamingLayer = "sample-streaming-layer";
  private static String outputIndexLayer = "volatile-layer-avro-data";

  public static void main(String[] args) {
    Logger logger = LoggerFactory.getLogger(StreamToVolatileLayerPipeline.class);

    // Configure stream execution environment settings
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env.enableCheckpointing(5000);
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 10000L));

    // Define the properties
    Map<String, String> properties = new HashMap<>();
    properties.put("olp.kafka.group-name", "protobuf-streaming");
    properties.put("olp.kafka.offset", "earliest");
    properties.put("olp.connector.flink.v2", "true");

    // Create the TableSource
    OlpStreamConnection streamSource =
        OlpStreamConnectorDescriptorFactory.create(sensorDataCatalogHrn, streamingLayer)
            .createConnectorDescriptorWithSchema(properties);

    // Register the TableSource
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
    tEnv.connect(streamSource.connectorDescriptor())
        .withSchema(streamSource.schema())
        .inAppendMode()
        .createTemporaryTable("SensorDataTable");

    // Register the user-defined functions to be used in the SQL query
    tEnv.registerFunction("computeHereTile", new ComputeTileFunction());
    tEnv.registerFunction("assignUuid", new AssignUuidFunction());

    // Define a Table containing the fields to be used in computing results
    Table observationsTable =
        tEnv.sqlQuery(
            "SELECT"
                + "  assignUuid(envelope.version) AS eventId, "
                + "  timeStampUTC_ms AS timestampUtc, "
                + "  computeHereTile(latitude_deg, longitude_deg) AS tile "
                + "FROM SensorDataTable "
                + "CROSS JOIN UNNEST(positionEstimate)");

    TupleTypeInfo<Tuple3<String, Long, Long>> tupleType =
        new TupleTypeInfo<>(Types.STRING(), Types.LONG(), Types.LONG());

    // Compute values for the number of observations by tile and time window
    DataStream<PositionStatistics> outputStream =
        tEnv.toAppendStream(observationsTable, tupleType)
            .assignTimestampsAndWatermarks(new PositionTimestampAssigner(Time.seconds(10)))
            .map(
                new MapFunction<Tuple3<String, Long, Long>, Tuple3<Long, Long, Integer>>() {
                  @Override
                  public Tuple3<Long, Long, Integer> map(Tuple3<String, Long, Long> observation)
                      throws Exception {
                    return new Tuple3<>(observation.f1, observation.f2, 1);
                  }
                })
            .keyBy(1)
            .timeWindow(Time.seconds(15), Time.seconds(5))
            .sum(2)
            .map(
                new MapFunction<Tuple3<Long, Long, Integer>, PositionStatistics>() {
                  @Override
                  public PositionStatistics map(Tuple3<Long, Long, Integer> result)
                      throws Exception {
                    return new PositionStatistics(result.f0, result.f1, result.f2);
                  }
                });

    outputStream.print();

    tEnv.createTemporaryView("PositionStatsTable", outputStream);

    // Define the avro schema in which the output data will be written
    String outputAvroSchema =
        "  {\n"
            + "    \"type\" : \"record\",\n"
            + "    \"name\" : \"Event\",\n"
            + "    \"namespace\" : \"my.flink.tutorial\",\n"
            + "    \"fields\" : [\n"
            + "      {\"name\" : \"city\", \"type\" : \"string\"},\n"
            + "      {\"name\" : \"total\", \"type\" : \"int\"},\n"
            + "      {\"name\" : \"timestampUtc\", \"type\" : \"long\"}\n"
            + "    ]\n"
            + "  }\n";

    Map<String, String> sinkProperties = new HashMap<>();
    sinkProperties.put("olp.catalog.layer-schema", outputAvroSchema);
    sinkProperties.put("olp.connector.flink.v2", "true");

    // Create TableSink for the output
    OlpStreamConnection streamSink =
        OlpStreamConnectorDescriptorFactory.create(outputCatalogHrn, outputIndexLayer)
            .createConnectorDescriptorWithSchema(sinkProperties);
    tEnv.connect(streamSink.connectorDescriptor())
        .withSchema(streamSink.schema())
        .inAppendMode()
        .createTemporaryTable("OutputIndexTable");

    // Write the result into the output table, indexed by timestamp and HERE tile id
    tEnv.sqlUpdate(
        "INSERT INTO"
            + "  OutputIndexTable "
            + "SELECT"
            + "  'Berlin' AS city,"
            + "  totalObservations AS total,"
            + "  timestampUtc,"
            + "  CAST(tile AS STRING) AS mt_partition "
            + "FROM"
            + "  PositionStatsTable");

    try {
      JobClient job = env.executeAsync();
      logger.info(String.format("Stream to %s executed", outputCatalogHrn));

      // remove this part to run as infinite pipeline
      new Timer(true)
          .schedule(
              new TimerTask() {
                @Override
                public void run() {
                  job.cancel();
                  logger.info(String.format("Stream to %s canceled", outputCatalogHrn));
                }
              },
              30000);
    } catch (Exception ex) {
      ex.printStackTrace();
    }
  }

  public static class ComputeTileFunction extends ScalarFunction {
    private int tileLevel = 14;

    public long eval(Double latitude, Double longitude) {
      return HereQuadFactory.INSTANCE
          .getMapQuadByLocation(latitude, longitude, tileLevel)
          .getLongKey();
    }
  }

  public static class AssignUuidFunction extends ScalarFunction {

    public String eval(String input) {
      return UUID.randomUUID().toString();
    }
  }

  public static class PositionStatistics {
    public long tile;
    public long timestampUtc;
    public int totalObservations;

    public PositionStatistics() {};

    public PositionStatistics(long timestampUtc, long tile, int totalObservations) {
      this.tile = tile;
      this.timestampUtc = timestampUtc;
      this.totalObservations = totalObservations;
    }

    @Override
    public String toString() {
      return tile + " " + timestampUtc + " " + totalObservations;
    }
  }

  public static class PositionTimestampAssigner
      extends BoundedOutOfOrdernessTimestampExtractor<Tuple3<String, Long, Long>> {

    public PositionTimestampAssigner(Time maxOutOfOrderness) {
      super(maxOutOfOrderness);
    }

    @Override
    public long extractTimestamp(Tuple3<String, Long, Long> tuple) {
      return tuple.f1;
    }
  }
}


Another example uses the volatile layer filled in the previous step to read the data in avro data format, performing some transformations on the received data, and writing to the output index layer from the previously created catalog.

Scala
Java

/*
 * Copyright (c) 2018-2021 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import com.here.platform.data.client.flink.scaladsl.{
  OlpStreamConnection,
  OlpStreamConnectorDescriptorFactory
}
import com.here.platform.pipeline.PipelineContext
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala.{tableConversions, StreamTableEnvironment}
import org.apache.flink.types.Row
import org.apache.flink.streaming.api.scala._
import org.slf4j.LoggerFactory

import java.util.{Timer, TimerTask}
import scala.collection.JavaConverters._

object VolatileToIndexLayerScalaPipeline extends App {
  private val logger = LoggerFactory.getLogger(VolatileToIndexLayerScalaPipeline.getClass)

  private val pipelineContext = new PipelineContext

  // Source and output catalogs / layers
  val catalogHrn = pipelineContext.config.outputCatalog

  val inputVolatileLayer = "volatile-layer-avro-data"
  val outputIndexLayer = "index-layer-parquet-data"

  // Configure stream execution environment settings
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.setParallelism(1)
  env.enableCheckpointing(5000)
  env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 10000L))

  val tEnv = StreamTableEnvironment.create(env)

  val inputSchema =
    """{
      |  "type" : "record",
      |  "name" : "Event",
      |  "namespace" : "my.flink.tutorial",
      |  "fields" : [
      |    {"name" : "city", "type" : "string"},
      |    {"name" : "total", "type" : "int"},
      |    {"name" : "timestampUtc", "type" : "long"}
      |  ]
      |}
      |""".stripMargin

  // For China environment change to `List("389695390", "389695391", "389695392")`
  val tiles = List("377894440", "377894441", "377894442")
  // Define the properties
  val properties =
    Map(
      "olp.layer.query" -> s"mt_partition=in=${tiles.mkString("(", ", ", ")")}",
      "olp.catalog.layer-schema" -> inputSchema,
      "olp.connector-refresh-interval" -> "-1",
      "olp.connector.flink.v2" -> "true"
    )

  // Create a TableSource
  val volatileSource: OlpStreamConnection =
    OlpStreamConnectorDescriptorFactory(catalogHrn, inputVolatileLayer)
      .createConnectorDescriptorWithSchema(properties.asJava)

  // Register the TableSource
  tEnv
    .connect(volatileSource.connectorDescriptor)
    .withSchema(volatileSource.schema)
    .inAppendMode()
    .createTemporaryTable("TableSource")

  tEnv.from("TableSource").toAppendStream[Row].print()

  val outputSchema =
    """{
      |  "type" : "record",
      |  "name" : "Event",
      |  "namespace" : "my.flink.tutorial",
      |  "fields" : [
      |    {"name" : "city", "type" : "string"},
      |    {"name" : "total", "type" : "int"}
      |  ]
      |}
      |""".stripMargin

  val indexSink: OlpStreamConnection =
    OlpStreamConnectorDescriptorFactory(catalogHrn, outputIndexLayer)
      .createConnectorDescriptorWithSchema(
        Map("olp.catalog.layer-schema" -> outputSchema, "olp.connector.flink.v2" -> "true").asJava)

  tEnv
    .connect(indexSink.connectorDescriptor)
    .withSchema(indexSink.schema)
    .inAppendMode()
    .createTemporaryTable("Sink")

  tEnv.sqlUpdate("""INSERT INTO Sink
                   |SELECT
                   |  city,
                   |  total,
                   |  CAST(mt_partition as BIGINT) as idx_tile_id,
                   |  timestampUtc as idx_time_window
                   |FROM TableSource
                   |""".stripMargin)

  try {
    val job = env.executeAsync()
    logger.info(s"Stream to $catalogHrn executed")

    // remove this part to run as infinite pipeline
    new Timer(true).schedule(new TimerTask {
      override def run(): Unit = {
        job.cancel()
        logger.info(s"Stream to $catalogHrn canceled")
      }
    }, 30000)
  } catch {
    case ex: Exception =>
      ex.printStackTrace()
  }
}



/*
 * Copyright (c) 2018-2021 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import com.here.hrn.HRN;
import com.here.platform.data.client.flink.javadsl.OlpStreamConnectorDescriptorFactory;
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnection;
import com.here.platform.pipeline.PipelineContext;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class VolatileToIndexLayerPipeline {
  private static final PipelineContext pipelineContext = new PipelineContext();

  // Source and output catalogs / layers
  private static final HRN catalogHrn = pipelineContext.config().getOutputCatalog();

  private static final String inputVolatileLayer = "volatile-layer-avro-data";
  private static final String outputIndexLayer = "index-layer-parquet-data";

  public static void main(String[] args) {
    Logger logger = LoggerFactory.getLogger(VolatileToIndexLayerPipeline.class);

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env.enableCheckpointing(5000);
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 10000L));

    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

    String inputAvroSchema =
        "  {\n"
            + "    \"type\" : \"record\",\n"
            + "    \"name\" : \"Event\",\n"
            + "    \"namespace\" : \"my.flink.tutorial\",\n"
            + "    \"fields\" : [\n"
            + "      {\"name\" : \"city\", \"type\" : \"string\"},\n"
            + "      {\"name\" : \"total\", \"type\" : \"int\"},\n"
            + "      {\"name\" : \"timestampUtc\", \"type\" : \"long\"}\n"
            + "    ]\n"
            + "  }\n";

    // For China environment change to `(new String[] {"389695390", "389695391", "389695392"})`
    String[] tiles = (new String[] {"377894440", "377894441", "377894442"});
    String tilesQueryString = Arrays.stream(tiles).collect(Collectors.joining(", ", "(", ")"));

    Map<String, String> properties = new HashMap<>();
    properties.put("olp.layer.query", "mt_partition=in=" + tilesQueryString);
    properties.put("olp.catalog.layer-schema", inputAvroSchema);
    properties.put("olp.connector-refresh-interval", "-1");
    properties.put("olp.connector.flink.v2", "true");

    OlpStreamConnection volatileSource =
        OlpStreamConnectorDescriptorFactory.create(catalogHrn, inputVolatileLayer)
            .createConnectorDescriptorWithSchema(properties);

    tEnv.connect(volatileSource.connectorDescriptor())
        .withSchema(volatileSource.schema())
        .inAppendMode()
        .createTemporaryTable("TableSource");

    tEnv.toAppendStream(tEnv.from("TableSource"), Row.class).print();
    // Define the parquet schema in which the output data will be written
    String outputParquetSchema =
        "  {\n"
            + "    \"type\" : \"record\",\n"
            + "    \"name\" : \"Event\",\n"
            + "    \"namespace\" : \"my.flink.tutorial\",\n"
            + "    \"fields\" : [\n"
            + "       {\"name\" : \"city\", \"type\" : \"string\"},\n"
            + "       {\"name\" : \"total\", \"type\" : \"int\"}\n"
            + "    ]\n"
            + "  }\n";

    Map<String, String> sinkProperties = new HashMap<>();
    sinkProperties.put("olp.catalog.layer-schema", outputParquetSchema);
    sinkProperties.put("olp.connector.flink.v2", "true");

    OlpStreamConnection indexSink =
        OlpStreamConnectorDescriptorFactory.create(catalogHrn, outputIndexLayer)
            .createConnectorDescriptorWithSchema(sinkProperties);

    tEnv.connect(indexSink.connectorDescriptor())
        .withSchema(indexSink.schema())
        .inAppendMode()
        .createTemporaryTable("Sink");

    tEnv.sqlUpdate(
        "INSERT INTO Sink "
            + "SELECT"
            + "  city,"
            + "  total,"
            + "  CAST(mt_partition as BIGINT) as idx_tile_id,"
            + "  timestampUtc as idx_time_window "
            + "FROM TableSource");

    try {
      JobClient job = env.executeAsync();
      logger.info("Stream to {} executed", catalogHrn);

      // remove this part to run as infinite pipeline
      new Timer(true)
          .schedule(
              new TimerTask() {
                @Override
                public void run() {
                  job.cancel();
                  logger.info("Stream to {} canceled", catalogHrn);
                }
              },
              30000);
    } catch (Exception ex) {
      ex.printStackTrace();
    }
  }
}


Compile and Run Locally

To run the application locally, execute the following command:

Scala
Java

mvn compile exec:java \
    -Dexec.mainClass=StreamToVolatileLayerScalaPipeline \
    -Dpipeline-config.file=pipeline-config.conf \
    -Dpipeline-job.file=pipeline-job.conf


mvn compile exec:java \
    -Dexec.mainClass=StreamToVolatileLayerPipeline \
    -Dpipeline-config.file=pipeline-config.conf \
    -Dpipeline-job.file=pipeline-job.conf

to run the second application, execute the following command:

Scala
Java

mvn compile exec:java \
    -Dexec.mainClass=VolatileToIndexLayerScalaPipeline \
    -Dpipeline-config.file=pipeline-config.conf \
    -Dpipeline-job.file=pipeline-job.conf


mvn compile exec:java \
    -Dexec.mainClass=VolatileToIndexLayerPipeline \
    -Dpipeline-config.file=pipeline-config.conf \
    -Dpipeline-job.file=pipeline-job.conf

Further Information

For more details on the topics covered in this tutorial, you can refer to the following sources:

results matching ""

    No results matching ""