Flink Entry Point

FlinkDataClient is the main entry point for the API. FlinkDataClient is a heavyweight object that needs to be created once, reused and terminated. If you do not terminate this object, the job may never finish and result in exceptions like ClassNotFound.

In the driver, you need to terminate the client after execution as shown in the snippet below.

Scala
Java
val env: StreamExecutionEnvironment =
  StreamExecutionEnvironment.getExecutionEnvironment

val client = new FlinkDataClient()

// create sources
// apply functions
// add sinks
// ...

// block until job finish
env.execute()

// terminate the client on finish
client.terminate()
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

FlinkDataClient client = new FlinkDataClient();

// create sources
// apply functions
// add sinks
// ...

// block until job finish
env.execute();

// terminate the client on finish
client.terminate();

Use the close() callback provide by the Rich classes in Flink (RichSinkFunction, RichMaFunction, RichSourceFunction, or others) to terminate the Flink functions.

Scala
Java
/** Flink function with access to DataClient. */
abstract class CustomFunction extends RichFunction with Serializable {
  // initialize DataClient
  @transient
  private lazy val flinkDataClient: FlinkDataClient =
    new FlinkDataClient()

  // terminate DataClient
  override def close(): Unit =
    flinkDataClient.terminate()
}
/** Flink function with access to DataClient. */
abstract class CustomFunction implements RichFunction, Serializable {
  private transient FlinkDataClient flinkDataClient;

  @Override
  public void open(Configuration parameters) throws Exception {
    flinkDataClient = new FlinkDataClient();
  }

  @Override
  public void close() throws Exception {
    flinkDataClient.terminate();
  }
}

results matching ""

    No results matching ""