Read and Write to the Objectstore layer

Read data from the Objectstore Layer into DataStream

The snippet below shows how to retrieve data from the Objectstore layer in a catalog.

// create dataclient
val client = new FlinkDataClient()
val readEngine = client.readEngine(hrn)

// keys in Objectstore
val key1 = "someKey1"
val key2 = "someKey2"

// Read objects as DataStream
val objects: DataStream[Array[Byte]] = {
      readEngine.getObjectDataAsBytes(objectStoreLayer, key1),
      readEngine.getObjectDataAsBytes(objectStoreLayer, key2)
FlinkDataClient client = new FlinkDataClient();
FlinkReadEngine readEngine = client.readEngine(hrn);

// keys in Objectstore
String key1 = "someKey1";
String key2 = "someKey2";

// Read objects as DataStream
List<byte[]> dataCollection = new ArrayList<>();
dataCollection.add(readEngine.getObjectDataAsBytes(objectStoreLayer, key1, ByteRange.all()));
dataCollection.add(readEngine.getObjectDataAsBytes(objectStoreLayer, key2, ByteRange.all()));
DataStream<byte[]> objects = env.fromCollection(dataCollection);

Upload data to Objectstore layer

The snipper below shows how to upload data to the Objectstore layer.

// given a stream of objects to be uploaded
val objects: DataStream[ObjectStoreMessage] =

// add our custom sink to upload data
objects.addSink(new UploadObjectStoreDataSink(hrn, objectStoreLayer))
/** Sink function with access to DataClient.
  * This example writes an object into the Objectstore layer*/
class UploadObjectStoreDataSink(hrn: HRN, layerId: String)
    extends RichSinkFunction[ObjectStoreMessage]
    with Serializable {
  // initialize DataClient
  private lazy val dataClient: FlinkDataClient =
    new FlinkDataClient()

  private lazy val writeEngine: FlinkWriteEngine =

  // terminate DataClient
  override def close(): Unit =

  override def invoke(value: ObjectStoreMessage, context: SinkFunction.Context[_]): Unit = {
    val key = value.key
    val data =
    writeEngine.uploadObject(layerId, key, data)


case class ObjectStoreMessage(key: String, data: NewPartition.Blob)
 * Sink function with access to DataClient.
 * <p>This example writes an object into the Objectstore layer
class UploadObjectStoreDataSink extends RichSinkFunction<JObjectStoreMessage>
    implements Serializable {
  private HRN hrn;
  private String layerId;

  private transient FlinkDataClient dataClient;

  private transient FlinkWriteEngine writeEngine;

  public UploadObjectStoreDataSink(HRN hrn, String layerId) {
    this.hrn = hrn;
    this.layerId = layerId;

  public void open(Configuration parameters) throws Exception {
    dataClient = new FlinkDataClient();
    writeEngine = dataClient.writeEngine(hrn);

  public void close() {
    // terminate DataClient

  public void invoke(JObjectStoreMessage message, Context context) {
        layerId, message.getKey(), message.getData(), Optional.empty(), Optional.empty());

// given a stream of PendingPartitions to be uploaded
DataStream<JObjectStoreMessage> objects = getObjectStreamMessages();

// add our custom sink to upload data
objects.addSink(new UploadObjectStoreDataSink(hrn, objectStoreLayer));
public class JObjectStoreMessage {
  private String key;
  private NewPartition.Blob data;

  public JObjectStoreMessage(String key, NewPartition.Blob data) {
    this.key = key; = data;

  public String getKey() {
    return key;

  public void setKey(String key) {
    this.key = key;

  public NewPartition.Blob getData() {
    return data;

  public void setData(NewPartition.Blob data) { = data;

results matching ""

    No results matching ""