Get Metadata

To get information about catalogs, layers, and partitions, add the data-client module as a dependency to your project.

Discover Catalog Versions

Includes methods that are used for fetching versions or version information of a catalog.

Note

Version uses for a version Layer
Only a versioned layer have versioning. Examples that show below gives information about a special version or range of versions that can be used for fetching metadata through methods as getPartitions, getPartitionsById, getPartitionsAsIterator, getChanges, getChangesById, getChangesAsIterator in QueryApi instance.

To find the latest version of a catalog, add the following:

Scala
Java
// create queryApi for source catalog
val queryApi = DataClient().queryApi(catalogHrn, settings)

// fetch latest version of catalog
queryApi.getLatestVersion(None).flatMap {
  case Some(version) =>
    println(s"Query catalog for $version")
    Future.successful(Done)

  case None =>
    println(s"Catalog has no versions")
    Future.successful(Done)
}
// create queryApi for source catalog
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);

// fetch latest version of catalog
queryApi
    .getLatestVersion(OptionalLong.empty())
    .thenAccept(
        version -> {
          if (version.isPresent())
            System.out.println("Catalog latest version is: " + version.getAsLong());
          else System.out.println("Catalog has no versions");
        });

To get information about a specific catalog version, add the following:

Scala
Java
// fetch version information for specific version of catalog
queryApi.getVersion(0).map { versionInfo =>
  println(s"Query catalog for $versionInfo")
}
// fetch version information for specific version of catalog
queryApi
    .getVersion(0L)
    .thenAccept(
        versionInfo -> {
          System.out.println("Query catalog for " + versionInfo);
        });

To get information on a range of catalog versions, add the following:

Scala
Java
// fetch version information for version range (0, 10)
queryApi.getVersions(0, 10).map { versionInfos =>
  versionInfos.foreach(println)
}
// fetch version details for version range (0, 10)
queryApi
    .getVersions(0L, 10L)
    .thenAccept(versionInfos -> System.out.println("Catalog versions: " + versionInfos));

Note

Using explicit versions
Since one processing job often makes several calls to the HERE platform, explicitly define a specific catalog version when getting data. Using a specific catalog version ensures possible changes in a catalog do not unexpectedly change the information in responses to subsequent requests.

There is also an API to retrieve versions of a catalog that are compatible with a set of versions of other catalogs (dependencies).

A catalog version is compatible when it has one of the following properties:

  • It (directly or indirectly) depends on at least one of the catalogs in dependencies and all these dependencies are on the exact catalog version specified in dependencies
  • It doesn't depend on any of the catalogs in dependencies

To get all compatible versions ordered from newest to oldest, add the following:

Scala
Java
// Get all dependencies of a specific version of our catalog
val dependencies: Future[Set[CatalogVersion]] = queryApi
  .getVersion(0)
  .map(_.dependencies.map(d => CatalogVersion(d.hrn, d.version)).toSet)

val otherQueryApi = DataClient().queryApi(otherHrn)
// Find all versions of `otherHrn` that are compatible with our catalog version
dependencies
  .flatMap(otherQueryApi.getCompatibleVersionsAsIterator)
  .map { compatibleVersions: Iterator[CompatibleVersion] =>
    compatibleVersions.foreach(println)
  }
// get all dependencies of a specific version of our catalog
CompletionStage<Set<CatalogVersion>> dependencies =
    queryApi
        .getVersion(0L)
        .thenApply(
            versionInfo ->
                versionInfo.getDependencies().stream()
                    .map(d -> new CatalogVersion(d.getHrn(), d.getVersion()))
                    .collect(Collectors.toSet()));

// create queryApi for another catalog
final QueryApi otherQueryApi = DataClient.get(myActorSystem).queryApi(otherHrn);

// find all versions of `otherHrn` that are compatible with our catalog version
dependencies
    .thenCompose(otherQueryApi::getCompatibleVersionsAsIterator)
    .thenAccept(
        compatibleVersions ->
            compatibleVersions.forEachRemaining(
                compatibleVersion -> System.out.println(compatibleVersion.toString())));

Get Partitions from a Volatile Layer

To get specific partitions from a volatile layer, use VolatilePartitionsFilter.

To get partitions from a volatile layer that have changed since a specific time:

Scala
Java
// create queryApi for source catalog
  import com.here.platform.data.client.common.VolatilePartitionsFilter._
  val queryApi = DataClient().queryApi(catalogHrn, settings)
  val layerName = ""
  val timestamp = 0L // specific timestamp
  val partition1 = ""
  val partition2 = ""
  // define a function how to process partitions
  def processPartitions(partitions: Source[Partition, NotUsed]): Future[Done] =
    partitions.runForeach(println).andThen {
      case Success(_) => println("Done")
      case Failure(exception) => println(s"Failed with $exception")
    }

  //  get changes of volatile layer since specific time and for specific partitions
  queryApi
    .getVolatilePartitions(layerName, since(timestamp) and byIds(Set(partition1, partition2)))
    .flatMap(processPartitions)
}
// create queryApi for source catalog
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);
long sinceTime = 0;
String layerName = "volatileLayer";
String partition1 = "partition1";
String partition2 = "partition2";
Set<String> partitions = new HashSet<String>();
partitions.add(partition1);
partitions.add(partition2);

VolatilePartitionsFilter.Builder builder = new VolatilePartitionsFilter.Builder();
builder.withIds(partitions).withSinceTimestamp(sinceTime);
//  get changes of volatile layer since specific time and for specific partitions
Source<Partition, NotUsed> changesSource =
    queryApi
        .getVolatilePartitions(layerName, builder.build(), AdditionalFields.AllFields())
        .toCompletableFuture()
        .get();
changesSource
    .runForeach(
        partition -> {
          // define a function how to process partitions
          System.out.println(partition);
        },
        myMaterializer)
    .whenCompleteAsync(
        (result, e) -> {
          if (e != null) {
            e.printStackTrace();
          } else {
            System.out.println("DONE!");
          }
        });

Note

Filter VolatilePartitionsFilter used to create a combination of simple filters for fetch data from Volatile layer. That can be filtered by partition id and by since time. Method getVolatilePartitions has a twin that returns iteration of partitions getVolatilePartitionsAsIterator.

Read Partitions from a Version Layer

When you have a catalog HRN and initial setup environment ready, you can get metadata from the HERE platform.

To read the latest version of partitions in a catalog for a specific layer, add the following:

Scala
Java
// create queryApi for source catalog
val queryApi = DataClient().queryApi(catalogHrn, settings)

// define a function how to process partitions
def processPartitions(partitions: Source[Partition, NotUsed]): Future[Done] =
  partitions.runForeach(println).andThen {
    case Success(_) => println("Done")
    case Failure(exception) => println(s"Failed with $exception")
  }

// fetches partitions for a specific version of a versioned layer in catalog
def fetchPartitions(catalogVersion: Long, layer: String): Future[Source[Partition, NotUsed]] =
  queryApi.getPartitions(catalogVersion, layer, AdditionalFields.All)

// It is possible to fetch partitions only if catalog has some versions
// It is not possible to fetch partitions for empty catalog
queryApi.getLatestVersion(None).flatMap {
  case Some(version) =>
    println(s"Query catalog for $version")

    for {
      partitions <- fetchPartitions(version, layer)
      _ <- processPartitions(partitions)
    } yield Done

  case None =>
    println(s"Catalog has no versions")
    Future.successful(Done)
}
// create queryApi for source catalog
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);

// get latest version
Long catalogVersion =
    queryApi
        .getLatestVersion(OptionalLong.empty())
        .toCompletableFuture()
        .get()
        .orElseThrow(() -> new Exception("Catalog has no version"));

// fetches partitions for a specific version of a versioned layer in catalog
queryApi
    .getPartitions(catalogVersion, layer, AdditionalFields.AllFields())
    .thenCompose(
        partitions ->
            partitions
                .runForeach(System.out::println, myMaterializer)
                .whenCompleteAsync(
                    (result, e) -> {
                      if (e != null) {
                        e.printStackTrace();
                      } else {
                        System.out.println("DONE!");
                      }
                    }));

To download all partitions for all layers in a catalog, use the fetchPartitions function as follows:

Scala
def fetchPartitions(catalogVersion: Long): Future[Source[Partition, NotUsed]] =
  queryApi.getConfiguration().map { config =>
    // for each a versioned layer in list of layers in catalog configuration fetch all partitions from server
    Source(config.layers.toList)
      .mapAsync(1) { layer =>
        queryApi.getPartitions(catalogVersion, layer.name, AdditionalFields.All)
      }
      .flatMapConcat(identity)
  }

Get Changes from a Version Layer

To incrementally process data, fetch metadata to determine which partitions change between specified versions in a catalog.

To get changes for all layers in a catalog, add the following:

Scala
Java
// create queryApi for source catalog
val queryApi = DataClient().queryApi(catalogHrn, settings)

// define a function how to process partitions
def processPartitions(partitions: Source[Partition, NotUsed]): Future[Done] =
  partitions.runForeach(println)

// fetches changes partitions for a specific version range of a versioned layer in catalog
def fetchPartitions(startVersion: Long,
                    endVersion: Long): Future[Source[Partition, NotUsed]] =
  queryApi.getConfiguration().map { config =>
    // for each a versioned layer in list of layers in catalog configuration fetch all partitions from server
    Source(config.layers.toList)
      .mapAsync(1) { layer =>
        val parts = queryApi.getChangesParts(layer.name, 2)
        val source = Source
          .unfoldAsync[Future[Seq[String]], Source[Partition, NotUsed]](parts.map(_.parts)) {
            input =>
              input.flatMap {
                case Nil => Future.successful(None)
                case head :: tail =>
                  queryApi
                    .getChanges(startVersion,
                                endVersion,
                                layer.name,
                                AdditionalFields.All,
                                Some(head))
                    .map { source =>
                      Some(Future.successful(tail) -> source)
                    }
              }
          }
          .flatMapConcat(identity)
        Future(source)
      }
      .flatMapConcat(identity)
  }

// latest version that was already processed
val startVersion = 1L

queryApi.getLatestVersion(None).flatMap {
  case Some(endVersion) =>
    println(s"Query catalog for changes for ($startVersion, $endVersion]")

    for {
      partitions <- fetchPartitions(startVersion, endVersion)
      _ <- processPartitions(partitions)
    } yield Done

  case None =>
    println(s"Catalog has no versions")
    Future.successful(Done)
}
// create queryApi for source catalog
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);

// latest version that was already processed
Long startVersion = 1L;

queryApi
    .getLatestVersion(OptionalLong.empty())
    .thenCompose(
        maybeEndVersion -> {
          if (maybeEndVersion.isPresent()) {
            Long endVersion = maybeEndVersion.getAsLong();
            System.out.println(
                "Query catalog for changes for (" + startVersion + ", " + endVersion + ")");

            return queryApi
                .getConfiguration()
                .thenCompose(
                    config -> {
                      // for each a versioned layer in list of layers in catalog configuration
                      // fetch all partitions from server
                      Source<Partition, NotUsed> changesSource =
                          Source.from(config.getLayers())
                              .map(Layer::getName)
                              .mapAsync(
                                  1,
                                  layer -> {
                                    // fetches changes partitions for a specific version range
                                    // in catalog
                                    return queryApi.getChanges(
                                        startVersion,
                                        endVersion,
                                        layer,
                                        AdditionalFields.AllFields());
                                  })
                              .flatMapConcat(i -> i);

                      return changesSource
                          .runForeach(
                              partition -> {
                                // define a function how to process partitions
                                System.out.println(partition);
                              },
                              myMaterializer)
                          .whenCompleteAsync(
                              (result, e) -> {
                                if (e != null) {
                                  e.printStackTrace();
                                } else {
                                  System.out.println("DONE!");
                                }
                              });
                    });
          } else {
            System.out.println("Catalog has no versions");
            return CompletableFuture.completedFuture(Done.getInstance());
          }
        });

Get Additional Fields for Partitions in a Version Layer

By default, the HERE platform only includes some base information on partitions in responses. The base information includes the layer and partition. To get additional fields, such as dataSize and checksum, you need to explicitly request them.

Note

Responses only include information on additional fields if these fields were included when the catalog was published. If these fields are not included during publication, the responses do not include these fields even if you request them.

To request information on additional fields, add the following:

Scala
Java
// fetches partitions for a specific version of a versioned layer in catalog
def fetchPartitions(catalogVersion: Long, layer: String): Future[Source[Partition, NotUsed]] =
  queryApi.getPartitions(catalogVersion,
                         layer,
                         Set(AdditionalField.Checksum, AdditionalField.DataSize))
// fetches partitions for a specific version of a versioned layer in catalog
Set<AdditionalField> fields = new HashSet<AdditionalField>();
fields.add(AdditionalField.Checksum);
queryApi.getPartitions(catalogVersion, layer, fields);

Get metadata of an object in the Object Store layer

For objects in the Object Store layer, metadata information is available via ObjectMetadata. There are two metadata fields that are available: lastModified and size.

To get the metadata of an object in the Object Store layer, add the following:

Scala
Java
// create readEngine
val readEngine = DataEngine().readEngine(catalogHrn, settings)

// get object metadata
val objectMetadata =
  readEngine
    .getObjectMetadata(layer, key)
// create readEngine
ReadEngine readEngine = DataEngine.get(myActorSystem).readEngine(catalogHrn);

// get object metadata
CompletionStage<ObjectMetadata> dataAsSource = readEngine.getObjectMetadata(layer, key);

List objects in the Object Store layer

Objects in the Object Store layer can be either listed under the root of the layer or under a prefix. The list result returns an Akka source of ObjectStoreListItem. ObjectStoreListItem contains the following fields:

  • name: Name of the key.
  • keyType: Can either be ObjectStoreKeyType.Object or ObjectStoreKeyType.CommonPrefix.
    • ObjectStoreKeyType.Object An object that was uploaded under the specified key.
    • ObjectStoreKeyType.CommonPrefix A commonPrefix under the given path. For example if two objects: "test-key1" & "test-key2/test-key3" are uploaded at the root of the layer; and when you list at the root, then the result will include two elements, one with name: "test-key1" and keyType: ObjectStoreKeyType.Object, the second one with name: "test-key2" and keyType: ObjectStoreKeyType.CommonPrefix.
  • lastModified: This field defines when the object was last modified in the ZonedDateTime format. This field is only present, if the keyType is ObjectStoreKeyType.Object.
  • size: This field represents the size of the object. This field is only present if the keyType is ObjectStoreKeyType.Object.

To list objects at the root of Object Store layer, add the following:

Scala
Java
// list objects as Source
val objectList =
  readEngine
    .listObjects(layer, None)

// list objects as Collection
val objectListAsCollection =
  readEngine
    .listObjectsAsCollection(layer, None)
// list objects as Source
Source<ObjectStoreListItem, NotUsed> objectList = readEngine.listObjects(layer, "");

// list objects as Collection
CompletionStage<List<ObjectStoreListItem>> objectListAsCollection =
    readEngine.listObjectsAsCollection(layer, "");

To list objects under a prefix in the Object Store layer, add the following:

Scala
Java
val objectListUnderPrefix =
  readEngine
    .listObjects(layer, Some(prefix))

val objectListUnderPrefixAsCollection =
  readEngine
    .listObjectsAsCollection(layer, Some(prefix))
Source<ObjectStoreListItem, NotUsed> objectListUnderPrefix =
    readEngine.listObjects(layer, prefix);

Note

As a user you always get the full list either as a Source or a List. However, the default page size which is used internally is 1000 and if you have an ObjectStore folder which has 1 million objects then it would fire a 1000 requests. This is slow. So, if you already know or you suspect that there is a large number of objects you can optimize the runtime very much by defining a higher value for the pageSize parameter.

results matching ""

    No results matching ""