Create an output catalog to contain the matched trips using this config file, replacing {{YOUR_CATALOG_ID}} and {{YOUR_USERNAME}} with your own.
{"id":"{{YOUR_CATALOG_ID}}","name":"{{YOUR_USERNAME}} Path Matching Tutorial","summary":"Berlin sample matched path in GeoJSON","description":"Berlin sample matched path in GeoJSON","layers":[{"id":"matched-trips","name":"GeoJSON matched trips","summary":"GeoJSON matched trips","description":"GeoJSON matched trips","contentType":"application/vnd.geo+json","layerType":"versioned","volume":{"volumeType":"durable"},"partitioning":{"scheme":"heretile","tileLevels":[14]},"coverage":{"adminAreas":["DE"]}}]}
Using {{YOUR_PROJECT_HRN}} from Organize your work in projects tutorial, create a catalog with a GeoJSON layer at level 14 (the same as in the input catalog):
The main() body contains direct invocations of the Location Libraries for the path matching logic. The helper methods implement the marshalling of input and output data via the Data Client Library.
Scala
Java
/*
* Copyright (c) 2018-2021 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/importakka.actor.ActorSystem
importcom.here.hrn.HRN
importcom.here.platform.data.client.engine.scaladsl.DataEngine
importcom.here.platform.data.client.model.VersionDependency
importcom.here.platform.data.client.scaladsl.{CommitPartition, DataClient, NewPartition}importcom.here.platform.data.client.spark.DataClientSparkContextUtils
importcom.here.platform.data.client.spark.LayerDataFrameReader._
importcom.here.platform.data.client.spark.SparkSupport._
importcom.here.platform.location.core.geospatial.GeoCoordinate
importcom.here.platform.location.core.mapmatching.OnRoad
importcom.here.platform.location.dataloader.core.caching.CacheManager
importcom.here.platform.location.dataloader.spark.SparkCatalogFactory
importcom.here.platform.location.integration.optimizedmap.mapmatching.PathMatchers
importcom.here.platform.pipeline.PipelineContext
importorg.apache.spark.sql.expressions.UserDefinedFunction
importorg.apache.spark.sql.functions.{udf, _}importorg.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}// Read sensor data from an SDII archive and output the map-matched paths// to a GeoJSON output catalog with the same tiling scheme.object BatchPathMatchingScala {privateval pipelineContext =new PipelineContext
privateval outputCatalogHrn = pipelineContext.config.outputCatalog
privateval outputLayer ="matched-trips"privateval sensorDataArchiveHrn = pipelineContext.config.inputCatalogs("sensor")privateval sensorDataArchiveVersionLayerName ="sample-index-layer"privateval locationCatalogHrn = pipelineContext.config.inputCatalogs("location")privateval locationCatalogVersion = pipelineContext.job.get.inputCatalogs("location").version
val sparkSession: SparkSession =
SparkSession.builder().appName("BatchPathMatchingPipeline").getOrCreate()importsparkSession.implicits._
def main(args: Array[String]):Unit={val sensorData: Dataset[SensorData]= getSensorData
// Set up the location library catalog factory and cache used by the path matcher.val locationLibraryCatalogFactory =new SparkCatalogFactory
val locationLibraryCatalog =
locationLibraryCatalogFactory.create(locationCatalogHrn, locationCatalogVersion)val locationLibraryCache = CacheManager.withLruCache
// Map match each path inside each sensor archive tile (https://en.wikipedia.org/wiki/Map_matching)// and return a list of lists of coordinates.// Each element in the top-level list represents a path as a list of coordinates.val matchedPaths: Dataset[(Long, Seq[Seq[GeoCoordinate]])]=
sensorData.mapPartitions { sensorDataIterator =>val pathMatcher =
PathMatchers.carPathMatcher[GeoCoordinate](locationLibraryCatalog, locationLibraryCache)
sensorDataIterator.map { sensorDataElement =>
sensorDataElement.tileId ->
sensorDataElement.positionEstimateList
.map {
pathMatcher
.matchPath(_).results
.flatMap {case OnRoad(matched)=>
Some(GeoCoordinate(matched.nearest.latitude, matched.nearest.longitude))case _ => None
}}}}
publish(convertToGeoJSON(matchedPaths))}// Retrieves the contents of the sensor data archive partitions.privatedef getSensorData: Dataset[SensorData]={// Extracting input data.// There are two different options for the query parameter: `hour_from` and `tile_id`.// You can either make it `hour_from>0` to get all available messages or make it// `tile_id==$tile_id` to get messages by the specific partition.// Possible tile_ids = [377893756, 377894443, 377894442, ...].// For CN environment = [389695267, 389696772, 389695401, ...].// You can implement more complex queries with RSQL// (https://developer.here.com/documentation/data-client-library/dev_guide/client/rsql.html)val sdiiMessages: DataFrame = sparkSession
.readLayer(sensorDataArchiveHrn, sensorDataArchiveVersionLayerName).query(s"hour_from>0").load()val sensorData = sdiiMessages
.select($"idx_tile_id" as "tileId", $"path.positionEstimate" as "fullPositionEstimate").withColumn("positionEstimate", convertToGeoCoordinate($"fullPositionEstimate")).groupBy("tileId").agg(collect_list("positionEstimate") as "positionEstimateList").as[SensorData]
sensorData
}val convertToGeoCoordinate: UserDefinedFunction = udf((positionEstimateRows: Seq[Row])=>{
positionEstimateRows.map { positionEstimate =>val lat = positionEstimate.getAs[Double]("latitude_deg")val lon = positionEstimate.getAs[Double]("longitude_deg")
GeoCoordinate(lat, lon)}})// Publish the GeoJSON partitions to the output catalog using Data Client Library.privatedef publish(geoJsonByTile: Dataset[(Long,String)]):Unit={val masterActorSystem: ActorSystem =
DataClientSparkContextUtils.context.actorSystem
// Start commit on master.val masterPublishApi = DataClient(masterActorSystem).publishApi(outputCatalogHrn)// Get the latest catalog version of the output catalog.val baseVersion = masterPublishApi.getBaseVersion().awaitResult()// Fill in the direct and indirect dependencies for the output catalog, given the direct inputs.// This is good practice, especially if you intend to use the scheduler to// determine when the pipeline should run, (i.e., by using the --with-scheduler option// in// For users using platform.here.com:// https://developer.here.com/documentation/open-location-platform-cli/user_guide/topics/pipeline/version-commands.html#pipeline-version-create// For users using platform.hereolp.cn:// https://developer.here.com/cn/documentation/open-location-platform-cli/user_guide/topics/pipeline/version-commands.html#pipeline-version-createval dependencies =
gatherDependencies(locationCatalogHrn, locationCatalogVersion)// Start a publication batch on top of most recent catalog version.val batchToken =
masterPublishApi.startBatch2(baseVersion, Some(Seq(outputLayer)), dependencies).awaitResult()// Send partitions to workers, and upload data and metadata.
geoJsonByTile.foreachPartition({ partitions =>val workerActorSystem = DataClientSparkContextUtils.context.actorSystem
val workerPublishApi = DataClient(workerActorSystem).publishApi(outputCatalogHrn)val workerWriteEngine = DataEngine(workerActorSystem).writeEngine(outputCatalogHrn)val committedPartitions: Iterator[CommitPartition]=
partitions.map {case(tileId:Long, geoJson:String)=>val newPartition =
NewPartition(
partition = tileId.toString,
layer = outputLayer,
data = NewPartition.ByteArrayData(geoJson.getBytes))
workerWriteEngine.put(newPartition).awaitResult()}
workerPublishApi
.publishToBatch(batchToken, committedPartitions).awaitResult()})
masterPublishApi.completeBatch(batchToken).awaitResult()
sparkSession.stop()}// Convert the trips to GeoJson; each trip is converted to a linestring.privatedef convertToGeoJSON(
matchedTrips: Dataset[(Long, Seq[Seq[GeoCoordinate]])]): Dataset[(Long,String)]=
matchedTrips.map { matchedTrip =>val features = matchedTrip._2
.map(
coordinates =>"""{ "type": "Feature", "geometry": """+"""{ "type": "LineString", "coordinates": """+
coordinates.map(c => s"[${c.longitude}, ${c.latitude}]").mkString("[",",","]")+"}"+""", "properties": {} }""")val featureCollection =
features.mkString("""{ "type": "FeatureCollection", "features": [""",",","]}")(matchedTrip._1, featureCollection)}// Gather the dependencies for the output catalog which depends on the location library catalog.privatedef gatherDependencies(locationHrn: HRN,
locationVersion:Long): Seq[VersionDependency]={val sparkActorSystem = DataClientSparkContextUtils.context.actorSystem
val locationLibraryQuery = DataClient(sparkActorSystem).queryApi(locationHrn)val locationLibraryDeps =
locationLibraryQuery.getVersion(locationVersion).awaitResult().dependencies
// The output catalog depends directly on the location library// catalog, and indirectly on its respective dependencies.
locationLibraryDeps.map(_.copy(direct =false))++
Seq(VersionDependency(locationHrn, locationVersion, direct =true))}caseclass SensorData(tileId:Long, positionEstimateList: Seq[Seq[GeoCoordinate]])}
/*
* Copyright (c) 2018-2021 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/importstaticorg.apache.spark.sql.functions.col;importstaticorg.apache.spark.sql.functions.collect_list;importstaticorg.apache.spark.sql.functions.udf;importakka.actor.ActorSystem;importakka.japi.Pair;importcom.here.hrn.HRN;importcom.here.platform.data.client.engine.javadsl.DataEngine;importcom.here.platform.data.client.engine.javadsl.WriteEngine;importcom.here.platform.data.client.javadsl.CommitPartition;importcom.here.platform.data.client.javadsl.DataClient;importcom.here.platform.data.client.javadsl.NewPartition;importcom.here.platform.data.client.javadsl.PublishApi;importcom.here.platform.data.client.javadsl.QueryApi;importcom.here.platform.data.client.model.BatchToken;importcom.here.platform.data.client.model.VersionDependency;importcom.here.platform.data.client.spark.DataClientSparkContextUtils;importcom.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;importcom.here.platform.location.core.geospatial.GeoCoordinate;importcom.here.platform.location.core.mapmatching.MatchResult;importcom.here.platform.location.core.mapmatching.NoTransition;importcom.here.platform.location.core.mapmatching.OnRoad;importcom.here.platform.location.core.mapmatching.javadsl.MatchResults;importcom.here.platform.location.core.mapmatching.javadsl.PathMatcher;importcom.here.platform.location.dataloader.core.Catalog;importcom.here.platform.location.dataloader.core.caching.CacheManager;importcom.here.platform.location.dataloader.spark.SparkCatalogFactory;importcom.here.platform.location.inmemory.graph.Vertex;importcom.here.platform.location.integration.optimizedmap.mapmatching.javadsl.PathMatchers;importcom.here.platform.pipeline.PipelineContext;importjava.io.Serializable;importjava.util.*;importjava.util.concurrent.ExecutionException;importjava.util.stream.Collectors;importjava.util.stream.StreamSupport;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.sql.Dataset;importorg.apache.spark.sql.Encoder;importorg.apache.spark.sql.Encoders;importorg.apache.spark.sql.Row;importorg.apache.spark.sql.SparkSession;importorg.apache.spark.sql.api.java.UDF1;importorg.apache.spark.sql.expressions.UserDefinedFunction;importorg.apache.spark.sql.types.ArrayType;importorg.apache.spark.sql.types.DataTypes;importscala.collection.JavaConversions;importscala.collection.mutable.WrappedArray;// Read sensor data from an SDII archive and output the map-matched paths// to a GeoJSON output catalog with the same tiling scheme.publicclassBatchPathMatchingJava{privatestaticPipelineContext pipelineContext =newPipelineContext();privatestaticHRN outputCatalogHrn = pipelineContext.getConfig().getOutputCatalog();privatestaticString outputLayer ="matched-trips";privatestaticHRN sensorDataArchiveHrn =
pipelineContext.getConfig().getInputCatalogs().get("sensor");privatestaticString sensorDataArchiveVersionLayerName ="sample-index-layer";privatestaticHRN locationCatalogHrn =
pipelineContext.getConfig().getInputCatalogs().get("location");privatestaticLong locationCatalogVersion =
pipelineContext.getJob().get().getInputCatalogs().get("location").version();privatestaticSparkSession sparkSession =SparkSession.builder().appName("BatchPathMatchingPipeline").getOrCreate();publicstaticvoidmain(String[] args)throwsInterruptedException,ExecutionException{Dataset<SensorData> sensorData =getSensorData();// Set up the location library catalog factory and cache used by the path matcher.SparkCatalogFactory locationLibraryCatalogFactory =newSparkCatalogFactory(scala.concurrent.duration.Duration.Inf());Catalog locationLibraryCatalog =
locationLibraryCatalogFactory.create(locationCatalogHrn, locationCatalogVersion);CacheManager locationLibraryCache =CacheManager.withLruCache();// Map match each path inside each sensor archive tile// (https://en.wikipedia.org/wiki/Map_matching)// and return a list of lists of (lon,lat) string pairs.// Each element in the top-level list represents a path as a list of coordinates.JavaRDD<Pair<Long,List<List<GeoCoordinate>>>> matchedPaths =
sensorData
.javaRDD().mapPartitions(
tiles ->{PathMatcher<GeoCoordinate,Vertex,NoTransition> pathMatcher =PathMatchers.carPathMatcher(locationLibraryCatalog, locationLibraryCache);Iterable<SensorData> tileIterable =()-> tiles;returnStreamSupport.stream(tileIterable.spliterator(),false).map(
sensorDataElement ->newPair<>(
sensorDataElement.getTileId(),getCoordinatesMatchedToPath(
pathMatcher, sensorDataElement.getPositionEstimateList()))).iterator();});publish(convertToGeoJSON(matchedPaths));}// Using path matcher get new list of lists of GeoCoordinates,// each list representing a path.privatestaticList<List<GeoCoordinate>>getCoordinatesMatchedToPath(PathMatcher<GeoCoordinate,Vertex,NoTransition> pathMatcher,List<List<String>> positionEstimatePathList){return positionEstimatePathList
.stream().map(BatchPathMatchingJava::convertToGeoCoordinates).map(
positionEstimatePath ->
pathMatcher
.matchPath(positionEstimatePath).results().stream().map(BatchPathMatchingJava::matchResultOnRoad).filter(Objects::nonNull).collect(Collectors.toList())).collect(Collectors.toList());}// Convert list of string pairs, representing coordinates to actual GeoCoordinatesprivatestaticList<GeoCoordinate>convertToGeoCoordinates(List<String> coordinatePairs){return coordinatePairs
.stream().map(
coordinatePair ->{String[] coordinates = coordinatePair.split(",",2);double latitude =Double.parseDouble(coordinates[0]);double longitude =Double.parseDouble(coordinates[1]);returnnewGeoCoordinate(latitude, longitude);}).collect(Collectors.toList());}// Construct new GeoCoordinates for on-road matches.privatestaticGeoCoordinatematchResultOnRoad(MatchResult<Vertex> matchResult){if(MatchResults.isOnRoad(matchResult)){OnRoad<Vertex> onRoad =(OnRoad<Vertex>) matchResult;GeoCoordinate nearest = onRoad.elementProjection().nearest();returnnewGeoCoordinate(nearest.latitude(), nearest.longitude());}returnnull;}// Retrieves the contents of the sensor data archive partitions.privatestaticDataset<SensorData>getSensorData(){// Extracting input data.// There are two different options for the query parameter: `hour_from` and `tile_id`.// You can either make it `hour_from>0` to get all available messages or make it// `tile_id==$tile_id` to get messages by the specific partition.// Possible tile_ids = [377893756, 377894443, 377894442, ...].// For CN environment = [389695267, 389696772, 389695401, ...].// You can implement more complex queries with RSQL// (https://developer.here.com/documentation/data-client-library/dev_guide/client/rsql.html)Dataset<Row> sdiiMessages =JavaLayerDataFrameReader.create(sparkSession).readLayer(sensorDataArchiveHrn, sensorDataArchiveVersionLayerName).query("hour_from>0").load();// Define structure of List of coordinate pairs - Spark uses it to encode dataArrayType schema =DataTypes.createArrayType(DataTypes.StringType);// UDF to extract only coordinates from the complex structure of pathUserDefinedFunction positionEstimateToCoordinatePair =udf((UDF1<WrappedArray<Row>,List<String>>)
positionEstimateList ->JavaConversions.seqAsJavaList(positionEstimateList).stream().map(
positionEstimate ->{Double lat = positionEstimate.getAs("latitude_deg");Double lon = positionEstimate.getAs("longitude_deg");return lat +","+ lon;}).collect(Collectors.toList()),
schema);Encoder<SensorData> sensorDataEncoder =Encoders.bean(SensorData.class);// Extract list of paths (each path represented as a list of coordinates)return sdiiMessages
.select(col("idx_tile_id").as("tileId"),col("path.positionEstimate").as("fullPositionEstimate")).withColumn("positionEstimate", positionEstimateToCoordinatePair.apply(col("fullPositionEstimate"))).groupBy("tileId").agg(collect_list("positionEstimate").as("positionEstimateList")).as(sensorDataEncoder);}// Publish the GeoJSON partitions to the output catalog using Data Client Library.privatestaticvoidpublish(JavaRDD<Pair<Long,String>> geoJsonByTile)throwsInterruptedException,ExecutionException{ActorSystem masterActorSystem =DataClientSparkContextUtils.context().actorSystem();// Start commit on master.PublishApi masterPublishApi =DataClient.get(masterActorSystem).publishApi(outputCatalogHrn);// Get the latest catalog version of the output catalog.OptionalLong baseVersion = masterPublishApi.getBaseVersion().toCompletableFuture().get();// Fill in the direct and indirect dependencies for the output catalog, given the direct inputs.// This is good practice, especially if you intend to use the scheduler to// determine when the pipeline should run, (i.e., by using the --with-scheduler option// in// For users using platform.here.com:// https://developer.here.com/documentation/open-location-platform-cli/user_guide/topics/pipeline/version-commands.html#pipeline-version-create// For users using platform.hereolp.cn:// https://developer.here.com/cn/documentation/open-location-platform-cli/user_guide/topics/pipeline/version-commands.html#pipeline-version-createList<VersionDependency> dependencies =gatherDependencies(locationCatalogHrn, locationCatalogVersion);// Start a publication batch on top of most recent catalog version.BatchToken batchToken =
masterPublishApi
.startBatch2(
baseVersion,Optional.of(Collections.singletonList(outputLayer)), dependencies).toCompletableFuture().get();// Send partitions to workers, and upload data and metadata.
geoJsonByTile.foreachPartition(
partitions ->{ActorSystem workerActorSystem =DataClientSparkContextUtils.context().actorSystem();PublishApi workerPublishApi =DataClient.get(workerActorSystem).publishApi(outputCatalogHrn);WriteEngine workerWriteEngine =DataEngine.get(workerActorSystem).writeEngine(outputCatalogHrn);ArrayList<CommitPartition> commitPartitions =newArrayList<>();while(partitions.hasNext()){Pair<Long,String> content = partitions.next();NewPartition newPartition =newNewPartition.Builder().withPartition(content.first().toString()).withLayer(outputLayer).withData(content.second().getBytes()).build();
commitPartitions.add(workerWriteEngine.put(newPartition).toCompletableFuture().join());}
workerPublishApi
.publishToBatch(batchToken, commitPartitions.iterator()).toCompletableFuture().join();});// Complete the commit.
masterPublishApi.completeBatch(batchToken).toCompletableFuture().join();
sparkSession.stop();}// Convert the trips to GeoJson; each trip is converted to a linestring.privatestaticJavaRDD<Pair<Long,String>>convertToGeoJSON(JavaRDD<Pair<Long,List<List<GeoCoordinate>>>> matchedTrips){return matchedTrips.map(
matchedTrip ->{String featureCollection =
matchedTrip
.second().stream().map(BatchPathMatchingJava::mapToFeature).collect(Collectors.joining(",","{ \"type\": \"FeatureCollection\", \"features\": [","]}"));returnnewPair<>(matchedTrip.first(), featureCollection);});}// Convert path to GeoJson FeatureprivatestaticStringmapToFeature(List<GeoCoordinate> coordinates){StringJoiner coordJoiner =newStringJoiner(",","[","]");
coordinates
.stream().map(c ->"["+ c.getLongitude()+", "+ c.getLatitude()+"]").forEach(coordJoiner::add);return"{ \"type\": \"Feature\", \"geometry\": "+"{ \"type\": \"LineString\", \"coordinates\": "+ coordJoiner.toString()+"}"+", \"properties\": {} }";}// Gather the dependencies for the output catalog which depends on the location library catalog.privatestaticList<VersionDependency>gatherDependencies(HRN locationHrn,Long locationVersion){ActorSystem sparkActorSystem =DataClientSparkContextUtils.context().actorSystem();QueryApi locationLibraryQuery =DataClient.get(sparkActorSystem).queryApi(locationHrn);List<VersionDependency> locationDeps =
locationLibraryQuery
.getVersion(locationVersion).toCompletableFuture().join().getDependencies();List<VersionDependency> retval =
locationDeps
.stream().map(dep ->newVersionDependency(dep.hrn(), dep.version(),false)).collect(Collectors.toList());
retval.add(newVersionDependency(locationHrn, locationVersion,true));return retval;}/**
* Class used for encoding data in Dataset. Contains tileId and list of lists of pairs, each list
* representing a path and each pair representing coordinate.
*/publicstaticclassSensorDataimplementsSerializable{privateLong tileId;privateList<List<String>> positionEstimateList;publicLonggetTileId(){return tileId;}publicvoidsetTileId(Long tileId){this.tileId = tileId;}publicList<List<String>>getPositionEstimateList(){return positionEstimateList;}publicvoidsetPositionEstimateList(List<List<String>> positionEstimateList){this.positionEstimateList = positionEstimateList;}}}
Declare the Catalog Inputs to the Path Matching Application
The pipeline configuration files which declare the catalog inputs to the path matching application are as follows.
Replace the value for {{YOUR_CATALOG_HRN}} in the output-catalog HRN.
pipeline-config.conf
pipeline.config {
output-catalog {hrn = "{{YOUR_CATALOG_HRN}}"}
input-catalogs {
//Please, use hrn:here-cn:data::olp-cn-here:here-optimized-map-for-location-library-china-2 on China Environment
location {hrn = "hrn:here:data::olp-here:here-optimized-map-for-location-library-2"}
//Please, use hrn:here-cn:data::olp-cn-here:sample-data on China Environment
sensor {hrn = "hrn:here:data::olp-here:olp-sdii-sample-berlin-2"}
}
}
Project resource hrn:here:data::olp-here:olp-sdii-sample-berlin-2 has been linked.
Project resource hrn:here:data::olp-here:here-optimized-map-for-location-library-2 has been linked.
pipeline-job.conf
// NOTE: If you are running the application in a pipeline with the scheduler,
// i.e., using the --with-scheduler option when creating the pipeline version
// (https://developer.here.com/documentation/open-location-platform-cli/user_guide/topics/pipeline/version-commands.html#pipeline-version-create)
// then this file is not needed, as the scheduler will pick up the latest versions
// of the input catalogs.
pipeline.job.catalog-versions {
output-catalog {base-version = -1}
input-catalogs {
location {
processing-type = "reprocess"
version = 1
}
}
}
For more details on the capabilities and architecture of the Location Library, see the Developer's Guide. The Code Examples page also lists more detailed code examples using the Location Library.
You can also build this simple path matching application as a fat JAR file:
mvn -Pplatform clean package
and deploy it via the Pipeline API. For more details on pipeline deployment, see the Pipelines Developer's Guide.