Get Data

To get data from catalogs, add the data-engine module as a dependencies to your project.

The data-engine module provides high level abstractions on top of the data-client when working with the Open Location Platform data. This module can read and manage both metadata and data.

The Open Location Platform supports three types of data layers: versioned, volatile, and stream.

Versioned Layers

The data in versioned layers is available as long as the specified version of the catalog exists. This means you can cache fetched blobs on the client side.

To get data (a blob) for a stream of partitions belonging to a versioned layer, add the following:

Scala
Java
val queryApi = DataClient().queryApi(catalogHrn, settings)

// create readEngine for source catalog
val readEngine = DataEngine().readEngine(catalogHrn, settings)

// stream of tuple of (partition, bytes)
val dataAsBytes: Future[Source[(Partition, Array[Byte]), NotUsed]] =
  queryApi
    .getPartitions(version, layer)
    .map { partitions =>
      // parallelism defines how many parallel requests would be made to fetch the data
      partitions.mapAsync(parallelism = 10) { partition =>
        readEngine.getDataAsBytes(partition).map { data =>
          (partition, data)
        }
      }
    }
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);

// parallelism defines how many parallel requests would be made to fetch the data
int parallelism = 10;

// create readEngine for source catalog
ReadEngine readEngine = DataEngine.get(myActorSystem).readEngine(catalogHrn);

// stream of tuple of (partition, bytes)
CompletionStage<Source<Pair<Partition, byte[]>, NotUsed>> dataAsBytes =
    queryApi
        .getPartitions(catalogVersion, layer, AdditionalFields.AllFields())
        .thenApply(
            metadata ->
                metadata.mapAsync(
                    parallelism,
                    partition ->
                        readEngine
                            .getDataAsBytes(partition)
                            .thenApply(data -> new Pair<>(partition, data))));

To get data as an Akka source, add the following:

Scala
Java
// fetch data as lazy source of data
val dataAsSource: Future[Source[Source[ByteString, NotUsed], NotUsed]] =
  queryApi
    .getPartitions(version, layer)
    .map { partitions =>
      partitions.mapAsync(parallelism = 10) { partition =>
        readEngine.getDataAsSource(partition)
      }
    }
// fetch data as lazy source of data
CompletionStage<Source<Source<ByteString, NotUsed>, NotUsed>> dataAsSource =
    queryApi
        .getPartitions(catalogVersion, layer, AdditionalFields.AllFields())
        .thenApply(
            partitions ->
                partitions.mapAsync(
                    parallelism, partition -> readEngine.getDataAsSource(partition)));

To transform data into a custom object, add the following:

Scala
Java
// fetch data mapped directly to custom domain object
val data: Future[Source[CustomDomainObject, NotUsed]] =
  queryApi
    .getPartitions(version, layer)
    .map { partitions =>
      partitions.mapAsync(parallelism = 10) { partition =>
        readEngine.get(partition, bytes => CustomDomainObject.fromBytes(bytes))
      }
    }
// fetch data mapped directly to custom domain object
CompletionStage<Source<JavaCustomDomainObject, NotUsed>> data =
    queryApi
        .getPartitions(catalogVersion, layer, AdditionalFields.AllFields())
        .thenApply(
            partitions ->
                partitions.mapAsync(
                    parallelism,
                    partition ->
                        readEngine.get(
                            partition, bytes -> JavaCustomDomainObject.fromBytes(bytes))));

Stream Layer

Data in stream layers consists of events pushed to consumers as long as the producer publishes them.

To subscribe to a stream layer, add the following:

Scala
Java
// create queryApi for target catalog
val queryApi = DataClient().queryApi(catalogHrn, settings)

//Create read engine for target catalog
val readEngine = DataEngine().readEngine(catalogHrn, settings)

// define a function how to process payload
def processPayload(data: Array[Byte]): Done = {
  println("Received data: " + data)
  Done
}

// create subscription to stream layer
val subscription: Future[Subscription] =
  queryApi.subscribe(streamingLayerId,
                     ConsumerSettings("consumer-name", consumerId = "consumer-id"))

subscription.foreach { subscription =>
  subscription.partitions
    .mapAsync(parallelism = 10) { partition: Partition =>
      readEngine.getDataAsBytes(partition)
    }
    .map { payload: Array[Byte] =>
      processPayload(payload)
    }
    .runWith(Sink.ignore)
    .andThen {
      case Success(_) => println("Done")
      case Failure(exception) => println(s"Failed with $exception")
    }
}
// create readEngine and queryApi for a catalog
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);
ReadEngine readEngine = DataEngine.get(myActorSystem).readEngine(catalogHrn);

int parallelism = 10;

// subscribe to receive new publications from stream layer
CompletionStage<Subscription> subscriptionFuture =
    queryApi.subscribe(
        layer, new ConsumerSettings.Builder().withGroupName("test-consumer").build());

subscriptionFuture
    .thenApply(
        subscription -> {
          return subscription
              .getPartitions()
              .mapAsync(parallelism, readEngine::getDataAsBytes)
              .map(payload -> processPayload(payload))
              .runWith(Sink.ignore(), myMaterializer);
        })
    .whenCompleteAsync(
        (result, e) -> {
          if (e != null) {
            e.printStackTrace();
          } else {
            System.out.println("DONE!");
          }
        });
// define a function how to process payload
private Done processPayload(byte[] data) {
  System.out.println("Received data: " + data);
  return Done.getInstance();
}

To shutdown your subscription, use SubscriptionControl:

Scala
Java
// shutdown stream subscription when done
subscription.subscriptionControl.shutdown()
// shutdown stream subscription when done
subscription.getSubscriptionControl().shutdown();

To provide a handler/callback function during the subscription process, add the following:

Scala
Java
// create queryApi for target catalog
val queryApi = DataClient().queryApi(catalogHrn, settings)

// define a function how to process partitions
def processPartition(partition: Partition): Unit =
  println("Received partition: " + partition)

// create subscription to stream layer
val subscriptionControl: Future[SubscriptionControl] =
  queryApi.subscribe(streamingLayerId,
                     ConsumerSettings("consumer-name"),
                     partition => processPartition(partition))
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);
CompletionStage<SubscriptionControl> subscriptionFuture =
    queryApi.subscribe(
        "stream-layer",
        new ConsumerSettings.Builder()
            .withGroupName("test-consumer")
            .withConsumerId("consumer-id")
            .build(),
        partition -> processPartition(partition));
// define a function how to process partitions
private void processPartition(Partition partition) {
  System.out.println("Received partition: " + partition);
}

Read Data from Stream Layers on Multiple Workers

Depending on the stream layer throughput configuration, you can set up distributed workers that consume the same stream layer. If all workers share the same consumer group (defined by the ConsumerSetting.groupName during when you create the subscription, and unique ConsumerSetting.consumerId (for http-connector only) for each worker), stream events are distributed between workers. For processing stream layers, use at-least-once delivery semantics to have the same events dispatched to same/different workers.

If a consumer needs to recovery the worker (in case of its failure) , create a new subscription with the same ConsumerSetting.groupName and ConsumerSetting.consumerId (for http-connector only)

If a consumer needs to re-process a stream layer from the beginning, create a new subscription with a different groupName.

Initial Offsets and Checkpoint

Use ConsumerSettings.offset to configure how offsets and checkpoints are managed in your subscription.

  • EarliestOffset: Subscribe for earliest (old) partitions available for a given group name, respecting any previous checkpoint. The checkpoints are generated automatically.

  • ManualOffset: Subscribe for earliest (old) partitions available for a given group name, respecting any previous checkpoint. You must call SubscriptionControl.acknowledge for every received partition. When required, use SubscriptionControl.checkpoint to send offsets to the Open Location Platform.

  • LatestOffset: Subscribe for latest (new) available partitions. Any data already available or checkpoints are ignored by the subscription.

Volatile Layer

Data in volatile layers can change over time. This means data (a blob) for the same partition can potentially contain different content. Normally volatile layers can represent traffic information, weather, and other similar content.

When using volatile data, performance is often an important factor. To speed-up interactions with the Open Location Platform, you can cache metadata and keep getting blobs as needed.

To get data from a volatile layer, add the following:

Scala
Java
// create queryApi and readEngine
val queryApi = DataClient().queryApi(catalogHrn, settings)
val readEngine = DataEngine().readEngine(catalogHrn, settings)

// download payload for partition
def downloadData(partition: Partition): Future[Option[String]] =
  readEngine
    .getDataAsBytes(partition)
    .map(bytes => Some(new String(bytes)))
    .recover { case _ => None }

// fetch metadata once, can be cached on the client
val partitions: Future[Source[Partition, NotUsed]] =
  queryApi.getVolatilePartitions(layerId)

// keep reading data for volatile layer as needed
partitions.flatMap { ps: Source[Partition, NotUsed] =>
  ps.mapAsync(parallelism = 10) { partition: Partition =>
      downloadData(partition)
    }
    .runWith(Sink.foreach(println))
}
// create queryApi and readEngine
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);
ReadEngine readEngine = DataEngine.get(myActorSystem).readEngine(catalogHrn);

// download payload for partition
Function<Partition, CompletionStage<Optional<byte[]>>> fetchData =
    (partition) -> {
      return readEngine
          .getDataAsBytes(partition)
          .thenApply(Optional::of)
          .exceptionally(failure -> Optional.empty());
    };

// fetch metadata once, can be cached on the client
CompletionStage<Source<Partition, NotUsed>> partitions =
    queryApi.getVolatilePartitions(
        layer, new VolatilePartitionsFilter.Builder().build(), Collections.emptySet());

int parallelism = 10;

// keep reading data for volatile layer as needed
partitions.thenApply(
    partitionsSource -> {
      return partitionsSource
          .mapAsync(parallelism, fetchData::apply)
          .runWith(Sink.foreach(System.out::println), myMaterializer);
    });

For both functions, getVolatilePartitions and getVolatilePartitionsAsIterator you can pass an optional filter parameter. The VolatilePartitionsFilter can be either empty or a combination of since filter and/or filter byIds. Multiple of such filters can be joined by logical and operator.

For example:

Scala
Java
// create queryApi and readEngine
val queryApi = DataClient().queryApi(catalogHrn, settings)
val readEngine = DataEngine().readEngine(catalogHrn, settings)

// download payload for partition
def downloadData(partition: Partition): Future[Option[String]] =
  readEngine
    .getDataAsBytes(partition)
    .map(bytes => Some(new String(bytes)))
    .recover { case _ => None }

// using this time as an example:
// Friday, October 18, 2019 1:45:20 PM GMT
val timestampSinceEpochInMs = 1571406320000L
val timestampSinceEpochInMsPlusOneHour = timestampSinceEpochInMs + 3600 * 1000

// empty filter means that no filter is applied
val emptyFilter: VolatilePartitionsFilter = VolatilePartitionsFilter.empty

// filter all partitions from timestampSinceEpochInMs to now
val sinceFilter1: VolatilePartitionsFilter =
  VolatilePartitionsFilter.since(timestampSinceEpochInMs)

// combination of two since filters will effectively use the younger/higher timestamp
val sinceFilter2: VolatilePartitionsFilter = VolatilePartitionsFilter.since(
  timestampSinceEpochInMs) and VolatilePartitionsFilter
  .since(timestampSinceEpochInMsPlusOneHour)

// filter partitions with ids 1, 2 and 3
val byIdsFilter1: VolatilePartitionsFilter =
  VolatilePartitionsFilter.byIds(Set("1", "2", "3"))

// combination of two byIds filter results in a filter with the intersection of the ids, in this case 1
val byIdsFilter2: VolatilePartitionsFilter = VolatilePartitionsFilter.byIds(
  Set("1", "2", "3")) and VolatilePartitionsFilter.byIds(Set("1"))

// combination of two byIds filter with non-overlapping sets results in empty ids, so no partition
val byIdsFilter3: VolatilePartitionsFilter = VolatilePartitionsFilter.byIds(
  Set("1", "2", "3")) and VolatilePartitionsFilter.byIds(Set("4"))

// combination of byIds filter and since filter
val combinedFilter
  : VolatilePartitionsFilter = VolatilePartitionsFilter.byIds(Set("1")) and VolatilePartitionsFilter
  .since(timestampSinceEpochInMs)

// fetch metadata once, can be cached on the client
val partitions: Future[Source[Partition, NotUsed]] =
  queryApi.getVolatilePartitions(layerId, combinedFilter)

// keep reading data for volatile layer as needed
partitions.flatMap { ps: Source[Partition, NotUsed] =>
  ps.mapAsync(parallelism = 10) { partition: Partition =>
      downloadData(partition)
    }
    .runWith(Sink.foreach(println))
}
// create queryApi and readEngine
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);
ReadEngine readEngine = DataEngine.get(myActorSystem).readEngine(catalogHrn);

// download payload for partition
Function<Partition, CompletionStage<Optional<byte[]>>> fetchData =
    (partition) -> {
      return readEngine
          .getDataAsBytes(partition)
          .thenApply(Optional::of)
          .exceptionally(failure -> Optional.empty());
    };

// using this time as an example:
// Friday, October 18, 2019 1:45:20 PM GMT
Long timestampSinceEpochInMs = 1571406320000L;
Long timestampSinceEpochInMsPlusOneHour = timestampSinceEpochInMs + 3600 * 1000;

// empty filter means that no filter is applied
VolatilePartitionsFilter emptyFilter = new VolatilePartitionsFilter.Builder().build();

// filter all partitions from timestampSinceEpochInMs to now
VolatilePartitionsFilter sinceFilter1 =
    new VolatilePartitionsFilter.Builder().withSinceTimestamp(timestampSinceEpochInMs).build();

// combination of two since filters will effectively use the younger/higher timestamp
VolatilePartitionsFilter sinceFilter2 =
    new VolatilePartitionsFilter.Builder()
        .withSinceTimestamp(timestampSinceEpochInMs)
        .build()
        .and(
            new VolatilePartitionsFilter.Builder()
                .withSinceTimestamp(timestampSinceEpochInMsPlusOneHour)
                .build());

String partition1 = "1";
String partition2 = "2";
String partition3 = "3";
String partition4 = "4";
Set<String> partitions1 = new HashSet<String>();
partitions1.add(partition1);
Set<String> partitions4 = new HashSet<String>();
partitions4.add(partition4);
Set<String> partitions123 = new HashSet<String>();
partitions123.add(partition1);
partitions123.add(partition2);
partitions123.add(partition3);

// filter partitions with ids 1, 2 and 3
VolatilePartitionsFilter byIdsFilter1 =
    new VolatilePartitionsFilter.Builder().withIds(partitions123).build();

// combination of two byIds filter results in a filter with the intersection of the ids, in this
// case 1
VolatilePartitionsFilter byIdsFilter2 =
    new VolatilePartitionsFilter.Builder()
        .withIds(partitions123)
        .build()
        .and(new VolatilePartitionsFilter.Builder().withIds(partitions1).build());

// combination of two byIds filter with non-overlapping sets results in empty ids, so no
// partition
VolatilePartitionsFilter byIdsFilter3 =
    new VolatilePartitionsFilter.Builder()
        .withIds(partitions123)
        .build()
        .and(new VolatilePartitionsFilter.Builder().withIds(partitions4).build());

// combination of byIds filter and since filter
VolatilePartitionsFilter combinedFilter =
    new VolatilePartitionsFilter.Builder()
        .withIds(partitions1)
        .build()
        .and(
            new VolatilePartitionsFilter.Builder()
                .withSinceTimestamp(timestampSinceEpochInMs)
                .build());

// fetch metadata once, can be cached on the client
CompletionStage<Source<Partition, NotUsed>> partitions =
    queryApi.getVolatilePartitions(layer, combinedFilter, Collections.emptySet());

int parallelism = 10;

// keep reading data for volatile layer as needed
partitions.thenApply(
    partitionsSource -> {
      return partitionsSource
          .mapAsync(parallelism, fetchData::apply)
          .runWith(Sink.foreach(System.out::println), myMaterializer);
    });

You can also use VolatilePartitionsFilter builder.

Fetch Data in Parallel

As shown in the example, use the parallelism parameter to manage how many parallel requests Data Client Library makes to fetch blobs. The optimal value depends on the node configuration, RAM, CPU, and the network. Using more than 100 parallel requests had a negative effect on performance.

Index Layer

To use pagination perform method QueryApi.queryIndexParts. It returns a list of Part Ids which represent the layer parts that can be used to limit the scope of a query operation. This allows to run parallel queries with multiple parts. The user has to provide the desired number of parts and the service will return a list of Part Ids. Please note in some cases the requested number of parts will make them too small and in this case the service might return lesser amount of the parts than requested. You can find example how to fetch index parts in subsection below Retrieve Index Parts.

To retrieve data from the index layer, you must first call the method QueryApi.queryIndex. QueryApi.queryIndex returns the IndexPartitions that match a given query.

Then, call the method ReadEngine.getDataAsBytes on each IndexPartition to retrieve the corresponding data using the blob API. Given that Fetching the data for one partition takes some time, and that QueryApi.queryIndex may return hundreds or even thousands of partitions, we recommend to fetch the data corresponding to these partitions in parallel. You can find an example below about how to first query the index and then retrieve the data corresponding to the IndexPartitions in parallel.

The right level of parallelism depends on the machine that runs the code and the size of the objects to retrieve:

  • If you set the level of parallelism too low, the network bandwidth will not be fully used because of the request execution overhead
  • If you set the parallelism too high, for example more than 200 parallel downloads on a single ActorSystem, you'll start seeing warnings about too much pressure being put on the Akka HTTP connection pool. This happens because the code fills up the number of asynchronous tasks, and the Data Client Library does not provide any backpressure mechanism in this case.

Trying out several levels of parallelism is a good way to get the best download performances. You can start with 10 parallel downloads per machine and increase this number by 10 until you see a degradation of the performances.

Retrieve Index Parts

To perform part queries, perform a QueryApi.queryIndexParts method.

Scala
Java
// Query the index layer with pagination
import scala.concurrent.Await
import scala.concurrent.duration._
//number of parts you want to split all your index partitions in the layer
val numberOfParts = 50
val indexParts =
  Await.result(queryApi.queryIndexParts(indexLayerId, numberOfParts), 10.seconds)
// Query the index layer with pagination
int numberOfParts = 50;

IndexParts indexParts =
    queryApi.queryIndexParts(indexLayerId, numberOfParts).toCompletableFuture().join();

Query an Index layer

To query indexed data, you must provide some search criteria in the RSQL query language.

RSQL supports the following logical operators:

Operator Description
; or and Logical AND
, or or Logical OR

RSQL supports the following comparison operators:

Operator Description
== Equal
!= Not Equal
< or =lt= Less Than
<= or =le= Less or Equal
> or =gt= Greater Than
>= or =ge= Greater or Equal
< or =lt= Less Than

Below are some examples of RSQL expressions:

  • someIntKey==42
  • someStringKey!=abc
  • someStringKey=="Hello World!"
  • someIntKey<100;someBooleanKey==true
  • (someIntKey=gt=23,someStringKey==xyz);someBooleanKey==true

Note that, to make the examples above work, the IndexLayerType of the queried index layer must contain the following IndexDefinition objects:

  • IndexDefinition("someIntKey", IndexType.Int)
  • IndexDefinition("someBooleanKey", IndexType.Boolean)
  • IndexDefinition("someStringKey", IndexType.Boolean)

The code snippet below retrieves the partitions whose someIntKey attribute is above 42 and someStringKey attribute is not "abc". It returns an Akka Source to simplify the parallel fetching of the data corresponding to the returned partitions, see the next subsection Retrieve Indexed Data for more information about how to retrieve data.

Scala
Java
import scala.concurrent.Await
import scala.concurrent.duration._

val queryString = "someIntKey>42;someStringKey!=abc"
val parallelism = 10

val foundIndexPartitionsAsSource =
  Source
    .unfoldAsync[Future[Seq[String]], Source[IndexPartition, NotUsed]](
      Future.successful(indexParts.parts)) {
      _.flatMap {
        case Nil => Future.successful(None)
        case x +: xs =>
          queryApi
            .queryIndex(indexLayerId, queryString, Some(x))
            .map(source => Some(Future.successful(xs) -> source))
      }
    }
    .flatMapConcat(identity)
// How to query the index layer
String queryString = "someIntKey>42;someStringKey!=abc";

Source<IndexPartition, NotUsed> indexPartitionsSource =
    Source.unfoldAsync(
            indexParts.getParts(),
            parts -> {
              if (!parts.isEmpty()) {
                CompletionStage<Source<IndexPartition, NotUsed>> eventualSource =
                    queryApi.queryIndex(
                        indexLayerId, queryString, parts.stream().findFirst().get());
                return eventualSource.thenApply(
                    source ->
                        Optional.of(Pair.create(parts.subList(1, parts.size()), source)));
              } else {
                return CompletableFuture.completedFuture(Optional.empty());
              }
            })
        .flatMapConcat(s -> s);

For more information about the format of and constraints on the queries, see also the Get the Data Handle section of the Data API Developer Guide.

Retrieve Indexed Data

The previous subsection Query an Index layer showed how to query an index layer using QueryApi.queryIndex. The code snippet below illustrates how to retrieve the data corresponding to each partition in parallel from the IndexPartitions returned by QueryApi.queryIndex using Akka streams and ReadEngine.getDataAsBytes:

Scala
Java
println(
  "Download the data corresponding to the index partitions previously found by the queryIndex method")

implicit val materializer: ActorMaterializer = ActorMaterializer()
def youCanProcessTheDataHere(byteData: Array[Byte]): Unit = ???

foundIndexPartitionsAsSource
  .mapAsyncUnordered(parallelism)(partition => readEngine.getDataAsBytes(partition))
  .runForeach((byteData: Array[Byte]) => youCanProcessTheDataHere(byteData))
  .await

println("Computation finished. Shutting down the HTTP connections and the actor system.")
Await.ready(CoordinatedShutdown(actorSystem).run(UnknownReason), Duration.Inf)
ActorMaterializer actorMaterializer = ActorMaterializer.create(actorSystem);

System.out.println(
    "Download the data corresponding to the index partitions previously found by the queryIndex method");

int parallelism = 10;
indexPartitionsSource
    .mapAsyncUnordered(parallelism, readEngine::getDataAsBytes)
    // Replace the method youCanProcessTheDataHere with your own code.
    .runForeach(this::youCanProcessTheDataHere, actorMaterializer)
    .toCompletableFuture()
    .join();

System.out.println(
    "Computation finished. Shutting down the HTTP connections and the actor system.");
CoordinatedShutdown.get(actorSystem)
    .runAll(CoordinatedShutdown.unknownReason())
    .toCompletableFuture()
    .join();

results matching ""

    No results matching ""