Read and Write Indexed Data

Read Partitions from Index Layers into DataStream

To retrieve data from the index layer, you must first call the method QueryApi.queryIndex. QueryApi.queryIndex returns the IndexPartitions that match a given query. To query indexed data, you must provide some search criteria in the Query an Index layer.

The snippet below shows query an index layer in a catalog.

Scala
Java
// create dataclient
val client = new FlinkDataClient()
val queryApi = client.queryApi(hrn)

// RSQL expression
val queryString = "intkey==42"

// query an index layer
val indexPartitions: DataStream[IndexPartition] = {
  // with some query string and all parts
  env.addSource(queryApi.queryIndex(indexLayer, Some(queryString)))
  // -- or -- with default query ("timestamp=ge=0") and all parts
  env.addSource(queryApi.queryIndex(indexLayer))
  // -- or -- with some query string and specific part
  env.addSource(queryApi.queryIndex(indexLayer, Some(queryString), partId = Some("123")))
}
FlinkDataClient client = new FlinkDataClient();
FlinkQueryApi queryApi = client.queryApi(hrn);

// RSQL expression
String queryString = "intKey==42";

// query to an index layer
DataStream<IndexPartition> partitions;
// with some query string and all parts
partitions = env.addSource(queryApi.queryIndex(indexLayer, Optional.of(queryString)));
// -- or -- with default query ("timestamp=ge=0") and all parts
partitions = env.addSource(queryApi.queryIndex(indexLayer, Optional.empty()));
// -- or -- with some query string and specific part
partitions = env.addSource(queryApi.queryIndex(indexLayer, Optional.of(queryString), "123"));

Notes:

  • If the query string is not defined, the value "timestamp=ge=0" will be used by default, and it would mean that all the partitions will be read.

Query index partitions as stream:

Scala
Java
// create dataclient
val client = new FlinkDataClient()
val queryApi = client.queryApi(hrn)

// RSQL expression
val queryString = "intkey==42"

// query an index layer
val parts = Some(100)
val indexPartitions: DataStream[IndexPartition] =
  queryApi.queryIndexAsStream(indexLayer, Some(queryString), parts)
FlinkDataClient client = new FlinkDataClient();
FlinkQueryApi queryApi = client.queryApi(hrn);

// RSQL expression
String queryString = "intKey==42";

// query to an index layer
DataStream<IndexPartition> partitions =
    queryApi.queryIndexAsStream(env, indexLayer, Optional.of(queryString));

Write Partitions to Index Layer

Data published to an index layer is not versioned but is indexed. To publish and index the data, you must separately write and index the data by calling the methods WriteEngine.put and PublishApi.publishIndex.

The snippet below uses Flink Window to publish a new partition of an index layer every minute.

Scala
Java
/** Map function that upload data and return metadata */
class UploadDataFunction(hrn: HRN)
    extends RichMapFunction[PendingPartition, CommitPartition]
    with Serializable {
  // initialize DataClient
  @transient
  private lazy val dataClient: FlinkDataClient =
    new FlinkDataClient()

  @transient
  private lazy val writeEngine: FlinkWriteEngine =
    dataClient.writeEngine(hrn)

  // terminate DataClient
  override def close(): Unit =
    dataClient.terminate()

  // read data and publish a tuple with partition and data
  override def map(pendingPartition: PendingPartition): CommitPartition =
    writeEngine.put(pendingPartition)
}

/** Window function that publish an index for all CommitPartition in the window*/
class PublishIndexWindowFunction(hrn: HRN, indexLayer: String)
    extends RichAllWindowFunction[CommitPartition, String, TimeWindow]
    with Serializable {
  // initialize DataClient
  @transient
  private lazy val flinkDataClient: FlinkDataClient =
    new FlinkDataClient()

  @transient
  private lazy val publishApi: FlinkPublishApi =
    flinkDataClient.publishApi(hrn)

  // terminate DataClient
  override def close(): Unit =
    flinkDataClient.terminate()

  override def apply(window: TimeWindow,
                     partitions: Iterable[CommitPartition],
                     out: Collector[String]): Unit = {
    publishApi.publishIndex(indexLayer, partitions.iterator)
    out.collect(s"indexing a new partition of $indexLayer layer is a success")
  }
}

// given a stream of new partitions
val pendingPartitions: DataStream[PendingPartition] =
  getPendingPartitionsStream()

// for each input: upload data and collect metadata
val newPartitions: DataStream[CommitPartition] =
  pendingPartitions.map(new UploadDataFunction(hrn))

// group metadata by 1 minute window
val newPartitionsWindow: AllWindowedStream[CommitPartition, TimeWindow] =
  newPartitions.windowAll(TumblingEventTimeWindows.of(Time.minutes(1)))

// index new partition on window trigger
val results: DataStream[String] =
  newPartitionsWindow.apply(new PublishIndexWindowFunction(hrn, indexLayer))
/** Map function that upload data and return metadata */
class UploadDataFunction extends RichMapFunction<PendingPartition, CommitPartition>
    implements Serializable {

  private HRN hrn;

  private transient FlinkDataClient dataClient;

  private transient FlinkWriteEngine writeEngine;

  public UploadDataFunction(HRN hrn) {
    this.hrn = hrn;
  }

  @Override
  public void open(Configuration parameters) throws Exception {
    dataClient = new FlinkDataClient();
    writeEngine = dataClient.writeEngine(hrn);
  }

  @Override
  public void close() throws Exception {
    dataClient.terminate();
  }

  @Override
  public CommitPartition map(PendingPartition pendingPartition) throws Exception {
    return writeEngine.put(pendingPartition);
  }
}

/** Window function that publish an index for all CommitPartition in the window */
class PublishIndexWindowFunction
    extends RichAllWindowFunction<CommitPartition, String, TimeWindow> implements Serializable {

  private HRN hrn;

  private String indexLayer;

  private transient FlinkDataClient dataClient;

  private transient FlinkPublishApi publishApi;

  public PublishIndexWindowFunction(HRN hrn, String indexLayer) {
    this.hrn = hrn;
    this.indexLayer = indexLayer;
  }

  @Override
  public void open(Configuration parameters) throws Exception {
    dataClient = new FlinkDataClient();
    publishApi = dataClient.publishApi(hrn);
  }

  @Override
  public void close() throws Exception {
    dataClient.terminate();
  }

  @Override
  public void apply(
      TimeWindow window, Iterable<CommitPartition> commitPartitions, Collector<String> out)
      throws Exception {
    publishApi.publishIndex(indexLayer, commitPartitions.iterator());
    out.collect("indexing a new partition of " + indexLayer + " layer is a success");
  }
}

// given a stream of new partitions
DataStream<PendingPartition> pendingPartitions = getPendingPartitionsStream();

// for each input: upload data and collect metadata
DataStream<CommitPartition> newPartitions = pendingPartitions.map(new UploadDataFunction(hrn));

// group metadata by 1 minute window
AllWindowedStream<CommitPartition, TimeWindow> newPartitionsWindow =
    newPartitions.windowAll(TumblingEventTimeWindows.of(Time.minutes(1)));

// index new partition on window trigger
DataStream<String> results =
    newPartitionsWindow.apply(new PublishIndexWindowFunction(hrn, indexLayer));

Update and Delete Partitions of Index Layer

For updating and deleting partitions of index layer the FlinkDataClient PublishApi supports the same functions as the DataClient PublishApi. Refer to the related chapters of DataClient PublishApi for Update Index Data and Delete Index Data.

results matching ""

    No results matching ""