Get Metadata

To get information about catalogs, layers, and partitions, add the data-client module as a dependency to your project.

Discover Catalog Versions

Includes methods that are used for fetching versions or version information of a catalog.

Note: Version uses for a Version Layer

Only a versioned layer have versioning. Examples that show below gives information about a special version or range of versions that can be used for fetching metadata through methods as getPartitions, getPartitionsById, getPartitionsAsIterator, getChanges, getChangesById, getChangesAsIterator in QueryApi instance.

To find the latest version of a catalog, add the following:

Scala
Java
// create queryApi for source catalog
val queryApi = DataClient().queryApi(catalogHrn, settings)

// fetch latest version of catalog
queryApi.getLatestVersion(None).flatMap {
  case Some(version) =>
    println(s"Query catalog for $version")
    Future.successful(Done)

  case None =>
    println(s"Catalog has no versions")
    Future.successful(Done)
}
// create queryApi for source catalog
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);

// fetch latest version of catalog
queryApi
    .getLatestVersion(OptionalLong.empty())
    .thenAccept(
        version -> {
          if (version.isPresent())
            System.out.println("Catalog latest version is: " + version.getAsLong());
          else System.out.println("Catalog has no versions");
        });

To get information about a specific catalog version, add the following:

Scala
Java
// fetch version information for specific version of catalog
queryApi.getVersion(0).map { versionInfo =>
  println(s"Query catalog for $versionInfo")
}
// fetch version information for specific version of catalog
queryApi
    .getVersion(0L)
    .thenAccept(
        versionInfo -> {
          System.out.println("Query catalog for " + versionInfo);
        });

To get information on a range of catalog versions, add the following:

Scala
Java
// fetch version information for version range (0, 10)
queryApi.getVersions(0, 10).map { versionInfos =>
  versionInfos.foreach(println)
}
// fetch version details for version range (0, 10)
queryApi
    .getVersions(0L, 10L)
    .thenAccept(versionInfos -> System.out.println("Catalog versions: " + versionInfos));

Note: Using Explicit Versions

Since one processing job often makes several calls to the HERE platform, explicitly define a specific catalog version when getting data. Using a specific catalog version ensures possible changes in a catalog do not unexpectedly change the information in responses to subsequent requests.

There is also an API to retrieve versions of a catalog that are compatible with a set of versions of other catalogs (dependencies).

A catalog version is compatible when it has one of the following properties:

  • It (directly or indirectly) depends on at least one of the catalogs in dependencies and all these dependencies are on the exact catalog version specified in dependencies
  • It doesn't depend on any of the catalogs in dependencies

To get all compatible versions ordered from newest to oldest, add the following:

Scala
Java
// Get all dependencies of a specific version of our catalog
val dependencies: Future[Set[CatalogVersion]] = queryApi
  .getVersion(0)
  .map(_.dependencies.map(d => CatalogVersion(d.hrn, d.version)).toSet)

val otherQueryApi = DataClient().queryApi(otherHrn)
// Find all versions of `otherHrn` that are compatible with our catalog version
dependencies
  .flatMap(otherQueryApi.getCompatibleVersionsAsIterator)
  .map { compatibleVersions: Iterator[CompatibleVersion] =>
    compatibleVersions.foreach(println)
  }
// get all dependencies of a specific version of our catalog
CompletionStage<Set<CatalogVersion>> dependencies =
    queryApi
        .getVersion(0L)
        .thenApply(
            versionInfo ->
                versionInfo
                    .getDependencies()
                    .stream()
                    .map(d -> new CatalogVersion(d.getHrn(), d.getVersion()))
                    .collect(Collectors.toSet()));

// create queryApi for another catalog
final QueryApi otherQueryApi = DataClient.get(myActorSystem).queryApi(otherHrn);

// find all versions of `otherHrn` that are compatible with our catalog version
dependencies
    .thenCompose(otherQueryApi::getCompatibleVersionsAsIterator)
    .thenAccept(
        compatibleVersions ->
            compatibleVersions.forEachRemaining(
                compatibleVersion -> System.out.println(compatibleVersion.toString())));

Get Partitions from a Volatile Layer

To get specific partitions from a volatile layer, use VolatilePartitionsFilter.

To get partitions from a volatile layer that have changed since a specific time:

Scala
Java
// create queryApi for source catalog
  import com.here.platform.data.client.common.VolatilePartitionsFilter._
  val queryApi = DataClient().queryApi(catalogHrn, settings)
  val layerName = ""
  val timestamp = 0L // specific timestamp
  val partition1 = ""
  val partition2 = ""
  // define a function how to process partitions
  def processPartitions(partitions: Source[Partition, NotUsed]): Future[Done] =
    partitions.runForeach(println).andThen {
      case Success(_) => println("Done")
      case Failure(exception) => println(s"Failed with $exception")
    }

  //  get changes of volatile layer since specific time and for specific partitions
  queryApi
    .getVolatilePartitions(layerName, since(timestamp) and byIds(Set(partition1, partition2)))
    .flatMap(processPartitions)
}
// create queryApi for source catalog
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);
long sinceTime = 0;
String layerName = "volatileLayer";
String partition1 = "partition1";
String partition2 = "partition2";
Set<String> partitions = new HashSet<String>();
partitions.add(partition1);
partitions.add(partition2);

VolatilePartitionsFilter.Builder builder = new VolatilePartitionsFilter.Builder();
builder.withIds(partitions).withSinceTimestamp(sinceTime);
//  get changes of volatile layer since specific time and for specific partitions
Source<Partition, NotUsed> changesSource =
    queryApi
        .getVolatilePartitions(layerName, builder.build(), AdditionalFields.AllFields())
        .toCompletableFuture()
        .get();
changesSource
    .runForeach(
        partition -> {
          // define a function how to process partitions
          System.out.println(partition);
        },
        myMaterializer)
    .whenCompleteAsync(
        (result, e) -> {
          if (e != null) {
            e.printStackTrace();
          } else {
            System.out.println("DONE!");
          }
        });

Note

Filter VolatilePartitionsFilter used to create a combination of simple filters for fetch data from Volatile layer. That can be filtered by partition id and by since time. Method getVolatilePartitions has a twin that returns iteration of partitions getVolatilePartitionsAsIterator.

Read Partitions from a Version Layer

When you have a catalog HRN and initial setup environment ready, you can get metadata from the HERE platform.

To read the latest version of partitions in a catalog for a specific layer, add the following:

Scala
Java
// create queryApi for source catalog
val queryApi = DataClient().queryApi(catalogHrn, settings)

// define a function how to process partitions
def processPartitions(partitions: Source[Partition, NotUsed]): Future[Done] =
  partitions.runForeach(println).andThen {
    case Success(_) => println("Done")
    case Failure(exception) => println(s"Failed with $exception")
  }

// fetches partitions for a specific version of a versioned layer in catalog
def fetchPartitions(catalogVersion: Long, layer: String): Future[Source[Partition, NotUsed]] =
  queryApi.getPartitions(catalogVersion, layer, AdditionalFields.All)

// It is possible to fetch partitions only if catalog has some versions
// It is not possible to fetch partitions for empty catalog
queryApi.getLatestVersion(None).flatMap {
  case Some(version) =>
    println(s"Query catalog for $version")

    for {
      partitions <- fetchPartitions(version, layer)
      _ <- processPartitions(partitions)
    } yield Done

  case None =>
    println(s"Catalog has no versions")
    Future.successful(Done)
}
// create queryApi for source catalog
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);

// get latest version
Long catalogVersion =
    queryApi
        .getLatestVersion(OptionalLong.empty())
        .toCompletableFuture()
        .get()
        .orElseThrow(() -> new Exception("Catalog has no version"));

// fetches partitions for a specific version of a versioned layer in catalog
queryApi
    .getPartitions(catalogVersion, layer, AdditionalFields.AllFields())
    .thenCompose(
        partitions ->
            partitions
                .runForeach(System.out::println, myMaterializer)
                .whenCompleteAsync(
                    (result, e) -> {
                      if (e != null) {
                        e.printStackTrace();
                      } else {
                        System.out.println("DONE!");
                      }
                    }));

To download all partitions for all layers in a catalog, use the fetchPartitions function as follows:

Scala
def fetchPartitions(catalogVersion: Long): Future[Source[Partition, NotUsed]] =
  queryApi.getConfiguration().map { config =>
    // for each a versioned layer in list of layers in catalog configuration fetch all partitions from server
    Source(config.layers.toList)
      .mapAsync(1) { layer =>
        queryApi.getPartitions(catalogVersion, layer.name, AdditionalFields.All)
      }
      .flatMapConcat(identity)
  }

Get Changes from a Version Layer

To incrementally process data, fetch metadata to determine which partitions change between specified versions in a catalog.

To get changes for all layers in a catalog, add the following:

Scala
Java
// create queryApi for source catalog
val queryApi = DataClient().queryApi(catalogHrn, settings)

// define a function how to process partitions
def processPartitions(partitions: Source[Partition, NotUsed]): Future[Done] =
  partitions.runForeach(println)

// fetches changes partitions for a specific version range of a versioned layer in catalog
def fetchPartitions(startVersion: Long,
                    endVersion: Long): Future[Source[Partition, NotUsed]] =
  queryApi.getConfiguration().map { config =>
    // for each a versioned layer in list of layers in catalog configuration fetch all partitions from server
    Source(config.layers.toList)
      .mapAsync(1) { layer =>
        val parts = queryApi.getChangesParts(layer.name, 2)
        val source = Source
          .unfoldAsync[Future[Seq[String]], Source[Partition, NotUsed]](parts.map(_.parts)) {
            input =>
              input.flatMap {
                case Nil => Future.successful(None)
                case head :: tail =>
                  queryApi
                    .getChanges(startVersion,
                                endVersion,
                                layer.name,
                                AdditionalFields.All,
                                Some(head))
                    .map { source =>
                      Some(Future.successful(tail) -> source)
                    }
              }
          }
          .flatMapConcat(identity)
        Future(source)
      }
      .flatMapConcat(identity)
  }

// latest version that was already processed
val startVersion = 1L

queryApi.getLatestVersion(None).flatMap {
  case Some(endVersion) =>
    println(s"Query catalog for changes for ($startVersion, $endVersion]")

    for {
      partitions <- fetchPartitions(startVersion, endVersion)
      _ <- processPartitions(partitions)
    } yield Done

  case None =>
    println(s"Catalog has no versions")
    Future.successful(Done)
}
// create queryApi for source catalog
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);

// latest version that was already processed
Long startVersion = 1L;

queryApi
    .getLatestVersion(OptionalLong.empty())
    .thenCompose(
        maybeEndVersion -> {
          if (maybeEndVersion.isPresent()) {
            Long endVersion = maybeEndVersion.getAsLong();
            System.out.println(
                "Query catalog for changes for (" + startVersion + ", " + endVersion + ")");

            return queryApi
                .getConfiguration()
                .thenCompose(
                    config -> {
                      // for each a versioned layer in list of layers in catalog configuration
                      // fetch all partitions from server
                      Source<Partition, NotUsed> changesSource =
                          Source.from(config.getLayers())
                              .map(Layer::getName)
                              .mapAsync(
                                  1,
                                  layer -> {
                                    // fetches changes partitions for a specific version range
                                    // in catalog
                                    return queryApi.getChanges(
                                        startVersion,
                                        endVersion,
                                        layer,
                                        AdditionalFields.AllFields());
                                  })
                              .flatMapConcat(i -> i);

                      return changesSource
                          .runForeach(
                              partition -> {
                                // define a function how to process partitions
                                System.out.println(partition);
                              },
                              myMaterializer)
                          .whenCompleteAsync(
                              (result, e) -> {
                                if (e != null) {
                                  e.printStackTrace();
                                } else {
                                  System.out.println("DONE!");
                                }
                              });
                    });
          } else {
            System.out.println("Catalog has no versions");
            return CompletableFuture.completedFuture(Done.getInstance());
          }
        });

Get Additional Fields for Partitions in a Version Layer

By default, the HERE platform only includes some base information on partitions in responses. The base information includes the layer and partition. To get additional fields, such as dataSize and checksum, you need to explicitly request them.

Note

Responses only include information on additional fields if these fields were included when the catalog was published. If these fields are not included during publication, the responses do not include these fields even if you request them.

To request information on additional fields, add the following:

Scala
Java
// fetches partitions for a specific version of a versioned layer in catalog
def fetchPartitions(catalogVersion: Long, layer: String): Future[Source[Partition, NotUsed]] =
  queryApi.getPartitions(catalogVersion,
                         layer,
                         Set(AdditionalField.Checksum, AdditionalField.DataSize))
// fetches partitions for a specific version of a versioned layer in catalog
Set<AdditionalField> fields = new HashSet<AdditionalField>();
fields.add(AdditionalField.Checksum);
queryApi.getPartitions(catalogVersion, layer, fields);

results matching ""

    No results matching ""