Flink Connector Migration Guide (Flink 1.10 -> 1.13)

After Flink upgrade from 1.10 to 1.13 version for Flink Connector there are some changes in API.

Dependencies

Instead of "flink-table-planner" add two new dependencies: "flink-clients" and "flink-table-planner-blink"

before:

Maven
sbt
<dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner_2.12</artifactId>
        <version>1.10.3</version>
        <scope>provided</scope>
    </dependency>
"org.apache.flink" %% "flink-table-planner" % "1.10.3" % "provided"

current:

Maven
sbt
<dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
        <version>1.13.2</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner-blink_2.12</artifactId>
        <version>1.13.2</version>
        <scope>provided</scope>
    </dependency>
"org.apache.flink" %% "flink-table-planner" % "1.13.5" % "provided"

Upgrade imports

For the new version main entry point of the Flink Connector API is OlpStreamConnectorHelper, so instead of

Scala
Java
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnection
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnectorDescriptorFactory
import com.here.platform.data.client.flink.javadsl.OlpStreamConnectorDescriptorFactory;
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnection;

you should import

Scala
Java
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnectorHelper
import com.here.platform.data.client.flink.scaladsl.OlpStreamConnectorHelper;

Table creation

In previous version of Flink Connector you create an instance of OlpStreamConnection and then connect with ConnectorDescriptor and create temporary table with provided by OlpStreamConnection table schema:

Scala
Java
val streamSource: OlpStreamConnection =
    OlpStreamConnectorDescriptorFactory(HRN(inputCatalogHrn), inputLayerId)
    .createConnectorDescriptorWithSchema(sourceProperties)

tEnv
  .connect(streamSource.connectorDescriptor)
  .withSchema(streamSource.schema)
  .inAppendMode()
  .createTemporaryTable("InputTable")
OlpStreamConnection streamSource =
    OlpStreamConnectorDescriptorFactory.create(HRN.fromString(inputCatalogHrn), inputLayerId)
    .createConnectorDescriptorWithSchema(sourceProperties);

tEnv.connect(streamSource.connectorDescriptor())
    .withSchema(streamSource.schema())
    .inAppendMode()
    .createTemporaryTable("InputTable");

Since in Flink 1.13 method connect is deprecated, Flink Connector provides new api:

Scala
Java
val helper = OlpStreamConnectorHelper(
    HRN(catalogHrn),
    layerId,
    properties
)

val schema = helper.prebuiltSchema(tEnv).build()

tEnv.executeSql(s"CREATE TABLE OutputTable $schema WITH ${helper.options}")
OlpStreamConnectorHelper helper = OlpStreamConnectorHelper.create(
    HRN.fromString(catalogHrn),
    layerId,
    properties
);

Schema schema = helper.prebuiltSchema(tEnv).build();

tEnv.executeSql(String.format("CREATE TABLE InputTable %s WITH %s", schema, helper.options()));

here, firstly you create OlpStreamConnectorHelper with catalog HRN, layer id and properties, then build flink.table.api.Schema and finally create table by calling tEnv.executeSql with schema and formatted by OlpStreamConnectorHelper properties. Source and Sink tables creation looks the same.

SQL

Method sqlUpdate is now deprecated, but you could use executeSql the same way. So instead:

Scala
Java
tEnv.sqlUpdate("INSERT INTO OutputTable SELECT * FROM InputTable")
tEnv.sqlUpdate("INSERT INTO OutputTable SELECT * FROM InputTable");

you can use

Scala
Java
tEnv.executeSql("INSERT INTO OutputTable SELECT * FROM InputTable")
tEnv.executeSql("INSERT INTO OutputTable SELECT * FROM InputTable");

results matching ""

    No results matching ""