Publish Data

The image below illustrates the data publication model. You can mix all layer types in the same publication.

dataservice-view
Figure 1. dataservice-view

The HERE platform supports following types of layers. For details on each layer type, see the dedicated chapters:

  • versioned
  • volatile
  • index
  • stream
  • objectstore

Publish to a Versioned layer

Simplified Publication Process

Just as with the simplified metadata publication for a versioned layer, you can publish both data and metadata with a single step. The snippet below automatically starts a batch publication, publishes the data and metadata using that batch publication, and finalizes the batch. The call finishes once the data has been processed and is available in the catalog.

Scala
Java
// create writeEngine for a catalog
val writeEngine = DataEngine().writeEngine(catalogHrn)

// list of dependencies for this publication
val dependencies = Seq.empty[VersionDependency]

val partitions: Source[PendingPartition, NotUsed] =
  Source(
    List(
      NewPartition(
        partition = newPartitionId1,
        layer = versionedLayerId,
        data = NewPartition.ByteArrayData(blobData)
      ),
      DeletedPartition(
        partition = deletedPartitionId,
        layer = versionedLayerId
      )
    )
  )

writeEngine.publishBatch2(parallelism = 10,
                          layerIds = Some(Seq(versionedLayerId)),
                          dependencies = dependencies,
                          partitions = partitions)
// create writeEngine for source catalog
WriteEngine writeEngine = DataEngine.get(myActorSystem).writeEngine(catalogHrn);

// parallelism defines how many parallel requests would be made to fetch the data
int parallelism = 10;

// list of dependencies for this publication
List<VersionDependency> dependencies = Collections.emptyList();

NewPartition newPartition =
    new NewPartition.Builder()
        .withPartition(partitionId)
        .withData(blobData)
        .withLayer(layer)
        .build();
DeletedPartition deletedPartition =
    new DeletedPartition.Builder().withPartition(deletedPartitionId).withLayer(layer).build();

ArrayList<PendingPartition> partitionList = new ArrayList<>();
partitionList.add(newPartition);
partitionList.add(deletedPartition);

Source<PendingPartition, NotUsed> partitions = Source.from(partitionList);

CompletableFuture<Done> futurePublish =
    writeEngine
        .publishBatch2(parallelism, Optional.of(Arrays.asList(layer)), dependencies, partitions)
        .toCompletableFuture();

Distributed Publications

The HERE platform allows you to process and publish a large number of partitions in a distributed manner.

For versioned layers, this is a three-step process:

  • start the publication process to initiate a new batch publication and during which you receive a batch token, normally this operation happens on the master or a driver node in the cluster.
  • different workers upload data/metadata, attaching them to same batch token
  • once all data is sent to server, you needs to finalize the batch publication upload, normally this operation happens on the master or a driver node in the cluster.

Upon receiving the complete batch request, the server starts processing the publications to create the next catalog version.

The following snippet illustrates how to publish multiple requests to a versioned layer:

Scala
Java
// create publishApi and writeEngine for source catalog
val publishApi = DataClient().publishApi(catalogHrn, settings)
val writeEngine = DataEngine().writeEngine(catalogHrn)

// start batch publication
publishApi
  .startBatch2(None, Some(Seq(versionedLayerId)), Seq.empty)
  .flatMap { batchToken =>
    //start worker 1 with upload data and publishing metadata
    val worker1 =
      publishApi.publishToBatch(batchToken, partitions1.mapAsync(parallelism = 2) {
        partition =>
          writeEngine.put(partition)
      })

    //start worker 2 with upload data and publishing metadata
    val worker2 =
      publishApi.publishToBatch(batchToken, partitions2.mapAsync(parallelism = 2) {
        partition =>
          writeEngine.put(partition)
      })

    // wait until workers are done uploading data/metadata
    for {
      _ <- worker1
      _ <- worker2
    } yield batchToken

  }
  .flatMap { batchToken =>
    //signal to server complete batch publication
    publishApi.completeBatch(batchToken)
  }
// create publishApi and writeEngine for source catalog
PublishApi publishApi = DataClient.get(myActorSystem).publishApi(catalogHrn);
WriteEngine writeEngine = DataEngine.get(myActorSystem).writeEngine(catalogHrn);

// start a batch, publish partitions, complete batch
CompletableFuture<Done> futurePublish =
    publishApi
        .startBatch2(baseVersion, Optional.empty(), Collections.emptyList())
        .thenCompose(
            batchToken -> {
              // start worker 1 with upload data and publishing metadata
              Source<PendingPartition, NotUsed> partitionsOnWorker1 =
                  arbitraryPendingPartitions1;
              CompletableFuture<Done> worker1 =
                  publishApi
                      .publishToBatch(
                          batchToken,
                          partitionsOnWorker1.mapAsync(
                              2, partition -> writeEngine.put(partition)))
                      .toCompletableFuture();

              // start worker 2 with upload data and publishing metadata
              Source<PendingPartition, NotUsed> partitionsOnWorker2 =
                  arbitraryPendingPartitions2;
              CompletableFuture<Done> worker2 =
                  publishApi
                      .publishToBatch(
                          batchToken,
                          partitionsOnWorker2.mapAsync(
                              2, partition -> writeEngine.put(partition)))
                      .toCompletableFuture();

              // wait until workers are done upload
              return worker1.thenCombine(worker2, (done, done2) -> batchToken);
            })
        .thenCompose(
            batchToken -> {
              return publishApi.completeBatch(batchToken);
            })
        .toCompletableFuture();

Publish to a Stream Layer

Data published to a stream layer is not versioned. It becomes immediately available to consumers for processing. The data can be retrieved by subscribing to the stream layer.

The snippet below illustrates how to publish to a stream layer.`

Scala
Java
// create writeEngine and queryApi for a catalog
val writeEngine = DataEngine().writeEngine(catalogHrn)
val queryApi = DataClient().queryApi(catalogHrn)

// subscribe to receive new publications from stream layer
queryApi.subscribe("stream-layer",
                   ConsumerSettings("test-consumer"),
                   partition => println("Received " + new String(partition.partition)))

val partitions =
  Source.single(
    NewPartition(
      partition = newPartitionId1,
      layer = streamingLayerId,
      data = NewPartition.ByteArrayData(blobData),
      dataSize = dataSize,
      checksum = checksum,
      compressedDataSize = compressedDataSize,
      timestamp = Some(timestamp) // optional, see explation below
    )
  )

writeEngine.publish(partitions)
// create writeEngine and queryApi for a catalog
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);
WriteEngine writeEngine = DataEngine.get(myActorSystem).writeEngine(catalogHrn);

// subscribe to receive new publications from stream layer
queryApi.subscribe(
    "stream-layer",
    new ConsumerSettings.Builder().withGroupName("test-consumer").build(),
    partition -> processPartition(partition));

NewPartition newPartition =
    new NewPartition.Builder()
        .withPartition(partitionId)
        .withData(blobData)
        .withLayer(layer)
        .withDataSize(dataSize)
        .withCompressedDataSize(compressedDataSize)
        .withChecksum(checksum)
        .withTimestamp(OptionalLong.of(timestamp)) // optional, see explation below
        .build();

Source<PendingPartition, NotUsed> partitions = Source.single(newPartition);

writeEngine.publish(partitions);

If the optional parameter timestamp is given then it is used as is. If this parameter is not present then the current time is used by default (System.currentTimeMillis()). Providing a timestamp can be useful if your workflows require you to capture "event" time, otherwise the current time represents the "ingestion" time.

Note

Kafka data/record deletion as configured by your stream layer TTL (Time to Live) configuration is triggered by the timestamp parameter. If you therefore provide the timestamp, the stream layer data will be deleted based on that timestamp + the configured TTL (Time to Live). By default, the data will be purged from Kafka using the "ingestion" timestamp + the configured TTL. This differentiation can be important to consider if your use case requires that the stream data be deleted within a certain time post ingestion. The Kafka record TTL (time to live) is defined in the Kafka broker.

Publish to a Volatile Layer

A volatile layer is a key/value store where values for a given key can change and only the latest value is retrievable. As new data is published, old data is overwritten. You must publish a new version of the metadata if there are going to be breaking changes on the data that consumers are expecting to read in the layer.

If you need to publish a new version to a volatile layer, use version dependencies to upload the partitions using the batch publication.

The snippet below illustrates how to use version dependencies.

Scala
Java
// get base version to commit a new version
val publishApi = DataClient().publishApi(catalogHrn)

publishApi.getBaseVersion().flatMap { baseVersion =>
  // compute next version to be used in Md5BlobIdGenerator
  val nextVersion =
    baseVersion
      .map(_ + 1L)
      .getOrElse(0L)

  // create writeEngine for a catalog with a deterministic BlobIdGenerator
  val writeEngine =
    DataEngine().writeEngine(catalogHrn, new StableBlobIdGenerator(nextVersion))

  // list of dependencies for this publication
  val dependencies = Seq.empty[VersionDependency]

  // given a list partitions to commit
  val partitions: Source[PendingPartition, NotUsed] =
    Source(
      List(
        NewPartition(
          partition = newPartitionId1,
          layer = volatileLayerId,
          data = maybeEmptyData
        ),
        NewPartition(
          partition = newPartitionId2,
          layer = volatileLayerId,
          data = maybeEmptyData
        )
      )
    )

  // upload data
  val commitPartitions: Source[CommitPartition, NotUsed] =
    partitions.mapAsync(parallelism = 10) { pendingPartition =>
      writeEngine.put(pendingPartition)
    }

  // publish version to metadata
  publishApi
    .publishBatch2(baseVersion, Some(Seq(volatileLayerId)), dependencies, commitPartitions)
}
// get base version to commit a new version
PublishApi publishApi = DataClient.get(myActorSystem).publishApi(catalogHrn);

publishApi
    .getBaseVersion()
    .thenCompose(
        baseVersion -> {
          // compute next version to be used in Md5BlobIdGenerator
          Long nextVersion = baseVersion.isPresent() ? baseVersion.getAsLong() + 1 : 0;

          // create writeEngine for a catalog with a deterministic BlobIdGenerator
          BlobIdGenerator idGenerator =
              new StableBlobIdGenerator.Builder().withVersion(nextVersion).build();

          WriteEngine writeEngine =
              DataEngine.get(myActorSystem).writeEngine(catalogHrn, idGenerator);

          // list of dependencies for this publication
          List<VersionDependency> dependencies = Collections.emptyList();

          NewPartition newPartition1 =
              new NewPartition.Builder()
                  .withPartition(partitionId1)
                  .withLayer(layer)
                  .withData(maybeEmptyData)
                  .build();

          NewPartition newPartition2 =
              new NewPartition.Builder()
                  .withPartition(partitionId2)
                  .withLayer(layer)
                  .withData(maybeEmptyData)
                  .build();

          ArrayList<PendingPartition> partitionList = new ArrayList<>();
          partitionList.add(newPartition1);
          partitionList.add(newPartition2);

          Source<PendingPartition, NotUsed> partitions = Source.from(partitionList);

          int parallelism = 10;

          // upload data
          Source<CommitPartition, NotUsed> commitPartitions =
              partitions.mapAsync(parallelism, writeEngine::put);

          // publish version to metadata
          CompletionStage<Done> done =
              publishApi.publishBatch2(
                  baseVersion,
                  Optional.of(Arrays.asList(layer)),
                  dependencies,
                  commitPartitions);
          return done;
        });

If you only need to update data in a volatile layer, use the generic publish method.

The snippet below illustrates how to use publish.

Scala
Java
// create queryApi for a catalog to find latest version
val queryApi = DataClient().queryApi(catalogHrn)

queryApi.getLatestVersion().flatMap { maybeLatestVersion =>
  val latestVersion =
    maybeLatestVersion.getOrElse(throw new IllegalArgumentException("No version found!"))

  // create writeEngine for a catalog with a deterministic BlobIdGenerator
  val writeEngine =
    DataEngine().writeEngine(catalogHrn, new StableBlobIdGenerator(latestVersion))

  val partitions: Source[PendingPartition, NotUsed] =
    Source(
      List(
        NewPartition(
          partition = newPartitionId1,
          layer = volatileLayerId,
          data = someData
        ),
        NewPartition(
          partition = newPartitionId2,
          layer = volatileLayerId,
          data = someData
        )
      )
    )

  // publish data without batch token
  partitions
    .mapAsync(parallelism = 10) { partition =>
      writeEngine.put(partition)
    }
    .runWith(Sink.ignore)
}
// create queryApi for a catalog to find latest version
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);

CompletionStage<Done> completionStage =
    queryApi
        .getLatestVersion(OptionalLong.empty())
        .thenCompose(
            maybeLatestVersion -> {
              if (!maybeLatestVersion.isPresent())
                throw new IllegalArgumentException("No version found!");

              Long latestVersion = maybeLatestVersion.getAsLong();

              // create writeEngine for a catalog with a deterministic BlobIdGenerator
              BlobIdGenerator idGenerator =
                  new StableBlobIdGenerator.Builder().withVersion(latestVersion).build();

              WriteEngine writeEngine =
                  DataEngine.get(myActorSystem).writeEngine(catalogHrn, idGenerator);

              NewPartition newPartition1 =
                  new NewPartition.Builder()
                      .withPartition(partitionId1)
                      .withLayer(layer)
                      .withData(someData)
                      .build();

              NewPartition newPartition2 =
                  new NewPartition.Builder()
                      .withPartition(partitionId2)
                      .withLayer(layer)
                      .withData(someData)
                      .build();

              ArrayList<PendingPartition> partitionList = new ArrayList<>();
              partitionList.add(newPartition1);
              partitionList.add(newPartition2);

              Source<PendingPartition, NotUsed> partitions = Source.from(partitionList);

              int parallelism = 10;

              // publish data without batch token
              CompletionStage<Done> done =
                  partitions
                      .mapAsync(parallelism, writeEngine::put)
                      .runWith(Sink.ignore(), myMaterializer);

              return done;
            });

Delete from a Volatile Layer

When you need to delete the metadata and data from a volatile layer, you can use the following two step process.

  1. First, you delete the data using DataEngine.writeEngine with DeletePartition object. A DeletePartition is similar to NewPartition but contains the dataHandle instead of the payload. This was the referenced blob object (the data) is deleted.

  2. Second, you delete the metadata by using PublishApi.publishBatch with the CommitPartition object you got from the writeEngine previous API call.

If you skip the second step you get the same result as if the volatile partition is expired; the data is gone but the metadata is still there.

The snippet below illustrates how to delete data and metadata from volatile layer.

Scala
Java
// get base version to commit a new version
val publishApi = DataClient().publishApi(catalogHrn)

publishApi.getBaseVersion().flatMap { baseVersion =>
  // compute next version to be used in Md5BlobIdGenerator
  val nextVersion =
    baseVersion
      .map(_ + 1L)
      .getOrElse(0L)

  // create writeEngine for a catalog with a deterministic BlobIdGenerator
  val writeEngine =
    DataEngine().writeEngine(catalogHrn, new StableBlobIdGenerator(nextVersion))

  // list of dependencies for this publication
  val dependencies = Seq.empty[VersionDependency]

  val queryApi = DataClient().queryApi(catalogHrn)
  val filter = VolatilePartitionsFilter.byIds(Set(deletePartitionId1, deletePartitionId2))
  val partitions: Seq[Partition] = Await
    .result(queryApi.getVolatilePartitionsAsIterator(volatileLayerId, filter), Duration.Inf)
    .toSeq

  // prepare list of partitions to be deleted
  val deletePartitions: Source[PendingPartition, NotUsed] =
    Source(
      partitions.map {
        case referencePartition: ReferencePartition =>
          val dataHandle = referencePartition.getDataHandle
          val partitionId = referencePartition.partition
          DeletedPartition(
            partition = partitionId,
            layer = volatileLayerId,
            dataHandle = Some(dataHandle)
          )
      }.toList
    )

  // delete data
  val commitPartitions: Source[CommitPartition, NotUsed] =
    deletePartitions.mapAsync(parallelism = 10) { pendingPartition =>
      writeEngine.put(pendingPartition)
    }

  // publish version to metadata
  publishApi
    .publishBatch2(baseVersion, Some(Seq(volatileLayerId)), dependencies, commitPartitions)
}
// get the partitions from partitionIds
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);
VolatilePartitionsFilter filter =
    new VolatilePartitionsFilter.Builder()
        .withIds(new HashSet<String>(Arrays.asList(partitionId1, partitionId2)))
        .build();

final List<Partition> partitions = new ArrayList<Partition>();
try {
  queryApi
      .getVolatilePartitionsAsIterator(layerId, filter, Collections.emptySet())
      .toCompletableFuture()
      .get()
      .forEachRemaining(partitions::add);
} catch (Exception exp) {
  partitions.clear();
}

// get base version to commit a new version
PublishApi publishApi = DataClient.get(myActorSystem).publishApi(catalogHrn);

publishApi
    .getBaseVersion()
    .thenCompose(
        baseVersion -> {
          // compute next version to be used in Md5BlobIdGenerator
          Long nextVersion = baseVersion.isPresent() ? baseVersion.getAsLong() + 1 : 0;

          // create writeEngine for a catalog with a deterministic BlobIdGenerator
          BlobIdGenerator idGenerator =
              new StableBlobIdGenerator.Builder().withVersion(nextVersion).build();

          WriteEngine writeEngine =
              DataEngine.get(myActorSystem).writeEngine(catalogHrn, idGenerator);

          ArrayList<PendingPartition> partitionList = new ArrayList<>();
          for (Partition p : partitions) {
            if (p instanceof ReferencePartition) {
              ReferencePartition referencePartition = (ReferencePartition) p;
              partitionList.add(
                  new DeletedPartition.Builder()
                      .withLayer(layerId)
                      .withPartition(referencePartition.getPartition())
                      .withDataHandle(referencePartition.getDataHandle())
                      .build());
            }
          }

          Source<PendingPartition, NotUsed> pendingPartitions = Source.from(partitionList);

          int parallelism = 10;

          // upload data
          Source<CommitPartition, NotUsed> commitPartitions =
              pendingPartitions.mapAsync(parallelism, writeEngine::put);

          // publish version to metadata
          CompletionStage<Done> done =
              publishApi.publishBatch2(
                  baseVersion,
                  Optional.of(Arrays.asList(layerId)),
                  Collections.emptyList(),
                  commitPartitions);
          return done;
        });

Note

BlobIdGenerator
It is recommended to use the StableBlobIdGenerator to create the write engine for uploading volatile partitions. If you define your own BlobIdGenerator ensure that the method generateVolatileBlobId(partition) is stable, that is, for a certain partition it generates same blobId on each call. By default generateVolatileBlobId returns the result of generateBlobId. So if this method is stable it should be fine otherwise it must be overridden, as having a non stable blobIds for volatile partitions can create orphaned blobs.

Custom blobIdGenerator example bellow:

Scala
Java
class CustomBlobIdGenerator extends BlobIdGenerator {

  override def generateBlobId(partition: NewPartition): String =
    UUID.randomUUID.toString

  override def generateVolatileBlobId(partition: NewPartition): String =
    "volatile-partition-" + partition.partition

}
public class JavaCustomBlobIdGenerator implements BlobIdGenerator {
  @Override
  public String generateBlobId(NewPartition partition) {
    return UUID.randomUUID().toString();
  }

  @Override
  public String generateVolatileBlobId(NewPartition partition) {
    return "volatile-partition-" + partition.partition();
  }
}

Publish to an Index Layer

Data published to an index layer is not versioned but is indexed. To publish and index the data, you have two options:

  • You can separately publish and index the data by calling the methods WriteEngine.put and PublishApi.index.
  • Or you can call the method WriteEngine.uploadAndIndex that both publishes and indexes the data of a partition.

The snippet below illustrates how to create a new partition that can later be published to an index layer.

Scala
Java
// How to define NewPartition for Index layer
val newPartition = NewPartition(
  partition = "",
  layer = indexLayerId,
  data = ByteArrayData(bytes),
  fields = Some(
    Map(
      "someIntKey" -> IntIndexValue(42),
      "someStringKey" -> StringIndexValue("abc"),
      "someBooleanKey" -> BooleanIndexValue(true),
      "someTimeWindowKey" -> TimeWindowIndexValue(123456789L),
      "someHereTileKey" -> HereTileIndexValue(91956L)
    )),
  metadata = Some(
    Map(
      "someKey1" -> "someValue1",
      "someKey2" -> "someValue2"
    )),
  checksum = Some(checksum),
  crc = Some(crc),
  dataSize = Some(dataSize)
)
// How to define NewPartition for Index layer
NewPartition newPartition =
    new NewPartition.Builder()
        .withPartition("")
        .withLayer(indexLayerId)
        .withData(bytes)
        .addIntField("someIntKey", 42)
        .addStringField("someStringKey", "abc")
        .addBooleanField("someBooleanKey", true)
        .addTimeWindowField("someTimeWindowKey", 123456789L)
        .addHereTileField("someHereTileKey", 91956L)
        .addMetadata("someKey1", "someValue1")
        .addMetadata("someKey2", "someValue2")
        .withChecksum(Optional.of(checksum))
        .withDataSize(OptionalLong.of(dataSize))
        .build();

Note

The Metadata parameter is an additional key-value collection that is not related to any index key. A Metadata key is a user-defined field that can store extra information about a record such as the ingestion time: Map("ingestionTime" -> "1532018660873").

The NewPartition.fields class member is also called index attributes on the portal or indexDefinitions in the OLP CLI.

The snippet below illustrates how to upload data and to index partition with single method WriteEngine.uploadAndIndex.

Scala
Java
// The example illustrated how to upload data and to index partition
// with single method WriteEngine.uploadAndIndex
writeEngine.uploadAndIndex(Iterator(newPartition))
// The example illustrated how to upload data and to index partition
// with single method WriteEngine.uploadAndIndex
Iterator<NewPartition> partitions = Arrays.asList(newPartition).iterator();
CompletionStage<Done> publish = writeEngine.uploadAndIndex(partitions);

The snippet below illustrates how to upload data with WriteEngine.put and then to index partition with PublishApi.publishIndex

Scala
Java
// How to upload data with WriteEngine.put and
// index the partition with PublishApi.publishIndex
val putAndIndex: Future[Done] =
  for {
    commitPartition <- writeEngine.put(newPartition)
    _ <- publishApi.publishIndex(indexLayerId, Iterator(commitPartition))
  } yield Done
// How to upload data with WriteEngine.put and
// index the partition with PublishApi.publishIndex
CompletionStage<Done> putAndIndex =
    writeEngine
        .put(newPartition)
        .thenCompose(
            commitPartition -> {
              Iterator<CommitPartition> commitPartitions =
                  Arrays.asList(commitPartition).iterator();
              return publishApi.publishIndex(indexLayerId, commitPartitions);
            });

Update an Index Layer

When you need to change the data in an index layer, you can use the PublishApi.updateIndex API call. The method takes 3 arguments:

  • layer - The layer id of the layer which should be updated.
  • additions - A list of partitions to add. Note that you must first upload the data blob using WriteEngine.put before you can add the corresponding partition.
  • deletions - A list of partitions to delete.

The following snippet demonstrates the usage of the PublishApi.updateIndex API:

Scala
Java
val updateIndex: Future[Done] = {
  // partitions to add
  // see above how to define a new partition for an index layer
  val additions = Seq(newPartition)
  // partitions to remove
  // use CommitPartition.deletedIndexPartition to define a partition its data handle that
  // you plan to remove
  val removals = Seq(CommitPartition.deletedIndexPartition(dataHandle, indexLayerId))

  for {
    // first you have to upload corresponding blobs of the new partitions to the Blob Store
    committedAdditions <- Future.sequence(additions.map(p => writeEngine.put(p)))
    _ <- publishApi.updateIndex(indexLayerId,
                                committedAdditions.toIterator,
                                removals.toIterator)
  } yield Done
}
CompletionStage<Done> updateIndex =
    writeEngine
        // first you have to upload corresponding blobs of the new partitions
        // to the Blob Store
        .put(newPartition)
        .thenCompose(
            commitPartition -> {
              Iterator<CommitPartition> additions = Arrays.asList(commitPartition).iterator();

              // use DeleteIndexPartitionBuilder to define partitions that you plan to remove
              CommitPartition deletePartition =
                  new CommitPartition.Builder()
                      .deleteIndexPartition()
                      .withLayer(indexLayerId)
                      .withDataHandle(dataHandle)
                      .build();

              Iterator<CommitPartition> removals = Arrays.asList(deletePartition).iterator();

              return publishApi.updateIndex(indexLayerId, additions, removals);
            });

Delete from an Index Layer

When you need to delete the metadata and data in an index layer, you can use the PublishApi.deleteIndex API call. The delete operation will be scheduled when the PublishApi.deleteIndex API call is successful.

The method takes 2 arguments:

  • layer - The layer id of the layer from that some records should be deleted.
  • queryString - A string written in the RSQL query language to query the index layer.

The method returns:

  • deleteId - A string which can later be used to query the delete status.

For checking delete status, you can use QueryApi.queryIndexDeleteStatus API call.

The method takes one argument:

  • deleteId - The delete request id returned from PublishApi.deleteIndex API call.

The method returns:

  • DeleteIndexesStatusResponse - The response will provide information on the state and number of partitions deleted at the time of delete status request.

The snippet below demonstrates the usage of the PublishApi.deleteIndex API and the QueryApi.queryIndexDeleteStatus API:

Scala
Java
import scala.concurrent.Await
import scala.concurrent.duration._

val queryString = "someIntKey>42;someStringKey!=abc"
val deleteId =
  Await.result(publishApi.deleteIndex(indexLayerId, queryString), 45.seconds)

// Note that the delete operation for deleting records in index layer is an async operation
// This example will return the current status of the delete request
// If user wants to wait for the delete status to be in Succeeded state, user may have to
// perform multiple delete status calls
// It is recommended to use exponential backoff policy to reduce the rate of delete status
// calls to the server
val deleteStatusResponse =
  Await.result(queryApi.queryIndexDeleteStatus(indexLayerId, deleteId), 5.seconds)
println("Current state of index delete request is " + deleteStatusResponse.state)
String queryString = "someIntKey>42;someStringKey!=abc";
String deleteId =
    publishApi.deleteIndex(indexLayerId, queryString).toCompletableFuture().join();

// Note that the delete operation for deleting records in index layer is an async operation
// This example will return the current status of the delete request
// If user wants to wait for the delete status to be in Succeeded state, user may have to
// perform multiple delete status calls
// It is recommended to use exponential backoff policy to reduce the rate of delete status
// calls to the server
DeleteIndexesStatusResponse deleteStatusResponse =
    queryApi.queryIndexDeleteStatus(indexLayerId, deleteId).toCompletableFuture().join();
System.out.println(
    "Current state of index delete request is " + deleteStatusResponse.state());

Upload an object to the Object Store Layer

The Object Store layer is a key/value store. You can upload data to an existing or a non-existing key. The data is mutable and parallel writes are allowed. You can publish keys with / to create a hierarchical structure, so that you can list the keys under the same prefix.

The following code snippet shows how to upload data to the Object Store layer:

Scala
Java
// create writeEngine for a catalog
val writeEngine = DataEngine().writeEngine(catalogHrn)

writeEngine.uploadObject2(layer,
                          key,
                          NewPartition.ByteArrayData(blobData),
                          Some(ContentTypes.`application/json`.toString()))
// create writeEngine for a catalog
WriteEngine writeEngine = DataEngine.get(myActorSystem).writeEngine(catalogHrn);

CompletableFuture<Done> futureUploadObject =
    writeEngine
        .uploadObject2(
            layer,
            key,
            new NewPartition.ByteArrayData(blobData),
            Optional.of(ContentTypes.APPLICATION_JSON.toString()),
            Optional.empty())
        .toCompletableFuture();

Delete an Object from the Object Store Layer

An object can be deleted from the Object Store layer. You submit a delete request and the object will eventually be deleted from the layer.

The following code snippet shows how to delete an object from Object Store layer:

Scala
Java
// create writeEngine
val writeEngine = DataEngine().writeEngine(catalogHrn)

writeEngine.deleteObject(layer, key)
// create writeEngine
WriteEngine writeEngine = DataEngine.get(myActorSystem).writeEngine(catalogHrn);

CompletableFuture<Done> futureUploadObject =
    writeEngine.deleteObject(layer, key).toCompletableFuture();

Copy an object within the Object Store Layer

Object Store allows server side copying of an object within the same layer.

The following code snippet shows how to copy an object within the same Object Store layer:

Scala
Java
// create writeEngine
val writeEngine = DataEngine().writeEngine(catalogHrn)

writeEngine.copyObject(layer, destinationKey, sourceKey)
// create writeEngine
WriteEngine writeEngine = DataEngine.get(myActorSystem).writeEngine(catalogHrn);

CompletableFuture<Done> futureUploadObject =
    writeEngine.copyObject(layer, destinationKey, sourceKey).toCompletableFuture();

Publish to an Interactive Map Layer

To publish data to an interactive map layer, use the Publish API. Send your data to the layer using the POST, PUT or PATCH requests of the 'interactive' api. Use Delete request to delete the Data from an interactive map layer.

Upload Data to an Interactive Map Layer

When you want to upload the specified feature to an interactive map layer, you can use thePublishApi.putFeature API call.

The method takes 3 arguments:

  • layer - The layer ID of the layer which should be updated.
  • feature - The feature to upload in the interactive map layer.
  • featureId - The feature ID of the feature.

When you want to upload the specified FeatureCollection to an interactive map layer, you can use thePublishApi.putFeatureCollection API call.

The method takes 2 arguments:

  • layer - The layer ID of the layer which should be updated.
  • featureCollection - The FeatureCollection to upload in an interactive map layer.

The following snippet demonstrates the usage of the PublishApi.putFeatureCollection API:

Scala
Java
// create publishApi
val publishApi = DataClient().publishApi(catalogHrn, settings)

val featureCollection =
  FeatureCollection(
    features = immutable.Seq(
      Feature(id = Some("feature-1"),
              geometry = Some(Point(coordinates = Some(immutable.Seq(10.0, 12.0)))),
              properties = Some(Map("prop1" -> "some-value", "prop2" -> 10)))
    ))

val futureResponse = publishApi.putFeatureCollection(layerId, featureCollection)
val response = Await.result(futureResponse, timeout)
// create queryApi
PublishApi publishApi = DataClient.get(myActorSystem).publishApi(catalogHrn);

// Map of properties in the feature
Map properties = new HashMap();
properties.put("prop1", "some-value");
properties.put("prop2", 10);

Feature feature =
    new Feature.Builder()
        .withId("feature-1")
        .withGeometry(
            new Point.Builder()
                .withCoordinates(new ArrayList<>(Arrays.asList(10.0, 12.0)))
                .build())
        .withProperties(properties)
        .build();
FeatureCollection featureCollection =
    new FeatureCollection.Builder()
        .withFeatures(new ArrayList<>(Arrays.asList(feature)))
        .build();

FeatureCollection response =
    publishApi.putFeatureCollection(layerId, featureCollection).toCompletableFuture().join();

Geo-coordinates

Interactive map layer works with geo-coordinates. The meaning of a geo-coordinate is defined by GeoJSON RFC7946. Citing RFC7946 chapter "Position":

"A position is an array of numbers. There MUST be two or more elements. The first two elements are longitude and latitude, or easting and northing, precisely in that order and using decimal numbers. Altitude or elevation MAY be included as an optional third element."

For bounding boxes citing RFC7946 chapter "Bounding Box":

"The value of the bbox member MUST be an array of length 2*n where n is the number of dimensions represented in the contained geometries, with all axes of the most southwesterly point followed by all axes of the more northeasterly point. The axes order of a bbox follows the axes order of geometries. The "bbox" values define shapes with edges that follow lines of constant longitude, latitude, and elevation."

Update an Interactive Map Layer

When you want to update the specified FeatureCollection to the Interactive Map layer, you can use thePublishApi.postFeatureCollection API call.

The method takes 5 arguments:

  • layer - The layer ID of the layer which should be updated.
  • featureCollection - The FeatureCollection to update in an interactive map layer.
  • ifExist - The action to execute, when a feature with the provided ID exists. Default is PATCH.
  • ifNotExist - The action to execute, when a feature with the provided ID does not exist, or the feature contains no ID. Default is CREATE.
  • transactional - Defines, if this is a transactional operation. Default is TRUE.

Alternatively, you can also use you can use thePublishApi.postFeatureModifications API call.

The method takes 3 arguments:

  • layer - The layer ID of the layer which should be updated.
  • featureModificationList - The featureModificationList contains a list of FeatureModification objects. Each FeatureModification object contains a list of features, param named onFeatureNotExists which defines action to execute when a feature with the provided ID does not exist and param named onFeatureExists which defines action to execute when a feature with the provided ID exist
  • transactional - Defines, if this is a transactional operation. Default is TRUE.

When you want to patch the specified feature in an interactive map layer, you can use thePublishApi.patchFeature API call.

The method takes 3 arguments:

  • layer - The layer ID of the layer which should be updated.
  • feature - The feature to update in an interactive map layer.
  • featureId - The feature ID of the feature.

The following snippet demonstrates the usage of the PublishApi.patchFeature API:

Scala
Java
// create publishApi
val publishApi = DataClient().publishApi(catalogHrn, settings)

val featureId = "feature-1"
val feature = Feature(geometry = Some(Point(coordinates = Some(immutable.Seq(10.0, 15.0)))))

val futureResponse = publishApi.patchFeature(layerId, feature, featureId)
val response = Await.result(futureResponse, timeout)
// create queryApi
PublishApi publishApi = DataClient.get(myActorSystem).publishApi(catalogHrn);

// The featureId to be updated
String featureId = "feature-1";

// Updated feature
Feature feature =
    new Feature.Builder()
        .withGeometry(
            new Point.Builder()
                .withCoordinates(new ArrayList<>(Arrays.asList(10.0, 15.0)))
                .build())
        .build();

Feature response =
    publishApi.patchFeature(layerId, feature, featureId).toCompletableFuture().join();

Delete from an Interactive Map Layer

When you want to delete the specified features from an interactive map layer you can use the PublishApi.deleteFeatures API call.

The method takes 3 arguments:

  • layer - The layer ID of the layer which should be updated.
  • ids - List of feature IDs to be deleted.
  • context - Interactive Map context (optional parameter) - see below for the list of valid values.

The following snippet demonstrates the usage of the PublishApi.deleteFeatures API:

Scala
Java
// create publishApi
val publishApi = DataClient().publishApi(catalogHrn, settings)

// List of feature ids to delete
val ids = Seq("feature-1", "feature-2")

val futureResponse = publishApi.deleteFeatures(layerId, ids, Some(context))
val response = Await.result(futureResponse, timeout)

val deletedIds: Seq[String] = response.deleted.get
// create queryApi
PublishApi publishApi = DataClient.get(myActorSystem).publishApi(catalogHrn);

List<String> ids = new ArrayList<>(Arrays.asList("feature-1", "feature-2"));

FeatureCollection response =
    publishApi.deleteFeatures(layerId, ids, Optional.of(context)).toCompletableFuture().join();

// Check the list of deleted feature Ids
List<String> deletedFeatures = response.getDeleted();

Delete changesets from an Interactive Map Layer

When you want to delete one or more changesets from an interactive map layer you can use the PublishApi.deleteFeatureChanges API call.

The method takes 2 arguments:

  • layer - The layer ID of the layer which should be updated.
  • versionQuery - The query parameter used to specify the versions to be deleted

The following snippet demonstrates the usage of the PublishApi.deleteFeatureChanges API:

Scala
Java
// create publishApi
val publishApi = DataClient().publishApi(catalogHrn, settings)

// delete all changesets bellow version=8L
val response = publishApi.deleteFeatureChanges(
  layerId,
  VersionQuery.lessThan(8L)
)
// create publishApi
PublishApi publishApi = DataClient.get(myActorSystem).publishApi(catalogHrn);

// delete all changesets bellow version=8L
publishApi.deleteFeatureChanges(layerId, VersionQuery.lessThan(8L));

Upload Features to an Interactive Map Layer using Data-Engine

When you want to upload features to an interactive map layer you can use the writeEngine.uploadIMLFeaturesAsSource API call.

The method takes 3 arguments:

  • layer - The layer ID of the layer which should be updated.
  • features - Source of the features to be uploaded.
  • batchsize - The number of features in an upload batch.

The following snippet demonstrates the usage of the writeEngine.uploadIMLFeaturesAsSource API:

Scala
Java
// create writeEngine
val writeEngine = DataEngine().writeEngine(catalogHrn)

val batchSize = 100

// Source of Feature to upload
val source: Source[Feature, NotUsed] = Source(featureCollection.features)

val response = writeEngine.uploadIMLFeaturesAsSource(layerId, source, Some(batchSize))
Await.result(response, timeout)
// create writeEngine
WriteEngine writeEngine = DataEngine.get(myActorSystem).writeEngine(catalogHrn);

int batchSize = 100;

// Source of feature to upload
Source<Feature, NotUsed> source = Source.from(featureCollection.getFeatures());

Done done =
    writeEngine
        .uploadIMLFeaturesAsSource(layerId, source, OptionalInt.of(batchSize))
        .toCompletableFuture()
        .join();

Upload FeatureCollection to an Interactive Map Layer using Data-Engine

When you want to upload a large FeatureCollection to an interactive map layer you can use the writeEngine.uploadIMLFeatureCollection API call.

The method takes 3 arguments:

  • layer - The layer ID of the layer which should be updated.
  • featureCollection- the FeatureCollection to upload.
  • batchsize - The number of features in an upload batch.

The following snippet demonstrates the usage of the writeEngine.uploadIMLFeatureCollection API:

Scala
Java
// create publishApi
val writeEngine = DataEngine().writeEngine(catalogHrn)

// Large FeatureCollection to upload
val featureCollection = new FeatureCollection.JsonBuilder(json).build
val batchSize = 100

val response =
  writeEngine.uploadIMLFeatureCollection(layerId, featureCollection, Some(batchSize))
Await.result(response, timeout)
// create writeEngine
WriteEngine writeEngine = DataEngine.get(myActorSystem).writeEngine(catalogHrn);

int batchSize = 100;

// Large feature collection to upload
FeatureCollection featureCollection = new FeatureCollection.JsonBuilder(json).build();

Done done =
    writeEngine
        .uploadIMLFeatureCollection(layerId, featureCollection, OptionalInt.of(batchSize))
        .toCompletableFuture()
        .join();

Valid values for Interactive Map requests

Context

  • DEFAULT = The default value if none is given. For composite layers the operation occurs based on the extension rules. For normal layers this is the only valid context.
  • EXTENSION = The operation will be executed only in the extension and no operation will be performed in the extended layer.
  • SUPER = Only applicable for read-operations. The operation will be executed only in the layer being extended (super layer).

VersionQuery

For deleting Changesets from an interactive map layer, the only supported filter currently is lessThan a version. The filter for a specific version can be constructed using the factory method: VersionQuery.lessThan(version)

results matching ""

    No results matching ""