Write to Stream Data through FlinkKafkaPublisher

Use as one line

The snippet below shows how to publish to a stream layer by one line.

Scala
Java
// give an iterator of PendingPartition`s
val pendingPartitions: Iterator[NewPartition] =
  Iterator.single(
    NewPartition("partition", "layer-id", NewPartition.ByteArrayData(Array(1.toByte))))

// publish partitions into streaming layer
FlinkKafkaPublisher(hrn, pendingPartitions)
// give an iterator of PendingPartition`s
Iterator<PendingPartition> pendingPartitions = getPendingPartitions();

// publish partitions into streaming layer
FlinkKafkaPublisher.apply(hrn, pendingPartitions);

Note: Usage specifics

FlinkKafkaPublisher class is a non-standard Flink solution. It will reduce the performance of a processing stream. Recommend to use general approach for writing partitions to stream layer writeEngine.publish()

Use when need publish multiple times with better performance

The snippet below shows how to publish to a stream layer with better performance. Where it needs to publish multiple times and terminate at the finish.

Scala
Java
val someRichMapFunction = new RichMapFunction[String, String]() {

  lazy val publisher = new FlinkKafkaPublisher(hrn)

  override def map(value: String): String = {

    // give an iterator of PendingPartition`s
    val pendingPartitions: Iterator[NewPartition] =
      Iterator.single(
        NewPartition("partition", "layer-id", NewPartition.ByteArrayData(Array(1.toByte))))

    // publish partitions into streaming layer
    publisher.publish(pendingPartitions)

    "some data"
  }

  override def close(): Unit = publisher.terminate()
}
class SomeMapFunction extends RichMapFunction<Partition, Tuple2<Partition, byte[]>>
    implements Serializable {
  private final HRN hrn;

  private transient FlinkKafkaPublisher publisher;

  public SomeMapFunction(HRN hrn) {
    this.hrn = hrn;
  }

  @Override
  public void open(Configuration parameters) throws Exception {
    // initialize publisher
    publisher = new FlinkKafkaPublisher(hrn);
  }

  @Override
  public void close() throws Exception {
    // close publisher
    publisher.terminate();
  }

  @Override
  public Tuple2<Partition, byte[]> map(Partition partition) throws Exception {
    byte[] data = "some data".getBytes();
    Iterator<PendingPartition> pendingPartitions = getPendingPartitions();

    // publish partitions into streaming layer
    FlinkKafkaPublisher.apply(hrn, pendingPartitions);

    return new Tuple2<Partition, byte[]>(partition, data);
  }
}

results matching ""

    No results matching ""