Read historic index layer data

This tutorial demonstrates how to read historic index layer data. In this example, the historic index layer partitioning uses a temporal partition of a 24 hour timeWindow and spatial partitioning with a HERE Tiling scheme at zoom level 10. There is one entry for each day in the timeWindow column, and multiple HERE tile zoom level 10 entries for each day.

When you are reading from an index layer certain restrictions apply. For infomation about restrictions and known limiataions, see the Data API Developer's Guide.

Procedure

To get data from catalogs, add the data-engine module as a dependency to your project.

The data-engine module provides high level abstractions on top of the data-client when working with HERE platform data. This module can read and manage both metadata and data.

For more information on adding the data-engine module as a dependency to your project, see the Data Client Library Guide.

To read historic index layer data, the fields for timeWindow and tileId must be set:

  • timeWindow - Set a value that indicates the number of milliseconds since the beginning of 1970 (per the TIMEWINDOW data type). When used as an index field, this field should be set to the beginning of the 24-hour period for the data download.
  • tileId – Set to a level 10 HEREtile ID. For more information on tile IDs and the HEREtile Tiling Scheme, see HERE Lanes.

Code

The following code snippet demonstrates how to read data from the historic index layer of a HERE Probe Data catalog.

package com.here.paap.local

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ Sink, Source }
import com.here.hpd.schema.v1.probe_tile.ProbePoint
import com.here.hrn.HRN
import com.here.paap.util.AkkaImplicit
import com.here.platform.data.client.engine.scaladsl.DataEngine
import com.here.platform.data.client.model.{ HereTileIndexValue, TimeWindowIndexValue }
import com.here.platform.data.client.scaladsl.{ DataClient, IndexPartition }

import scala.concurrent.duration.Duration
import scala.concurrent.{ Await, ExecutionContext, Future }

object IndexLayerReader extends App {
  implicit val system = ActorSystem("probe-services")
  implicit val mat    = ActorMaterializer()
  implicit val ec     = ExecutionContext.global

  lazy val catalogHrn = "hrn:here:data::olp-here-probe:prasadsurve-3aartexkhyaz8ekcw1jau4yk8xqw1kqhf"
  lazy val layerId    = "prasadsurve-3aartexkhyaz8ekcw1jau4yk8xqw1kqhf-0"
  val queryApi        = DataClient().queryApi(HRN(catalogHrn))
  val readEngine      = DataEngine().readEngine(HRN(catalogHrn))

  val queryString = "timeWindow==1640908800000;tileId==1476147"

  val partitions = fetchIndexPartitions(queryString)

  partitions
    .map(partition => {
      loadDrives(partition)
    })
    .runWith(Sink.ignore)
  val parallelism = 10

  def fetchIndexPartitions(baseQuery: String): Source[IndexPartition, NotUsed] = {
    println(baseQuery)
    implicit val ec   = AkkaImplicit.ec
    val driveMessages = queryApi.queryIndex(layerId, Some(baseQuery))
    Await.result(driveMessages, Duration.Inf)
  }

  def loadDrives(partition: IndexPartition): Unit = {
    val bytes = readEngine.getDataAsBytes(partition)
    val dataF = bytes
      .map(partitionData => {
        val tileId     = partition.fields.get.getOrElse("tileId", "").asInstanceOf[HereTileIndexValue].tile
        val timeWindow = partition.fields.get.getOrElse("timeWindow", "").asInstanceOf[TimeWindowIndexValue].time
        val probeData  = ProbeData.parseFrom(partitionData)

        println(
          s" TILE_ID = ${tileId}, TIME_WINDOW = ${timeWindow}, PROBE DATA SIZE = ${probeData.probepoints.size}, DATA = ${probeData.probepoints.take(10)} "
        ) 
      })
    Await.result(dataF, Duration.Inf)
  }
}

Return

The following is a sample of what is returned from the above code snippet.

TILE_ID = 1476147, TIME_WINDOW = 1640822400000, PROBE DATA SIZE = 187671, DATA = Vector(ProbePoint(166.0,52.5314333,13.1548266,2KAtw1LTqqgthruZKxDEg,Some(Timestamp(1640891599,0)),42,), ProbePoint(346.0,52.5269383,13.1604431,2KAtw1LTqqgthruZKxDEg,Some(Timestamp(1640891959,0)),57,), ProbePoint(18.0,52.550941,13.1652703,2KAtw1LTqqgthruZKxDEg,Some(Timestamp(1640892289,0)),0,), ProbePoint(259.0,52.4583141,13.3319228,ac1FLlQKRFu2pzIWowDgDg,Some(Timestamp(1640888898,0)),44,), ProbePoint(208.0,52.4616646,13.3243461,ac1FLlQKRFu2pzIWowDgDg,Some(Timestamp(1640889228,0)),0,), ProbePoint(325.0,52.461683,13.3237886,ac1FLlQKRFu2pzIWowDgDg,Some(Timestamp(1640889288,0)),0,), ProbePoint(180.0,52.5275811,13.3432411,JToKOxTTjahquXZ04p26Q,Some(Timestamp(1640887309,0)),21,), ProbePoint(359.0,52.5273811,13.3433388,JToKOxTTjahquXZ04p26Q,Some(Timestamp(1640887369,0)),27,), ProbePoint(5.0,52.531732,13.3433613,JToKOxTTjahquXZ04p26Q,Some(Timestamp(1640887429,0)),34,), ProbePoint(236.0,52.5278053,13.2188525,Z3xV3JN1SQCMnvnJ5trtg,Some(Timestamp(1640889064,0)),49,))

Additional Information

For more information on the properties that appear in this tutorial, see the HERE Probe Data Catalog Specification.

results matching ""

    No results matching ""