Use HERE Platform Service

Objectives: Use the HERE Traffic API client in a Spark application to retrieve and visualize a real-time traffic flow on a map

Complexity: Beginner

Time to complete: 30 min

Prerequisites: Get your credentials, Verify your credentials

Source code: Download

This tutorial demonstrates how to use the HERE Traffic API client in a Spark application. The HERE Traffic API client is a part of the Data Client Base Library. The Data Client Base Library provides Scala/Java libraries that you can use in your projects to access the HERE platform. It offers a low-level, stateless and thread-safe programmatic access to HERE platform Data APIs as well as to the HERE platform services, such as Geocode API, Routing API, RevGeocode API, HERE Traffic API and so on. For more information, see the Data Client Base Library documentation.

The HERE Traffic API is a RESTful API that:

  • Provides access to real-time traffic flow data in JSON format, including information on the speed and jam factor for the region(s) defined in each request. The Traffic API v7 can also deliver additional data, such as the geometry of the road segments in relation to the flow.
  • Provides aggregated information about traffic incidents in JSON format, including the type and location of each traffic incident, status, start and end time, and other relevant data. This data is useful to dynamically optimize route calculations.

In this tutorial, we will get a snapshot of the real-time traffic flow, upload data to the versioned layer, and visualize the results on the map. Roads with a light load will be marked with a green line, with a medium load - with a yellow line, and with a high load - with a red line.

Real Time Traffic
Figure 1. Real Time Traffic

The tutorial covers the following topics:

Set up a Maven project

In order to get real-time traffic conditions, you may download the source code at the beginning of the tutorial and store it in a folder of your choice, or create a folder structure for your project from scratch as follows:

here-realtime-traffic-client
└── src
    └── main
        ├── java
        └── resources
        └── scala

You can do this with a single bash command:

mkdir -p here-realtime-traffic/src/main/{java,resources,scala}

The Maven POM file is similar to the one in the Verify Maven Settings tutorial, with the parent POM and dependencies sections updated.

The Parent POM is sdk-batch-bom_2.12 as this tutorial is designed to run Spark applications in both local and platform environments.

<parent>
    <groupId>com.here.platform</groupId>
    <artifactId>sdk-batch-bom_2.12</artifactId>
    <version>2.54.3</version>
    <relativePath/>
</parent>

The following dependencies are used:

  • com.here.platform.data.client.base:ols-traffic-v7_${scala.compat.version} to use the HERE Traffic API client.
  • com.here.platform.data.client:local-support_${scala.compat.version} to write data to a local catalog.
  • com.here.platform.data.client:spark-support_${scala.compat.version} to write data to a catalog on the platform.
  • org.apache.spark:spark-core_${scala.compat.version} to run a Java/Scala Spark Application.
  • com.here.platform.pipeline:pipeline-interface_${scala.compat.version} to get information about input catalogs from the PipelineContext.
  • com.here.platform.location:location-integration-here-commons_${scala.compat.version} containing the tools to calculate partition Tile IDs for certain geocoordinates. For more details, see Calculate partition Tile IDs.
  • com.here.hrn:hrn_${scala.compat.version} to parse catalog HRNs.

Dependencies:

<dependencies>
    <dependency>
        <groupId>com.here.platform.data.client.base</groupId>
        <artifactId>ols-traffic-v7_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>com.here.platform.data.client</groupId>
        <artifactId>spark-support_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>com.here.platform.pipeline</groupId>
        <artifactId>pipeline-interface_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>com.here.platform.data.client</groupId>
        <artifactId>local-support_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>com.here.platform.location</groupId>
        <artifactId>location-integration-here-commons_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>com.here.hrn</groupId>
        <artifactId>hrn_${scala.compat.version}</artifactId>
    </dependency>

</dependencies>

Once you have created the Maven project, the next step would be to write the code of the application and run it.

Use the Traffic API client

As mentioned before, the tutorial shows how to write a simple Spark application that retrieves data from the HERE Traffic API and visualize the data on the map. All the retrieved data is added to the versioned layer to visualize it with the local data inspector. For more details, see Local development and testing with CLI.

HERE Real-Time Traffic comprises highly accurate data from multiple sources, including connected car probes, roadway sensors, and live operations centers.

Real-Time Traffic contains two types of traffic data:

  • Flow - information about the speed of travel and congestion along a segment of a roadway. Flow data is updated every minute.
  • Incidents - information about events that affect the flow of traffic or that may be important for drivers to know. Incident data is updated every two minutes.

Real-Time Traffic covers more than 70 countries globally. For full coverage information, see Traffic coverage.

The HERE Traffic API provides the following endpoints:

For more information about HERE Traffic API endpoints, see the Traffic API Reference.

We will use the Real-Time Flow Information endpoint to get real-time traffic flow data for a geospatial area.

A basic flow request consists of a geospatial filter (in) and the type of location referencing to return (locationReferencing). The response will contain the traffic flow information of the road segments that are within the requested area.

The geospatial filter in can be a circle, a bounding box or a corridor, each having its own format. In our application below, we will use a circle specified by a point with latitude and longitude and a radius: in=circle:52.51652,13.37885;r=3000. This parameter defines a circle with a center at the Brandenburg Gate and a radius of 3 km. For more details on the geospatial filtering types, see Geospatial filtering.

The locationReferencing can be one or more out of tmc, olr, shape. A location reference describes a location. A location can be a particular point, curve, or two-dimensional shape on the surface of the earth. Quite often, when a location is used in an application, it refers to a particular anthropogenic or geographic feature, such as a road, building, mountain, or body of water. In this tutorial, we will use the shape location reference. This allows us to visualize the data on the map in a simple way. For more information on the location referencing types, see Location referencing.

A request to this endpoint returns an array of flow items, each having a location and a currentFlow element. Each location has a length in meters, a description that usually is a street name, and the available location references of the location, depending on which were requested by the locationReferencing parameter. In our case it's latitudes and longitudes. The currentFlow element of each flow item carries the speed, jam factor, and traversability information of the flow item.

The code snippet below demonstrates how to get real-time flow information with the parameters described above:

Java
Scala
    FlowResponse response = trafficApi
            .getFlow()
            .withIn("circle:52.51652,13.37885;r=3000")
            .withLocationReferencing(Collections.singletonList("shape"))
            .withAdvancedFeatures(Collections.singletonList("deepCoverage"))
            .build()
            .toEntity();

    val response = trafficApi
      .getFlow(in = "circle:52.51652,13.37885;r=3000",
        locationReferencing = List("shape"),
        advancedFeatures = List("deepCoverage")
      )
      .toEntity()

After getting acquainted with the API client, we can proceed to the application review.

Implement the application

Let's look at the implementation of this Spark application. In the code snippet below, you can see that the main method logic is implemented in 3 methods:

  • getRealTimeTrafficFlow()
  • prepareGeoJsonData()
  • uploadPartitions()

Let's take a closer look at the implementations of these methods.

The getRealTimeTrafficFlow() method is the main part of the application for obtaining real-time traffic flow. All other subsequent methods mentioned above process the data received from this method. If you only want to get a real-time traffic flow without further processing, you just need to add the com.here.platform.data.client.base:ols-traffic-v7_${scala.compat.version} dependency mentioned above and add this method to your application.

The getRealTimeTrafficFlow() method creates a HERE Traffic API client and retrieves the real-time traffic flow using the parameters described at the beginning of this section. The HERE Traffic API requires authentication. By default, the credentials stored in $HOME/.here/credentials.properties are used. For more details, see the Data Client Base Library documentation.

The prepareGeoJsonData() method splits traffic data into partitions using HereTileResolver (for details, see Calculate partition Tile IDs) and converts the data to GeoJSON per each tile. Depending on the jamFactor property, the data display style is different. Roads with a light load will be marked with a green line, with a medium load - with a yellow line and with a high load - with a red line. For more details, see GeoJSON Data and Style GeoJSON Visualization. In this method, we use classes LineString, Geometry, Feature, FeatureCollection that represent GeoJSON. For more details, see the Data Client Library documentation.

The uploadPartitions() method sends GeoJSON data to the versioned catalog using the Data Client Library.

For more information on what the application does, see the comments in the code below.

Java
Scala
/*
 * Copyright (c) 2018-2023 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import akka.actor.ActorSystem;
import akka.stream.javadsl.Source;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.here.hrn.HRN;
import com.here.platform.data.client.base.javadsl.BaseClient;
import com.here.platform.data.client.base.javadsl.BaseClientJava;
import com.here.platform.data.client.base.ols.generated.javadsl.api.traffic.RealTimeTrafficApi;
import com.here.platform.data.client.base.ols.generated.scaladsl.model.traffic.CurrentFlow;
import com.here.platform.data.client.base.ols.generated.scaladsl.model.traffic.FlowItem;
import com.here.platform.data.client.base.ols.generated.scaladsl.model.traffic.FlowResponse;
import com.here.platform.data.client.base.ols.generated.scaladsl.model.traffic.JLocationShape;
import com.here.platform.data.client.engine.javadsl.DataEngine;
import com.here.platform.data.client.engine.javadsl.WriteEngine;
import com.here.platform.data.client.javadsl.NewPartition;
import com.here.platform.data.client.model.PendingPartition;
import com.here.platform.data.client.model.geojson.Feature;
import com.here.platform.data.client.model.geojson.FeatureCollection;
import com.here.platform.data.client.model.geojson.LineString;
import com.here.platform.data.client.spark.DataClientSparkContextUtils;
import com.here.platform.location.core.geospatial.GeoCoordinate;
import com.here.platform.location.integration.herecommons.geospatial.HereTileLevel;
import com.here.platform.location.integration.herecommons.geospatial.javadsl.HereTileResolver;
import com.here.platform.pipeline.PipelineContext;
import java.io.Serializable;
import java.util.*;
import java.util.concurrent.ExecutionException;

public class RealTimeTrafficApiJava {

  private static final String LAYER_ID = "realtime-traffic";

  public static void main(String[] args)
      throws JsonProcessingException, ExecutionException, InterruptedException {

    // get real time traffic flow
    FlowResponse flowResponse = getRealTimeTrafficFlow("circle:52.51652,13.37885;r=3000");

    // convert the retrieved data to GeoJson and split them in tiles
    List<PartitionData> partitionData = prepareGeoJsonData(flowResponse.getResults());

    // publish data to output catalog
    PipelineContext pipelineContext = new PipelineContext();
    uploadPartitions(pipelineContext.getConfig().getOutputCatalog(), partitionData);
  }

  /** Get real-time traffic flow from the RealTimeTrafficApi */
  private static FlowResponse getRealTimeTrafficFlow(String geo) {
    BaseClient client = BaseClientJava.instance();
    RealTimeTrafficApi trafficApi = new RealTimeTrafficApi(client);

    return trafficApi
        .getFlow()
        .withIn(geo)
        .withLocationReferencing(Collections.singletonList("shape"))
        .withAdvancedFeatures(Collections.singletonList("deepCoverage"))
        .build()
        .toEntity();
  }

  /** Convert the real-time traffic flow response to GeoJson and splitting them by tiles */
  private static List<PartitionData> prepareGeoJsonData(List<FlowItem> flowItems)
      throws JsonProcessingException {

    ObjectMapper objectMapper = new ObjectMapper();
    objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);

    List<PartitionData> customDataList = new ArrayList<>();
    Map<Long, List<Feature>> tileIdToFeatureList = new HashMap<>();

    // go through the flow items and create features from the data
    flowItems.forEach(
        flowItem -> {
          List<List<Double>> coordinates = new ArrayList<>();
          // get coordinates from the location to create the geometry from
          flowItem
              .getLocation()
              .getShape()
              .orElse(new JLocationShape.Builder().build())
              .getLinks()
              .stream()
              .flatMap(link -> link.getPoints().stream())
              .forEach(point -> coordinates.add(Arrays.asList(point.getLng(), point.getLat())));

          // create lineString geometry
          LineString lineString = new LineString.Builder().withCoordinates(coordinates).build();

          // create feature using the lineString geometry and flow properties, such as speed,
          // jamFactor, etc
          Feature feature =
              new Feature.Builder()
                  .withId(UUID.randomUUID().toString())
                  .withGeometry(lineString)
                  .withProperties(getGeoJsonProperties(flowItem.getCurrentFlow()))
                  .build();

          // find tile ID for the given coordinate and save feature per each tile ID
          lineString
              .getCoordinates()
              .forEach(
                  list -> {
                    double latitude = list.get(1);
                    double longitude = list.get(0);
                    GeoCoordinate coordinate = new GeoCoordinate(latitude, longitude);

                    long tileId =
                        new HereTileResolver(new HereTileLevel(15)).fromCoordinate(coordinate);
                    List<Feature> featureList =
                        tileIdToFeatureList.computeIfAbsent(tileId, (k) -> new ArrayList<>());
                    featureList.add(feature);
                  });
        });

    // create partition data from the features
    for (Map.Entry<Long, List<Feature>> entry : tileIdToFeatureList.entrySet()) {
      Long tileId = entry.getKey();
      FeatureCollection featureCollection =
          new FeatureCollection.Builder().withFeatures(entry.getValue()).build();
      PartitionData customData =
          new PartitionData(
              Long.toString(tileId), LAYER_ID, objectMapper.writeValueAsBytes(featureCollection));
      customDataList.add(customData);
    }
    return customDataList;
  }

  /** Upload data to the versioned layer using Data Client */
  public static void uploadPartitions(HRN catalog, List<PartitionData> partitions)
      throws ExecutionException, InterruptedException {

    ActorSystem actorSystem = DataClientSparkContextUtils.context().actorSystem();

    // create writeEngine for source catalog
    WriteEngine writeEngine = DataEngine.get(actorSystem).writeEngine(catalog);

    // parallelism defines how many parallel requests would be made to fetch the data
    int parallelism = 10;

    // create a list partitions to upload
    ArrayList<PendingPartition> partitionList = new ArrayList<>();
    partitions.forEach(
        partitionData -> {
          NewPartition newPartition =
              new NewPartition.Builder()
                  .withPartition(partitionData.partition)
                  .withData(partitionData.data)
                  .withLayer(LAYER_ID)
                  .build();
          partitionList.add(newPartition);
        });

    // upload partitions to the catalog
    writeEngine
        .publishBatch2(
            parallelism,
            Optional.of(Collections.singletonList(LAYER_ID)),
            Collections.emptyList(),
            Source.from(partitionList))
        .toCompletableFuture()
        .get();
  }

  /** Generate the real-time traffic flow properties for the Feature object */
  private static Map<String, Object> getGeoJsonProperties(CurrentFlow currentFlow) {

    Map<String, Object> properties = new HashMap<>();
    properties.put("style", generateFillColor(currentFlow.getJamFactor()));
    properties.put("speed", currentFlow.getSpeed().orElse(0.0));
    properties.put("speedUncapped", currentFlow.getSpeedUncapped().orElse(0.0));
    properties.put("freeFlow", currentFlow.getFreeFlow());
    properties.put("jamFactor", currentFlow.getJamFactor());
    properties.put("confidence", currentFlow.getConfidence().orElse(0.0));
    properties.put("traversability", currentFlow.getTraversability().orElse("empty"));

    return properties;
  }

  /**
   * Generate RGB encoded color. For high Jam Factor color have more red component, for low Jam
   * Factor - more green component. Each color component can have value between 0 and 255, blue
   * component not used.
   */
  private static Map<String, Object> generateFillColor(double jamFactor) {
    Map<String, Object> style = new HashMap<>();
    int green;
    int red;
    if (jamFactor <= 3) {
      green = 255;
      red = (int) (255 * (jamFactor * 10) / 100 * 2);
    } else {
      green = (int) (255 * ((100 - (jamFactor * 10)) / 100));
      red = 255;
    }
    style.put("color", "rgb(" + red + ", " + green + ", 0)");
    style.put("weight", "5");
    return style;
  }

  /** Class to store the metadata and real-time traffic flow data */
  @SuppressWarnings("serial")
  static class PartitionData implements Serializable {

    String partition;

    String layerId;

    byte[] data;

    PartitionData(String partition, String layerId, byte[] data) {
      this.partition = partition;
      this.layerId = layerId;
      this.data = data;
    }
  }
}

/*
 * Copyright (c) 2018-2023 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import akka.stream.scaladsl.Source
import com.fasterxml.jackson.annotation.JsonInclude
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.here.hrn.HRN
import com.here.platform.data.client.base.ols.generated.codecs.traffic.JsonSupport._
import com.here.platform.data.client.base.ols.generated.scaladsl.api.traffic.RealTimeTrafficApi
import com.here.platform.data.client.base.ols.generated.scaladsl.model.traffic.{
  CurrentFlow,
  FlowItem,
  FlowResponse
}
import com.here.platform.data.client.base.scaladsl.BaseClient
import com.here.platform.data.client.engine.scaladsl.DataEngine
import com.here.platform.data.client.model.geojson.{Feature, FeatureCollection, LineString}
import com.here.platform.data.client.scaladsl.NewPartition
import com.here.platform.data.client.spark.DataClientSparkContextUtils.context.actorSystem
import com.here.platform.location.core.geospatial.GeoCoordinate
import com.here.platform.location.integration.herecommons.geospatial.{
  HereTileLevel,
  HereTileResolver
}
import com.here.platform.pipeline.PipelineContext

import java.util
import scala.collection.convert.ImplicitConversions.`map AsJavaMap`
import scala.collection.mutable.ListBuffer
import scala.concurrent.ExecutionContext.Implicits.global

object RealTimeTrafficApiScala {
  private val LAYER_ID = "realtime-traffic"

  def main(args: Array[String]): Unit = {

    // get real time traffic flow
    val flowResponse = getRealTimeTrafficFlow("circle:52.51652,13.37885;r=3000")

    // convert the retrieved data to GeoJson and split them in tiles
    val partitionData = prepareGeoJsonData(flowResponse.getResults)

    // publish data to output catalog
    val pipelineContext = new PipelineContext
    uploadPartitions(pipelineContext.config.outputCatalog, partitionData)

  }

  /**
    * Get real-time traffic flow from the RealTimeTrafficApi
    */
  def getRealTimeTrafficFlow(str: String): FlowResponse = {

    val client = BaseClient()
    val trafficApi = client.of[RealTimeTrafficApi]

    trafficApi
      .getFlow(
        in = str,
        locationReferencing = List("shape"),
        advancedFeatures = List("deepCoverage")
      )
      .toEntity()
  }

  /**
    * Convert the real-time traffic flow response to GeoJson
    * and splitting them by tiles
    */
  def prepareGeoJsonData(flowItems: util.List[FlowItem]): List[PartitionData] = {

    val objectMapper = new ObjectMapper()
    objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL)
    objectMapper.registerModule(new DefaultScalaModule)

    val customDataList = ListBuffer[PartitionData]()
    val tileIdToFeatureList = scala.collection.mutable.Map[Long, ListBuffer[Feature]]()

    // go through the flow items and create features from the data
    flowItems.forEach(item => {

      val coordinates = ListBuffer(ListBuffer[Double]())

      // get coordinates from the location to create the geometry from
      item.getLocation.shape.get.links
        .flatMap(link => link.points)
        .foreach(point => {
          coordinates += ListBuffer(point.getLng, point.getLat)
        })

      // create lineString geometry
      coordinates.remove(0)
      val lineString = LineString(
        coordinates = Option(coordinates.map(_.toList).toList)
      )

      // create feature using the lineString geometry and flow properties, such as speed,
      // jamFactor, etc
      val feature = Feature.apply(
        id = Option(java.util.UUID.randomUUID.toString),
        geometry = Option(lineString),
        properties = Option(getGeoJsonProperties(item.getCurrentFlow).toMap)
      )

      // find tile ID for the given coordinate and save feature per each tile ID
      lineString.getCoordinates.forEach(list => {

        val latitude = list.get(1)
        val longitude = list.get(0)
        val coordinate = GeoCoordinate.apply(latitude, longitude)

        val tileId = new HereTileResolver(HereTileLevel(15)).fromCoordinate(coordinate).value
        val featureList = tileIdToFeatureList.getOrElseUpdate(tileId, ListBuffer())

        featureList += feature
      })
    })

    // create partition data from the features
    tileIdToFeatureList
      .entrySet()
      .forEach(entry => {
        val tileId = entry.getKey
        val featureCollection = FeatureCollection.apply(features = entry.getValue.toList)

        val customData = PartitionData.apply(
          tileId.toString,
          LAYER_ID,
          objectMapper.writeValueAsBytes(featureCollection)
        )

        customDataList += customData

      })

    customDataList.toList
  }

  /**
    * Upload data to the versioned layer using Data Client
    */
  def uploadPartitions(catalog: HRN, partitionsData: Seq[PartitionData]): Unit = {
    // create writeEngine for a catalog
    val writeEngine = DataEngine().writeEngine(catalog)

    val partitionList = ListBuffer[NewPartition]()

    // create a list partitions to upload
    partitionsData.foreach(partitionsData => {
      val partition = NewPartition(
        partition = partitionsData.partition,
        layer = LAYER_ID,
        data = NewPartition.ByteArrayData(partitionsData.data)
      )

      partitionList += partition
    })

    // upload partitions to the catalog
    writeEngine.publishBatch2(parallelism = 10,
                              layerIds = Some(Seq(LAYER_ID)),
                              dependencies = Seq.empty,
                              partitions = Source.apply(partitionList.toList))
  }

  /**
    * Generate the real-time traffic flow properties for the Feature object
    */
  def getGeoJsonProperties(currentFlow: CurrentFlow): scala.collection.mutable.Map[String, Any] = {

    val properties = scala.collection.mutable.Map[String, Any]()

    properties += ("style" -> generateFillColor(currentFlow.getJamFactor))
    properties += ("speed" -> currentFlow.getSpeed.orElse(0.0))
    properties += ("speedUncapped" -> currentFlow.getSpeedUncapped.orElse(0.0))
    properties += ("freeFlow" -> currentFlow.getFreeFlow)
    properties += ("jamFactor" -> currentFlow.getJamFactor)
    properties += ("confidence" -> currentFlow.getConfidence.orElse(0.0))
    properties += ("traversability" -> currentFlow.getTraversability.orElse("empty"))

    properties

  }

  /**
    * Generate RGB encoded color. For high Jam Factor color have more red component, for low Jam
    * Factor - more green component. Each color component can have value between 0 and 255, blue
    * component not used.
    */
  private def generateFillColor(jamFactor: Double) = {
    val style = scala.collection.mutable.Map[String, Any]()
    var green = 0
    var red = 0
    if (jamFactor <= 3) {
      green = 255
      red = (255 * (jamFactor * 10) / 100 * 2).toInt
    } else {
      green = (255 * ((100 - (jamFactor * 10)) / 100)).toInt
      red = 255
    }
    val color = "rgb(" + red.toString + ", " + green.toString + ", 0)"
    style += ("color" -> color)
    style += ("weight" -> "5")
    style
  }

  /**
    * Class to store the metadata and real-time traffic flow data
    */
  case class PartitionData(partition: String, layer: String, data: Array[Byte])
}

Once the code is complete, you can prepare the resources and run the application.

Run the application locally

To run the application, you need to prepare the resources - create a catalog with a versioned layer.

In this tutorial, we will run our application locally, therefore, it will be enough for us to create a local catalog. Since they are contained in a local machine, they are not subject to naming conflicts within your realm and you can use any name you want.

Although we do not use the input catalog in our tutorial, we need to create it to fill the input-catalog field in the config file, otherwise, you will get an error about an invalid catalog HRN.

Run the following OLP CLI command from the root of the tutorial folder to create a local input catalog:

olp local catalog create batch-catalog batch-catalog --summary "Input catalog" --description "Input catalog"

Run the following OLP CLI command from the root of the tutorial folder to create a local output catalog with the versioned layer:

olp local catalog create output-batch-catalog output-batch-catalog --config output-catalog-configuration.json

The structure of the output-catalog-configuration.json file is as follows:


{
  "id": "realtime-traffic-output",
  "name": "Real time traffic output catalog",
  "summary": "Catalog with real time traffic",
  "description": "Catalog with real time traffic",
  "layers": [
    {
      "id": "realtime-traffic",
      "name": "realtime-traffic",
      "summary": "RealTimeTraffic",
      "description": "RealTimeTraffic",
      "contentType": "application/geo+json",
      "layerType": "versioned",
      "volume": {
        "volumeType": "durable"
      },
      "partitioning": {
        "scheme": "heretile",
        "tileLevels": [15]
      }
    }
  ]
}

Let's take a closer look at the parameters of the versioned layer. Property "contentType": "application/geo+json" means that the layer contains data in GeoJSON format. Property: scheme:heretile means that the data is stored as a quad-tree. The data will be stored on 15 zoom-level. The local data inspector can visualize this tiling schema and GeoJSON without any additional rendering plugins. After you have created the catalogs, run the application from the root of the downloaded tutorial using the following command:

Java
Scala
mvn compile exec:java -q -D"exec.mainClass"="RealTimeTrafficApiJava" \
-Dhere.platform.data-client.endpoint-locator.discovery-service-env=local \
-Dspark.master=local[*] \
-Dpipeline-config.file=local-pipeline-config.conf
mvn compile exec:java -q -D"exec.mainClass"="RealTimeTrafficApiScala" \
-Dhere.platform.data-client.endpoint-locator.discovery-service-env=local \
-Dspark.master=local[*] \
-Dpipeline-config.file=local-pipeline-config.conf

The command has the following parameters:

  • exec.mainClass – entry point to run your application.
  • here.platform.data-client.endpoint-locator.discovery-service-env=local – configures the Data Client Library to only use local catalogs.
  • spark.master=local[*] – configures a local Spark run with as many worker threads as there are logical cores on your machine.
  • pipeline-config.file=local-pipeline-config.conf - configuration file with information about the input and output catalogs.

After the application finishes successfully, you can see the data that was added to the versioned layer in the console.

Once the GeoJSON data is published to the catalog, let's view the results using the following command:

olp local catalog layer inspect hrn:local:data:::output-batch-catalog realtime-traffic

This command will open the browser tab showing a real-time traffic flow:

Now we can proceed to run this application on the platform.

Run the application on the platform

Before running the application on the platform, we need to prepare all the resources for it.

We will run the application in the project. To do this, let's create a project using the OLP CLI.

olp project create $PROJECT_ID $PROJECT_NAME

The OLP CLI should return the following message:

Project YOUR_PROJECT_HRN has been created.

Save the YOUR_PROJECT_HRN value to the $PROJECT_HRN console variable to simplify command running.

Since we will be using the HERE Traffic service, we need to link it to project, so that our HERE Traffic client could resolve the base url of the service. To do this, let's link the service to the project using the following command:

olp project resource link $PROJECT_HRN hrn:here:service::olp-here:traffic-api-7

The next step is to create input and output catalogs in the project we have created before. The --scope parameter is used to specify the project in the OLP CLI.

Note that the input catalog is not used in our application. We need this catalog to specify the input-catalog property in the pipeline-config.conf file.

Let's execute the following command from the root of the tutorial folder to create an input catalog:

 olp catalog create $CATALOG_ID $CATALOG_NAME  --summary "Input catalog" --description "Input catalog" --scope $PROJECT_HRN

The OLP CLI should return the following message:

Catalog YOUR_INPUT_CATALOG_HRN has been created.

The output catalog should be created using the output-catalog-configuration.json file. Run the following command from the root of the tutorial folder to create the output catalog:

 olp catalog create $CATALOG_ID $CATALOG_NAME --config output-catalog-configuration.json --scope $PROJECT_HRN

The next step is to configure the source code to use the output catalog we have created before. Replace the INPUT_CATALOG_HRN and OUTPUT_CATALOG_HRN placeholders in the pipeline-config.conf file with the HRNs of the catalogs you created as described above.

The application has to be packaged with all dependencies in a Fat JAR to be deployed on the platform. The Java/Scala SDK provides a platform profile to generate the Fat JAR. The following command uses the profile to generate the Fat JAR:

 mvn -Pplatform clean package

The real-time-traffic-tutorial-<tutorial-version>-platform.jar Fat JAR should be created in the target folder.

Now you have all the resources used by the application, and the application configured to use them and prepared to be deployed on the platform.

Let's create a pipeline in the project scope using the following OLP CLI command:

 olp pipeline create $PIPELINE_ID --scope $PROJECT_HRN

The OLP CLI should return the following message:

Pipeline YOUR_PIPELINE_ID has been created.

The next step is to create a pipeline template.

To create your own pipeline template, you need to specify the template name, batch runtime environment, since in this tutorial we run a Spark application and use the batch environment on the platform, the Fat JAR created by the mvn package -Pplatform command, the main class, the project that the pipeline belongs to, and input catalog IDs that are expected in the pipeline version configuration.

Let's create a pipeline template by running the following OLP CLI command from the root of the tutorial folder:

Java Application
Scala Application

olp pipeline template create traffic-template \
batch-3.0 target/real-time-traffic-tutorial-<tutorial-version>-platform.jar \
RealTimeTrafficApiJava --input-catalog-ids pipeline-config.conf \
--scope $PROJECT_HRN


olp pipeline template create traffic-template \
batch-3.0 target/real-time-traffic-tutorial-<tutorial-version>-platform.jar \
RealTimeTrafficApiScala --input-catalog-ids pipeline-config.conf \
--scope $PROJECT_HRN

The next step is to create a pipeline version.

Let's create a pipeline version in the project scope.

  olp pipeline version create test-spark-version $PIPELINE_ID $PIPELINE_TEMPLATE_ID pipeline-config.conf --scope $PROJECT_HRN

Note

If a billing tag is required in your realm, use the --billing-tag <your-billing-tag> parameter.

Once you have created the batch pipeline, pipeline template and pipeline version, you can run the application on the platform by activating the pipeline version.

To activate the pipeline version, run the following OLP CLI command:

  olp pipeline version activate $PIPELINE_ID $PIPELINE_VERSION_ID --scope $PROJECT_HRN

After the pipeline is activated, you need to wait until the pipeline reaches the Completed state. For this purpose, execute the following OLP CLI command:

olp pipeline version wait $PIPELINE_ID $PIPELINE_VERSION_ID \
                --job-state completed --timeout 600 \
                --scope $PROJECT_HRN

After your pipeline has reached the Completed state, you can check the result of the application.

For this purpose, execute the following OLP CLI command:

olp catalog layer inspect $OUTPUT_CATALOG_HRN realtime-traffic

This command will open the browser tab showing a real-time traffic flow:

Conclusion

In this tutorial, you have learned how to use the HERE Traffic API client in the Java/Scala Spark application to get a snapshot of the real-time traffic flow and visualize it on a map.

Further information

For more details on the topics covered in this tutorial, see the following sources:

results matching ""

    No results matching ""