The image below illustrates the data publication model. You can mix all layer types in the same publication.
Figure 1. dataservice-view
The HERE platform supports following types of layers. Please see the dedicated chapters for each of it.
versioned
volatile
index
stream
objectstore
Publish to a Versioned layer
Simplified Publication Process
Just as with the simplified metadata publication for a versioned layer, you can publish both data and metadata with a single step. The snippet below automatically starts a batch publication, publishes the data and metadata using that batch publication, and finalizes the batch. The call finishes once the data has been processed and is available in the catalog.
Scala
Java
// create writeEngine for a catalogval writeEngine = DataEngine().writeEngine(catalogHrn)// list of dependencies for this publicationval dependencies = Seq.empty[VersionDependency]val partitions: Source[PendingPartition, NotUsed]=
Source(
List(
NewPartition(
partition = newPartitionId1,
layer = versionedLayerId,
data = NewPartition.ByteArrayData(blobData)),
DeletedPartition(
partition = deletedPartitionId,
layer = versionedLayerId
)))
writeEngine.publishBatch2(parallelism =10,
layerIds = Some(Seq(versionedLayerId)),
dependencies = dependencies,
partitions = partitions)
// create writeEngine for source catalogWriteEngine writeEngine =DataEngine.get(myActorSystem).writeEngine(catalogHrn);// parallelism defines how many parallel requests would be made to fetch the dataint parallelism =10;// list of dependencies for this publicationList<VersionDependency> dependencies =Collections.emptyList();NewPartition newPartition =newNewPartition.Builder().withPartition(partitionId).withData(blobData).withLayer(layer).build();DeletedPartition deletedPartition =newDeletedPartition.Builder().withPartition(deletedPartitionId).withLayer(layer).build();ArrayList<PendingPartition> partitionList =newArrayList<>();
partitionList.add(newPartition);
partitionList.add(deletedPartition);Source<PendingPartition,NotUsed> partitions =Source.from(partitionList);CompletableFuture<Done> futurePublish =
writeEngine
.publishBatch2(parallelism,Optional.of(Arrays.asList(layer)), dependencies, partitions).toCompletableFuture();
Distributed Publications
The HERE platform allows you to process and publish a large number of partitions in a distributed manner.
For versioned layers, this is a three-step process:
start the publication process to initiate a new batch publication and during which you receive a batch token, normally this operation happens on the master or a driver node in the cluster.
different workers upload data/metadata, attaching them to same batch token
once all data is sent to server, you needs to finalize the batch publication upload, normally this operation happens on the master or a driver node in the cluster.
Upon receiving the complete batch request, the server starts processing the publications to create the next catalog version.
The following snippet illustrates how to publish multiple requests to a versioned layer:
Scala
Java
// create publishApi and writeEngine for source catalogval publishApi = DataClient().publishApi(catalogHrn, settings)val writeEngine = DataEngine().writeEngine(catalogHrn)// start batch publication
publishApi
.startBatch2(None, Some(Seq(versionedLayerId)), Seq.empty).flatMap { batchToken =>//start worker 1 with upload data and publishing metadataval worker1 =
publishApi.publishToBatch(batchToken, partitions1.mapAsync(parallelism =2){
partition =>
writeEngine.put(partition)})//start worker 2 with upload data and publishing metadataval worker2 =
publishApi.publishToBatch(batchToken, partitions2.mapAsync(parallelism =2){
partition =>
writeEngine.put(partition)})// wait until workers are done uploading data/metadatafor{
_ <- worker1
_ <- worker2
}yield batchToken
}.flatMap { batchToken =>//signal to server complete batch publication
publishApi.completeBatch(batchToken)}
// create publishApi and writeEngine for source catalogPublishApi publishApi =DataClient.get(myActorSystem).publishApi(catalogHrn);WriteEngine writeEngine =DataEngine.get(myActorSystem).writeEngine(catalogHrn);// start a batch, publish partitions, complete batchCompletableFuture<Done> futurePublish =
publishApi
.startBatch2(baseVersion,Optional.empty(),Collections.emptyList()).thenCompose(
batchToken ->{// start worker 1 with upload data and publishing metadataSource<PendingPartition,NotUsed> partitionsOnWorker1 =
arbitraryPendingPartitions1;CompletableFuture<Done> worker1 =
publishApi
.publishToBatch(
batchToken,
partitionsOnWorker1.mapAsync(2, partition -> writeEngine.put(partition))).toCompletableFuture();// start worker 2 with upload data and publishing metadataSource<PendingPartition,NotUsed> partitionsOnWorker2 =
arbitraryPendingPartitions2;CompletableFuture<Done> worker2 =
publishApi
.publishToBatch(
batchToken,
partitionsOnWorker2.mapAsync(2, partition -> writeEngine.put(partition))).toCompletableFuture();// wait until workers are done uploadreturn worker1.thenCombine(worker2,(done, done2)-> batchToken);}).thenCompose(
batchToken ->{return publishApi.completeBatch(batchToken);}).toCompletableFuture();
Publish to a Stream Layer
Data published to a stream layer is not versioned. It becomes immediately available to consumers for processing. The data can be retrieved by subscribing to the stream layer.
The snippet below illustrates how to publish to a stream layer.`
Scala
Java
// create writeEngine and queryApi for a catalogval writeEngine = DataEngine().writeEngine(catalogHrn)val queryApi = DataClient().queryApi(catalogHrn)// subscribe to receive new publications from stream layer
queryApi.subscribe("stream-layer",
ConsumerSettings("test-consumer"),
partition => println("Received "+newString(partition.partition)))val partitions =
Source.single(
NewPartition(
partition = newPartitionId1,
layer = streamingLayerId,
data = NewPartition.ByteArrayData(blobData),
dataSize = dataSize,
checksum = checksum,
compressedDataSize = compressedDataSize,
timestamp = Some(timestamp)// optional, see explation below))
writeEngine.publish(partitions)
// create writeEngine and queryApi for a catalogQueryApi queryApi =DataClient.get(myActorSystem).queryApi(catalogHrn);WriteEngine writeEngine =DataEngine.get(myActorSystem).writeEngine(catalogHrn);// subscribe to receive new publications from stream layer
queryApi.subscribe("stream-layer",newConsumerSettings.Builder().withGroupName("test-consumer").build(),
partition ->processPartition(partition));NewPartition newPartition =newNewPartition.Builder().withPartition(partitionId).withData(blobData).withLayer(layer).withDataSize(dataSize).withCompressedDataSize(compressedDataSize).withChecksum(checksum).withTimestamp(OptionalLong.of(timestamp))// optional, see explation below.build();Source<PendingPartition,NotUsed> partitions =Source.single(newPartition);
writeEngine.publish(partitions);
If the optional parameter timestamp is given then it is used as is. If this parameter is not present then the current time is used by default (System.currentTimeMillis()). Providing a timestamp can be useful if your workflows require you to capture "event" time, otherwise the current time represents the "ingestion" time.
Note: Important, please note
Kafka data/record deletion as configured by your stream layer TTL (Time to Live) configuration is triggered by the timestamp parameter. If you therefore provide the timestamp, the stream layer data will be deleted based on that timestamp + the configured TTL (Time to Live). By default, the data will be purged from Kafka using the "ingestion" timestamp + the configured TTL. This differentiation can be important to consider if your use case requires that the stream data be deleted within a certain time post ingestion. The Kafka record TTL (time to live) is defined in the Kafka broker.
Publish to a Volatile Layer
A volatile layer is a key/value store where values for a given key can change and only the latest value is retrievable. As new data is published, old data is overwritten. You must publish a new version of the metadata if there are going to be breaking changes on the data that consumers are expecting to read in the layer.
If you need to publish a new version to a volatile layer, use version dependencies to upload the partitions using the batch publication.
The snippet below illustrates how to use version dependencies.
Scala
Java
// get base version to commit a new versionval publishApi = DataClient().publishApi(catalogHrn)
publishApi.getBaseVersion().flatMap { baseVersion =>// compute next version to be used in Md5BlobIdGeneratorval nextVersion =
baseVersion
.map(_ +1L).getOrElse(0L)// create writeEngine for a catalog with a deterministic BlobIdGeneratorval writeEngine =
DataEngine().writeEngine(catalogHrn,new StableBlobIdGenerator(nextVersion))// list of dependencies for this publicationval dependencies = Seq.empty[VersionDependency]// given a list partitions to commitval partitions: Source[PendingPartition, NotUsed]=
Source(
List(
NewPartition(
partition = newPartitionId1,
layer = volatileLayerId,
data = maybeEmptyData
),
NewPartition(
partition = newPartitionId2,
layer = volatileLayerId,
data = maybeEmptyData
)))// upload dataval commitPartitions: Source[CommitPartition, NotUsed]=
partitions.mapAsync(parallelism =10){ pendingPartition =>
writeEngine.put(pendingPartition)}// publish version to metadata
publishApi
.publishBatch2(baseVersion, Some(Seq(volatileLayerId)), dependencies, commitPartitions)}
// get base version to commit a new versionPublishApi publishApi =DataClient.get(myActorSystem).publishApi(catalogHrn);
publishApi
.getBaseVersion().thenCompose(
baseVersion ->{// compute next version to be used in Md5BlobIdGeneratorLong nextVersion = baseVersion.isPresent()? baseVersion.getAsLong()+1:0;// create writeEngine for a catalog with a deterministic BlobIdGeneratorBlobIdGenerator idGenerator =newStableBlobIdGenerator.Builder().withVersion(nextVersion).build();WriteEngine writeEngine =DataEngine.get(myActorSystem).writeEngine(catalogHrn, idGenerator);// list of dependencies for this publicationList<VersionDependency> dependencies =Collections.emptyList();NewPartition newPartition1 =newNewPartition.Builder().withPartition(partitionId1).withLayer(layer).withData(maybeEmptyData).build();NewPartition newPartition2 =newNewPartition.Builder().withPartition(partitionId2).withLayer(layer).withData(maybeEmptyData).build();ArrayList<PendingPartition> partitionList =newArrayList<>();
partitionList.add(newPartition1);
partitionList.add(newPartition2);Source<PendingPartition,NotUsed> partitions =Source.from(partitionList);int parallelism =10;// upload dataSource<CommitPartition,NotUsed> commitPartitions =
partitions.mapAsync(parallelism, writeEngine::put);// publish version to metadataCompletionStage<Done> done =
publishApi.publishBatch2(
baseVersion,Optional.of(Arrays.asList(layer)),
dependencies,
commitPartitions);return done;});
If you only need to update data in a volatile layer, use the generic publish method.
The snippet below illustrates how to use publish.
Scala
Java
// create queryApi for a catalog to find latest versionval queryApi = DataClient().queryApi(catalogHrn)
queryApi.getLatestVersion().flatMap { maybeLatestVersion =>val latestVersion =
maybeLatestVersion.getOrElse(thrownew IllegalArgumentException("No version found!"))// create writeEngine for a catalog with a deterministic BlobIdGeneratorval writeEngine =
DataEngine().writeEngine(catalogHrn,new StableBlobIdGenerator(latestVersion))val partitions: Source[PendingPartition, NotUsed]=
Source(
List(
NewPartition(
partition = newPartitionId1,
layer = volatileLayerId,
data = someData
),
NewPartition(
partition = newPartitionId2,
layer = volatileLayerId,
data = someData
)))// publish data without batch token
partitions
.mapAsync(parallelism =10){ partition =>
writeEngine.put(partition)}.runWith(Sink.ignore)}
// create queryApi for a catalog to find latest versionQueryApi queryApi =DataClient.get(myActorSystem).queryApi(catalogHrn);CompletionStage<Done> completionStage =
queryApi
.getLatestVersion(OptionalLong.empty()).thenCompose(
maybeLatestVersion ->{if(!maybeLatestVersion.isPresent())thrownewIllegalArgumentException("No version found!");Long latestVersion = maybeLatestVersion.getAsLong();// create writeEngine for a catalog with a deterministic BlobIdGeneratorBlobIdGenerator idGenerator =newStableBlobIdGenerator.Builder().withVersion(latestVersion).build();WriteEngine writeEngine =DataEngine.get(myActorSystem).writeEngine(catalogHrn, idGenerator);NewPartition newPartition1 =newNewPartition.Builder().withPartition(partitionId1).withLayer(layer).withData(someData).build();NewPartition newPartition2 =newNewPartition.Builder().withPartition(partitionId2).withLayer(layer).withData(someData).build();ArrayList<PendingPartition> partitionList =newArrayList<>();
partitionList.add(newPartition1);
partitionList.add(newPartition2);Source<PendingPartition,NotUsed> partitions =Source.from(partitionList);int parallelism =10;// publish data without batch tokenCompletionStage<Done> done =
partitions
.mapAsync(parallelism, writeEngine::put).runWith(Sink.ignore(), myMaterializer);return done;});
Delete from a Volatile Layer
When you need to delete the metadata and data from a volatile layer, you can use the following two step process.
First, you delete the data using DataEngine.writeEngine with DeletePartition object. A DeletePartition is similar to NewPartition but contains the dataHandle instead of the payload. This was the referenced blob object (the data) is deleted.
Second, you delete the metadata by using PublishApi.publishBatch with the CommitPartition object you got from the writeEngine previous API call.
If you skip the second step you get the same result as if the volatile partition is expired; the data is gone but the metadata is still there.
The snippet below illustrates how to delete data and metadata from volatile layer.
Scala
Java
// get base version to commit a new versionval publishApi = DataClient().publishApi(catalogHrn)
publishApi.getBaseVersion().flatMap { baseVersion =>// compute next version to be used in Md5BlobIdGeneratorval nextVersion =
baseVersion
.map(_ +1L).getOrElse(0L)// create writeEngine for a catalog with a deterministic BlobIdGeneratorval writeEngine =
DataEngine().writeEngine(catalogHrn,new StableBlobIdGenerator(nextVersion))// list of dependencies for this publicationval dependencies = Seq.empty[VersionDependency]val queryApi = DataClient().queryApi(catalogHrn)val filter = VolatilePartitionsFilter.byIds(Set(deletePartitionId1, deletePartitionId2))val partitions: Seq[Partition]= Await
.result(queryApi.getVolatilePartitionsAsIterator(volatileLayerId, filter), Duration.Inf).toSeq
// prepare list of partitions to be deletedval deletePartitions: Source[PendingPartition, NotUsed]=
Source(
partitions.map {case referencePartition: ReferencePartition =>val dataHandle = referencePartition.getDataHandle
val partitionId = referencePartition.partition
DeletedPartition(
partition = partitionId,
layer = volatileLayerId,
dataHandle = Some(dataHandle))}.toList
)// delete dataval commitPartitions: Source[CommitPartition, NotUsed]=
deletePartitions.mapAsync(parallelism =10){ pendingPartition =>
writeEngine.put(pendingPartition)}// publish version to metadata
publishApi
.publishBatch2(baseVersion, Some(Seq(volatileLayerId)), dependencies, commitPartitions)}
// get the partitions from partitionIdsQueryApi queryApi =DataClient.get(myActorSystem).queryApi(catalogHrn);VolatilePartitionsFilter filter =newVolatilePartitionsFilter.Builder().withIds(newHashSet<String>(Arrays.asList(partitionId1, partitionId2))).build();finalList<Partition> partitions =newArrayList<Partition>();try{
queryApi
.getVolatilePartitionsAsIterator(layerId, filter,Collections.emptySet()).toCompletableFuture().get().forEachRemaining(partitions::add);}catch(Exception exp){
partitions.clear();}// get base version to commit a new versionPublishApi publishApi =DataClient.get(myActorSystem).publishApi(catalogHrn);
publishApi
.getBaseVersion().thenCompose(
baseVersion ->{// compute next version to be used in Md5BlobIdGeneratorLong nextVersion = baseVersion.isPresent()? baseVersion.getAsLong()+1:0;// create writeEngine for a catalog with a deterministic BlobIdGeneratorBlobIdGenerator idGenerator =newStableBlobIdGenerator.Builder().withVersion(nextVersion).build();WriteEngine writeEngine =DataEngine.get(myActorSystem).writeEngine(catalogHrn, idGenerator);ArrayList<PendingPartition> partitionList =newArrayList<>();for(Partition p : partitions){if(p instanceofReferencePartition){ReferencePartition referencePartition =(ReferencePartition) p;
partitionList.add(newDeletedPartition.Builder().withLayer(layerId).withPartition(referencePartition.getPartition()).withDataHandle(referencePartition.getDataHandle()).build());}}Source<PendingPartition,NotUsed> pendingPartitions =Source.from(partitionList);int parallelism =10;// upload dataSource<CommitPartition,NotUsed> commitPartitions =
pendingPartitions.mapAsync(parallelism, writeEngine::put);// publish version to metadataCompletionStage<Done> done =
publishApi.publishBatch2(
baseVersion,Optional.of(Arrays.asList(layerId)),Collections.emptyList(),
commitPartitions);return done;});
Note: BlobIdGenerator
It is recommended to use the StableBlobIdGenerator to create the write engine for uploading volatile partitions. If you define your own BlobIdGenerator ensure that the method generateVolatileBlobId(partition) is stable, that is, for a certain partition it generates same blobId on each call. By default generateVolatileBlobId returns the result of generateBlobId. So if this method is stable it should be fine otherwise it must be overridden, as having a non stable blobIds for volatile partitions can create orphaned blobs.
Data published to an index layer is not versioned but is indexed. To publish and index the data, you have two options:
You can separately publish and index the data by calling the methods WriteEngine.put and PublishApi.index.
Or you can call the method WriteEngine.uploadAndIndex that both publishes and indexes the data of a partition.
The snippet below illustrates how to create a new partition that can later be published to an index layer.
Scala
Java
// How to define NewPartition for Index layerval newPartition = NewPartition(
partition ="",
layer = indexLayerId,
data = ByteArrayData(bytes),
fields = Some(
Map("someIntKey"-> IntIndexValue(42),"someStringKey"-> StringIndexValue("abc"),"someBooleanKey"-> BooleanIndexValue(true),"someTimeWindowKey"-> TimeWindowIndexValue(123456789L),"someHereTileKey"-> HereTileIndexValue(91956L))),
metadata = Some(
Map("someKey1"->"someValue1","someKey2"->"someValue2")),
checksum = Some(checksum),
crc = Some(crc),
dataSize = Some(dataSize))
// How to define NewPartition for Index layerNewPartition newPartition =newNewPartition.Builder().withPartition("").withLayer(indexLayerId).withData(bytes).addIntField("someIntKey",42).addStringField("someStringKey","abc").addBooleanField("someBooleanKey",true).addTimeWindowField("someTimeWindowKey",123456789L).addHereTileField("someHereTileKey",91956L).addMetadata("someKey1","someValue1").addMetadata("someKey2","someValue2").withChecksum(Optional.of(checksum)).withDataSize(OptionalLong.of(dataSize)).build();
Note
The Metadata parameter is an additional key-value collection that is not related to any index key. A Metadata key is a user-defined field that can store extra information about a record such as the ingestion time: Map("ingestionTime" -> "1532018660873").
The NewPartition.fields class member is also called index attributes on the portal or indexDefinitions in the OLP CLI.
The snippet below illustrates how to upload data and to index partition with single method WriteEngine.uploadAndIndex.
Scala
Java
// The example illustrated how to upload data and to index partition// with single method WriteEngine.uploadAndIndex
writeEngine.uploadAndIndex(Iterator(newPartition))
// The example illustrated how to upload data and to index partition// with single method WriteEngine.uploadAndIndexIterator<NewPartition> partitions =Arrays.asList(newPartition).iterator();CompletionStage<Done> publish = writeEngine.uploadAndIndex(partitions);
The snippet below illustrates how to upload data with WriteEngine.put and then to index partition with PublishApi.publishIndex
Scala
Java
// How to upload data with WriteEngine.put and// index the partition with PublishApi.publishIndexval putAndIndex: Future[Done]=for{
commitPartition <- writeEngine.put(newPartition)
_ <- publishApi.publishIndex(indexLayerId, Iterator(commitPartition))}yield Done
// How to upload data with WriteEngine.put and// index the partition with PublishApi.publishIndexCompletionStage<Done> putAndIndex =
writeEngine
.put(newPartition).thenCompose(
commitPartition ->{Iterator<CommitPartition> commitPartitions =Arrays.asList(commitPartition).iterator();return publishApi.publishIndex(indexLayerId, commitPartitions);});
Update an Index Layer
When you need to change the data in an index layer, you can use the PublishApi.updateIndex API call. The method takes 3 arguments:
layer - The layer id of the layer which should be updated.
additions - A list of partitions to add. Note that you must first upload the data blob using WriteEngine.put before you can add the corresponding partition.
deletions - A list of partitions to delete.
The following snippet demonstrates the usage of the PublishApi.updateIndex API:
Scala
Java
val updateIndex: Future[Done]={// partitions to add// see above how to define a new partition for an index layerval additions = Seq(newPartition)// partitions to remove// use CommitPartition.deletedIndexPartition to define a partition its data handle that// you plan to removeval removals = Seq(CommitPartition.deletedIndexPartition(dataHandle, indexLayerId))for{// first you have to upload corresponding blobs of the new partitions to the Blob Store
committedAdditions <- Future.sequence(additions.map(p => writeEngine.put(p)))
_ <- publishApi.updateIndex(indexLayerId,
committedAdditions.toIterator,
removals.toIterator)}yield Done
}
CompletionStage<Done> updateIndex =
writeEngine
// first you have to upload corresponding blobs of the new partitions// to the Blob Store.put(newPartition).thenCompose(
commitPartition ->{Iterator<CommitPartition> additions =Arrays.asList(commitPartition).iterator();// use DeleteIndexPartitionBuilder to define partitions that you plan to removeCommitPartition deletePartition =newCommitPartition.Builder().deleteIndexPartition().withLayer(indexLayerId).withDataHandle(dataHandle).build();Iterator<CommitPartition> removals =Arrays.asList(deletePartition).iterator();return publishApi.updateIndex(indexLayerId, additions, removals);});
Delete from an Index Layer
When you need to delete the metadata and data in an index layer, you can use the PublishApi.deleteIndex API call. The delete operation will be scheduled when the PublishApi.deleteIndex API call is successful.
The method takes 2 arguments:
layer - The layer id of the layer from that some records should be deleted.
queryString - A string written in the RSQL query language to query the index layer.
The method returns:
deleteId - A string which can later be used to query the delete status.
For checking delete status, you can use QueryApi.queryIndexDeleteStatus API call.
The method takes one argument:
deleteId - The delete request id returned from PublishApi.deleteIndex API call.
The method returns:
DeleteIndexesStatusResponse - The response will provide information on the state and number of partitions deleted at the time of delete status request.
The snippet below demonstrates the usage of the PublishApi.deleteIndex API and the QueryApi.queryIndexDeleteStatus API:
Scala
Java
importscala.concurrent.Await
importscala.concurrent.duration._
val queryString ="someIntKey>42;someStringKey!=abc"val deleteId =
Await.result(publishApi.deleteIndex(indexLayerId, queryString),45.seconds)// Note that the delete operation for deleting records in index layer is an async operation// This example will return the current status of the delete request// If user wants to wait for the delete status to be in Succeeded state, user may have to// perform multiple delete status calls// It is recommended to use exponential backoff policy to reduce the rate of delete status// calls to the serverval deleteStatusResponse =
Await.result(queryApi.queryIndexDeleteStatus(indexLayerId, deleteId),5.seconds)
println("Current state of index delete request is "+ deleteStatusResponse.state)
String queryString ="someIntKey>42;someStringKey!=abc";String deleteId =
publishApi.deleteIndex(indexLayerId, queryString).toCompletableFuture().join();// Note that the delete operation for deleting records in index layer is an async operation// This example will return the current status of the delete request// If user wants to wait for the delete status to be in Succeeded state, user may have to// perform multiple delete status calls// It is recommended to use exponential backoff policy to reduce the rate of delete status// calls to the serverDeleteIndexesStatusResponse deleteStatusResponse =
queryApi.queryIndexDeleteStatus(indexLayerId, deleteId).toCompletableFuture().join();System.out.println("Current state of index delete request is "+ deleteStatusResponse.state());
Upload an object to the Object Store Layer
The Object Store layer is a key/value store. You can upload data to an existing or a non-existing key. The data is mutable and parallel writes are allowed. You can publish keys with / to create a hierarchical structure, so that you can list the keys under the same prefix.
The following code snippet shows how to upload data to the Object Store layer:
Scala
Java
// create writeEngine for a catalogval writeEngine = DataEngine().writeEngine(catalogHrn)
writeEngine.uploadObject2(layer,
key,
NewPartition.ByteArrayData(blobData),
Some(ContentTypes.`application/json`.toString()))
// create writeEngine for a catalogWriteEngine writeEngine =DataEngine.get(myActorSystem).writeEngine(catalogHrn);CompletableFuture<Done> futureUploadObject =
writeEngine
.uploadObject2(
layer,
key,newNewPartition.ByteArrayData(blobData),Optional.of(ContentTypes.APPLICATION_JSON.toString()),Optional.empty()).toCompletableFuture();
Delete an Object from the Object Store Layer
An object can be deleted from the Object Store layer. You submit a delete request and the object will eventually be deleted from the layer.
The following code snippet shows how to delete an object from Object Store layer:
To publish data to an interactive map layer, use the Publish API. Send your data to the layer using the POST, PUT or PATCH requests of the 'interactive' api. Use Delete request to delete the Data from an interactive map layer.
Upload Data to an Interactive Map Layer
When you want to upload the specified feature to an interactive map layer, you can use thePublishApi.putFeature API call.
The method takes 3 arguments:
layer - The layer ID of the layer which should be updated.
feature - The feature to upload in the interactive map layer.
featureId - The feature ID of the feature.
When you want to upload the specified FeatureCollection to an interactive map layer, you can use thePublishApi.putFeatureCollection API call.
The method takes 2 arguments:
layer - The layer ID of the layer which should be updated.
featureCollection - The FeatureCollection to upload in an interactive map layer.
The following snippet demonstrates the usage of the PublishApi.putFeatureCollection API:
"A position is an array of numbers. There MUST be two or more elements. The first two elements are longitude and latitude, or easting and northing, precisely in that order and using decimal numbers. Altitude or elevation MAY be included as an optional third element."
"The value of the bbox member MUST be an array of length 2*n where n is the number of dimensions represented in the contained geometries, with all axes of the most southwesterly point followed by all axes of the more northeasterly point. The axes order of a bbox follows the axes order of geometries. The "bbox" values define shapes with edges that follow lines of constant longitude, latitude, and elevation."
Update an Interactive Map Layer
When you want to update the specified FeatureCollection to the Interactive Map layer, you can use thePublishApi.postFeatureCollection API call.
The method takes 5 arguments:
layer - The layer ID of the layer which should be updated.
featureCollection - The FeatureCollection to update in an interactive map layer.
ifExist - The action to execute, when a feature with the provided ID exists. Default is PATCH.
ifNotExist - The action to execute, when a feature with the provided ID does not exist, or the feature contains no ID. Default is CREATE.
transactional - Defines, if this is a transactional operation. Default is TRUE.
Alternatively, you can also use you can use thePublishApi.postFeatureModifications API call.
The method takes 3 arguments:
layer - The layer ID of the layer which should be updated.
featureModificationList - The featureModificationList contains a list of FeatureModification objects. Each FeatureModification object contains a list of features, param named onFeatureNotExists which defines action to execute when a feature with the provided ID does not exist and param named onFeatureExists which defines action to execute when a feature with the provided ID exist
transactional - Defines, if this is a transactional operation. Default is TRUE.
When you want to patch the specified feature in an interactive map layer, you can use thePublishApi.patchFeature API call.
The method takes 3 arguments:
layer - The layer ID of the layer which should be updated.
feature - The feature to update in an interactive map layer.
featureId - The feature ID of the feature.
The following snippet demonstrates the usage of the PublishApi.patchFeature API:
// create writeEngineWriteEngine writeEngine =DataEngine.get(myActorSystem).writeEngine(catalogHrn);int batchSize =100;// Source of feature to uploadSource<Feature,NotUsed> source =Source.from(featureCollection.getFeatures());Done done =
writeEngine
.uploadIMLFeaturesAsSource(layer, source,OptionalInt.of(batchSize)).toCompletableFuture().join();
Upload FeatureCollection to an Interactive Map Layer using Data-Engine
When you want to upload a large FeatureCollection to an interactive map layer you can use the writeEngine.uploadIMLFeatureCollection API call.
The method takes 3 arguments:
layer - The layer ID of the layer which should be updated.
featureCollection- the FeatureCollection to upload.
batchsize - The number of features in an upload batch.
The following snippet demonstrates the usage of the writeEngine.uploadIMLFeatureCollection API:
Scala
Java
// create publishApival writeEngine = DataEngine().writeEngine(catalogHrn)// Large FeatureCollection to uploadval featureCollection =new FeatureCollection.JsonBuilder(json).build
val batchSize =100val response =
writeEngine.uploadIMLFeatureCollection(layer, featureCollection, Some(batchSize))
Await.result(response, timeout)
// create writeEngineWriteEngine writeEngine =DataEngine.get(myActorSystem).writeEngine(catalogHrn);int batchSize =100;// Large feature collection to uploadFeatureCollection featureCollection =newFeatureCollection.JsonBuilder(json).build();Done done =
writeEngine
.uploadIMLFeatureCollection(layer, featureCollection,OptionalInt.of(batchSize)).toCompletableFuture().join();
Valid values for Interactive Map requests
Context
DEFAULT = The default value if none is given. For composite layers the operation occurs based on the extension rules. For normal layers this is the only valid context.
EXTENSION = The operation will be executed only in the extension and no operation will be performed in the extended layer.
SUPER = Only applicable for read-operations. The operation will be executed only in the layer being extended (super layer).