Read and Write Versioned Layer Data using the DataEngine

Project Dependencies

To run an application that uses the Data Client Library's DataEngine to read and write version data within a batch pipeline (Spark), use the spark-support modules as dependencies to your project.

SBT
Maven
libraryDependencies ++= Seq(
  "com.here.platform.data.client" %% "spark-support" % "0.15.24"
)
<dependencies>
    <dependency>
        <groupId>com.here.platform.data.client</groupId>
        <artifactId>spark-support_2.11</artifactId>
        <version>0.15.24</version>
    </dependency>
</dependencies>

Reading Versioned Data

This snippet shows how to use the query API within a running Spark context to consume partitions:

Scala
Java
import com.here.platform.data.client.spark.DataClientSparkContextUtils
import com.here.platform.data.client.spark.SparkSupport._
def countDataBytesUsingMultiplesWorkers(sc: SparkContext, catalog: HRN, layer: String): Long = {
  val masterActorSystem = DataClientSparkContextUtils.context.actorSystem

  // query current catalog for latest version
  val masterQueryApi = DataClient(masterActorSystem).queryApi(catalog)

  masterQueryApi.getLatestVersion().awaitResult() match {
    case Some(version) =>
      // Parallel require an in-memory sequence that need to be fully in spark.memory. A flatMap followed by
      // repartition allow to flush data into workers.
      val partitions: RDD[Partition] =
        sc.parallelize(Seq(layer))
          .flatMap { layer =>
            val workerActorSystem = DataClientSparkContextUtils.context.actorSystem
            val workerQueryApi = DataClient(workerActorSystem).queryApi(catalog)

            workerQueryApi.getPartitionsAsIterator(version, layer).awaitResult()
          }
          .repartition(100)

      val partitionsAndData: RDD[(Partition, Array[Byte])] =
        partitions.mapPartitions({ partitions =>
          val workerActorSystem = DataClientSparkContextUtils.context.actorSystem
          val readEngine = DataEngine(workerActorSystem).readEngine(catalog)

          partitions.map { partition =>
            partition -> readEngine.getDataAsBytes(partition).awaitResult()
          }
        })

      // process each partition and data
      val total: Long =
        partitionsAndData
          .map { case (partition, data) => data.length.toLong }
          .reduce(_ + _)

      total

    case None => 0L
  }
}
import com.here.platform.data.client.spark.DataClientSparkContextUtils;

private static long countDataBytesUsingMultiplesWorkers(
    JavaSparkContext jsc, HRN catalog, List<String> layerIds) {
  ActorSystem masterActorSystem = DataClientSparkContextUtils.context().actorSystem();

  // query current catalog for latest version
  QueryApi masterQueryApi = DataClient.get(masterActorSystem).queryApi(catalog);

  // Parallel require an in-memory sequence that need to be fully in spark.memory. A flatMap
  // followed by
  // repartition allow to flush data into workers.
  OptionalLong latestVersion =
      masterQueryApi.getLatestVersion(OptionalLong.empty()).toCompletableFuture().join();
  if (!latestVersion.isPresent()) return 0L;

  Long version = latestVersion.getAsLong();

  JavaRDD<Partition> partitions =
      jsc.parallelize(layerIds)
          .flatMap(
              layerId -> {
                ActorSystem workerActorSystem =
                    DataClientSparkContextUtils.context().actorSystem();
                QueryApi workerQueryApi = DataClient.get(workerActorSystem).queryApi(catalog);

                return workerQueryApi
                    .getPartitionsAsIterator(version, layerId, AdditionalFields.AllFields())
                    .toCompletableFuture()
                    .join();
              })
          .repartition(100);

  JavaRDD<Map.Entry<Partition, byte[]>> partitionsAndData =
      partitions.mapPartitions(
          innerPartitions -> {
            ActorSystem workerActorSystem = DataClientSparkContextUtils.context().actorSystem();
            ReadEngine readEngine = DataEngine.get(workerActorSystem).readEngine(catalog);

            Map<Partition, byte[]> partitionsToBytesMap = new HashMap<>();
            while (innerPartitions.hasNext()) {
              Partition partition = innerPartitions.next();
              partitionsToBytesMap.put(
                  partition, readEngine.getDataAsBytes(partition).toCompletableFuture().join());
            }
            return partitionsToBytesMap.entrySet().iterator();
          });
  return partitionsAndData.map(entry -> (long) entry.getValue().length).reduce(Long::sum);
}

Writing Versioned Data

The following snippets demonstrate how to use the publish API within a running spark context.

Note: Sharing

The DataClientSparkContext object must not be shared between the master and the workers; each one needs its own instance.

Scala
Java
import com.here.platform.data.client.spark.DataClientSparkContextUtils
import com.here.platform.data.client.spark.SparkSupport._
case class CustomData(partition: String, layer: String, data: Array[Byte])

def publishUsingMultipleWorkers(sc: SparkContext,
                                catalog: HRN,
                                layerIds: Seq[String],
                                partitions: Seq[CustomData]): Unit = {
  val masterActorSystem = DataClientSparkContextUtils.context.actorSystem

  // start commit on master
  val masterPublishApi = DataClient(masterActorSystem).publishApi(catalog)
  val latestVersion = masterPublishApi.getBaseVersion().awaitResult()
  val token: BatchToken =
    masterPublishApi.startBatch2(latestVersion, Some(layerIds), dependencies = Nil).awaitResult()

  // send partitions to workers and upload data and metadata
  val commitParts: RDD[Done] =
    sc.parallelize(partitions)
      .mapPartitions({ partitions =>
        // this code will run for each worker to process multiples partitions
        val workerActorSystem = DataClientSparkContextUtils.context.actorSystem

        val workerPublishApi = DataClient(workerActorSystem).publishApi(catalog)
        val workerWriteEngine = DataEngine(workerActorSystem).writeEngine(catalog)

        val committedPartitions: Iterator[CommitPartition] =
          partitions.map { partition =>
            val newPartition =
              NewPartition(
                partition = partition.partition,
                layer = partition.layer,
                data = NewPartition.ByteArrayData(partition.data)
              )

            workerWriteEngine.put(newPartition).awaitResult()
          }

        workerPublishApi.publishToBatch(token, committedPartitions).awaitResult()

        Seq(Done).iterator
      })

  // execute the RDD
  commitParts.collect()

  // complete the commit
  masterPublishApi.completeBatch(token).awaitResult()
}
import com.here.platform.data.client.spark.DataClientSparkContextUtils;

public static class CustomData implements Serializable {
  String partition;
  String layerId;
  byte[] data;

  CustomData(String partition, String layerId, byte[] data) {
    this.partition = partition;
    this.layerId = layerId;
    this.data = data;
  }
}

private static final int PARALLELISM = 3;

public static void publishUsingMultipleWorkers(
    JavaSparkContext jsc, HRN catalog, List<String> layerIds, List<CustomData> partitions) {
  ActorSystem masterActorSystem = DataClientSparkContextUtils.context().actorSystem();

  // start commit on master
  PublishApi masterPublishApi = DataClient.get(masterActorSystem).publishApi(catalog);

  // Get the latest catalog version of the catalog
  OptionalLong latestVersion = masterPublishApi.getBaseVersion().toCompletableFuture().join();

  // Start a publication batch on top of most recent catalog version. This commit has no upstream
  // catalog dependencies the the second parameter is an empty collection.
  BatchToken token =
      masterPublishApi
          .startBatch2(latestVersion, Optional.of(layerIds), Collections.emptyList())
          .toCompletableFuture()
          .join();

  // send partitions to workers and upload data and metadata
  JavaRDD<Done> commitParts =
      jsc.parallelize(partitions, PARALLELISM)
          .mapPartitions(
              cdPartitions -> {
                // this code will run for each worker to process multiples partitions
                ActorSystem workerActorSystem =
                    DataClientSparkContextUtils.context().actorSystem();
                PublishApi workerPublishApi =
                    DataClient.get(workerActorSystem).publishApi(catalog);
                WriteEngine workerWriteEngine =
                    DataEngine.get(workerActorSystem).writeEngine(catalog);

                List<CommitPartition> committedPartitions = new ArrayList<>();
                while (cdPartitions.hasNext()) {
                  CustomData cdPartition = cdPartitions.next();
                  com.here.platform.data.client.javadsl.NewPartition newPartition =
                      new com.here.platform.data.client.javadsl.NewPartition.Builder()
                          .withPartition(cdPartition.partition)
                          .withData(cdPartition.data)
                          .withLayer(cdPartition.layerId)
                          .build();

                  workerWriteEngine
                      .put(newPartition)
                      .thenAccept(committedPartitions::add)
                      .toCompletableFuture()
                      .join();
                }
                Done done =
                    workerPublishApi
                        .publishToBatch(token, committedPartitions.iterator())
                        .toCompletableFuture()
                        .join();
                return Collections.singletonList(done).iterator();
              });

  // execute the RDD
  commitParts.collect();

  // complete the commit
  masterPublishApi.completeBatch(token).toCompletableFuture().join();
}

results matching ""

    No results matching ""