To get information about catalogs, layers, and partitions, add the data-client
module as a dependency to your project.
Discover Catalog Versions
Includes methods that are used for fetching versions or version information of a catalog.
Note
Version uses for a version Layer
Only a versioned layer have versioning. Examples that show below gives information about a special version or range of versions that can be used for fetching metadata through methods as getPartitions
, getPartitionsById
, getPartitionsAsIterator
, getChanges
, getChangesById
, getChangesAsIterator
in QueryApi
instance.
To find the latest version of a catalog, add the following:
val queryApi = DataClient().queryApi(catalogHrn, settings)
queryApi.getLatestVersion(None).flatMap {
case Some(version) =>
println(s"Query catalog for $version")
Future.successful(Done)
case None =>
println(s"Catalog has no versions")
Future.successful(Done)
}
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);
queryApi
.getLatestVersion(OptionalLong.empty())
.thenAccept(
version -> {
if (version.isPresent())
System.out.println("Catalog latest version is: " + version.getAsLong());
else System.out.println("Catalog has no versions");
});
To get information about a specific catalog version, add the following:
queryApi.getVersion(0).map { versionInfo =>
println(s"Query catalog for $versionInfo")
}
queryApi
.getVersion(0L)
.thenAccept(
versionInfo -> {
System.out.println("Query catalog for " + versionInfo);
});
To get information on a range of catalog versions, add the following:
queryApi.getVersions(0, 10).map { versionInfos =>
versionInfos.foreach(println)
}
queryApi
.getVersions(0L, 10L)
.thenAccept(versionInfos -> System.out.println("Catalog versions: " + versionInfos));
Note
Using explicit versions
Since one processing job often makes several calls to the HERE platform, explicitly define a specific catalog version when getting data. Using a specific catalog version ensures possible changes in a catalog do not unexpectedly change the information in responses to subsequent requests.
There is also an API to retrieve versions of a catalog that are compatible with a set of versions of other catalogs (dependencies
).
A catalog version is compatible when it has one of the following properties:
- It (directly or indirectly) depends on at least one of the catalogs in
dependencies
and all these dependencies are on the exact catalog version specified in dependencies
- It doesn't depend on any of the catalogs in
dependencies
To get all compatible versions ordered from newest to oldest, add the following:
val dependencies: Future[Set[CatalogVersion]] = queryApi
.getVersion(0)
.map(_.dependencies.map(d => CatalogVersion(d.hrn, d.version)).toSet)
val otherQueryApi = DataClient().queryApi(otherHrn)
dependencies
.flatMap(otherQueryApi.getCompatibleVersionsAsIterator)
.map { compatibleVersions: Iterator[CompatibleVersion] =>
compatibleVersions.foreach(println)
}
CompletionStage<Set<CatalogVersion>> dependencies =
queryApi
.getVersion(0L)
.thenApply(
versionInfo ->
versionInfo.getDependencies().stream()
.map(d -> new CatalogVersion(d.getHrn(), d.getVersion()))
.collect(Collectors.toSet()));
final QueryApi otherQueryApi = DataClient.get(myActorSystem).queryApi(otherHrn);
dependencies
.thenCompose(otherQueryApi::getCompatibleVersionsAsIterator)
.thenAccept(
compatibleVersions ->
compatibleVersions.forEachRemaining(
compatibleVersion -> System.out.println(compatibleVersion.toString())));
Get Partitions from a Volatile Layer
To get specific partitions from a volatile layer, use VolatilePartitionsFilter
.
To get partitions from a volatile layer that have changed since a specific time:
import com.here.platform.data.client.common.VolatilePartitionsFilter._
val queryApi = DataClient().queryApi(catalogHrn, settings)
val layerName = ""
val timestamp = 0L
val partition1 = ""
val partition2 = ""
def processPartitions(partitions: Source[Partition, NotUsed]): Future[Done] =
partitions.runForeach(println).andThen {
case Success(_) => println("Done")
case Failure(exception) => println(s"Failed with $exception")
}
queryApi
.getVolatilePartitions(layerName, since(timestamp) and byIds(Set(partition1, partition2)))
.flatMap(processPartitions)
}
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);
long sinceTime = 0;
String layerName = "volatileLayer";
String partition1 = "partition1";
String partition2 = "partition2";
Set<String> partitions = new HashSet<String>();
partitions.add(partition1);
partitions.add(partition2);
VolatilePartitionsFilter.Builder builder = new VolatilePartitionsFilter.Builder();
builder.withIds(partitions).withSinceTimestamp(sinceTime);
Source<Partition, NotUsed> changesSource =
queryApi
.getVolatilePartitions(layerName, builder.build(), AdditionalFields.AllFields())
.toCompletableFuture()
.get();
changesSource
.runForeach(
partition -> {
System.out.println(partition);
},
myMaterializer)
.whenCompleteAsync(
(result, e) -> {
if (e != null) {
e.printStackTrace();
} else {
System.out.println("DONE!");
}
});
Note
Filter VolatilePartitionsFilter
used to create a combination of simple filters for fetch data from Volatile layer. That can be filtered by partition id and by since time. Method getVolatilePartitions
has a twin that returns iteration of partitions getVolatilePartitionsAsIterator
.
Read Partitions from a Version Layer
When you have a catalog HRN and initial setup environment ready, you can get metadata from the HERE platform.
To read the latest version of partitions in a catalog for a specific layer, add the following:
val queryApi = DataClient().queryApi(catalogHrn, settings)
def processPartitions(partitions: Source[Partition, NotUsed]): Future[Done] =
partitions.runForeach(println).andThen {
case Success(_) => println("Done")
case Failure(exception) => println(s"Failed with $exception")
}
def fetchPartitions(catalogVersion: Long, layer: String): Future[Source[Partition, NotUsed]] =
queryApi.getPartitions(catalogVersion, layer, AdditionalFields.All)
queryApi.getLatestVersion(None).flatMap {
case Some(version) =>
println(s"Query catalog for $version")
for {
partitions <- fetchPartitions(version, layer)
_ <- processPartitions(partitions)
} yield Done
case None =>
println(s"Catalog has no versions")
Future.successful(Done)
}
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);
Long catalogVersion =
queryApi
.getLatestVersion(OptionalLong.empty())
.toCompletableFuture()
.get()
.orElseThrow(() -> new Exception("Catalog has no version"));
queryApi
.getPartitions(catalogVersion, layer, AdditionalFields.AllFields())
.thenCompose(
partitions ->
partitions
.runForeach(System.out::println, myMaterializer)
.whenCompleteAsync(
(result, e) -> {
if (e != null) {
e.printStackTrace();
} else {
System.out.println("DONE!");
}
}));
To download all partitions for all layers in a catalog, use the fetchPartitions
function as follows:
def fetchPartitions(catalogVersion: Long): Future[Source[Partition, NotUsed]] =
queryApi.getConfiguration().map { config =>
Source(config.layers.toList)
.mapAsync(1) { layer =>
queryApi.getPartitions(catalogVersion, layer.name, AdditionalFields.All)
}
.flatMapConcat(identity)
}
Get Changes from a Version Layer
To incrementally process data, fetch metadata to determine which partitions change between specified versions in a catalog.
To get changes for all layers in a catalog, add the following:
val queryApi = DataClient().queryApi(catalogHrn, settings)
def processPartitions(partitions: Source[Partition, NotUsed]): Future[Done] =
partitions.runForeach(println)
def fetchPartitions(startVersion: Long,
endVersion: Long): Future[Source[Partition, NotUsed]] =
queryApi.getConfiguration().map { config =>
Source(config.layers.toList)
.mapAsync(1) { layer =>
val parts = queryApi.getChangesParts(layer.name, 2)
val source = Source
.unfoldAsync[Future[Seq[String]], Source[Partition, NotUsed]](parts.map(_.parts)) {
input =>
input.flatMap {
case Nil => Future.successful(None)
case head :: tail =>
queryApi
.getChanges(startVersion,
endVersion,
layer.name,
AdditionalFields.All,
Some(head))
.map { source =>
Some(Future.successful(tail) -> source)
}
}
}
.flatMapConcat(identity)
Future(source)
}
.flatMapConcat(identity)
}
val startVersion = 1L
queryApi.getLatestVersion(None).flatMap {
case Some(endVersion) =>
println(s"Query catalog for changes for ($startVersion, $endVersion]")
for {
partitions <- fetchPartitions(startVersion, endVersion)
_ <- processPartitions(partitions)
} yield Done
case None =>
println(s"Catalog has no versions")
Future.successful(Done)
}
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);
Long startVersion = 1L;
queryApi
.getLatestVersion(OptionalLong.empty())
.thenCompose(
maybeEndVersion -> {
if (maybeEndVersion.isPresent()) {
Long endVersion = maybeEndVersion.getAsLong();
System.out.println(
"Query catalog for changes for (" + startVersion + ", " + endVersion + ")");
return queryApi
.getConfiguration()
.thenCompose(
config -> {
Source<Partition, NotUsed> changesSource =
Source.from(config.getLayers())
.map(Layer::getName)
.mapAsync(
1,
layer -> {
return queryApi.getChanges(
startVersion,
endVersion,
layer,
AdditionalFields.AllFields());
})
.flatMapConcat(i -> i);
return changesSource
.runForeach(
partition -> {
System.out.println(partition);
},
myMaterializer)
.whenCompleteAsync(
(result, e) -> {
if (e != null) {
e.printStackTrace();
} else {
System.out.println("DONE!");
}
});
});
} else {
System.out.println("Catalog has no versions");
return CompletableFuture.completedFuture(Done.getInstance());
}
});
Get Additional Fields for Partitions in a Version Layer
By default, the HERE platform only includes some base information on partitions in responses. The base information includes the layer
and partition
. To get additional fields, such as dataSize
and checksum
, you need to explicitly request them.
Note
Responses only include information on additional fields if these fields were included when the catalog was published. If these fields are not included during publication, the responses do not include these fields even if you request them.
To request information on additional fields, add the following:
def fetchPartitions(catalogVersion: Long, layer: String): Future[Source[Partition, NotUsed]] =
queryApi.getPartitions(catalogVersion,
layer,
Set(AdditionalField.Checksum, AdditionalField.DataSize))
Set<AdditionalField> fields = new HashSet<AdditionalField>();
fields.add(AdditionalField.Checksum);
queryApi.getPartitions(catalogVersion, layer, fields);
For objects in the Object Store layer, metadata information is available via ObjectMetadata
. There are two metadata fields that are available: lastModified
and size
.
To get the metadata of an object in the Object Store layer, add the following:
val readEngine = DataEngine().readEngine(catalogHrn, settings)
val objectMetadata =
readEngine
.getObjectMetadata(layer, key)
ReadEngine readEngine = DataEngine.get(myActorSystem).readEngine(catalogHrn);
CompletionStage<ObjectMetadata> dataAsSource = readEngine.getObjectMetadata(layer, key);
List objects in the Object Store layer
Objects in the Object Store layer can be either listed under the root of the layer or under a prefix. The list result returns an Akka source of ObjectStoreListItem
. ObjectStoreListItem
contains the following fields:
-
name
: Name of the key. -
keyType
: Can either be ObjectStoreKeyType.Object
or ObjectStoreKeyType.CommonPrefix
. -
ObjectStoreKeyType.Object
An object that was uploaded under the specified key. -
ObjectStoreKeyType.CommonPrefix
A commonPrefix under the given path. For example if two objects: "test-key1" & "test-key2/test-key3" are uploaded at the root of the layer; and when you list at the root, then the result will include two elements, one with name
: "test-key1" and keyType
: ObjectStoreKeyType.Object
, the second one with name
: "test-key2" and keyType
: ObjectStoreKeyType.CommonPrefix
.
-
lastModified
: This field defines when the object was last modified in the ZonedDateTime
format. This field is only present, if the keyType
is ObjectStoreKeyType.Object
. -
size
: This field represents the size of the object. This field is only present if the keyType
is ObjectStoreKeyType.Object
.
To list objects at the root of Object Store layer, add the following:
val objectList =
readEngine
.listObjects(layer, None)
val objectListAsCollection =
readEngine
.listObjectsAsCollection(layer, None)
Source<ObjectStoreListItem, NotUsed> objectList = readEngine.listObjects(layer, "");
CompletionStage<List<ObjectStoreListItem>> objectListAsCollection =
readEngine.listObjectsAsCollection(layer, "");
To list objects under a prefix in the Object Store layer, add the following:
val objectListUnderPrefix =
readEngine
.listObjects(layer, Some(prefix))
val objectListUnderPrefixAsCollection =
readEngine
.listObjectsAsCollection(layer, Some(prefix))
Source<ObjectStoreListItem, NotUsed> objectListUnderPrefix =
readEngine.listObjects(layer, prefix);
Note
As a user you always get the full list either as a Source
or a List
. However, the default page size which is used internally is 1000 and if you have an ObjectStore
folder which has 1 million objects then it would fire a 1000 requests. This is slow. So, if you already know or you suspect that there is a large number of objects you can optimize the runtime very much by defining a higher value for the pageSize
parameter.