Get Data

To get data from catalogs, add the data-engine module as a dependencies to your project.

The data-engine module provides high level abstractions on top of the data-client when working with the HERE platform data. This module can read and manage both metadata and data.

The platform supports three types of data layers: versioned, volatile, and stream.

Versioned Layers

The data in versioned layers is available as long as the specified version of the catalog exists. This means you can cache fetched blobs on the client side.

To get data (a blob) for a stream of partitions belonging to a versioned layer, add the following:

Scala
Java
val queryApi = DataClient().queryApi(catalogHrn, settings)

// create readEngine for source catalog
val readEngine = DataEngine().readEngine(catalogHrn, settings)

// stream of tuple of (partition, bytes)
val dataAsBytes: Future[Source[(Partition, Array[Byte]), NotUsed]] =
  queryApi
    .getPartitions(version, layer)
    .map { partitions =>
      // parallelism defines how many parallel requests would be made to fetch the data
      partitions.mapAsync(parallelism = 10) { partition =>
        readEngine.getDataAsBytes(partition).map { data =>
          (partition, data)
        }
      }
    }
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);

// parallelism defines how many parallel requests would be made to fetch the data
int parallelism = 10;

// create readEngine for source catalog
ReadEngine readEngine = DataEngine.get(myActorSystem).readEngine(catalogHrn);

// stream of tuple of (partition, bytes)
CompletionStage<Source<Pair<Partition, byte[]>, NotUsed>> dataAsBytes =
    queryApi
        .getPartitions(catalogVersion, layer, AdditionalFields.AllFields())
        .thenApply(
            metadata ->
                metadata.mapAsync(
                    parallelism,
                    partition ->
                        readEngine
                            .getDataAsBytes(partition)
                            .thenApply(data -> new Pair<>(partition, data))));

To get data as an Akka source, add the following:

Scala
Java
// fetch data as lazy source of data
val dataAsSource: Future[Source[Source[ByteString, NotUsed], NotUsed]] =
  queryApi
    .getPartitions(version, layer)
    .map { partitions =>
      partitions.mapAsync(parallelism = 10) { partition =>
        readEngine.getDataAsSource(partition)
      }
    }
// fetch data as lazy source of data
CompletionStage<Source<Source<ByteString, NotUsed>, NotUsed>> dataAsSource =
    queryApi
        .getPartitions(catalogVersion, layer, AdditionalFields.AllFields())
        .thenApply(
            partitions ->
                partitions.mapAsync(
                    parallelism, partition -> readEngine.getDataAsSource(partition)));

To transform data into a custom object, add the following:

Scala
Java
// fetch data mapped directly to custom domain object
val data: Future[Source[CustomDomainObject, NotUsed]] =
  queryApi
    .getPartitions(version, layer)
    .map { partitions =>
      partitions.mapAsync(parallelism = 10) { partition =>
        readEngine.get(partition, bytes => CustomDomainObject.fromBytes(bytes))
      }
    }
// fetch data mapped directly to custom domain object
CompletionStage<Source<JavaCustomDomainObject, NotUsed>> data =
    queryApi
        .getPartitions(catalogVersion, layer, AdditionalFields.AllFields())
        .thenApply(
            partitions ->
                partitions.mapAsync(
                    parallelism,
                    partition ->
                        readEngine.get(
                            partition, bytes -> JavaCustomDomainObject.fromBytes(bytes))));

Stream Layer

Data in stream layers consists of events pushed to consumers as long as the producer publishes them.

To subscribe to a stream layer, add the following:

Scala
Java
// create queryApi for target catalog
val queryApi = DataClient().queryApi(catalogHrn, settings)

//Create read engine for target catalog
val readEngine = DataEngine().readEngine(catalogHrn, settings)

// define a function how to process payload
def processPayload(data: Array[Byte]): Done = {
  println("Received data: " + data)
  Done
}

// create subscription to stream layer
val subscription: Future[Subscription] =
  queryApi.subscribe(streamingLayerId,
                     ConsumerSettings("consumer-name", consumerId = "consumer-id"))

subscription.foreach { subscription =>
  subscription.partitions
    .mapAsync(parallelism = 10) { partition: Partition =>
      readEngine.getDataAsBytes(partition)
    }
    .map { payload: Array[Byte] =>
      processPayload(payload)
    }
    .runWith(Sink.ignore)
    .andThen {
      case Success(_) => println("Done")
      case Failure(exception) => println(s"Failed with $exception")
    }
}
// create readEngine and queryApi for a catalog
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);
ReadEngine readEngine = DataEngine.get(myActorSystem).readEngine(catalogHrn);

int parallelism = 10;

// subscribe to receive new publications from stream layer
CompletionStage<Subscription> subscriptionFuture =
    queryApi.subscribe(
        layer, new ConsumerSettings.Builder().withGroupName("test-consumer").build());

subscriptionFuture
    .thenApply(
        subscription -> {
          return subscription
              .getPartitions()
              .mapAsync(parallelism, readEngine::getDataAsBytes)
              .map(payload -> processPayload(payload))
              .runWith(Sink.ignore(), myMaterializer);
        })
    .whenCompleteAsync(
        (result, e) -> {
          if (e != null) {
            e.printStackTrace();
          } else {
            System.out.println("DONE!");
          }
        });
// define a function how to process payload
private Done processPayload(byte[] data) {
  System.out.println("Received data: " + data);
  return Done.getInstance();
}

To shutdown your subscription, use SubscriptionControl:

Scala
Java
// shutdown stream subscription when done
subscription.subscriptionControl.shutdown()
// shutdown stream subscription when done
subscription.getSubscriptionControl().shutdown();

To provide a handler/callback function during the subscription process, add the following:

Scala
Java
// create queryApi for target catalog
val queryApi = DataClient().queryApi(catalogHrn, settings)

// define a function how to process partitions
def processPartition(partition: Partition): Unit =
  println("Received partition: " + partition)

// create subscription to stream layer
val subscriptionControl: Future[SubscriptionControl] =
  queryApi.subscribe(streamingLayerId,
                     ConsumerSettings("consumer-name"),
                     partition => processPartition(partition))
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);
CompletionStage<SubscriptionControl> subscriptionFuture =
    queryApi.subscribe(
        "stream-layer",
        new ConsumerSettings.Builder()
            .withGroupName("test-consumer")
            .withConsumerId("consumer-id")
            .build(),
        partition -> processPartition(partition));
// define a function how to process partitions
private void processPartition(Partition partition) {
  System.out.println("Received partition: " + partition);
}

Read Data from Stream Layers on Multiple Workers

Depending on the stream layer throughput configuration, you can set up distributed workers that consume the same stream layer. If all workers share the same consumer group (defined by the ConsumerSetting.groupName during when you create the subscription, and unique ConsumerSetting.consumerId (for http-connector only) for each worker), stream events are distributed between workers. For processing stream layers, use at-least-once delivery semantics to have the same events dispatched to same/different workers.

If a consumer needs to recovery the worker (in case of its failure) , create a new subscription with the same ConsumerSetting.groupName and ConsumerSetting.consumerId (for http-connector only)

If a consumer needs to re-process a stream layer from the beginning, create a new subscription with a different groupName.

Initial Offsets and Checkpoint

Use ConsumerSettings.offset to configure how offsets and checkpoints are managed in your subscription.

  • EarliestOffset: Subscribe for earliest (old) partitions available for a given group name, respecting any previous checkpoint. The checkpoints are generated automatically.

  • ManualOffset: Subscribe for earliest (old) partitions available for a given group name, respecting any previous checkpoint. You must call SubscriptionControl.acknowledge for every received partition. When required, use SubscriptionControl.checkpoint to send offsets to the platform.

  • LatestOffset: Subscribe for latest (new) available partitions. Any data already available or checkpoints are ignored by the subscription.

Volatile Layer

Data in volatile layers can change over time. This means data (a blob) for the same partition can potentially contain different content. Normally volatile layers can represent traffic information, weather, and other similar content.

When using volatile data, performance is often an important factor. To speed-up interactions with the HERE platform, you can cache metadata and keep getting blobs as needed.

To get data from a volatile layer, add the following:

Scala
Java
// create queryApi and readEngine
val queryApi = DataClient().queryApi(catalogHrn, settings)
val readEngine = DataEngine().readEngine(catalogHrn, settings)

// download payload for partition
def downloadData(partition: Partition): Future[Option[String]] =
  readEngine
    .getDataAsBytes(partition)
    .map(bytes => Some(new String(bytes)))
    .recover { case _ => None }

// fetch metadata once, can be cached on the client
val partitions: Future[Source[Partition, NotUsed]] =
  queryApi.getVolatilePartitions(layerId)

// keep reading data for volatile layer as needed
partitions.flatMap { ps: Source[Partition, NotUsed] =>
  ps.mapAsync(parallelism = 10) { partition: Partition =>
      downloadData(partition)
    }
    .runWith(Sink.foreach(println))
}
// create queryApi and readEngine
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);
ReadEngine readEngine = DataEngine.get(myActorSystem).readEngine(catalogHrn);

// download payload for partition
Function<Partition, CompletionStage<Optional<byte[]>>> fetchData =
    (partition) -> {
      return readEngine
          .getDataAsBytes(partition)
          .thenApply(Optional::of)
          .exceptionally(failure -> Optional.empty());
    };

// fetch metadata once, can be cached on the client
CompletionStage<Source<Partition, NotUsed>> partitions =
    queryApi.getVolatilePartitions(
        layer, new VolatilePartitionsFilter.Builder().build(), Collections.emptySet());

int parallelism = 10;

// keep reading data for volatile layer as needed
partitions.thenApply(
    partitionsSource -> {
      return partitionsSource
          .mapAsync(parallelism, fetchData::apply)
          .runWith(Sink.foreach(System.out::println), myMaterializer);
    });

For both functions, getVolatilePartitions and getVolatilePartitionsAsIterator you can pass an optional filter parameter. The VolatilePartitionsFilter can be either empty or a combination of since filter and/or filter byIds. Multiple of such filters can be joined by logical and operator.

For example:

Scala
Java
// create queryApi and readEngine
val queryApi = DataClient().queryApi(catalogHrn, settings)
val readEngine = DataEngine().readEngine(catalogHrn, settings)

// download payload for partition
def downloadData(partition: Partition): Future[Option[String]] =
  readEngine
    .getDataAsBytes(partition)
    .map(bytes => Some(new String(bytes)))
    .recover { case _ => None }

// using this time as an example:
// Friday, October 18, 2019 1:45:20 PM GMT
val timestampSinceEpochInMs = 1571406320000L
val timestampSinceEpochInMsPlusOneHour = timestampSinceEpochInMs + 3600 * 1000

// empty filter means that no filter is applied
val emptyFilter: VolatilePartitionsFilter = VolatilePartitionsFilter.empty

// filter all partitions from timestampSinceEpochInMs to now
val sinceFilter1: VolatilePartitionsFilter =
  VolatilePartitionsFilter.since(timestampSinceEpochInMs)

// combination of two since filters will effectively use the younger/higher timestamp
val sinceFilter2: VolatilePartitionsFilter = VolatilePartitionsFilter.since(
  timestampSinceEpochInMs) and VolatilePartitionsFilter
  .since(timestampSinceEpochInMsPlusOneHour)

// filter partitions with ids 1, 2 and 3
val byIdsFilter1: VolatilePartitionsFilter =
  VolatilePartitionsFilter.byIds(Set("1", "2", "3"))

// combination of two byIds filter results in a filter with the intersection of the ids, in this case 1
val byIdsFilter2: VolatilePartitionsFilter = VolatilePartitionsFilter.byIds(
  Set("1", "2", "3")) and VolatilePartitionsFilter.byIds(Set("1"))

// combination of two byIds filter with non-overlapping sets results in empty ids, so no partition
val byIdsFilter3: VolatilePartitionsFilter = VolatilePartitionsFilter.byIds(
  Set("1", "2", "3")) and VolatilePartitionsFilter.byIds(Set("4"))

// combination of byIds filter and since filter
val combinedFilter
  : VolatilePartitionsFilter = VolatilePartitionsFilter.byIds(Set("1")) and VolatilePartitionsFilter
  .since(timestampSinceEpochInMs)

// fetch metadata once, can be cached on the client
val partitions: Future[Source[Partition, NotUsed]] =
  queryApi.getVolatilePartitions(layerId, combinedFilter)

// keep reading data for volatile layer as needed
partitions.flatMap { ps: Source[Partition, NotUsed] =>
  ps.mapAsync(parallelism = 10) { partition: Partition =>
      downloadData(partition)
    }
    .runWith(Sink.foreach(println))
}
// create queryApi and readEngine
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);
ReadEngine readEngine = DataEngine.get(myActorSystem).readEngine(catalogHrn);

// download payload for partition
Function<Partition, CompletionStage<Optional<byte[]>>> fetchData =
    (partition) -> {
      return readEngine
          .getDataAsBytes(partition)
          .thenApply(Optional::of)
          .exceptionally(failure -> Optional.empty());
    };

// using this time as an example:
// Friday, October 18, 2019 1:45:20 PM GMT
Long timestampSinceEpochInMs = 1571406320000L;
Long timestampSinceEpochInMsPlusOneHour = timestampSinceEpochInMs + 3600 * 1000;

// empty filter means that no filter is applied
VolatilePartitionsFilter emptyFilter = new VolatilePartitionsFilter.Builder().build();

// filter all partitions from timestampSinceEpochInMs to now
VolatilePartitionsFilter sinceFilter1 =
    new VolatilePartitionsFilter.Builder().withSinceTimestamp(timestampSinceEpochInMs).build();

// combination of two since filters will effectively use the younger/higher timestamp
VolatilePartitionsFilter sinceFilter2 =
    new VolatilePartitionsFilter.Builder()
        .withSinceTimestamp(timestampSinceEpochInMs)
        .build()
        .and(
            new VolatilePartitionsFilter.Builder()
                .withSinceTimestamp(timestampSinceEpochInMsPlusOneHour)
                .build());

String partition1 = "1";
String partition2 = "2";
String partition3 = "3";
String partition4 = "4";
Set<String> partitions1 = new HashSet<String>();
partitions1.add(partition1);
Set<String> partitions4 = new HashSet<String>();
partitions4.add(partition4);
Set<String> partitions123 = new HashSet<String>();
partitions123.add(partition1);
partitions123.add(partition2);
partitions123.add(partition3);

// filter partitions with ids 1, 2 and 3
VolatilePartitionsFilter byIdsFilter1 =
    new VolatilePartitionsFilter.Builder().withIds(partitions123).build();

// combination of two byIds filter results in a filter with the intersection of the ids, in this
// case 1
VolatilePartitionsFilter byIdsFilter2 =
    new VolatilePartitionsFilter.Builder()
        .withIds(partitions123)
        .build()
        .and(new VolatilePartitionsFilter.Builder().withIds(partitions1).build());

// combination of two byIds filter with non-overlapping sets results in empty ids, so no
// partition
VolatilePartitionsFilter byIdsFilter3 =
    new VolatilePartitionsFilter.Builder()
        .withIds(partitions123)
        .build()
        .and(new VolatilePartitionsFilter.Builder().withIds(partitions4).build());

// combination of byIds filter and since filter
VolatilePartitionsFilter combinedFilter =
    new VolatilePartitionsFilter.Builder()
        .withIds(partitions1)
        .build()
        .and(
            new VolatilePartitionsFilter.Builder()
                .withSinceTimestamp(timestampSinceEpochInMs)
                .build());

// fetch metadata once, can be cached on the client
CompletionStage<Source<Partition, NotUsed>> partitions =
    queryApi.getVolatilePartitions(layer, combinedFilter, Collections.emptySet());

int parallelism = 10;

// keep reading data for volatile layer as needed
partitions.thenApply(
    partitionsSource -> {
      return partitionsSource
          .mapAsync(parallelism, fetchData::apply)
          .runWith(Sink.foreach(System.out::println), myMaterializer);
    });

You can also use VolatilePartitionsFilter builder.

Fetch Data in Parallel

As shown in the example, use the parallelism parameter to manage how many parallel requests Data Client Library makes to fetch blobs. The optimal value depends on the node configuration, RAM, CPU, and the network. Using more than 100 parallel requests had a negative effect on performance.

Index Layer

To use pagination perform method QueryApi.queryIndexParts. It returns a list of Part Ids which represent the layer parts that can be used to limit the scope of a query operation. This allows to run parallel queries with multiple parts. The user has to provide the desired number of parts and the service will return a list of Part Ids. Please note in some cases the requested number of parts will make them too small and in this case the service might return lesser amount of the parts than requested. You can find example how to fetch index parts in subsection below Retrieve Index Parts.

To retrieve data from the index layer, you must first call the method QueryApi.queryIndex. QueryApi.queryIndex returns the IndexPartitions that matches a given query.

Then, call the method ReadEngine.getDataAsBytes on each IndexPartition to retrieve the corresponding data using the blob API. Given that Fetching the data for one partition takes some time, and that QueryApi.queryIndex may return hundreds or even thousands of partitions, we recommend to fetch the data corresponding to these partitions in parallel. You can find an example below about how to first query the index and then retrieve the data corresponding to the IndexPartitions in parallel.

The right level of parallelism depends on the machine that runs the code and the size of the objects to retrieve:

  • If you set the level of parallelism too low, the network bandwidth will not be fully used because of the request execution overhead
  • If you set the parallelism too high, for example more than 200 parallel downloads on a single ActorSystem, you'll start seeing warnings about too much pressure being put on the Akka HTTP connection pool. This happens because the code fills up the number of asynchronous tasks, and the Data Client Library does not provide any backpressure mechanism in this case.

Trying out several levels of parallelism is a good way to get the best download performances. You can start with 10 parallel downloads per machine and increase this number by 10 until you see a degradation of the performances.

Retrieve Index Parts

To perform part queries, perform a QueryApi.queryIndexParts method.

Scala
Java
// Query the index layer with pagination
import scala.concurrent.Await
import scala.concurrent.duration._
//number of parts you want to split all your index partitions in the layer
val numberOfParts = 50
val indexParts =
  Await.result(queryApi.queryIndexParts(indexLayerId, numberOfParts), 10.seconds)
// Query the index layer with pagination
int numberOfParts = 50;

IndexParts indexParts =
    queryApi.queryIndexParts(indexLayerId, numberOfParts).toCompletableFuture().join();

Query an Index layer

To query indexed data, you must provide some search criteria in the RSQL query language.

RSQL supports the following logical operators:

Operator Description
; or and Logical AND
, or or Logical OR

RSQL supports the following comparison operators:

Operator Description
== Equal
!= Not Equal
< or =lt= Less Than
<= or =le= Less or Equal
> or =gt= Greater Than
>= or =ge= Greater or Equal
< or =lt= Less Than

Below are some examples of RSQL expressions:

  • someIntKey==42
  • someStringKey!=abc
  • someStringKey=="Hello World!"
  • someIntKey<100;someBooleanKey==true
  • (someIntKey=gt=23,someStringKey==xyz);someBooleanKey==true

Note that, to make the examples above work, the IndexLayerType of the queried index layer must contain the following IndexDefinition objects:

  • IndexDefinition("someIntKey", IndexType.Int)
  • IndexDefinition("someBooleanKey", IndexType.Boolean)
  • IndexDefinition("someStringKey", IndexType.Boolean)

The following code snippet retrieves the partitions whose someIntKey attribute is above 42 and someStringKey attribute is not "abc". It returns an Akka Source to simplify the parallel fetching of the data corresponding to the returned partitions, see the next subsection Retrieve Indexed Data for more information about how to retrieve data.

Scala
Java
import scala.concurrent.Await
import scala.concurrent.duration._

val queryString = "someIntKey>42;someStringKey!=abc"
val parallelism = 10

val foundIndexPartitionsAsSource: Source[IndexPartition, NotUsed] =
  Source(indexParts.parts)
    .mapAsync(parallelism) { part =>
      queryApi
        .queryIndex(indexLayerId, queryString, Some(part))
    }
    .flatMapConcat(identity)
// How to query the index layer
String queryString = "someIntKey>42;someStringKey!=abc";

Source<IndexPartition, NotUsed> indexPartitionsSource =
    Source.from(indexParts.getParts())
        .mapAsync(10, part -> queryApi.queryIndex(indexLayerId, queryString, part))
        .flatMapConcat(s -> s);

For more information about the format of and constraints on the queries, see also the Get the Data Handle section of the Data API Developer Guide.

Retrieve Indexed Data

The previous subsection Query an Index layer showed how to query an index layer using QueryApi.queryIndex. The code snippet below illustrates how to retrieve the data corresponding to each partition in parallel from the IndexPartitions returned by QueryApi.queryIndex using Akka streams and ReadEngine.getDataAsBytes:

Scala
Java
println(
  "Download the data corresponding to the index partitions previously found by the queryIndex method")

implicit val materializer: ActorMaterializer = ActorMaterializer()
def youCanProcessTheDataHere(byteData: Array[Byte]): Unit = ???

foundIndexPartitionsAsSource
  .mapAsyncUnordered(parallelism)(partition => readEngine.getDataAsBytes(partition))
  .runForeach((byteData: Array[Byte]) => youCanProcessTheDataHere(byteData))
  .await

println("Computation finished. Shutting down the HTTP connections and the actor system.")
Await.ready(CoordinatedShutdown(actorSystem).run(UnknownReason), Duration.Inf)
ActorMaterializer actorMaterializer = ActorMaterializer.create(actorSystem);

System.out.println(
    "Download the data corresponding to the index partitions previously found by the queryIndex method");

int parallelism = 10;
indexPartitionsSource
    .mapAsyncUnordered(parallelism, readEngine::getDataAsBytes)
    // Replace the method youCanProcessTheDataHere with your own code.
    .runForeach(this::youCanProcessTheDataHere, actorMaterializer)
    .toCompletableFuture()
    .join();

System.out.println(
    "Computation finished. Shutting down the HTTP connections and the actor system.");
CoordinatedShutdown.get(actorSystem)
    .runAll(CoordinatedShutdown.unknownReason())
    .toCompletableFuture()
    .join();

Object Store Layer

A key that you have control over references the data in the Object Store layer. The data is mutable and parallel writes are allowed. That means if multiple writers are writing to the same key in parallel, the last request to finish on the server will win. Client code should be ready to expect this situation when getting the data for that specific key.

To get data as an Akka source, add the following:

Scala
Java
// create readEngine
val readEngine = DataEngine().readEngine(catalogHrn, settings)

// full object as dataSource
val dataAsSource: Future[Source[ByteString, NotUsed]] =
  readEngine
    .getObjectDataAsSource(layer, key)

// full object as an array of bytes
val dataAsBytes: Future[Array[Byte]] =
  readEngine
    .getObjectDataAsBytes(layer, key)

// partial object with provided range as dataSource
val dataAsSourceWithRange: Future[Source[ByteString, NotUsed]] =
  readEngine
    .getObjectDataAsSource(layer, key, ByteRange.fromRange(5, 10))

// partial object with provided range as an Array of bytes
val dataAsBytesWithRange: Future[Array[Byte]] =
  readEngine
    .getObjectDataAsBytes(layer, key, ByteRange.fromRange(5, 10))
// create readEngine
ReadEngine readEngine = DataEngine.get(myActorSystem).readEngine(catalogHrn);

// full object as dataSource
CompletionStage<Source<ByteString, NotUsed>> dataAsSource =
    readEngine.getObjectDataAsSource(layer, key, ByteRange.all());

// partial object with provided range as dataSource
CompletionStage<Source<ByteString, NotUsed>> dataAsSourceWithRange =
    readEngine.getObjectDataAsSource(layer, key, ByteRange.fromRange(5, 10));

// full object as an Array of bytes
CompletionStage<byte[]> dataAsByteArray =
    readEngine.getObjectDataAsBytes(layer, key, ByteRange.all());

// partial object as an Array of bytes with provided range
CompletionStage<byte[]> dataAsByteArrayWithRange =
    readEngine.getObjectDataAsBytes(layer, key, ByteRange.fromRange(5, 10));

Interactive Map Layer

The data in an interactive map layer is available as long as the specified catalog exists.

To get the data from an interactive map layer, use the following methods.

To return all the features found for the provided list of IDs in an interactive map layer, you can use the QueryApi.getFeatureCollectionByIds API call.

The method takes 3 arguments:

  • layer - The layer ID of the layer.
  • ids - List of feature IDs to be retrieved from the interactive map layer.
  • selection - List of properties to be returned in the features result list.

The snippet below demonstrates the usage of the QueryApi.getFeatureCollectionByIds API:

Scala
Java
// create queryApi
val queryApi = DataClient().queryApi(catalogHrn, settings)

val ids = Seq("feature-1", "feature-2")

val response: Future[FeatureCollection] =
  queryApi.getFeatureCollectionByIds(layer, ids, Set.empty)
val featureCollection = Await.result(response, timeout)
// create queryApi
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);

// list of ids to query
List<String> ids = new ArrayList<>(Arrays.asList("feature-1"));

FeatureCollection featureCollection =
    queryApi
        .getFeatureCollectionByIds(layer, ids, Collections.emptySet())
        .toCompletableFuture()
        .join();

To return the features which are inside a bounding box of an interactive map layer, you can use the QueryApi.getFeatureCollectionByBbox API call.

The method takes 5 arguments:

  • layer - The layer ID of the layer.
  • bbox - The bounding box in which the features needs to be searched.
  • searchParam - List of additional feature filters resulting in a subset of features.
  • selection - List of properties to be returned in the features result list.
  • limit - The maximum number of features in the response (Default is 30K and maximum is 100K).

To return the features selected by tile type and tile id of an interactive map layer, you can use the QueryApi.getFeatureCollectionByTile API call.

The method takes 6 arguments:

  • layer - The layer ID of the layer.
  • tileId - The tile ID to be queried.
  • tileType - The type of tile identifier. Available values are quadkey, web, tms and here.
  • searchParam - List of additional feature filters resulting in a subset of features.
  • selection - List of properties to be returned in the features result list.
  • limit - The maximum number of features in the response (Default is 30K and maximum is 100K).

To return the features by search params from an interactive map layer, you can use the QueryApi.getFeatureCollectionBySearchParam API call.

The method takes 4 arguments:

  • layer - The layer ID of the layer.
  • searchParam - List of additional feature filters resulting in a subset of features.
  • selection - List of properties to be returned in the features result list.
  • limit - The maximum number of features in the response (Default is 30K and maximum is 100K).

The snippet below demonstrates the usage of the QueryApi.getFeatureCollectionBySearchParam API:

Scala
Java
// create queryApi
val queryApi = DataClient().queryApi(catalogHrn, settings)

// Adding search parameters to fetch the filtered data
val searchParams = Set(SearchParam("p.prop1", SearchOperator.EQUAL, "some-value1"),
                       SearchParam.fromString("p.prop2>=10"))

// Adding list of properties to be returned in the features
val selection = Set("p.prop1", "p.prop2")

// Adding to limit the size of the reponse features to 100
val limit = 100

val response: Future[FeatureCollection] =
  queryApi.getFeatureCollectionBySearchParam(layer, searchParams, selection, Some(limit))
val featureCollection = Await.result(response, timeout)
// create queryApi
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);

// Adding search parameters to fetch the filtered data
SearchParam searchParam1 = new SearchParam("p.prop1", SearchOperator.EQUAL, "some-value");
SearchParam searchParam2 = SearchParam.fromString("p.prop2>=10");
Set<SearchParam> searchParams = new HashSet();
searchParams.add(searchParam1);
searchParams.add(searchParam2);

// Adding list of properties to be returned in the features
Set<String> selection = new HashSet<>(Arrays.asList("p.prop1", "p.prop2"));

// Adding to limit the size of the reponse features to 100
int limit = 100;

FeatureCollection featureCollection =
    queryApi
        .getFeatureCollectionBySearchParam(
            layer, searchParams, selection, OptionalInt.of(limit))
        .toCompletableFuture()
        .join();

To return the features which are inside the specified circle with the specified latitude and longitude as center, you can use the QueryApi.getFeatureCollectionBySpatialSearchCircle API call. The method takes 7 arguments:

  • layer - The layer ID of the layer.
  • latitude - The latitude in WGS'84 decimal degree (-90 to +90) of the center Point.
  • longitude - The longitude in WGS'84 decimal degree (-180 to +180) of the center Point.
  • radius - Radius in meter of the circle.
  • searchParam - List of additional feature filters resulting in a subset of features.
  • selection - List of properties to be returned in the features result list.
  • limit - The maximum number of features in the response (Default is 30K and maximum is 100K).

To return the features which intersect the specified reference feature's geometry, you can use the QueryApi.getFeatureCollectionBySpatialSearchFeature API call.

The method takes 8 arguments:

  • layer - The layer ID of the layer.
  • refCatalogHrn - The catalog HRN where the layer containing the referenced feature is stored.
  • refLayerId -The layer ID where the referenced feature is stored.
  • refFeatureId -The feature ID in the referenced layer.
  • radius - Radius in meter which is added as a buffer to the geometry.
  • searchParam - List of additional feature filters resulting in a subset of features.
  • selection - List of properties to be returned in the features result list.
  • limit - The maximum number of features in the response (Default is 30K and maximum is 100K).

To return the features which intersect the provided geometry, you can use the QueryApi.getFeatureCollectionBySpatialSearchGeometry API call.

The method takes 6 arguments:

  • layer - The layer ID of the layer.
  • geometry - The geometry which is used as an origin for the search.
  • radius - Radius in meter which is added as a buffer to the geometry.
  • searchParam - List of additional feature filters resulting in a subset of features.
  • selection - List of properties to be returned in the features result list.
  • limit - The maximum number of features in the response (Default is 30K and maximum is 100K).

The snippet below demonstrates the usage of the QueryApi.getFeatureCollectionBySpatialSearchGeometry API:

Scala
Java
// create queryApi
val queryApi = DataClient().queryApi(catalogHrn, settings)

val geometry = Point(coordinates = Some(immutable.Seq(10.0, 12.0)))
val radius = 50

val response: Future[FeatureCollection] =
  queryApi.getFeatureCollectionBySpatialSearchGeometry(layer,
                                                       geometry,
                                                       Some(radius),
                                                       Set.empty,
                                                       Set.empty,
                                                       None)
val featureCollection = Await.result(response, timeout)
// create queryApi
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);

Geometry geometry =
    new Point.Builder().withCoordinates(new ArrayList<>(Arrays.asList(10.0, 12.0))).build();
int radius = 50;

FeatureCollection featureCollection =
    queryApi
        .getFeatureCollectionBySpatialSearchGeometry(
            layer,
            geometry,
            OptionalInt.of(radius),
            Collections.emptySet(),
            Collections.emptySet(),
            OptionalInt.empty())
        .toCompletableFuture()
        .join();

To return the features by iterating over all the features in an interactive map layer, you can use the QueryApi.getFeatureCollectionByIterate API call.

The method takes 4 arguments:

  • layer - The layer ID of the layer.
  • pageToken - The page token where the iteration will continue.
  • selection - List of properties to be returned in the features result list.
  • limit - The maximum number of features in the response (Default is 30K and maximum is 100K).

To return the statistics information of an interactive map layer, you can use the QueryApi.getIMLStatistics API call.

The method takes 1 argument:

  • layer - The layer ID of the layer.

The snippet below demonstrates the usage of the QueryApi.getIMLStatistics API:

Scala
Java
// create queryApi
val queryApi = DataClient().queryApi(catalogHrn, settings)

val response = queryApi.getIMLStatistics(layer)
val statistics = Await.result(response, timeout)

// Check the total features in the layer
println("FeatureCount: " + statistics.getCount.getValue)
// create queryApi
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);

Statistics statistics = queryApi.getIMLStatistics(layer).toCompletableFuture().join();

To export all the features from an interactive map layer, you can use the readEngine.exportIMLFeatures API call.

The method takes 2 argument:

  • layer - The layer ID of the layer.
  • batchSize- The batch size to iterate from an interactive map Layer

The snippet below demonstrates the usage of the readEngine.exportIMLFeatures API:

Scala
Java
// create readEngine
val readEngine = DataEngine().readEngine(catalogHrn, settings)

val batchSize = 100

val futureResponse = readEngine.exportIMLFeatures(layer, Some(batchSize)).runWith(Sink.seq)

val response: Seq[Seq[Feature]] = Await.result(futureResponse, timeout)
// create readEngine
ReadEngine readEngine = DataEngine.get(myActorSystem).readEngine(catalogHrn);

int batchSize = 100;

Source<List<Feature>, NotUsed> responseSource =
    readEngine.exportIMLFeatures(layer, OptionalInt.of(batchSize));

List<List<Feature>> response =
    responseSource.runWith(Sink.seq(), myMaterializer).toCompletableFuture().join();

NOTE: Additional filters can be used with SearchParams resulting in a subset of features. Allowed prefixes to be used for property search are:

  'p.' - used to access values stored in 'properties' property of the feature
  'f.' - used to access values which are added by default in the stored feature,The possible values are: 'f.id', 'f.createdAt' and 'f.updatedAt'.

  Example -
  p.property_1=property_value_1 or
  f.special_property_1=special_property_value_1

User can provide some search criteria in SearchParams using following logical operators:

  "=" - equals
  "!=" - not equals
  ">=" or "=gte=" - greater than or equals
  "<=" or "=lte=" - less than or equals
  ">" or "=gt=" - greater than
  "<" or "=lt=" - less than
  "@>" or "=cs=" - contains

results matching ""

    No results matching ""