Publish Metadata

Use CommitPartition to publish new or deleted partitions. You can mix both creation and deletion in the same publication.

The HERE platform supports following types of layers.

  • versioned
  • volatile
  • index
  • stream

Stream / Volatile Layers

The platform stream and volatile layers may contain data that smaller than its metadata , which means there is no reason to publish metadata separately from the data. For information on publishing a stream/volatile layer, see Publish Data.

Versioned Layers

Catalogs are empty when you first create them. All versioned layers are at version ∅.

To publish you first publication to a versioned layer, add the following:

Scala
Java
// create publishApi for a catalog
val publishApi = DataClient().publishApi(catalogHrn, settings)

// prepare a list of partitions to publish
val partitions =
  Source(
    List(
      CommitPartition.newCommitPartition(
        partition = partitionId1,
        layer = versionedLayerId,
        dataHandle = "example-data-handle"
      ),
      CommitPartition.newCommitPartition(
        partition = partitionId2,
        layer = versionedLayerId,
        dataHandle = "example-data-handle"
      )
    )
  )

// publish initial version
val firstPublish: Future[Done] =
  publishApi.publishBatch2(
    baseVersion = None, // publication to an empty catalog can be done with `baseVersion = None`.
    Some(Seq(versionedLayerId)),
    dependencies = Seq.empty,
    partitions = partitions
  )
// create publishApi for a catalog
PublishApi publishApi = DataClient.get(myActorSystem).publishApi(catalogHrn);

// prepare a list of partitions to publish
CommitPartition newPartition1 =
    new CommitPartition.Builder()
        .newPartition()
        .withPartition(partitionId1)
        .withDataHandle("<example-data-handle>")
        .withLayer(layer)
        .build();

CommitPartition newPartition2 =
    new CommitPartition.Builder()
        .newPartition()
        .withPartition(partitionId2)
        .withDataHandle("<example-data-handle>")
        .withLayer(layer)
        .build();

ArrayList<CommitPartition> partitionList = new ArrayList<>();
partitionList.add(newPartition1);
partitionList.add(newPartition2);

Source<CommitPartition, NotUsed> partitions = Source.from(partitionList);

// publish initial version
CompletableFuture<Done> futurePublish =
    publishApi
        .publishBatch(emptyVersion, Collections.emptyList(), partitions)
        .toCompletableFuture();

When you publish updates to a versioned layer, you need to provide a base version, which is the current version of the catalog at the time of publication. The base version is used to ensure that the catalog version has not changed during the publication processing, which could lead to data inconsistency.

To publish you an update to a versioned layer, add the following:

Scala
Java
val nextPublishPartitions =
  Source(
    List(
      CommitPartition.newCommitPartition(
        partition = partitionId1,
        layer = versionedLayerId,
        dataHandle = "example-data-handle"
      ),
      CommitPartition.deletedPartition(
        partition = partitionId2,
        layer = versionedLayerId
      )
    )
  )

// for subsequent publications catalog base version needs to be provided
val nextPublish: Future[Done] =
  for {
    baseVersion <- publishApi.getBaseVersion()
    _ <- publishApi.publishBatch2(
      baseVersion = baseVersion,
      Some(Seq(versionedLayerId)),
      dependencies = Seq.empty,
      partitions = nextPublishPartitions
    )
  } yield Done
// prepare a list of partitions to publish
CommitPartition nextPublishPartition1 =
    new CommitPartition.Builder()
        .newPartition()
        .withPartition(partitionId1)
        .withDataHandle("<example-data-handle>")
        .withLayer(layer)
        .build();

CommitPartition deletePartition2 =
    new CommitPartition.Builder()
        .deletePartition()
        .withPartition(partitionId2)
        .withLayer(layer)
        .build();

ArrayList<CommitPartition> nextPublishPartitionList = new ArrayList<>();
partitionList.add(nextPublishPartition1);
partitionList.add(deletePartition2);

Source<CommitPartition, NotUsed> nextPublishPartitions = Source.from(nextPublishPartitionList);

// for subsequent publications catalog base version needs to be provided
CompletableFuture<Done> futureNextPublish =
    publishApi
        .getBaseVersion()
        .thenCompose(
            baseVersion ->
                publishApi.publishBatch(
                    baseVersion, Collections.emptyList(), nextPublishPartitions))
        .toCompletableFuture();

Base Versions and Version Dependencies

When you start a batch, you must provide a base version, to which the publication handled by the batch is relative. The base version must be the latest version of the catalog. The base version is used to ensure that the publication is relative to the version your batch assumed, and no concurrent publication to the same layer was applied.

Consistency is important not only within a catalog but between all the catalogs in a data processing pipeline. To ensure consistency between catalogs, each batch can contain a list of VersionDependency objects that show the catalogs and versions used to produce the publication.

Note

Version dependencies should include catalogs only if its versioned layer was used to produce a new version of a given catalog. For example, if you read from a single stream layer and publish data to a versioned layer, then the dependencies should be empty.

To add a version dependency, add the following:

Scala
Java
val dependencies =
  Seq(
    VersionDependency(
      hrn = upstreamCatalogHrn,
      version = 5L,
      direct = true
    )
  )

for {
  baseVersion <- publishApi.getBaseVersion()
  _ <- publishApi.publishBatch2(
    baseVersion = baseVersion,
    Some(Seq(versionedLayerId)),
    dependencies = dependencies,
    partitions = partitions
  )
} yield Done
ArrayList<VersionDependency> dependencies = new ArrayList<>();
VersionDependency dependency = new VersionDependency(upstreamCatalogHrn, 5L, true);
dependencies.add(dependency);

CompletableFuture<Done> futurePublishWithDeps =
    publishApi
        .getBaseVersion()
        .thenCompose(
            baseVersion ->
                publishApi.publishBatch2(
                    baseVersion, Optional.of(Arrays.asList(layer)), dependencies, partitions))
        .toCompletableFuture();

Simplified Metadata Publication Process

Every publication to this type of layer requires a batch token even if the batch job is performed in a single publication request. You need to start a batch, receive a batch token and provide that batch token for every subsequent publication. Once all publications are submitted, the batch needs to be finalized, after which the server starts processing the submitted publications. A batched publication is considered completed once all publications have been processed and a new version is published to a catalog.

The Data Client Library provides helpers to perform your batched publication with a single step. The helpers automatically start a batch, publish the data using that batch, and finalize the batch. The call completes once the data has been processed and is available in the catalog.

To add publish a batch, add the following:

Scala
Java
// create publishApi for a catalog
val publishApi = DataClient().publishApi(catalogHrn)

// list of dependencies for this publication
val dependencies = Seq.empty[VersionDependency]

val partitions: Source[CommitPartition, NotUsed] =
  Source(
    List(
      CommitPartition.newCommitPartition(
        partition = partitionId1,
        layer = versionedLayerId,
        dataHandle = "example-data-handle"
      ),
      CommitPartition.newCommitPartition(
        partition = partitionId2,
        layer = versionedLayerId,
        dataHandle = "example-data-handle"
      )
    )
  )

publishApi.publishBatch2(
  baseVersion = None,
  Some(Seq(versionedLayerId)),
  dependencies = dependencies,
  partitions = partitions
)
// create publishApi for source catalog
PublishApi publishApi = DataClient.get(myActorSystem).publishApi(catalogHrn);

// list of dependencies for this publication
List<VersionDependency> dependencies = Collections.emptyList();

CommitPartition newPartition1 =
    new CommitPartition.Builder()
        .newPartition()
        .withPartition(partitionId1)
        .withDataHandle("<example-data-handle>")
        .withLayer(layer)
        .build();

CommitPartition newPartition2 =
    new CommitPartition.Builder()
        .newPartition()
        .withPartition(partitionId2)
        .withDataHandle("<example-data-handle>")
        .withLayer(layer)
        .build();

ArrayList<CommitPartition> partitionList = new ArrayList<>();
partitionList.add(newPartition1);
partitionList.add(newPartition2);

Source<CommitPartition, NotUsed> partitions = Source.from(partitionList);

CompletableFuture<Done> futurePublish =
    publishApi
        .publishBatch2(baseVersion, Optional.of(Arrays.asList(layer)), dependencies, partitions)
        .toCompletableFuture();

Distributed Publications

The HERE platform enables you to process and publish a large number of partitions in a distributed manner.

For versioned layers, this is a three-step process:

  1. initiate the publication process by starting a new batch publication and receiving a batch token, normally this operation happens on the master or driver node in the cluster.
  2. different workers upload partitions attaching them to the same batch token
  3. once all data is sent to server, you need to complete the batch upload, normally this operation happens on the master or driver node in the cluster.

Upon receiving the complete batch request, the server starts processing publications to create the next catalog version.

To publish multiple requests to a versioned layer, add the following:

Scala
Java
// create publishApi and writeEngine for source catalog
val publishApi = DataClient().publishApi(catalogHrn, settings)
val writeEngine = DataEngine().writeEngine(catalogHrn)

// start batch publication
publishApi
  .startBatch2(None, Some(Seq(versionedLayerId)), Seq.empty)
  .flatMap { batchToken =>
    //start worker 1 with upload data and publishing metadata
    val worker1 =
      publishApi.publishToBatch(batchToken, partitions1.mapAsync(parallelism = 2) {
        partition =>
          writeEngine.put(partition)
      })

    //start worker 2 with upload data and publishing metadata
    val worker2 =
      publishApi.publishToBatch(batchToken, partitions2.mapAsync(parallelism = 2) {
        partition =>
          writeEngine.put(partition)
      })

    // wait until workers are done uploading data/metadata
    for {
      _ <- worker1
      _ <- worker2
    } yield batchToken

  }
  .flatMap { batchToken =>
    //signal to server complete batch publication
    publishApi.completeBatch(batchToken)
  }
// create publishApi and writeEngine for source catalog
PublishApi publishApi = DataClient.get(myActorSystem).publishApi(catalogHrn);
WriteEngine writeEngine = DataEngine.get(myActorSystem).writeEngine(catalogHrn);

// start a batch, publish partitions, complete batch
CompletableFuture<Done> futurePublish =
    publishApi
        .startBatch2(baseVersion, Optional.empty(), Collections.emptyList())
        .thenCompose(
            batchToken -> {
              // start worker 1 with upload data and publishing metadata
              Source<PendingPartition, NotUsed> partitionsOnWorker1 =
                  arbitraryPendingPartitions1;
              CompletableFuture<Done> worker1 =
                  publishApi
                      .publishToBatch(
                          batchToken,
                          partitionsOnWorker1.mapAsync(
                              2, partition -> writeEngine.put(partition)))
                      .toCompletableFuture();

              // start worker 2 with upload data and publishing metadata
              Source<PendingPartition, NotUsed> partitionsOnWorker2 =
                  arbitraryPendingPartitions2;
              CompletableFuture<Done> worker2 =
                  publishApi
                      .publishToBatch(
                          batchToken,
                          partitionsOnWorker2.mapAsync(
                              2, partition -> writeEngine.put(partition)))
                      .toCompletableFuture();

              // wait until workers are done upload
              return worker1.thenCombine(worker2, (done, done2) -> batchToken);
            })
        .thenCompose(
            batchToken -> {
              return publishApi.completeBatch(batchToken);
            })
        .toCompletableFuture();

For stream and volatile layers distributed publication is not different from normal publication process.

Index layer

If your partition data is already uploaded to Blobstore and you have dataHandle of this uploaded data, you can index the partition in Index layer with the specified keys.

To index partition in index layer, add the following:

Scala
Java
// How to index partition with PublishApi.publishIndex
publishApi.publishIndex(indexLayerId, commitPartitions)
// How to index partition with PublishApi.publishIndex
publishApi.publishIndex(indexLayerId, commitPartitions);

results matching ""

    No results matching ""