Read and Write to the Objectstore layer

Read data from the Objectstore Layer into DataStream

The snippet below shows how to retrieve data from the Objectstore layer in a catalog.

Scala
Java
// create dataclient
val client = new FlinkDataClient()
val readEngine = client.readEngine(hrn)

// keys in Objectstore
val key1 = "someKey1"
val key2 = "someKey2"

// Read objects as DataStream
val objects: DataStream[Array[Byte]] = {
  env.fromCollection(
    Seq(
      readEngine.getObjectDataAsBytes(objectStoreLayer, key1),
      readEngine.getObjectDataAsBytes(objectStoreLayer, key2)
    )
  )
}
FlinkDataClient client = new FlinkDataClient();
FlinkReadEngine readEngine = client.readEngine(hrn);

// keys in Objectstore
String key1 = "someKey1";
String key2 = "someKey2";

// Read objects as DataStream
List<byte[]> dataCollection = new ArrayList<>();
dataCollection.add(readEngine.getObjectDataAsBytes(objectStoreLayer, key1, ByteRange.all()));
dataCollection.add(readEngine.getObjectDataAsBytes(objectStoreLayer, key2, ByteRange.all()));
DataStream<byte[]> objects = env.fromCollection(dataCollection);

Upload data to Objectstore layer

The snipper below shows how to upload data to the Objectstore layer.

Scala
Java
// given a stream of objects to be uploaded
val objects: DataStream[ObjectStoreMessage] =
  streamOfObjectStoreMessages()

// add our custom sink to upload data
objects.addSink(new UploadObjectStoreDataSink(hrn, objectStoreLayer))
/** Sink function with access to DataClient.
  *
  * This example writes an object into the Objectstore layer*/
class UploadObjectStoreDataSink(hrn: HRN, layerId: String)
    extends RichSinkFunction[ObjectStoreMessage]
    with Serializable {
  // initialize DataClient
  @transient
  private lazy val dataClient: FlinkDataClient =
    new FlinkDataClient()

  @transient
  private lazy val writeEngine: FlinkWriteEngine =
    dataClient.writeEngine(hrn)

  // terminate DataClient
  override def close(): Unit =
    dataClient.terminate()

  override def invoke(value: ObjectStoreMessage, context: SinkFunction.Context[_]): Unit = {
    val key = value.key
    val data = value.data
    writeEngine.uploadObject(layerId, key, data)
  }

}

case class ObjectStoreMessage(key: String, data: NewPartition.Blob)
/**
 * Sink function with access to DataClient.
 *
 * <p>This example writes an object into the Objectstore layer
 */
class UploadObjectStoreDataSink extends RichSinkFunction<JObjectStoreMessage>
    implements Serializable {
  private HRN hrn;
  private String layerId;

  private transient FlinkDataClient dataClient;

  private transient FlinkWriteEngine writeEngine;

  public UploadObjectStoreDataSink(HRN hrn, String layerId) {
    this.hrn = hrn;
    this.layerId = layerId;
  }

  @Override
  public void open(Configuration parameters) throws Exception {
    //
    dataClient = new FlinkDataClient();
    writeEngine = dataClient.writeEngine(hrn);
  }

  @Override
  public void close() {
    // terminate DataClient
    dataClient.terminate();
  }

  @Override
  public void invoke(JObjectStoreMessage message, Context context) {
    writeEngine.uploadObject(
        layerId, message.getKey(), message.getData(), Optional.empty(), Optional.empty());
  }
}

// given a stream of PendingPartitions to be uploaded
DataStream<JObjectStoreMessage> objects = getObjectStreamMessages();

// add our custom sink to upload data
objects.addSink(new UploadObjectStoreDataSink(hrn, objectStoreLayer));
public class JObjectStoreMessage {
  private String key;
  private NewPartition.Blob data;

  public JObjectStoreMessage(String key, NewPartition.Blob data) {
    this.key = key;
    this.data = data;
  }

  public String getKey() {
    return key;
  }

  public void setKey(String key) {
    this.key = key;
  }

  public NewPartition.Blob getData() {
    return data;
  }

  public void setData(NewPartition.Blob data) {
    this.data = data;
  }
}

results matching ""

    No results matching ""