Use CommitPartition
to publish new or deleted partitions. You can mix both creation and deletion in the same publication.
The HERE platform supports following types of layers.
- versioned
- volatile
- index
- stream
- objectstore
Stream / Volatile Layers
The platform stream and volatile layers may contain data that smaller than its metadata , which means there is no reason to publish metadata separately from the data. For information on publishing a stream/volatile layer, see Publish Data.
Versioned Layers
Catalogs are empty when you first create them. All versioned layers are at version ∅.
To publish you first publication to a versioned layer, add the following:
val publishApi = DataClient().publishApi(catalogHrn, settings)
val partitions =
Source(
List(
CommitPartition.newCommitPartition(
partition = partitionId1,
layer = versionedLayerId,
dataHandle = "example-data-handle"
),
CommitPartition.newCommitPartition(
partition = partitionId2,
layer = versionedLayerId,
dataHandle = "example-data-handle"
)
)
)
val firstPublish: Future[Done] =
publishApi.publishBatch2(
baseVersion = None,
Some(Seq(versionedLayerId)),
dependencies = Seq.empty,
partitions = partitions
)
PublishApi publishApi = DataClient.get(myActorSystem).publishApi(catalogHrn);
CommitPartition newPartition1 =
new CommitPartition.Builder()
.newPartition()
.withPartition(partitionId1)
.withDataHandle("<example-data-handle>")
.withLayer(layer)
.build();
CommitPartition newPartition2 =
new CommitPartition.Builder()
.newPartition()
.withPartition(partitionId2)
.withDataHandle("<example-data-handle>")
.withLayer(layer)
.build();
ArrayList<CommitPartition> partitionList = new ArrayList<>();
partitionList.add(newPartition1);
partitionList.add(newPartition2);
Source<CommitPartition, NotUsed> partitions = Source.from(partitionList);
CompletableFuture<Done> futurePublish =
publishApi
.publishBatch2(
emptyVersion,
Optional.of(Arrays.asList(layer)),
Collections.emptyList(),
partitions)
.toCompletableFuture();
When you publish updates to a versioned layer, you need to provide a base version, which is the current version of the catalog at the time of publication. The base version is used to ensure that the catalog version has not changed during the publication processing, which could lead to data inconsistency.
To publish you an update to a versioned layer, add the following:
val nextPublishPartitions =
Source(
List(
CommitPartition.newCommitPartition(
partition = partitionId1,
layer = versionedLayerId,
dataHandle = "example-data-handle"
),
CommitPartition.deletedPartition(
partition = partitionId2,
layer = versionedLayerId
)
)
)
val nextPublish: Future[Done] =
for {
baseVersion <- publishApi.getBaseVersion()
_ <- publishApi.publishBatch2(
baseVersion = baseVersion,
Some(Seq(versionedLayerId)),
dependencies = Seq.empty,
partitions = nextPublishPartitions
)
} yield Done
CommitPartition nextPublishPartition1 =
new CommitPartition.Builder()
.newPartition()
.withPartition(partitionId1)
.withDataHandle("<example-data-handle>")
.withLayer(layer)
.build();
CommitPartition deletePartition2 =
new CommitPartition.Builder()
.deletePartition()
.withPartition(partitionId2)
.withLayer(layer)
.build();
ArrayList<CommitPartition> nextPublishPartitionList = new ArrayList<>();
partitionList.add(nextPublishPartition1);
partitionList.add(deletePartition2);
Source<CommitPartition, NotUsed> nextPublishPartitions = Source.from(nextPublishPartitionList);
CompletableFuture<Done> futureNextPublish =
publishApi
.getBaseVersion()
.thenCompose(
baseVersion ->
publishApi.publishBatch2(
baseVersion,
Optional.of(Arrays.asList(layer)),
Collections.emptyList(),
nextPublishPartitions))
.toCompletableFuture();
Base Versions and Version Dependencies
When you start a batch, you must provide a base version
, to which the publication handled by the batch is relative. The base version
must be the latest version of the catalog. The base version
is used to ensure that the publication is relative to the version your batch assumed, and no concurrent publication to the same layer was applied.
Consistency is important not only within a catalog but between all the catalogs in a data processing pipeline. To ensure consistency between catalogs, each batch can contain a list of VersionDependency
objects that show the catalogs and versions used to produce the publication.
Note
Version dependencies should include catalogs only if its versioned layer was used to produce a new version of a given catalog. For example, if you read from a single stream layer and publish data to a versioned layer, then the dependencies should be empty.
To add a version dependency, add the following:
val dependencies =
Seq(
VersionDependency(
hrn = upstreamCatalogHrn,
version = 5L,
direct = true
)
)
for {
baseVersion <- publishApi.getBaseVersion()
_ <- publishApi.publishBatch2(
baseVersion = baseVersion,
Some(Seq(versionedLayerId)),
dependencies = dependencies,
partitions = partitions
)
} yield Done
ArrayList<VersionDependency> dependencies = new ArrayList<>();
VersionDependency dependency = new VersionDependency(upstreamCatalogHrn, 5L, true);
dependencies.add(dependency);
CompletableFuture<Done> futurePublishWithDeps =
publishApi
.getBaseVersion()
.thenCompose(
baseVersion ->
publishApi.publishBatch2(
baseVersion, Optional.of(Arrays.asList(layer)), dependencies, partitions))
.toCompletableFuture();
Every publication to this type of layer requires a batch token
even if the batch job is performed in a single publication request. You need to start a batch, receive a batch token
and provide that batch token
for every subsequent publication. Once all publications are submitted, the batch needs to be finalized, after which the server starts processing the submitted publications. A batched publication is considered completed
once all publications have been processed and a new version is published to a catalog.
The Data Client Library provides helpers to perform your batched publication
with a single step. The helpers automatically start a batch, publish the data using that batch, and finalize the batch. The call completes once the data has been processed and is available in the catalog.
To add publish a batch, add the following:
val publishApi = DataClient().publishApi(catalogHrn)
val dependencies = Seq.empty[VersionDependency]
val partitions: Source[CommitPartition, NotUsed] =
Source(
List(
CommitPartition.newCommitPartition(
partition = partitionId1,
layer = versionedLayerId,
dataHandle = "example-data-handle"
),
CommitPartition.newCommitPartition(
partition = partitionId2,
layer = versionedLayerId,
dataHandle = "example-data-handle"
)
)
)
publishApi.publishBatch2(
baseVersion = None,
Some(Seq(versionedLayerId)),
dependencies = dependencies,
partitions = partitions
)
PublishApi publishApi = DataClient.get(myActorSystem).publishApi(catalogHrn);
List<VersionDependency> dependencies = Collections.emptyList();
CommitPartition newPartition1 =
new CommitPartition.Builder()
.newPartition()
.withPartition(partitionId1)
.withDataHandle("<example-data-handle>")
.withLayer(layer)
.build();
CommitPartition newPartition2 =
new CommitPartition.Builder()
.newPartition()
.withPartition(partitionId2)
.withDataHandle("<example-data-handle>")
.withLayer(layer)
.build();
ArrayList<CommitPartition> partitionList = new ArrayList<>();
partitionList.add(newPartition1);
partitionList.add(newPartition2);
Source<CommitPartition, NotUsed> partitions = Source.from(partitionList);
CompletableFuture<Done> futurePublish =
publishApi
.publishBatch2(baseVersion, Optional.of(Arrays.asList(layer)), dependencies, partitions)
.toCompletableFuture();
Distributed Publications
The HERE platform enables you to process and publish a large number of partitions in a distributed manner.
For versioned layers, this is a three-step process:
- initiate the publication process by starting a new batch publication and receiving a
batch token
, normally this operation happens on the master or driver node in the cluster. - different workers upload partitions attaching them to the same
batch token
- once all data is sent to server, you need to complete the batch upload, normally this operation happens on the master or driver node in the cluster.
Upon receiving the complete batch request, the server starts processing publications to create the next catalog version.
To publish multiple requests to a versioned layer, add the following:
val publishApi = DataClient().publishApi(catalogHrn, settings)
val writeEngine = DataEngine().writeEngine(catalogHrn)
publishApi
.startBatch2(None, Some(Seq(versionedLayerId)), Seq.empty)
.flatMap { batchToken =>
val worker1 =
publishApi.publishToBatch(batchToken, partitions1.mapAsync(parallelism = 2) {
partition =>
writeEngine.put(partition)
})
val worker2 =
publishApi.publishToBatch(batchToken, partitions2.mapAsync(parallelism = 2) {
partition =>
writeEngine.put(partition)
})
for {
_ <- worker1
_ <- worker2
} yield batchToken
}
.flatMap { batchToken =>
publishApi.completeBatch(batchToken)
}
PublishApi publishApi = DataClient.get(myActorSystem).publishApi(catalogHrn);
WriteEngine writeEngine = DataEngine.get(myActorSystem).writeEngine(catalogHrn);
CompletableFuture<Done> futurePublish =
publishApi
.startBatch2(baseVersion, Optional.empty(), Collections.emptyList())
.thenCompose(
batchToken -> {
Source<PendingPartition, NotUsed> partitionsOnWorker1 =
arbitraryPendingPartitions1;
CompletableFuture<Done> worker1 =
publishApi
.publishToBatch(
batchToken,
partitionsOnWorker1.mapAsync(
2, partition -> writeEngine.put(partition)))
.toCompletableFuture();
Source<PendingPartition, NotUsed> partitionsOnWorker2 =
arbitraryPendingPartitions2;
CompletableFuture<Done> worker2 =
publishApi
.publishToBatch(
batchToken,
partitionsOnWorker2.mapAsync(
2, partition -> writeEngine.put(partition)))
.toCompletableFuture();
return worker1.thenCombine(worker2, (done, done2) -> batchToken);
})
.thenCompose(
batchToken -> {
return publishApi.completeBatch(batchToken);
})
.toCompletableFuture();
For stream and volatile layers distributed publication is not different from normal publication process.
Index layer
If your partition data is already uploaded to Blobstore and you have dataHandle of this uploaded data, you can index the partition in Index layer with the specified keys.
To index partition in index layer, add the following:
publishApi.publishIndex(indexLayerId, commitPartitions)
publishApi.publishIndex(indexLayerId, commitPartitions);