This tutorial demonstrates how to use the HERE Traffic API client in a Spark application. The HERE Traffic API client is a part of the Data Client Base Library. The Data Client Base Library provides Scala/Java libraries that you can use in your projects to access the HERE platform. It offers a low-level, stateless and thread-safe programmatic access to HERE platform Data APIs as well as to the HERE platform services, such as Geocode API, Routing API, RevGeocode API, HERE Traffic API and so on. For more information, see the Data Client Base Library documentation.
The HERE Traffic API is a RESTful API that:
Provides access to real-time traffic flow data in JSON format, including information on the speed and jam factor for the region(s) defined in each request. The Traffic API v7 can also deliver additional data, such as the geometry of the road segments in relation to the flow.
Provides aggregated information about traffic incidents in JSON format, including the type and location of each traffic incident, status, start and end time, and other relevant data. This data is useful to dynamically optimize route calculations.
In this tutorial, we will get a snapshot of the real-time traffic flow, upload data to the versioned layer, and visualize the results on the map. Roads with a light load will be marked with a green line, with a medium load - with a yellow line, and with a high load - with a red line.
In order to get real-time traffic conditions, you may download the source code at the beginning of the tutorial and store it in a folder of your choice, or create a folder structure for your project from scratch as follows:
com.here.platform.data.client.base:ols-traffic-v7_${scala.compat.version} to use the HERE Traffic API client.
com.here.platform.data.client:local-support_${scala.compat.version} to write data to a local catalog.
com.here.platform.data.client:spark-support_${scala.compat.version} to write data to a catalog on the platform.
org.apache.spark:spark-core_${scala.compat.version} to run a Java/Scala Spark Application.
com.here.platform.pipeline:pipeline-interface_${scala.compat.version} to get information about input catalogs from the PipelineContext.
com.here.platform.location:location-integration-here-commons_${scala.compat.version} containing the tools to calculate partition Tile IDs for certain geocoordinates. For more details, see Calculate partition Tile IDs.
com.here.hrn:hrn_${scala.compat.version} to parse catalog HRNs.
Once you have created the Maven project, the next step would be to write the code of the application and run it.
Use the Traffic API client
As mentioned before, the tutorial shows how to write a simple Spark application that retrieves data from the HERE Traffic API and visualize the data on the map. All the retrieved data is added to the versioned layer to visualize it with the local data inspector. For more details, see Local development and testing with CLI.
HERE Real-Time Traffic comprises highly accurate data from multiple sources, including connected car probes, roadway sensors, and live operations centers.
Real-Time Traffic contains two types of traffic data:
Flow - information about the speed of travel and congestion along a segment of a roadway. Flow data is updated every minute.
Incidents - information about events that affect the flow of traffic or that may be important for drivers to know. Incident data is updated every two minutes.
Real-Time Traffic covers more than 70 countries globally. For full coverage information, see Traffic coverage.
The HERE Traffic API provides the following endpoints:
For more information about HERE Traffic API endpoints, see the Traffic API Reference.
We will use the Real-Time Flow Information endpoint to get real-time traffic flow data for a geospatial area.
A basic flow request consists of a geospatial filter (in) and the type of location referencing to return (locationReferencing). The response will contain the traffic flow information of the road segments that are within the requested area.
The geospatial filter in can be a circle, a bounding box or a corridor, each having its own format. In our application below, we will use a circle specified by a point with latitude and longitude and a radius: in=circle:52.51652,13.37885;r=3000. This parameter defines a circle with a center at the Brandenburg Gate and a radius of 3 km. For more details on the geospatial filtering types, see Geospatial filtering.
The locationReferencing can be one or more out of tmc, olr, shape. A location reference describes a location. A location can be a particular point, curve, or two-dimensional shape on the surface of the earth. Quite often, when a location is used in an application, it refers to a particular anthropogenic or geographic feature, such as a road, building, mountain, or body of water. In this tutorial, we will use the shape location reference. This allows us to visualize the data on the map in a simple way. For more information on the location referencing types, see Location referencing.
A request to this endpoint returns an array of flow items, each having a location and a currentFlow element. Each location has a length in meters, a description that usually is a street name, and the available location references of the location, depending on which were requested by the locationReferencing parameter. In our case it's latitudes and longitudes. The currentFlow element of each flow item carries the speed, jam factor, and traversability information of the flow item.
The code snippet below demonstrates how to get real-time flow information with the parameters described above:
After getting acquainted with the API client, we can proceed to the application review.
Implement the application
Let's look at the implementation of this Spark application. In the code snippet below, you can see that the main method logic is implemented in 3 methods:
getRealTimeTrafficFlow()
prepareGeoJsonData()
uploadPartitions()
Let's take a closer look at the implementations of these methods.
The getRealTimeTrafficFlow() method is the main part of the application for obtaining real-time traffic flow. All other subsequent methods mentioned above process the data received from this method. If you only want to get a real-time traffic flow without further processing, you just need to add the com.here.platform.data.client.base:ols-traffic-v7_${scala.compat.version} dependency mentioned above and add this method to your application.
The getRealTimeTrafficFlow() method creates a HERE Traffic API client and retrieves the real-time traffic flow using the parameters described at the beginning of this section. The HERE Traffic API requires authentication. By default, the credentials stored in $HOME/.here/credentials.properties are used. For more details, see the Data Client Base Library documentation.
The prepareGeoJsonData() method splits traffic data into partitions using HereTileResolver (for details, see Calculate partition Tile IDs) and converts the data to GeoJSON per each tile. Depending on the jamFactor property, the data display style is different. Roads with a light load will be marked with a green line, with a medium load - with a yellow line and with a high load - with a red line. For more details, see GeoJSON Data and Style GeoJSON Visualization. In this method, we use classes LineString, Geometry, Feature, FeatureCollection that represent GeoJSON. For more details, see the Data Client Library documentation.
The uploadPartitions() method sends GeoJSON data to the versioned catalog using the Data Client Library.
For more information on what the application does, see the comments in the code below.
Java
Scala
/*
* Copyright (c) 2018-2023 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/importakka.actor.ActorSystem;importakka.stream.javadsl.Source;importcom.fasterxml.jackson.annotation.JsonInclude;importcom.fasterxml.jackson.core.JsonProcessingException;importcom.fasterxml.jackson.databind.ObjectMapper;importcom.here.hrn.HRN;importcom.here.platform.data.client.base.javadsl.BaseClient;importcom.here.platform.data.client.base.javadsl.BaseClientJava;importcom.here.platform.data.client.base.ols.generated.javadsl.api.traffic.RealTimeTrafficApi;importcom.here.platform.data.client.base.ols.generated.scaladsl.model.traffic.CurrentFlow;importcom.here.platform.data.client.base.ols.generated.scaladsl.model.traffic.FlowItem;importcom.here.platform.data.client.base.ols.generated.scaladsl.model.traffic.FlowResponse;importcom.here.platform.data.client.base.ols.generated.scaladsl.model.traffic.JLocationShape;importcom.here.platform.data.client.engine.javadsl.DataEngine;importcom.here.platform.data.client.engine.javadsl.WriteEngine;importcom.here.platform.data.client.javadsl.NewPartition;importcom.here.platform.data.client.model.PendingPartition;importcom.here.platform.data.client.model.geojson.Feature;importcom.here.platform.data.client.model.geojson.FeatureCollection;importcom.here.platform.data.client.model.geojson.LineString;importcom.here.platform.data.client.spark.DataClientSparkContextUtils;importcom.here.platform.location.core.geospatial.GeoCoordinate;importcom.here.platform.location.integration.herecommons.geospatial.HereTileLevel;importcom.here.platform.location.integration.herecommons.geospatial.javadsl.HereTileResolver;importcom.here.platform.pipeline.PipelineContext;importjava.io.Serializable;importjava.util.*;importjava.util.concurrent.ExecutionException;publicclassRealTimeTrafficApiJava{privatestaticfinalString LAYER_ID ="realtime-traffic";publicstaticvoidmain(String[] args)throwsJsonProcessingException,ExecutionException,InterruptedException{// get real time traffic flowFlowResponse flowResponse =getRealTimeTrafficFlow("circle:52.51652,13.37885;r=3000");// convert the retrieved data to GeoJson and split them in tilesList<PartitionData> partitionData =prepareGeoJsonData(flowResponse.getResults());// publish data to output catalogPipelineContext pipelineContext =newPipelineContext();uploadPartitions(pipelineContext.getConfig().getOutputCatalog(), partitionData);}/** Get real-time traffic flow from the RealTimeTrafficApi */privatestaticFlowResponsegetRealTimeTrafficFlow(String geo){BaseClient client =BaseClientJava.instance();RealTimeTrafficApi trafficApi =newRealTimeTrafficApi(client);return trafficApi
.getFlow().withIn(geo).withLocationReferencing(Collections.singletonList("shape")).withAdvancedFeatures(Collections.singletonList("deepCoverage")).build().toEntity();}/** Convert the real-time traffic flow response to GeoJson and splitting them by tiles */privatestaticList<PartitionData>prepareGeoJsonData(List<FlowItem> flowItems)throwsJsonProcessingException{ObjectMapper objectMapper =newObjectMapper();
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);List<PartitionData> customDataList =newArrayList<>();Map<Long,List<Feature>> tileIdToFeatureList =newHashMap<>();// go through the flow items and create features from the data
flowItems.forEach(
flowItem ->{List<List<Double>> coordinates =newArrayList<>();// get coordinates from the location to create the geometry from
flowItem
.getLocation().getShape().orElse(newJLocationShape.Builder().build()).getLinks().stream().flatMap(link -> link.getPoints().stream()).forEach(point -> coordinates.add(Arrays.asList(point.getLng(), point.getLat())));// create lineString geometryLineString lineString =newLineString.Builder().withCoordinates(coordinates).build();// create feature using the lineString geometry and flow properties, such as speed,// jamFactor, etcFeature feature =newFeature.Builder().withId(UUID.randomUUID().toString()).withGeometry(lineString).withProperties(getGeoJsonProperties(flowItem.getCurrentFlow())).build();// find tile ID for the given coordinate and save feature per each tile ID
lineString
.getCoordinates().forEach(
list ->{double latitude = list.get(1);double longitude = list.get(0);GeoCoordinate coordinate =newGeoCoordinate(latitude, longitude);long tileId =newHereTileResolver(newHereTileLevel(15)).fromCoordinate(coordinate);List<Feature> featureList =
tileIdToFeatureList.computeIfAbsent(tileId,(k)->newArrayList<>());
featureList.add(feature);});});// create partition data from the featuresfor(Map.Entry<Long,List<Feature>> entry : tileIdToFeatureList.entrySet()){Long tileId = entry.getKey();FeatureCollection featureCollection =newFeatureCollection.Builder().withFeatures(entry.getValue()).build();PartitionData customData =newPartitionData(Long.toString(tileId), LAYER_ID, objectMapper.writeValueAsBytes(featureCollection));
customDataList.add(customData);}return customDataList;}/** Upload data to the versioned layer using Data Client */publicstaticvoiduploadPartitions(HRN catalog,List<PartitionData> partitions)throwsExecutionException,InterruptedException{ActorSystem actorSystem =DataClientSparkContextUtils.context().actorSystem();// create writeEngine for source catalogWriteEngine writeEngine =DataEngine.get(actorSystem).writeEngine(catalog);// parallelism defines how many parallel requests would be made to fetch the dataint parallelism =10;// create a list partitions to uploadArrayList<PendingPartition> partitionList =newArrayList<>();
partitions.forEach(
partitionData ->{NewPartition newPartition =newNewPartition.Builder().withPartition(partitionData.partition).withData(partitionData.data).withLayer(LAYER_ID).build();
partitionList.add(newPartition);});// upload partitions to the catalog
writeEngine
.publishBatch2(
parallelism,Optional.of(Collections.singletonList(LAYER_ID)),Collections.emptyList(),Source.from(partitionList)).toCompletableFuture().get();}/** Generate the real-time traffic flow properties for the Feature object */privatestaticMap<String,Object>getGeoJsonProperties(CurrentFlow currentFlow){Map<String,Object> properties =newHashMap<>();
properties.put("style",generateFillColor(currentFlow.getJamFactor()));
properties.put("speed", currentFlow.getSpeed().orElse(0.0));
properties.put("speedUncapped", currentFlow.getSpeedUncapped().orElse(0.0));
properties.put("freeFlow", currentFlow.getFreeFlow());
properties.put("jamFactor", currentFlow.getJamFactor());
properties.put("confidence", currentFlow.getConfidence().orElse(0.0));
properties.put("traversability", currentFlow.getTraversability().orElse("empty"));return properties;}/**
* Generate RGB encoded color. For high Jam Factor color have more red component, for low Jam
* Factor - more green component. Each color component can have value between 0 and 255, blue
* component not used.
*/privatestaticMap<String,Object>generateFillColor(double jamFactor){Map<String,Object> style =newHashMap<>();int green;int red;if(jamFactor <=3){
green =255;
red =(int)(255*(jamFactor *10)/100*2);}else{
green =(int)(255*((100-(jamFactor *10))/100));
red =255;}
style.put("color","rgb("+ red +", "+ green +", 0)");
style.put("weight","5");return style;}/** Class to store the metadata and real-time traffic flow data */@SuppressWarnings("serial")staticclassPartitionDataimplementsSerializable{String partition;String layerId;byte[] data;PartitionData(String partition,String layerId,byte[] data){this.partition = partition;this.layerId = layerId;this.data = data;}}}
/*
* Copyright (c) 2018-2023 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/importakka.stream.scaladsl.Source
importcom.fasterxml.jackson.annotation.JsonInclude
importcom.fasterxml.jackson.databind.ObjectMapper
importcom.fasterxml.jackson.module.scala.DefaultScalaModule
importcom.here.hrn.HRN
importcom.here.platform.data.client.base.ols.generated.codecs.traffic.JsonSupport._
importcom.here.platform.data.client.base.ols.generated.scaladsl.api.traffic.RealTimeTrafficApi
importcom.here.platform.data.client.base.ols.generated.scaladsl.model.traffic.{
CurrentFlow,
FlowItem,
FlowResponse
}importcom.here.platform.data.client.base.scaladsl.BaseClient
importcom.here.platform.data.client.engine.scaladsl.DataEngine
importcom.here.platform.data.client.model.geojson.{Feature, FeatureCollection, LineString}importcom.here.platform.data.client.scaladsl.NewPartition
importcom.here.platform.data.client.spark.DataClientSparkContextUtils.context.actorSystem
importcom.here.platform.location.core.geospatial.GeoCoordinate
importcom.here.platform.location.integration.herecommons.geospatial.{
HereTileLevel,
HereTileResolver
}importcom.here.platform.pipeline.PipelineContext
importjava.utilimportscala.collection.convert.ImplicitConversions.`map AsJavaMap`
importscala.collection.mutable.ListBuffer
importscala.concurrent.ExecutionContext.Implicits.global
object RealTimeTrafficApiScala {privateval LAYER_ID ="realtime-traffic"def main(args: Array[String]):Unit={// get real time traffic flowval flowResponse = getRealTimeTrafficFlow("circle:52.51652,13.37885;r=3000")// convert the retrieved data to GeoJson and split them in tilesval partitionData = prepareGeoJsonData(flowResponse.getResults)// publish data to output catalogval pipelineContext =new PipelineContext
uploadPartitions(pipelineContext.config.outputCatalog, partitionData)}/**
* Get real-time traffic flow from the RealTimeTrafficApi
*/def getRealTimeTrafficFlow(str:String): FlowResponse ={val client = BaseClient()val trafficApi = client.of[RealTimeTrafficApi]
trafficApi
.getFlow(
in = str,
locationReferencing = List("shape"),
advancedFeatures = List("deepCoverage")).toEntity()}/**
* Convert the real-time traffic flow response to GeoJson
* and splitting them by tiles
*/def prepareGeoJsonData(flowItems: util.List[FlowItem]): List[PartitionData]={val objectMapper =new ObjectMapper()
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL)
objectMapper.registerModule(new DefaultScalaModule)val customDataList = ListBuffer[PartitionData]()val tileIdToFeatureList = scala.collection.mutable.Map[Long, ListBuffer[Feature]]()// go through the flow items and create features from the data
flowItems.forEach(item =>{val coordinates = ListBuffer(ListBuffer[Double]())// get coordinates from the location to create the geometry from
item.getLocation.shape.get.links
.flatMap(link => link.points).foreach(point =>{
coordinates += ListBuffer(point.getLng, point.getLat)})// create lineString geometry
coordinates.remove(0)val lineString = LineString(
coordinates = Option(coordinates.map(_.toList).toList))// create feature using the lineString geometry and flow properties, such as speed,// jamFactor, etcval feature = Feature.apply(
id = Option(java.util.UUID.randomUUID.toString),
geometry = Option(lineString),
properties = Option(getGeoJsonProperties(item.getCurrentFlow).toMap))// find tile ID for the given coordinate and save feature per each tile ID
lineString.getCoordinates.forEach(list =>{val latitude = list.get(1)val longitude = list.get(0)val coordinate = GeoCoordinate.apply(latitude, longitude)val tileId =new HereTileResolver(HereTileLevel(15)).fromCoordinate(coordinate).value
val featureList = tileIdToFeatureList.getOrElseUpdate(tileId, ListBuffer())
featureList += feature
})})// create partition data from the features
tileIdToFeatureList
.entrySet().forEach(entry =>{val tileId = entry.getKey
val featureCollection = FeatureCollection.apply(features = entry.getValue.toList)val customData = PartitionData.apply(
tileId.toString,
LAYER_ID,
objectMapper.writeValueAsBytes(featureCollection))
customDataList += customData
})
customDataList.toList
}/**
* Upload data to the versioned layer using Data Client
*/def uploadPartitions(catalog: HRN, partitionsData: Seq[PartitionData]):Unit={// create writeEngine for a catalogval writeEngine = DataEngine().writeEngine(catalog)val partitionList = ListBuffer[NewPartition]()// create a list partitions to upload
partitionsData.foreach(partitionsData =>{val partition = NewPartition(
partition = partitionsData.partition,
layer = LAYER_ID,
data = NewPartition.ByteArrayData(partitionsData.data))
partitionList += partition
})// upload partitions to the catalog
writeEngine.publishBatch2(parallelism =10,
layerIds = Some(Seq(LAYER_ID)),
dependencies = Seq.empty,
partitions = Source.apply(partitionList.toList))}/**
* Generate the real-time traffic flow properties for the Feature object
*/def getGeoJsonProperties(currentFlow: CurrentFlow): scala.collection.mutable.Map[String,Any]={val properties = scala.collection.mutable.Map[String,Any]()
properties +=("style"-> generateFillColor(currentFlow.getJamFactor))
properties +=("speed"-> currentFlow.getSpeed.orElse(0.0))
properties +=("speedUncapped"-> currentFlow.getSpeedUncapped.orElse(0.0))
properties +=("freeFlow"-> currentFlow.getFreeFlow)
properties +=("jamFactor"-> currentFlow.getJamFactor)
properties +=("confidence"-> currentFlow.getConfidence.orElse(0.0))
properties +=("traversability"-> currentFlow.getTraversability.orElse("empty"))
properties
}/**
* Generate RGB encoded color. For high Jam Factor color have more red component, for low Jam
* Factor - more green component. Each color component can have value between 0 and 255, blue
* component not used.
*/privatedef generateFillColor(jamFactor:Double)={val style = scala.collection.mutable.Map[String,Any]()var green =0var red =0if(jamFactor <=3){
green =255
red =(255*(jamFactor *10)/100*2).toInt
}else{
green =(255*((100-(jamFactor *10))/100)).toInt
red =255}val color ="rgb("+ red.toString +", "+ green.toString +", 0)"
style +=("color"-> color)
style +=("weight"->"5")
style
}/**
* Class to store the metadata and real-time traffic flow data
*/caseclass PartitionData(partition:String, layer:String, data: Array[Byte])}
Once the code is complete, you can prepare the resources and run the application.
Run the application locally
To run the application, you need to prepare the resources - create a catalog with a versioned layer.
In this tutorial, we will run our application locally, therefore, it will be enough for us to create a local catalog. Since they are contained in a local machine, they are not subject to naming conflicts within your realm and you can use any name you want.
Although we do not use the input catalog in our tutorial, we need to create it to fill the input-catalog field in the config file, otherwise, you will get an error about an invalid catalog HRN.
Run the following OLP CLI command from the root of the tutorial folder to create a local input catalog:
{"id":"realtime-traffic-output","name":"Real time traffic output catalog","summary":"Catalog with real time traffic","description":"Catalog with real time traffic","layers":[{"id":"realtime-traffic","name":"realtime-traffic","summary":"RealTimeTraffic","description":"RealTimeTraffic","contentType":"application/geo+json","layerType":"versioned","volume":{"volumeType":"durable"},"partitioning":{"scheme":"heretile","tileLevels":[15]}}]}
Let's take a closer look at the parameters of the versioned layer. Property "contentType": "application/geo+json" means that the layer contains data in GeoJSON format. Property: scheme:heretile means that the data is stored as a quad-tree. The data will be stored on 15 zoom-level. The local data inspector can visualize this tiling schema and GeoJSON without any additional rendering plugins. After you have created the catalogs, run the application from the root of the downloaded tutorial using the following command:
exec.mainClass – entry point to run your application.
here.platform.data-client.endpoint-locator.discovery-service-env=local – configures the Data Client Library to only use local catalogs.
spark.master=local[*] – configures a local Spark run with as many worker threads as there are logical cores on your machine.
pipeline-config.file=local-pipeline-config.conf - configuration file with information about the input and output catalogs.
After the application finishes successfully, you can see the data that was added to the versioned layer in the console.
Once the GeoJSON data is published to the catalog, let's view the results using the following command:
olp local catalog layer inspect hrn:local:data:::output-batch-catalog realtime-traffic
This command will open the browser tab showing a real-time traffic flow:
Now we can proceed to run this application on the platform.
Run the application on the platform
Before running the application on the platform, we need to prepare all the resources for it.
We will run the application in the project. To do this, let's create a project using the OLP CLI.
olp project create $PROJECT_ID$PROJECT_NAME
The OLP CLI should return the following message:
Project YOUR_PROJECT_HRN has been created.
Save the YOUR_PROJECT_HRN value to the $PROJECT_HRN console variable to simplify command running.
Since we will be using the HERE Traffic service, we need to link it to project, so that our HERE Traffic client could resolve the base url of the service. To do this, let's link the service to the project using the following command:
The next step is to create input and output catalogs in the project we have created before. The --scope parameter is used to specify the project in the OLP CLI.
Note that the input catalog is not used in our application. We need this catalog to specify the input-catalog property in the pipeline-config.conf file.
Let's execute the following command from the root of the tutorial folder to create an input catalog:
The output catalog should be created using the output-catalog-configuration.json file. Run the following command from the root of the tutorial folder to create the output catalog:
The next step is to configure the source code to use the output catalog we have created before. Replace the INPUT_CATALOG_HRN and OUTPUT_CATALOG_HRN placeholders in the pipeline-config.conf file with the HRNs of the catalogs you created as described above.
The application has to be packaged with all dependencies in a Fat JAR to be deployed on the platform. The Java/Scala SDK provides a platform profile to generate the Fat JAR. The following command uses the profile to generate the Fat JAR:
mvn -Pplatform clean package
The real-time-traffic-tutorial-<tutorial-version>-platform.jar Fat JAR should be created in the target folder.
Now you have all the resources used by the application, and the application configured to use them and prepared to be deployed on the platform.
Let's create a pipeline in the project scope using the following OLP CLI command:
To create your own pipeline template, you need to specify the template name, batchruntime environment, since in this tutorial we run a Spark application and use the batch environment on the platform, the Fat JAR created by the mvn package -Pplatform command, the main class, the project that the pipeline belongs to, and input catalog IDs that are expected in the pipeline version configuration.
Let's create a pipeline template by running the following OLP CLI command from the root of the tutorial folder:
Let's create a pipeline version in the project scope.
olp pipeline version create test-spark-version $PIPELINE_ID$PIPELINE_TEMPLATE_ID pipeline-config.conf --scope $PROJECT_HRN
Note
If a billing tag is required in your realm, use the --billing-tag <your-billing-tag> parameter.
Once you have created the batch pipeline, pipeline template and pipeline version, you can run the application on the platform by activating the pipeline version.
To activate the pipeline version, run the following OLP CLI command:
olp pipeline version activate $PIPELINE_ID$PIPELINE_VERSION_ID --scope $PROJECT_HRN
After the pipeline is activated, you need to wait until the pipeline reaches the Completed state. For this purpose, execute the following OLP CLI command:
olp pipeline version wait$PIPELINE_ID$PIPELINE_VERSION_ID\
--job-state completed --timeout 600\
--scope $PROJECT_HRN
After your pipeline has reached the Completed state, you can check the result of the application.
For this purpose, execute the following OLP CLI command:
This command will open the browser tab showing a real-time traffic flow:
Conclusion
In this tutorial, you have learned how to use the HERE Traffic API client in the Java/Scala Spark application to get a snapshot of the real-time traffic flow and visualize it on a map.
Further information
For more details on the topics covered in this tutorial, see the following sources: