Read and Write Stream Data

Reading Partitions from a Stream Layer

The snippet below shows how to subscribe to a stream layer in a catalog.

Scala
Java
// create dataclient
val client = new FlinkDataClient()
val queryApi = client.queryApi(hrn)

// subscribe to a stream layer
val partitions: DataStream[Partition] =
  env.addSource(queryApi.subscribe(streamLayer, ConsumerSettings(groupName = "my-job")))
FlinkDataClient client = new FlinkDataClient();
FlinkQueryApi queryApi = client.queryApi(hrn);

// subscribe to a stream layer
ConsumerSettings consumerSettings =
    new ConsumerSettings.Builder().withGroupName("my-job").build();
DataStream<Partition> partitions =
    env.addSource(queryApi.subscribe(streamLayer, consumerSettings));

Reading Data from Partitions

Use FlinkReadEngine.asDataFlow or a custom map function with FlinkReadEngine.getDataAsBytes to fetch data.

Scala
Java
// create dataclient
val client = new FlinkDataClient()
val readEngine = client.readEngine(hrn)

// give a stream of partitions
val partitions: DataStream[Partition] =
  getPartitionsStream()

// convert into a stream of data
val data: DataStream[Array[Byte]] =
  partitions.map(readEngine.asDataFlow())
FlinkDataClient client = new FlinkDataClient();
FlinkReadEngine readEngine = client.readEngine(hrn);

// give a stream of partitions
DataStream<Partition> partitions = getPartitionsStream();

// convert into a stream of data
DataStream<byte[]> data = partitions.map(readEngine.asDataFlow());

Custom Reading Map Functions

The snippet below illustrates how to implement a function to return a tuple of Partition and Data.

Scala
Java
/** Map function with access to DataClient. */
class ReadDataMapFunction(hrn: HRN)
    extends RichMapFunction[Partition, (Partition, Array[Byte])]
    with Serializable {
  // initialize DataClient
  @transient
  private lazy val dataClient: FlinkDataClient =
    new FlinkDataClient()

  @transient
  private lazy val readEngine: FlinkReadEngine =
    dataClient.readEngine(hrn)

  // terminate DataClient
  override def close(): Unit =
    dataClient.terminate()

  // read data and publish a tuple with partition and data
  override def map(partition: Partition): (Partition, Array[Byte]) =
    partition -> readEngine.getDataAsBytes(partition)
}

// given a stream of partitions
val partitions: DataStream[Partition] =
  getPartitionsStream()

// apply our custom map function that uses DataClient
val data: DataStream[(Partition, Array[Byte])] =
  partitions.map(new ReadDataMapFunction(hrn))
/** Map function with access to DataClient. */
class ReadDataMapFunction extends RichMapFunction<Partition, Tuple2<Partition, byte[]>>
    implements Serializable {
  private final HRN hrn;

  private transient FlinkDataClient dataClient;

  private transient FlinkReadEngine readEngine;

  public ReadDataMapFunction(HRN hrn) {
    this.hrn = hrn;
  }

  @Override
  public void open(Configuration parameters) throws Exception {
    dataClient = new FlinkDataClient();
    readEngine = dataClient.readEngine(hrn);
  }

  @Override
  public void close() throws Exception {
    dataClient.terminate();
  }

  @Override
  public Tuple2<Partition, byte[]> map(Partition partition) throws Exception {
    byte[] data = readEngine.getDataAsBytes(partition);
    return new Tuple2<Partition, byte[]>(partition, data);
  }
}
// give a stream of partitions
DataStream<Partition> partitions = getPartitionsStream();

// apply our custom map function that uses DataClient
DataStream<Tuple2<Partition, byte[]>> data = partitions.map(new ReadDataMapFunction(hrn));

Writing Partitions to a Stream Layer

The snippet below shows how to publish to a stream layer in a catalog.

Scala
Java
// create dataclient
val client = new FlinkDataClient()
val writeEngine = client.writeEngine(hrn)

// given a stream of new partitions
val pendingPartitions: DataStream[PendingPartition] =
  getPendingPartitionsStream()

// add a sink to publish all partitions
pendingPartitions.addSink(writeEngine.publish())
FlinkDataClient client = new FlinkDataClient();
FlinkWriteEngine writeEngine = client.writeEngine(hrn);

// given a stream of new partitions
DataStream<PendingPartition> pendingPartitions = getPendingPartitionsStream();

// add a sink to publish all partitions
pendingPartitions.addSink(writeEngine.publish());

results matching ""

    No results matching ""