Read and Write Volatile Data

Read Partitions from a Volatile Layer into DataStream

The snippet below shows how to retrieve partitions of a volatile layer in a catalog.

Scala
Java
// create dataclient
val client = new FlinkDataClient()
val queryApi = client.queryApi(hrn)

// subscribe to a volatile layer
import com.here.platform.data.client.common.VolatilePartitionsFilter._

// specific timestamp
val timestamp = 0L
val partition1 = "somePartition1"
val partition2 = "somePartition2"

val volatilePartitions: DataStream[Partition] =
  env.addSource(
    queryApi.getVolatilePartitions(volatileLayer,
                                   since(timestamp) and byIds(Set(partition1, partition2))))
FlinkDataClient client = new FlinkDataClient();
FlinkQueryApi queryApi = client.queryApi(hrn);

Long timestamp = 0L;
String partition1 = "somePartition1";
String partition2 = "somePartition2";

Set<String> ids = new HashSet<>(Arrays.asList(partition1, partition2));
VolatilePartitionsFilter filter =
    new VolatilePartitionsFilter.Builder().withIds(ids).withSinceTimestamp(timestamp).build();

// retrieve partitions of a volatile layer
DataStream<Partition> partitions =
    env.addSource(
        queryApi.getVolatilePartitions(volatileLayer, filter, Collections.emptySet()));

Publish Volatile Data

Use BlobstoreIdGenerator with a deterministic function to rewrite data. Use WriteEngine.put() to write the data, You can discard the returned metadata.

Scala
Java
/** Sink function with access to DataClient.
  *
  * This example write a pending partition into data engine and discard its metadata*/
class UploadVolatileDataSink(hrn: HRN, blobIdGenerator: BlobIdGenerator)
    extends RichSinkFunction[PendingPartition]
    with Serializable {
  // initialize DataClient
  @transient
  private lazy val dataClient: FlinkDataClient =
    new FlinkDataClient()

  @transient
  private lazy val writeEngine: FlinkWriteEngine =
    dataClient.writeEngine(hrn, blobIdGenerator)

  @transient
  private lazy val publishApi: FlinkPublishApi =
    dataClient.publishApi(hrn)

  // terminate DataClient
  override def close(): Unit =
    dataClient.terminate()

  override def invoke(partition: PendingPartition, context: SinkFunction.Context[_]): Unit =
    writeEngine.put(partition)

}

// deterministic function that always generate same dataHandle for same partition
val idGenerator: BlobIdGenerator =
  new StableBlobIdGenerator(version = 0L)

// given a stream of PendingPartitions to be uploaded
val pendingPartitions: DataStream[PendingPartition] =
  getPendingPartitionsStream()

// add our custom sink to upload data
pendingPartitions.addSink(new UploadVolatileDataSink(hrn, idGenerator))
/**
 * Sink function with access to DataClient.
 *
 * <p>This example write a pending partition into data engine and discard its metadata
 */
class UploadVolatileDataSink extends RichSinkFunction<PendingPartition> implements Serializable {
  private HRN hrn;

  private BlobIdGenerator blobIdGenerator;

  private transient FlinkDataClient dataClient;

  private transient FlinkWriteEngine writeEngine;

  public UploadVolatileDataSink(HRN hrn, BlobIdGenerator blobIdGenerator) {
    this.hrn = hrn;
    this.blobIdGenerator = blobIdGenerator;
  }

  @Override
  public void open(Configuration parameters) throws Exception {
    //
    dataClient = new FlinkDataClient();
    writeEngine = dataClient.writeEngine(hrn, blobIdGenerator);
  }

  @Override
  public void close() {
    // terminate DataClient
    dataClient.terminate();
  }

  @Override
  public void invoke(PendingPartition partition, Context context) {
    writeEngine.put(partition);
  }
}

// deterministic function that always generate same dataHandle for same partition
BlobIdGenerator idGenerator = new StableBlobIdGenerator.Builder().withVersion(0L).build();

// given a stream of PendingPartitions to be uploaded
DataStream<PendingPartition> pendingPartitions = getPendingPartitionsStream();

// add our custom sink to upload data
pendingPartitions.addSink(new UploadVolatileDataSink(hrn, idGenerator));

results matching ""

    No results matching ""