Copy a catalog using the Data Processing Library
Objectives: Copy the GeoJSON contents of a source catalog using the Data Processing Library.
Complexity: Intermediate
Time to complete: 30 min
Prerequisites: Verify Your Credentials; Verify Maven Settings, Organize your work in projects
Source code: Download
This example demonstrates how to create a catalog using the OLP CLI, configure it with a new versioned layer, populate the layer with GeoJSON contents, and write a Data Processing Library application to copy these contents to another catalog.
Create the source catalog
The OLP CLI requires a valid set of HERE credentials to authenticate itself with the Data Services, so make sure that the Verify your credentials tutorial returns the expected result.
If you have not already done so, download and unzip the Java and Scala examples and the CLI. Add the tools/OLP_CLI
folder of the unzipped file to your ${PATH}
.
Give your source catalog a unique identifier and name, for example, {{YOUR_USERNAME}}-geojson-dpl-tutorial
.
Replace {{YOUR_SRC_CATALOG_ID}}
that follows with your own identifier and {{YOUR_PROJECT_HRN}}
with Project HRN from Organize your work in projects tutorial, and run the following command:
olp catalog create {{YOUR_SRC_CATALOG_ID}} "{{YOUR_USERNAME}} geojson example" \
--summary "geojson example" \
--description "geojson example" \
--scope {{YOUR_PROJECT_HRN}}
The CLI returns as follows:
Catalog {{YOUR_SRC_CATALOG_HRN}} has been created.
Add a layer to contain the raw GeoJSON contents. This versioned layer has content type "application/vnd.geo+json"
, "heretile"
partitioning at zoom level 12, and covers the administrative area of "DE"
(Germany).
olp catalog layer add {{YOUR_SRC_CATALOG_HRN}} geojson "geojson example" \
--versioned \
--content-type=application/vnd.geo+json \
--partitioning=heretile:12 \
--coverage=DE \
--summary "geojson example" \
--description "geojson example" \
--scope {{YOUR_PROJECT_HRN}}
The CLI returns as follows:
Layer GeoJSON has been added to the catalog.
Note
If a billing tag is required in your realm, use the --billing-tags: "YOUR_BILLING_TAG"
parameter.
Make a folder called source-partitions
.
mkdir source-partitions
Make a file in source-partitions
named 23618359
and populate it with the following GeoJSON. This represents a polygon inside partition 23618359
.
{
"type": "FeatureCollection",
"features": [{
"type": "Feature",
"geometry": {
"type": "Polygon",
"coordinates": [[
[13.351969844791705, 52.52978386622147],
[13.330565116221472, 52.551188594791704],
[13.300294258778528, 52.551188594791704],
[13.278889530208295, 52.52978386622147],
[13.278889530208295, 52.49951300877853],
[13.300294258778528, 52.478108280208296],
[13.330565116221472, 52.478108280208296],
[13.351969844791705, 52.49951300877853],
[13.351969844791705, 52.52978386622147]
]]
},
"properties": {
"tooltip": "GREEN",
"style" : {
"color" : "#228B22",
"fill" : "#228B22",
"opacity": 1.0
},
"width": 5
}
}]
}
Make a file in source-partitions
named 23618402
and populate it with the following GeoJSON. This represents a polygon inside partition 23618402
.
{
"type": "FeatureCollection",
"features": [{
"type": "Feature",
"geometry": {
"type": "Polygon",
"coordinates": [[
[13.439860469791705, 52.52978386622147],
[13.418455741221472, 52.551188594791704],
[13.388184883778528, 52.551188594791704],
[13.366780155208295, 52.52978386622147],
[13.366780155208295, 52.49951300877853],
[13.388184883778528, 52.478108280208296],
[13.418455741221472, 52.478108280208296],
[13.439860469791705, 52.49951300877853],
[13.439860469791705, 52.52978386622147]
]]
},
"properties": {
"tooltip": "GREEN",
"style" : {
"color" : "#228B22",
"fill" : "#228B22",
"opacity": 1.0
},
"width": 5
}
}]
}
Upload your GeoJSON partitions to the GeoJSON layer of your catalog:
olp catalog layer partition put {{YOUR_SRC_CATALOG_HRN}} geojson --input=./source-partitions --scope {{YOUR_PROJECT_HRN}}
The CLI returns as follows:
Partitions 23618402, 23618359 were successfully uploaded.
You can inspect the content of the partitions in the platform portal. To do this, navigate to the portal, find {{YOUR_SRC_CATALOG_HRN}}
in the list, and open this catalog. Select the geojson layer to view its details and inspect its data in the Inspect tab.
You can see outlines for your two partitions in the Berlin area on the map.
To render a green octagon, zoom in and click on these partitions. To open a tooltip with the text "GREEN," hold your mouse over the octagon.
Also, you can list partitions with the OLP CLI by executing the following command:
olp catalog layer partition list {{YOUR_SRC_CATALOG_HRN}} geojson --scope {{YOUR_PROJECT_HRN}}
For more details on working with raw GeoJSON content as Workspace Data, see this document.
Create the destination catalog
Give your destination catalog a unique identifier and name, for example, {{YOUR_USERNAME}}-geojson-copy-dpl-tutorial
.
Replace {{YOUR_DST_CATALOG_ID}}
as shown in the following example with your own identifier, and create the destination catalog and geojson layer in the same way as the source catalog shown previously.
olp catalog create {{YOUR_DST_CATALOG_ID}} "{{YOUR_USERNAME}} geojson example copy"
--summary "geojson example copy"
--description "geojson example copy"
--scope {{YOUR_PROJECT_HRN}}
olp catalog layer add {{YOUR_DST_CATALOG_HRN}} geojson "geojson example" \
--versioned \
--content-type=application/vnd.geo+json \
--partitioning=heretile:12 \
--coverage=DE \
--summary "geojson example copy"
--description "geojson example copy"
--scope {{YOUR_PROJECT_HRN}}
Alternatively, you can query the layer configuration of the GeoJSON layer of your source catalog as json, and then use that to configure your destination catalog.
olp catalog layer show {{YOUR_SRC_CATALOG_HRN}} geojson --json --scope {{YOUR_PROJECT_HRN}}
The CLI returns as follows:
{
"coverage": {"adminAreas": ["DE"]},
"summary": "geojson example",
"volume": {"volumeType": "Durable"},
"layerType": "Versioned",
"billingTags": [],
"name": "geojson example",
"contentEncoding": "",
"description": "geojson example",
"partitioning": {
"scheme": "heretile",
"tileLevels": [12]
},
"id": "geojson",
"contentType": "application/vnd.geo+json",
"tags": []
}
You can then add the output into a config.json
file to update your catalog with the GeoJSON layer configuration.:
{
"id": {{YOUR_CATALOG_ID}},
"name": "{{YOUR_USERNAME}} geojson example copy",
"summary": "geojson example copy",
"description": "geojson example copy",
"layers": [
{{json output from the "olp catalog layer show" command above}}
]
}
olp catalog update {{YOUR_DST_CATALOG_HRN}} --config config.json --scope {{YOUR_PROJECT_HRN}}
Add a layer to contain the internal compilation state. The Data Processing Library uses this layer for stateful processing in subsequent runs. For more details on the state layer, see the Developer's Guide.
olp catalog layer add {{YOUR_DST_CATALOG_HRN}} state "internal compilation state" \
--content-type="application/octet-stream" \
--versioned --partitioning=generic \
--summary "internal compilation state" \
--description "internal compilation state" \
--scope {{YOUR_PROJECT_HRN}}
Copy the GeoJSON layer contents
Create the following folder structure for the project:
copy-geojson
└── src
└── main
├── java
└── resources
└── scala
To do this with a single bash
command:
mkdir -p copy-geojson/src/main/{java,resources,scala}
The POM for this example is identical to that of the first Maven example, except for its parent
and dependencies
section:
<parent>
<groupId>com.here.platform</groupId>
<artifactId>sdk-batch-bom_2.12</artifactId>
<version>2.51.5</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>com.here.platform.data.processing</groupId>
<artifactId>batch-core_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>com.here.platform.data.processing</groupId>
<artifactId>pipeline-runner_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>com.here.platform.data.processing</groupId>
<artifactId>batch-catalog-dataservice_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>com.here.platform.pipeline</groupId>
<artifactId>context-logging_${scala.compat.version}</artifactId>
</dependency>
</dependencies>
<dependencies>
<dependency>
<groupId>com.here.platform.data.processing</groupId>
<artifactId>batch-core-java_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>com.here.platform.data.processing</groupId>
<artifactId>pipeline-runner-java_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>com.here.platform.data.processing</groupId>
<artifactId>batch-catalog-dataservice_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>com.here.platform.pipeline</groupId>
<artifactId>context-logging_${scala.compat.version}</artifactId>
</dependency>
</dependencies>
In the resources
folder, create a file called application.conf
and fill it with these contents:
here.platform.data-processing.driver {
appName = "CopyCatalogScalaTutorial"
}
Configure the logger using the following contents for resources/log4j.properties
:
log4j.rootLogger=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS} %-7p %c{1}: %m%n
log4j.logger.org.apache.spark=WARN
log4j.logger.org.spark_project=WARN
The code for a 1:1 compiler to copy the GeoJSON contents is as follows:
import com.here.platform.data.processing.blobstore.{Payload, Retriever}
import com.here.platform.data.processing.build.BuildInfo
import com.here.platform.data.processing.catalog.Catalog
import com.here.platform.data.processing.catalog.Layer
import com.here.platform.data.processing.compiler._
import com.here.platform.data.processing.driver.config.CompleteConfig
import com.here.platform.data.processing.driver.runner.pipeline.PipelineRunner
import com.here.platform.data.processing.driver.{
DriverBuilder,
DriverContext,
DriverSetupWithBuilder
}
import com.here.platform.pipeline.logging.{ContextLogging, LogContext}
import com.here.platform.data.processing.spark.partitioner.Partitioner
object CopyCatalogScala extends PipelineRunner with DriverSetupWithBuilder {
type Data = Array[Byte]
val srcCatalogId = Catalog.Id("src")
val geojsonLayerId = Layer.Id("geojson")
class Compiler(retriever: Retriever)
extends Direct1ToNCompiler[Data]
with InputLayers
with OutputLayers
with ContextLogging {
override def inLayers: Map[Catalog.Id, Set[Layer.Id]] = Map(srcCatalogId -> Set(geojsonLayerId))
override def outLayers: Set[Layer.Id] = Set(geojsonLayerId)
override def mappingFn(inKey: InKey): Iterable[OutKey] =
Iterable(inKey.copy(catalog = outCatalogId))
override def compileInFn(in: (InKey, InMeta)): Data = {
logger.info("compileInFn (" + in.key + ")")
retriever.getPayload(in.key, in.meta).content
}
override def compileOutFn(outKey: OutKey, intermediate: Data): Option[Payload] = {
logger.info("compileOutFn (" + outKey + ")")
Some(Payload(intermediate))
}
override def inPartitioner(parallelism: Int): Option[Partitioner[InKey]] = None
override def outPartitioner(parallelism: Int): Option[Partitioner[OutKey]] = None
}
val applicationVersion: String = BuildInfo.version
def configureCompiler(completeConfig: CompleteConfig,
context: DriverContext,
builder: DriverBuilder): builder.type = {
val taskBuilder = builder.newTaskBuilder("copycatalogscala")
val compiler = new Compiler(context.inRetriever(srcCatalogId))
builder.addTask(taskBuilder.withDirect1ToNCompiler(compiler).build())
}
}
import com.here.platform.data.processing.build.BuildInfo;
import com.here.platform.data.processing.driver.runner.pipeline.java.PipelineRunner;
import com.here.platform.data.processing.java.Pair;
import com.here.platform.data.processing.java.blobstore.Payload;
import com.here.platform.data.processing.java.blobstore.Retriever;
import com.here.platform.data.processing.java.catalog.partition.Key;
import com.here.platform.data.processing.java.catalog.partition.Meta;
import com.here.platform.data.processing.java.compiler.Direct1ToNCompiler;
import com.here.platform.data.processing.java.compiler.InputLayers;
import com.here.platform.data.processing.java.compiler.OutputLayers;
import com.here.platform.data.processing.java.driver.Default;
import com.here.platform.data.processing.java.driver.DriverBuilder;
import com.here.platform.data.processing.java.driver.DriverContext;
import com.here.platform.data.processing.java.driver.TaskBuilder;
import com.here.platform.data.processing.java.driver.config.CompleteConfig;
import com.here.platform.data.processing.java.spark.partitioner.PartitionerOfKey;
import com.here.platform.pipeline.logging.java.ContextAwareLogger;
import java.util.*;
class Compiler implements Direct1ToNCompiler<byte[]>, InputLayers, OutputLayers {
private String srcCatalogId = "src";
private String geojsonLayerId = "geojson";
private Retriever retriever;
private ContextAwareLogger contextAwareLogger;
Compiler(DriverContext driverContext) {
this.retriever = driverContext.inRetriever(srcCatalogId);
this.contextAwareLogger = new ContextAwareLogger(getClass());
}
@Override
public Map<String, Set<String>> inLayers() {
return Collections.unmodifiableMap(
new HashMap<String, Set<String>>() {
{
put(
srcCatalogId,
new HashSet<String>() {
{
add(geojsonLayerId);
}
});
}
});
}
@Override
public Set<String> outLayers() {
return Collections.unmodifiableSet(
new HashSet<String>() {
{
add(geojsonLayerId);
}
});
}
@Override
public Iterable<Key> mappingFn(Key inKey) {
return Collections.singletonList(
new Key(Default.OutCatalogId(), inKey.layer(), inKey.partition()));
}
@Override
public byte[] compileInFn(Pair<Key, Meta> in) {
contextAwareLogger.info("compileInFn (" + in.getKey() + ")");
return retriever.getPayload(in.getKey(), in.getValue()).content();
}
@Override
public Optional<Payload> compileOutFn(Key outKey, byte[] intermediate) {
contextAwareLogger.info("compileOutFn (" + outKey + ")");
return Optional.of(new Payload(intermediate));
}
@Override
public Optional<PartitionerOfKey> inPartitioner(int parallelism) {
return Optional.empty();
}
@Override
public Optional<PartitionerOfKey> outPartitioner(int parallelism) {
return Optional.empty();
}
}
public class CopyCatalogJava {
public static void main(String... args) {
new PipelineRunner() {
@Override
public String applicationVersion() {
return BuildInfo.version();
}
@Override
public DriverBuilder configureCompiler(
CompleteConfig completeConfig, DriverContext context, DriverBuilder builder) {
TaskBuilder taskBuilder = builder.newTaskBuilder("copycatalogjava");
Compiler compiler = new Compiler(context);
return builder.addTask(taskBuilder.withDirect1ToNCompiler(compiler, byte[].class).build());
}
}.main(args);
}
}
Run the copy as a full compilation
To run the application locally, execute the following command:
mvn compile exec:java \
-Dexec.mainClass=CopyCatalogScala \
-Dpipeline.config.input-catalogs.src.hrn="{{YOUR_SRC_CATALOG_HRN}}" \
-Dpipeline.config.output-catalog.hrn="{{YOUR_DST_CATALOG_HRN}}" \
-Dpipeline.job.catalog-versions.input-catalogs.src.version=0 \
-Dpipeline.job.catalog-versions.input-catalogs.src.processing-type="reprocess" \
-Dexec.args="--master local[*]" \
-Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
mvn compile exec:java \
-Dexec.mainClass=CopyCatalogJava \
-Dpipeline.config.input-catalogs.src.hrn="{{YOUR_SRC_CATALOG_HRN}}" \
-Dpipeline.config.output-catalog.hrn="{{YOUR_DST_CATALOG_HRN}}" \
-Dpipeline.job.catalog-versions.input-catalogs.src.version=0 \
-Dpipeline.job.catalog-versions.input-catalogs.src.processing-type="reprocess" \
-Dexec.args="--master local[*]" \
-Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
After the local job is completed, you can inspect the copied contents in the platform portal. To do this, navigate to the portal, find {{YOUR_DST_CATALOG_HRN}}
in the list and open this catalog. Select the geojson layer to view its details and inspect its data in the Inspect tab.
Update the source catalog
Make a new GeoJSON partition:
Make a folder called source-partitions
.
mkdir new-partitions
Make a file in new-partitions
named 23618408
and populate it with the following GeoJSON.
{
"type": "FeatureCollection",
"features": [{
"type": "Feature",
"geometry": {
"type": "Polygon",
"coordinates": [[
[13.439860469791705, 52.61767449122147],
[13.418455741221472, 52.639079219791704],
[13.388184883778528, 52.639079219791704],
[13.366780155208295, 52.61767449122147],
[13.366780155208295, 52.58740363377853],
[13.388184883778528, 52.565998905208296],
[13.418455741221472, 52.565998905208296],
[13.439860469791705, 52.58740363377853],
[13.439860469791705, 52.61767449122147]
]]
},
"properties": {
"tooltip": "GREEN",
"style" : {
"color" : "#228B22",
"fill" : "#228B22",
"opacity": 1.0
},
"width": 5
}
}]
}
Upload your new partition to the GeoJSON layer of your source catalog:
olp catalog layer partition put {{YOUR_SRC_CATALOG_HRN}} geojson --input=./new-partitions --scope {{YOUR_PROJECT_HRN}}
The CLI returns as follows:
Partition 23618408 was successfully uploaded.
After the local job is completed, you can inspect the copied contents in the platform portal. To do this, navigate to the portal, find {{YOUR_SRC_CATALOG_HRN}}
in the list and open this catalog. Select the geojson layer to view its details and inspect its data in the Inspect tab.
Run incremental compilation
To run the application locally, execute the following command:
mvn compile exec:java \
-Dexec.mainClass=CopyCatalogScala \
-Dpipeline.config.input-catalogs.src.hrn="{{YOUR_SRC_CATALOG_HRN}}" \
-Dpipeline.config.output-catalog.hrn="{{YOUR_DST_CATALOG_HRN}}" \
-Dpipeline.job.catalog-versions.input-catalogs.src.since-version=0 \
-Dpipeline.job.catalog-versions.input-catalogs.src.version=1 \
-Dpipeline.job.catalog-versions.input-catalogs.src.processing-type="changes" \
-Dexec.args="--master local[*]" \
-Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
mvn compile exec:java \
-Dexec.mainClass=CopyCatalogJava \
-Dpipeline.config.input-catalogs.src.hrn="{{YOUR_SRC_CATALOG_HRN}}" \
-Dpipeline.config.output-catalog.hrn="{{YOUR_DST_CATALOG_HRN}}" \
-Dpipeline.job.catalog-versions.input-catalogs.src.since-version=0 \
-Dpipeline.job.catalog-versions.input-catalogs.src.version=1 \
-Dpipeline.job.catalog-versions.input-catalogs.src.processing-type="changes" \
-Dexec.args="--master local[*]" \
-Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
Within the logging output, you can see lines like the ones below. Notice that only the new partition is encountered during compileInFn
and compileOutFn
.
2018/07/09 17:17:44.015 INFO Direct1ToNCompilerExecutor: Incremental compilation execution // Task=copycatalogscala
2018/07/09 17:17:45.290 INFO CopyCatalogScala$Compiler: compileInFn (Key('src,'geojson,23618408))
2018/07/09 17:17:45.655 INFO CopyCatalogScala$Compiler: compileOutFn (Key('output,'geojson,23618408))
2018/07/09 17:27:09.383 INFO Direct1ToNCompilerExecutor: Incremental compilation execution // Task=copycatalogjava
2018/07/09 17:27:11.070 INFO Compiler: compileInFn (Key('src,'geojson,23618408))
2018/07/09 17:27:11.913 INFO Compiler: compileOutFn (Key('output,'geojson,23618408))
Upon completion of the local job, you can inspect the copied contents on the portal at https://platform.here.com/data//geojson/inspect
(replace YOUR_DST_CATALOG_HRN
with your real catalog HRN).
Note
You can also rerun the full compilation from version 0 of your source catalog. That effectively deletes the new partition from your destination catalog and restores it to the same layer contents as version 0 of your source catalog.
Similarly, you can apply the delta between version 0 and version 1 of your source catalog for any given state of the destination catalog using incremental compilation. That is, given an empty destination catalog, you can copy over only the single new partition added between version 0 and version 1 of the source catalog by running the incremental compilation as demonstrated above.
For additional details on the topics covered in this tutorial, you can refer to the following source: Data Processing Library Developer Guide.