Build a Batch Pipeline with Maven Archetypes (Scala)
To build a Batch Pipeline using the Data Processing Library, you use the SDK Maven Archetypes to create a skeleton for the project. The HERE platform portal is used to manage credentials, to create a catalog and manage access rights. The SDK Maven Archetypes is used to create a skeleton of the project.
This example demonstrates how to create a pipeline that reads the Road Topology & Geometry layer in the HERE Map Content catalog and then writes the number of segment references (cardinality) for every topology node in each input partition.
Credentials
There are two types of credentials you require:
- Platform Credentials - These credentials provide access to the platform API. First, create a new application at https://platform.here.com/profile/access-credentials. Once the application is created, click Create a key to download the credentials. By default, Data Processing Library looks for credentials in the
$HOME/.here/credentials.properties
file. Make sure your credentials file is placed in this location. - Repository Credentials - These credentials enable you to access the repository where the Data Processing Library is. Go to https://platform.here.com/profile/repository and click Generate Credentials). This downloads the
settings.xml
file. Copy this file to the $HOME/.m2/
folder.
Create the Output Catalog
First, create a new catalog to serve as the output catalog for the pipeline. The catalog has one layer where, for each partition of the Road Topology & Geometry layer, there is a partition containing the cardinalities of the topology nodes in that partition. You also need one additional layer, state
, which is reserved for the Data Processing Library.
Log into the HERE platform. Select the Data tab and do the following:
- Click Add new catalog.
- Specify a CATALOG NAME, and a CATALOG ID for your catalog, such as batch-processing-quickstart-username.
- Next, add a CATALOG SUMMARY and a CONTACT EMAIL.
- Click Save and wait for the Data API to create your new catalog.
Then, give your application read/write access to the catalog, as follows::
- Select your catalog by searching for its name in the Search for data box.
- Go to Sharing, and in Manage Sharing select SHARE CATALOG WITH App. Insert your application ID, click Grant and check read and write.
- Finally, click Grant to enable your changes.
Add layers to the catalog:
- Click Add new layer and create a layer with
node-cardinality
as its ID. You can use node-cardinality
as the layer's name too, or choose a different, human readable name. - You need a HERE Tile layer, and the zoom level must be the same as the input Road Topology & Geometry layer, 12. Select Versioned for Layer Type, which you must use for every layer processed by a batch pipeline.
- Keep the default
Content Type
of application/x-protobuf
so you can use Protocol Buffers to encode the partitions. Leave the Schema field set to None. - Then, click Save to complete the layer creation.
- Proceed with a second layer,
state
, and configure it according to the second row of the following table, which lists the configuration of all layers in the catalog.
Layer ID | Partitioning | Zoom Level | Layer Type | Content Type | Schema |
node-cardinality | HERE Tile | 12 | Versioned | application/x-protobuf | None |
state | Generic | N/A | Versioned | application/octet-stream | None |
The catalog is now fully configured. Proceed with creating a project.
Create a Project
The Data SDK includes Maven Archetypes to simplify the process of creating new batch pipelines. Using the Maven Archetypes, you can build a complete project structure using a few shell commands. The archetype automatically generates POM files that include all of the basic dependencies, sample configuration files, and source files you can edit to implement your own logic. You need to create at least three projects:
- a top-level project, for convenience, to compile all sub-projects with a single POM file
- a nested Schema project, to build Java/Scala bindings for our Protocol Buffers schema
- a Process project, to build the processing logic
The following steps assume that Maven is installed and the mvn
executable is in your PATH
variable. You must run all of the commands below from a bash
shell. The tree
command is used to show the folder structures. Alternatively, you can use ls -R
as a replacement.
First, create a top-level project named nodecardinality
by running the following command, press ENTER
to confirm:
$ pwd
~/projects
$ mvn archetype:generate -DarchetypeGroupId=org.codehaus.mojo.archetypes \
-DarchetypeArtifactId=pom-root \
-DarchetypeVersion=1.1 \
-DgroupId=com.example \
-DartifactId=nodecardinality \
-Dversion=1.0.0 \
-Dpackage=com.example.nodecardinality
This creates a nodecardinality
folder in the current directory, containing the following files:
$ pwd
~/projects
$ tree
.
`-- nodecardinality
`-- pom.xml
1 directory, 1 file
Sub-projects are created from within this folder. Navigate to the nodecardinality
folder to create the sub-projects. First create a Model project by running the following command, press ENTER
to confirm:
$ pwd
~/projects/nodecardinality
$ mvn archetype:generate -DarchetypeGroupId=com.here.platform.schema \
-DarchetypeArtifactId=project_archetype \
-DarchetypeVersion=X.Y.Z \
-DgroupId=com.example.nodecardinality \
-DartifactId=schema \
-Dversion=1.0.0 \
-Dpackage=com.example.nodecardinality.schema \
-DmajorVersion=0
For specific documentation about the latest version of the archetype included in the SDK, see the Archetypes Developer Guide. .
This creates a project template in the nodecardinality/schema
folder containing a project to build the schema for your output catalog:
$ pwd
~/projects/nodecardinality
$ tree
.
|-- pom.xml
`-- schema
|-- ds
| |-- pom.xml
| `-- src
| |-- assembly
| | `-- proto.xml
| `-- main
| `-- resources
| |-- ResourcesReadMe.txt
| `-- renderers
| `-- ReadMe.txt
|-- java
| `-- pom.xml
|-- pom.xml
|-- proto
| |-- pom.xml
| `-- src
| |-- assembly
| | `-- proto.xml
| `-- main
| |-- proto
| | `-- com
| | `-- example
| | `-- nodecardinality
| | `-- schema
| | `-- v0
| | `-- schema.proto
| `-- resources
| `-- description.md
|-- scala
| `-- pom.xml
`-- schema.yml
20 directories, 13 files
Finally, still within the nodecardinality
folder, run the following command to create a processor template including a Direct1ToNCompiler
, press ENTER
to confirm:
$ pwd
~/projects/nodecardinality
$ mvn archetype:generate -DarchetypeGroupId=com.here.platform \
-DarchetypeArtifactId=batch-direct1ton-scala-archetype \
-DarchetypeVersion=X.Y.Z \
-DgroupId=com.example.nodecardinality \
-DartifactId=processor \
-Dversion=1.0.0 \
-Dpackage=com.example.nodecardinality.processor
Refer to the Archetypes Developer Guide for specific documentation about the latest version of the archetype included in the SDK.
An additional processor
folder is now added to the nodecardinality
project:
$ pwd
~/projects/nodecardinality
$ tree
.
|-- pom.xml
|-- processor
| |-- config
| | |-- pipeline-config.conf
| | `-- pipeline-job.conf
| |-- pom.xml
| `-- src
| `-- main
| |-- resources
| | |-- application.conf
| | `-- log4j.properties
| `-- scala
| `-- com
| `-- example
| `-- nodecardinality
| `-- processor
| |-- Compiler.scala
| |-- CompilerConfig.scala
| |-- IntermediateData.scala
| |-- LayersDef.scala
| `-- Main.scala
`-- schema
|-- ds
| |-- pom.xml
| `-- src
| |-- assembly
| | `-- proto.xml
| `-- main
| `-- resources
| |-- ResourcesReadMe.txt
| `-- renderers
| `-- ReadMe.txt
|-- java
| `-- pom.xml
|-- pom.xml
|-- proto
| |-- pom.xml
| `-- src
| |-- assembly
| | `-- proto.xml
| `-- main
| |-- proto
| | `-- com
| | `-- example
| | `-- nodecardinality
| | `-- schema
| | `-- v0
| | `-- schema.proto
| `-- resources
| `-- description.md
|-- scala
| `-- pom.xml
`-- schema.yml
30 directories, 23 files
Schema Sub-project
The nodecardinality/schema
folder contains the skeleton of a Maven project that builds Java and Scala libraries (usually referred to as bindings) to de/serialize partitions encoded as Protocol Buffers. This is necessary to encode partitions in the output node-cardinality
layer as Protocol Buffers and +to specify a custom partition schema.
In the project's folder, there is:
- the main POM file,
pom.xml
, used to compile the project - a
java
folder containing a POM file to build the Java bindings for the Protocol Buffers - a
scala
folder containing a POM file to build the Scala bindings for the Protocol Buffers - a
ds
folder containing a sub-project to bundle the resulting bindings and Protocol Buffer definitions in a ZIP file, that can be published to the Platform Artifactory repository to enable the decoding of partitions from the platform portal - a
proto
folder containing the Protocol Buffer definitions. This is the sub-project you are going to customize to specify the output schema.
To create a custom Protocol Buffer schema, you need to add .proto
files to the nodecardinality/schema/proto/src
folder. For more information on Protocol Buffers, refer to the Protocol Buffers Documentation.
The skeleton project you have just created already contains a .proto
file you can edit to quickly define the schema of the output partitions.
Open the nodecardinality/schema/proto/src/main/proto/com/example/nodecardinality/schema/v0/schema.proto
file and search for main message definition:
syntax = "proto3";
package com.example.nodecardinality.schema.v0;
message MainProtobufMessage {
int32 lat = 1;
int32 lon = 2;
}
Change the name of the main message from MainProtobufMessage
to NodeCardinalityPartition
, remove the sample fields lat
and lon
, and add a repeated
field named node_cardinality
of type NodeCardinality
.
Then, define an auxiliary message type NodeCardinality
with two fields, the ID of the node (id
) and the cardinality of the node (cardinality
). This new Protobuf definition looks like this:
syntax = "proto3";
package com.example.nodecardinality.schema.v0;
message NodeCardinalityPartition {
repeated NodeCardinality node_cardinality = 1;
}
message NodeCardinality {
string id = 1;
uint32 cardinality = 2;
}
Since you changed the name of the main message, remember to update the configuration used to build the Schema bundle. Open the POM file of the ds
sub-project (nodecardinality/schema/ds/pom.xml
) and locate the configuration for the layer-manifest-plugin
:
<plugin>
<groupId>com.here.platform.schema.maven_plugins</groupId>
<artifactId>layer-manifest-plugin</artifactId>
<version>${here.plugin.version}</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>write-manifest</goal>
</goals>
</execution>
</executions>
<configuration>
<mainMessage>com.example.nodecardinality.schema.v0.MainProtobufMessage</mainMessage>
<inputDir>${project.build.directory}/proto</inputDir>
<writeManifest>true</writeManifest>
</configuration>
</plugin>
Change the mainMessage
path in the plugin configuration, replacing com.example.nodecardinality.schema.v0.MainProtobufMessage
with com.example.nodecardinality.schema.v0.NodeCardinalityPartition
:
<plugin>
<groupId>com.here.platform.schema.maven_plugins</groupId>
<artifactId>layer-manifest-plugin</artifactId>
<version>${here.plugin.version}</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>write-manifest</goal>
</goals>
</execution>
</executions>
<configuration>
<mainMessage>com.example.nodecardinality.schema.v0.NodeCardinalityPartition</mainMessage>
<inputDir>${project.build.directory}/proto</inputDir>
<writeManifest>true</writeManifest>
</configuration>
</plugin>
Now compile the Schema project by running mvn install
from the nodecardinality/schema
folder. Alternatively, you can also run this command from the top-level project.
Two libraries are built, schema_v0_java
and schema_v0_scala_${scala.compat.version}
, that provide APIs to create a NodeCardinalityPartition
object, serialize it to a ByteArray
, and deserialize it from a ByteArray
.
To write the output partitions, use schema_v0_scala_${scala.compat.version}
in your processing logic. To read the input partitions from the Road Topology & Geometry Layer, use the corresponding Java bindings, provided by com.here.schema.rib.topology-geometry_v2_scala
instead.
In the next section, you add those dependencies to the processor's sub-project.
Processing Logic
The nodecardinality/processor
folder contains the skeleton project of a batch processing pipeline. This project builds the final processing application.
The project folder contains the following components:
- the main POM file
pom.xml
, used to compile the project; - a
src
folder containing the Scala source files implementing the logic; - a
config
folder containing configuration files that can be used to run the pipeline locally, outside the Pipeline API.
Pipeline Configuration
To perform a batch processing job, the Data Processing Library requires the following:
- HERE Resource Names (HRNs) for all the input catalogs and the output catalog.
- Versions for all the input catalogs for the batch job to process.
For pipelines in SCHEDULED
state, this information is automatically provided by the Pipeline API via two configuration files in HOCON format:
-
pipeline-config.conf
, providing the HRN of the input catalogs and output catalog. -
pipeline-job.conf
, providing the versions of the input catalogs to be processed.
You upload the first file to the HERE Workspace pipeline once the pipeline is deployed; it never changes between compilations. The job configuration, on the other hand, is created on the fly by the Pipeline API, when a new job is deemed to be run. For example, a new version of an input catalog exists, and the output catalog needs to be updated.
During local development, when a batch pipeline is run locally without the Pipeline API, you must provide these configuration files ourselves, by setting two Java System Properties:
-
pipeline-config.file
, containing the path to the pipeline-config.conf
file -
pipeline-job.file
, containing the path to the pipeline-job.conf
file
The nodecardinality/processor/config
folder contains templates for both files, which you can edit and use for local development.
The pipeline configuration's template file (nodecardinality/processor/config/pipeline-config.conf
) looks like this:
pipeline.config {
output-catalog { hrn = "hrn:here:data:::myoutput" }
input-catalogs {
input-catalog-1 { hrn = "hrn:here:data:::myinput1" }
input-catalog-2 { hrn = "hrn:here:data:::myinput2" }
}
}
pipeline.config.output-catalog.hrn
indicates the output catalog's HRN. To read the HRN of the catalog you just created, open the catalog in the platform portal. If the catalog ID is batch-processing-quickstart
, the corresponding HRN is hrn:here:data:::batch-processing-quickstart
.
pipeline.config.input-catalogs
contains the HRN of all input catalogs, indexed by arbitrary symbolic identifiers, used to identify a specific input catalog in the job configuration and in the processing logic. The template file contains two input catalogs, with identifiers input-catalog-1
and input-catalog-2
and their corresponding sample HRNs. In this project, there is only one input catalog, HERE Map Content, with the HRN hrn:here:data::olp-here:rib-2
. Delete those two sample input catalogs, add one with the HRN mentioned above, and then choose rib
as its catalog ID:
pipeline.config {
output-catalog { hrn = "hrn:here:data:::batch-processing-quickstart" }
input-catalogs {
rib { hrn = "hrn:here:data::olp-here:rib-2" }
}
}
The nodecardinality/processor/config/pipeline-job.conf
file is a template for the job configuration. It contains the following:
pipeline.job.catalog-versions {
input-catalogs {
input-catalog-1 {
processing-type = "reprocess"
version = 0
}
input-catalog-2 {
processing-type = "reprocess"
version = 2
}
}
}
For each input catalog specified in pipeline-config.conf
, the pipeline.job.catalog-versions.input-catalogs
contains:
- the version of the catalog to use for processing
- the type of processing that the Data Processing Library uses for incremental compilation
The pipeline.job.catalog-versions.input-catalogs.
input-catalog-ID
.processing-type
can have three different values to denote three types of processing:
- reprocess: this type indicates that the catalog should be fully processed, results from a previous compilation are not used to reduce the amount of data being processed. This is the simplest type of processing to use, when you are dealing with a manually written job configuration. It effectively disables incremental compilation, a feature of the Data Processing Library that allows you to reduce the amount of data processed using the results of a previous compilation. With this type of processing, you must provide the
version
of the catalog to process. - no_changes: this type indicates that you want to reuse the same version of the catalog used when the output catalog was last compiled. This type of processing lets the Data Processing Library skip some compilation steps. You must provide the
version
of the catalog to process. A valid processing configuration requires the version to be equal to the version used in the last compilation. The Data Processing Library makes sure this condition holds true before the processing starts, otherwise incremental compilation is disabled. - changes: this type indicates that you want to process a new version of the catalog (
version
) given the version processed in the last compilation (since-version
). This type of processing may be used by the Data Processing Library to optimize processing, reducing the amount of data actually reprocessed. The processing configuration is valid as long the version of the catalog used in the last compilation is indeed since-version
; otherwise incremental compilation is disabled.
It is important to understand that both no_changes and changes are only used to enable optimizations internally, in the Data Processing Library. Conceptually, for any input catalog, the processing library fully processes a given version, always. For this quick start, and for local development, you should rely exclusively on the reprocess processing type. Once a pipeline is deployed, it is the Pipeline API duty to provide a valid job configuration that takes maximum advantage of the Data Processing Library's optimization capabilities.
At the time of writing, the latest version of the HERE Map Content available is 1
.
Let's configure the rib
catalog using reprocess for processing-type
and 1
for version
.
This is how pipeline-job.conf
should look now:
pipeline.job.catalog-versions {
input-catalogs {
rib {
processing-type = "reprocess"
version = 1
}
}
}
Dependencies
The SDK Maven Archetypes provides all basic dependencies in the pom.xml
file in the processor
sub-project. You must manually add custom dependencies used by your processing logic here. For this project two more dependencies are necessary:
-
com.here.schema.rib.topology-geometry_v2_scala
, to deserialize the input partitions -
com.example.nodecardinality.schema_v0_scala_${scala.compat.version}
, that you have just created, to serialize the output partitions
Open the nodecardinality/processor/pom.xml
file. There is already a placeholder for the Java bindings created by the Schema project. To find it, search for DATA_MODEL_PROJECT_NAME
in the file:
Uncomment that dependency, then fill in the {DATA_MODEL_PROJECT_NAME}
and {DATA_MODEL_PROJECT_VERSION}
placeholders with schema_v0
and 1.0.0
, respectively:
<dependency>
<groupId>com.example.nodecardinality</groupId>
<artifactId>schema_v0_scala_${scala.compat.version}</artifactId>
<version>1.0.0</version>
</dependency>
Then, add a dependency on com.here.schema.rib.topology-geometry_v2_scala
:
<dependency>
<groupId>com.here.schema.rib</groupId>
<artifactId>topology-geometry_v2_scala</artifactId>
<version>2.8.0</version>
</dependency>
Processing Logic
Now implement the actual processing logic by editing the Scala source files that the Maven archetypes created. In the processor/src/main/scala/com/example/nodecardinality/processor
folder there are five source files:
-
Main.scala
: contains the main entry point of the processing application, as a subclass of PipelineRunner
. The Driver
is configured with a single DriverTask
containing one Direct1ToNCompiler
(implemented in Compiler.scala
). -
CompilerConfig.scala
: contains the compiler configuration, a class you may define to configure the business logic through the application.conf
configuration file. However, the business logic does not need to expose any configuration parameters, thus the default implementation is sufficient. -
IntermediateData.scala
: defines the IntermediateData
class used by the compiler defined in Compiler.scala
. -
Compiler.scala
: implements the actual processing logic as a Direct1ToNCompiler
. -
LayersDef.scala
: defines the input and output layers.
First, decide which compilation pattern and intermediate data to use for the task at hand. This quick start focuses on functional patterns. Compared to the RDD-based patterns, you get incremental compilation with no intervention, and you do not have to deal with Spark
caveats such as partitioning, shuffling, or persistence.
The underlying Spark
application is still interesting to better understand how the functional patterns work. All of the patterns implement different flavors of the following scheme:
- The input metadata is retrieved and an
RDD
of Key
and Meta
pairs is created. Key
uniquely identifies a partition, and contains the catalog ID, the layer ID, and the partition name. Meta
contains information about the payload of the partition, and can be used together with the corresponding Key
to retrieve the content of a partition (payload). - A CompileIn transformation is applied to the input
RDD
. The purpose of this step is to define the mapping between input and output partitions and to preprocess the input data into an intermediate representation that you define. In most compilation patterns, this step corresponds to a flatMap
, where a compileInFn
that returns a sequence of (Key, IntermediateData)
pairs given a single (Key, Meta)
pair is applied to all elements of the RDD
. This is then followed by a groupBy
transformation to group all intermediate representations with the same output key together. - The resulting
RDD
of Key
and Iterable<IntermediateData>
pairs is then processed by applying a CompileOut transformation, where a Payload
is produced for each Key
from the grouped intermediate representations.
In this project, for each input partition of the HERE Map Content's topology-geometry
layer, you want to create an output partition with the same Tile ID in the output catalog's node-cardinality
layer. The mapping between input and output does not depend on the content of the input partitions; you just need the Tile ID that is part of the partition's Key
. For this reason you can use a direct compiler.
You will implement a direct 1:1
compilation since each input partition is used to produce one output partition. This is a special case of 1:N
compilation, and therefore you only need a Direct1ToNCompiler
pattern.
You still have to decide what IntermediateData
to use between CompileIn and CompileOut. Since you are performing a direct 1:1
compilation, you can implement the processing logic in CompileOut directly. That means you can forward the Key
and Meta
objects of the input partition from the CompileIn transformation to the CompileOut transformation and process the input data.
Notice that more complex IntermediateData
classes containing a processed version of the input partition are usually necessary, especially when an input partition is used to compile multiple output partitions and you want to avoid processing the same data multiple times.
Since the default IntermediateData
class provided by the Maven archetypes simply wraps the Key
and Meta
classes, it can be used in this case without any modification.
This is the IntermediateData
class provided by the archetypes:
case class IntermediateData(attribute1: String, attribute2: String)
Let's rewrite the implementation to wrap a Key
and Meta
pair:
import com.here.platform.data.processing.compiler.{InKey, InMeta}
case class IntermediateData(key: InKey, meta: InMeta)
Leave that file unchanged and open the processor/src/main/scala/com/example/nodecardinality/processor/LayersDef.scala
file. Here, replace the placeholders for the input and output layers:
object In {
val CatalogId = Catalog.Id("input-catalog-1")
val LayerId = Layer.Id("input-layer-1")
}
object Out {
val LayerId = Layer.Id("output-layer-1")
}
For input-catalog-1
you have to use the symbolic ID configured in pipeline-config.conf
(rib
). The input-layer-1
is topology
and the output-layer-1
is node-cardinality
:
object In {
val CatalogId = Catalog.Id("rib")
val LayerId = Layer.Id("topology-geometry")
}
object Out {
val LayerId = Layer.Id("node-cardinality")
}
Now open the processor/src/main/scala/com/example/nodecardinality/processor/Compiler.scala
file.
You need to import the Scala bindings for the HERE Map Content topology-geometry
layer and for your output layer:
import com.example.nodecardinality.schema.v0.schema._
import com.here.schema.rib.v2.topology_geometry_partition._
In a direct compiler, the CompileIn function is split into:
- a
mappingFn
function that returns a sequence of output Key
s given an input Key
- a
compileInFn
function that returns an IntermediateData
object given an input Key
and Meta
The mapping established by mappingFn
is used to send the IntermediateData
object to the corresponding output Key
, which is then compiled in the compileOutFn
function.
Search for the mapping function:
override def mappingFn(inKey: InKey): Iterable[OutKey] = ???
Each input Key
has to be mapped to an output Key
with the same partition name, a catalog ID equal to the output catalog (Default.OutCatalogId()
), and a layer ID equal to the output layer (OUT_LAYER
).
First, import the Default
object:
import com.here.platform.data.processing.driver.Default
Then implement mappingFn
as follows:
override def mappingFn(inKey: InKey): Iterable[OutKey] =
Iterable(OutKey(Default.OutCatalogId, Out.LayerId, inKey.partition))
In compileInFn
you simply return an IntermediateData
built out of the Key
and Meta
of the input partition.
Replace the existing compileInFn
method with the following:
override def compileInFn(in: (InKey, InMeta)): IntermediateData =
IntermediateData(in.key, in.meta)
Then replace compileOutFn
with the following:
override def compileOutFn(outKey: OutKey, intermediate: IntermediateData): Option[Payload] = {
val payload = retriever.getPayload(intermediate.key, intermediate.meta)
val partition = TopologyGeometryPartition.parseFrom(payload.content)
val outputPartition =
NodeCardinalityPartition(
partition.node.map(node => NodeCardinality(node.identifier, node.segmentRef.size)))
Some(Payload(outputPartition.toByteArray))
}
Here, the return values of mappingFn
and compileInFn
from above are passed compileOutFn
as the OutKey
of the output partition and the IntermediateData
containing the InKey
and InMeta
pair from the corresponding input partition.
retrieve the payload of the input partition using the retriever
object that's initialized when the Compiler
object is constructed:
val payload = retriever.getPayload(intermediate.key, intermediate.meta)
Decode the corresponding topology partition using the Scala bindings of the HERE Map Content's topology-geometry
layer:
val partition = TopologyGeometryPartition.parseFrom(payload.content)
Next, create the corresponding output partition using the Scala bindings from your Schema
project. For each Protocol Buffer message, you construct an instance of the case class NodeCardinalityPartition
. This has a single constructor argument representing the repeated field node_cardinality
, as defined in the schema. The value of this argument is created by iterating over the list of nodes in the topology partition and constructing for each such node a NodeCardinality
object with the node's ID (node.identifier
) and its cardinality (node.segmentRef.size
).
val outputPartition =
NodeCardinalityPartition(
partition.node.map(node => NodeCardinality(node.identifier, node.segmentRef.size)))
To publish the data, serialize the output partition to a byte array (outputPartition.toByteArray
), parse a Payload
object from it, and return the optional payload. If you don't want a specific output partition in the output catalog, you can use None
. But in this case, we want to publish an output partition for each input partition available:
Some(Payload(outputPartition.toByteArray))
Now, build the entire project from the top-level folder:
$ pwd
~/projects/nodecardinality
$ mvn install
Run The Processor Locally
Processing a global catalog like the HERE Map Content can be time consuming. However, you can limit the number of partitions to process during local development by adding one or more partition filters to the application.conf
file.
In this case you use a BoundingBoxFilter
to process only the partitions inside a bounding box containing the city of Berlin. Open the processor/src/main/resources/application.conf
file and append this partition filter configuration:
here.platform.data-processing.executors.partitionKeyFilters = [
{
className = "BoundingBoxFilter"
param = {
boundingBox {
north = 52.67551
south = 52.338261
east = 13.76116
west = 13.08835
}
}
}
]
Make sure to rerun mvn install
after making this change. From the processor module folder, you can then run the compilation job using the configuration files set up above:
$ pwd
~/projects/nodecardinality/processor
$ mvn exec:java -Dexec.mainClass=com.example.nodecardinality.processor.Main \
-Dpipeline-config.file=config/pipeline-config.conf \
-Dpipeline-job.file=config/pipeline-job.conf \
-Dexec.args="--master local[*]"
The Maven exec
plugin is used with the main class set to com.example.nodecardinality.processor.Main
and configuration files config/pipeline-config.conf
and config/pipeline-job.conf
. The PipelineRunner
main
method accepts an optional --master
spark-master
command line argument to set the master URL for the cluster. Use local[*]
to run Spark locally.
Inspect The Catalog
To inspect the new catalog, do the following:
- Log into the HERE platform. Search for the catalog from the Data tab and select it to switch to the catalog view.
- Select the
node-cardinality
layer and select the Inspect tab. - Set the zoom level to 8 and search the map for Berlin.
- The output partitions are highlighted on the map, covering the bounding box of Berlin you specified.
Decode Partitions
To decode the output partitions, you need to configure the layer to use your custom schema. First, upload your schema to the platform. From the nodecardinality/schema/
folder, run mvn deploy
:
$ pwd
~/projects/nodecardinality/schema
$ mvn deploy
A schema with the same group ID, artifact ID, and version can be only deployed once. To avoid collisions, ensure that your group ID (com.example.nodecardinality
in this example) is unique.
In the Data view of the portal, click Browse schemas to display a list of deployed schemas your user can access, and make sure your schema, schema_v0
, is there.
From the node-cardinality
layer view of the portal, click More and then select Reconfigure layer to access the layer configuration page. You have now the possibility to change most of the parameters you set during the creation of the layer, including the schema configuration. Locate the Schema configuration, select schema_v0
from the top down menu and then click Save at the bottom of the page.
From the Inspect tab, you can now select any partition. The decoded partition is displayed on the panel to the right.
Using the Pipeline API, you can run the batch pipeline you just created on a cluster. You can deploy your pipeline in one of two ways:
- scheduled pipeline, that automatically runs every time there is a new version of an input catalog
- run-once pipeline, that uses a
pipeline-job.conf
configuration you provide.
To use the Pipeline API, your App ID must belong to a Group. Your administrator must set up a Group ID in your account and assign your application to the group. Finally, you must grant that Group ID Read and Write access to the output catalog.
To deploy the processor to the HERE Workspace pipeline, first package it into a fat JAR. The pom.xml
file generated by the Archetypes contains a platform
profile for this purpose:
$ pwd
~/projects/nodecardinality/processor
$ mvn -Pplatform package
The above command creates a fat JAR of the processor as processor/target/processor-1.0-SNAPSHOT-platform.jar
.
You must use this file to create a Pipeline Template.
Refer to the Pipelines Developer's Guide for detailed instructions on deploying and running pipelines from the HERE platform portal.
Additionally, you can deploy and run a pipeline with the OLP CLI. Refer to the OLP CLI User Guide for details.