Read from a catalog in a batch application
Objectives: Estimate the total length of the road geometries of the HERE Map Content.
Complexity: Beginner
Time to complete: 30 min
Prerequisites: Organize your work in projects
Source code: Download
When developing in the HERE Workspace with the purpose of deploying a job in a pipeline, you can choose between two runtime environments.
- You can use batch to run Spark-based applications.
- You can use stream to run Flink-based applications.
This example demonstrates a simple Spark batch application that downloads data from the HERE Map Content catalog topology-geometry
layer, in order to estimate the total length of the road geometries present in the map.
The topology-geometry
layer contains the HERE Map Content topology and the geometry of the road segments. The spatial partitioning scheme for this layer is HereTile
. For more information on HereTile
partitioning, see this document.
Each segment also contains a length
attribute that represents its total length in meters.
First download the metadata for the layer that contains the list of partitions for the layer using the queryMetadata
and select a random sample of about 1/1000 of the available partitions.
For each selected partition, download the related data and sum all the lengths available in each partition. This reduces the resulting RDD
of doubles to a single number and sums up all the values present in the selected tiles.
Set up the project in Maven
In order to develop an application which runs on pipelines with Spark, use the sdk-batch-bom_2.12
as the parent pom for our application:
<parent>
<groupId>com.here.platform</groupId>
<artifactId>sdk-batch-bom_2.12</artifactId>
<version>2.51.5</version>
<relativePath/>
</parent>
Adjust dependencies for Scala and Java.
<dependencies>
<dependency>
<groupId>com.here.platform.pipeline</groupId>
<artifactId>pipeline-interface_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>com.here.schema.rib</groupId>
<artifactId>topology-geometry_v2_scala_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>com.here.schema.rib</groupId>
<artifactId>topology-geometry_v2_java</artifactId>
</dependency>
<dependency>
<groupId>com.here.platform.data.client</groupId>
<artifactId>spark-support_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>com.here.hrn</groupId>
<artifactId>hrn_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
</dependencies>
Implement the application
This application implements MapReduce over partitions in the topology-geometry
layer, summing the lengths of all road segments in each partition.
Instead of summing lengths over all the partitions, this application samples a small subset of partitions and divides the sum of all their lengths by the sampling rate to estimate the total length over all partitions. This produces a reasonable estimation in a fraction of the time.
Note
At the time of writing, there are approximately 59 million km of geometries in the HERE catalog.
import akka.actor.{ActorSystem, CoordinatedShutdown}
import akka.actor.CoordinatedShutdown.UnknownReason
import com.here.hrn.HRN
import com.here.platform.data.client.engine.scaladsl.DataEngine
import com.here.platform.data.client.scaladsl.{DataClient, Partition}
import com.here.platform.data.client.spark.DataClientSparkContextUtils
import com.here.platform.pipeline.PipelineContext
import com.here.schema.rib.v2.topology_geometry_partition.TopologyGeometryPartition
import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.slf4j.LoggerFactory
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import com.here.platform.data.client.spark.SparkSupport._
object SparkPipelineScala {
private val logger = LoggerFactory.getLogger(SparkPipelineScala.getClass)
private val sampleFraction = 1.0 / 1000.0
def main(args: Array[String]): Unit = {
val sparkContext: SparkContext = new SparkContext(new SparkConf().setAppName("SparkPipeline"))
val pipelineContext = new PipelineContext
val hereMapContent = pipelineContext.config.inputCatalogs("hereMapContent")
val hereMapContentVersion = pipelineContext.job.get.inputCatalogs("hereMapContent").version
val outputCatalog = pipelineContext.config.outputCatalog
val config: Config = ConfigFactory.empty
.withValue("here.platform.data-client.endpoint-locator.discovery-service-env",
ConfigValueFactory.fromAnyRef("custom"))
.withValue(
"here.platform.data-client.endpoint-locator.discovery-service-url",
ConfigValueFactory.fromAnyRef("https://api-lookup.data.api.platform.here.com")
)
val appName = "SparkPipelineExampleScala"
implicit lazy val actorSystem: ActorSystem = ActorSystem.create(appName, config)
try {
val topologyLayerMetadata: RDD[Partition] =
queryMetadata(hereMapContent,
hereMapContentVersion,
"topology-geometry",
sparkContext,
actorSystem)
.sample(withReplacement = true, sampleFraction)
val topologyPartitions: RDD[TopologyGeometryPartition] =
topologyLayerMetadata.map(readTopologyGeometry(hereMapContent))
val roadLengths: RDD[Double] =
topologyPartitions.map(_.segment.map(_.length).sum)
val totalMeters: Double =
roadLengths.reduce(_ + _) / sampleFraction
logger.info(f"The estimated length of all roads in the map is: $totalMeters%.0fm")
logger.info(s"The configured output catalog is: $outputCatalog")
} finally {
val shutdown = CoordinatedShutdown(actorSystem).run(UnknownReason)
Await.result(shutdown, Duration.Inf)
}
}
private def queryMetadata(catalog: HRN,
catalogVersion: Long,
layerName: String,
sparkContext: SparkContext,
actorSystem: ActorSystem): RDD[Partition] = {
val query = DataClient(actorSystem).queryApi(catalog)
val partitions = query.getPartitionsAsIterator(catalogVersion, layerName)
sparkContext.parallelize(partitions.awaitResult.toStream)
}
private def readTopologyGeometry(catalog: HRN)(partition: Partition) =
TopologyGeometryPartition.parseFrom(read(catalog)(partition))
private def read(catalog: HRN)(partition: Partition) =
DataEngine(DataClientSparkContextUtils.context.actorSystem)
.readEngine(catalog)
.getDataAsBytes(partition)
.awaitResult()
}
import akka.actor.ActorSystem;
import akka.actor.CoordinatedShutdown;
import com.google.protobuf.InvalidProtocolBufferException;
import com.here.hrn.HRN;
import com.here.platform.data.client.engine.javadsl.DataEngine;
import com.here.platform.data.client.javadsl.DataClient;
import com.here.platform.data.client.javadsl.Partition;
import com.here.platform.data.client.javadsl.QueryApi;
import com.here.platform.data.client.spark.DataClientSparkContextUtils;
import com.here.platform.pipeline.PipelineContext;
import com.here.schema.rib.v2.TopologyGeometry;
import com.here.schema.rib.v2.TopologyGeometryPartitionOuterClass.TopologyGeometryPartition;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SparkPipelineJava {
private static final double sampleFraction = 1.0 / 1000.0;
public static void main(String[] args) {
Logger logger = LoggerFactory.getLogger(SparkPipelineJava.class);
JavaSparkContext sparkContext =
new JavaSparkContext(new SparkConf().setAppName("SparkPipeline"));
PipelineContext pipelineContext = new PipelineContext();
HRN hereMapContent = pipelineContext.getConfig().getInputCatalogs().get("hereMapContent");
Long hereMapContentVersion =
pipelineContext.getJob().get().getInputCatalogs().get("hereMapContent").version();
HRN outputCatalog = pipelineContext.getConfig().getOutputCatalog();
Config config = ConfigFactory.empty();
config =
config.withValue(
"here.platform.data-client.endpoint-locator.discovery-service-env",
ConfigValueFactory.fromAnyRef("custom"));
config =
config.withValue(
"here.platform.data-client.endpoint-locator.discovery-service-url",
ConfigValueFactory.fromAnyRef("https://api-lookup.data.api.platform.here.com"));
ActorSystem sparkActorSystem = ActorSystem.create("SparkPipelineExampleJava", config);
try {
JavaRDD<Partition> topologyLayerMetadata =
queryMetadata(
hereMapContent,
hereMapContentVersion,
"topology-geometry",
sparkContext,
sparkActorSystem)
.sample(true, sampleFraction);
TopologyGeometryReader readTopologyGeometry = new TopologyGeometryReader(hereMapContent);
JavaRDD<TopologyGeometryPartition> topologyGeometryPartition =
topologyLayerMetadata.map(readTopologyGeometry::read);
JavaRDD<Double> roadLengths =
topologyGeometryPartition.map(
partition ->
partition
.getSegmentList()
.stream()
.map(TopologyGeometry.Segment::getLength)
.mapToDouble(Double::doubleValue)
.sum());
Double totalMeters = roadLengths.reduce(Double::sum) / sampleFraction;
logger.info(
String.format("The estimated length of all roads in the map is: %.0fm", totalMeters));
logger.info(String.format("The configured output catalog is: %s", outputCatalog));
} finally {
CoordinatedShutdown.get(sparkActorSystem)
.runAll(CoordinatedShutdown.unknownReason())
.toCompletableFuture()
.join();
}
}
private static JavaRDD<Partition> queryMetadata(
HRN catalog,
Long catalogVersion,
String layerName,
JavaSparkContext sparkContext,
ActorSystem sparkActorSystem) {
QueryApi query = DataClient.get(sparkActorSystem).queryApi(catalog);
ArrayList<Partition> partitions = new ArrayList<>();
query
.getPartitionsAsIterator(catalogVersion, layerName, Collections.emptySet())
.toCompletableFuture()
.join()
.forEachRemaining(partitions::add);
return sparkContext.parallelize(partitions);
}
}
class TopologyGeometryReader implements Serializable {
private HRN catalog;
TopologyGeometryReader(HRN catalog) {
this.catalog = catalog;
}
TopologyGeometryPartition read(Partition partition) throws InvalidProtocolBufferException {
return TopologyGeometryPartition.parseFrom(readRaw(partition));
}
private byte[] readRaw(Partition partition) {
return DataEngine.get(DataClientSparkContextUtils.context().actorSystem())
.readEngine(catalog)
.getDataAsBytes(partition)
.toCompletableFuture()
.join();
}
}
This pipeline-config.conf
file declares the HRN for HERE Map Content as the heremapcontent
input catalog for the pipeline, as well as an HRN for the output catalog. This pipeline does not write an output catalog, so the output HRN is just a dummy value.
In a production pipeline, the output HRN would point to an existing catalog to which the app and/or sharing group has write permissions. For more on managing these permissions, see this document.
pipeline.config {
//Please use "hrn:here-cn:data::olp-cn-here:dummy" on China environment
output-catalog {hrn = "hrn:here:data::olp-here:dummy"}
input-catalogs {
//Please, use hrn:here-cn:data::olp-cn-here:here-map-content-china-2 on China Environment
hereMapContent {hrn = "hrn:here:data::olp-here:rib-2"}
}
}
In this tutorial, a public catalog is used - HERE Map Content Catalog. The catalog should be linked to your project first to be used within the project. To do this, replace {{YOUR_PROJECT_HRN}}
with the actual HRN of the project you created as described in the Organize your work in projects tutorial and execute the following command:
olp project resource link {{YOUR_PROJECT_HRN}} hrn:here:data::olp-here:rib-2
If your command is successful, the CLI returns the following message:
Project resource hrn:here:data::olp-here:rib-2 has been linked.
This pipeline-job.conf
file declares the versions of the heremapcontent
input and the dummy output.
pipeline.job.catalog-versions {
output-catalog {base-version = -1}
input-catalogs {
hereMapContent {
processing-type = "reprocess"
// Please, use "version = 0" on China Environment
version = 4
}
}
}
Run the application
To run the application locally, execute the following command:
mvn compile exec:java \
-Dexec.mainClass=SparkPipelineScala \
-Dpipeline-config.file=pipeline-config.conf \
-Dpipeline-job.file=pipeline-job.conf \
-Dspark.master="local[*]" \
-Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
mvn compile exec:java \
-Dexec.mainClass=SparkPipelineJava \
-Dpipeline-config.file=pipeline-config.conf \
-Dpipeline-job.file=pipeline-job.conf \
-Dspark.master="local[*]" \
-Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
Consider the set of -Dhere.platform.data-client.request-signer.credentials.here-account.*
params. We specify these parameters to pass data from credentials.properties
file and {{YOUR_PROJECT_HRN}}
to the Data Client Library. For more details about the Data Client Library initialization, see the Set Your Credentials via Java System Properties.
From here, you can try running this job in a pipeline.
Pipeline commands in the OLP CLI Developer Guide.
Remove the call to sample
on the metadata and the scaling of the final result in the totalMeters
variable. This transforms the program from an estimator to a parallel program which sums up all the values in the catalog in a few minutes.
You can also try to publish the information on the total number of km into a text\plain
layer with generic
partitioning and a single partition with catalog information. See the tutorial on Organize your work in projects and the Command Line Interface Developer Guide for more information on creating catalogs and configuring layers.