Read historic index layer data
This tutorial demonstrates how to read historic index layer data. In this example, the historic index layer partitioning uses a temporal partition of a 24 hour timeWindow and spatial partitioning with a HERE Tiling scheme at zoom level 10. There is one entry for each day in the timeWindow column, and multiple HERE tile zoom level 10 entries for each day.
When you are reading from an index layer certain restrictions apply. For infomation about restrictions and known limiataions, see the Data API Developer's Guide.
Procedure
To get data from catalogs, add the data-engine
module as a dependency to your project.
The data-engine
module provides high level abstractions on top of the data-client
when working with HERE platform data. This module can read and manage both metadata and data.
For more information on adding the data-engine module as a dependency to your project, see the Data Client Library Guide.
To read historic index layer data, the fields for timeWindow
and tileId
must be set:
-
timeWindow
- Set a value that indicates the number of milliseconds since the beginning of 1970 (per the TIMEWINDOW data type). When used as an index field, this field should be set to the beginning of the 24-hour period for the data download. -
tileId
– Set to a level 10 HEREtile ID. For more information on tile IDs and the HEREtile Tiling Scheme, see HERE Lanes.
Code
The following code snippet demonstrates how to read data from the historic index layer of a HERE Probe Data catalog.
package com.here.paap.local
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ Sink, Source }
import com.here.hpd.schema.v1.probe_tile.ProbePoint
import com.here.hrn.HRN
import com.here.paap.util.AkkaImplicit
import com.here.platform.data.client.engine.scaladsl.DataEngine
import com.here.platform.data.client.model.{ HereTileIndexValue, TimeWindowIndexValue }
import com.here.platform.data.client.scaladsl.{ DataClient, IndexPartition }
import scala.concurrent.duration.Duration
import scala.concurrent.{ Await, ExecutionContext, Future }
object IndexLayerReader extends App {
implicit val system = ActorSystem("probe-services")
implicit val mat = ActorMaterializer()
implicit val ec = ExecutionContext.global
lazy val catalogHrn = "hrn:here:data::olp-here-probe:prasadsurve-3aartexkhyaz8ekcw1jau4yk8xqw1kqhf"
lazy val layerId = "prasadsurve-3aartexkhyaz8ekcw1jau4yk8xqw1kqhf-0"
val queryApi = DataClient().queryApi(HRN(catalogHrn))
val readEngine = DataEngine().readEngine(HRN(catalogHrn))
val queryString = "timeWindow==1640908800000;tileId==1476147"
val partitions = fetchIndexPartitions(queryString)
partitions
.map(partition => {
loadDrives(partition)
})
.runWith(Sink.ignore)
val parallelism = 10
def fetchIndexPartitions(baseQuery: String): Source[IndexPartition, NotUsed] = {
println(baseQuery)
implicit val ec = AkkaImplicit.ec
val driveMessages = queryApi.queryIndex(layerId, Some(baseQuery))
Await.result(driveMessages, Duration.Inf)
}
def loadDrives(partition: IndexPartition): Unit = {
val bytes = readEngine.getDataAsBytes(partition)
val dataF = bytes
.map(partitionData => {
val tileId = partition.fields.get.getOrElse("tileId", "").asInstanceOf[HereTileIndexValue].tile
val timeWindow = partition.fields.get.getOrElse("timeWindow", "").asInstanceOf[TimeWindowIndexValue].time
val probeData = ProbeData.parseFrom(partitionData)
println(
s" TILE_ID = ${tileId}, TIME_WINDOW = ${timeWindow}, PROBE DATA SIZE = ${probeData.probepoints.size}, DATA = ${probeData.probepoints.take(10)} "
)
})
Await.result(dataF, Duration.Inf)
}
}
Return
The following is a sample of what is returned from the above code snippet.
TILE_ID = 1476147, TIME_WINDOW = 1640822400000, PROBE DATA SIZE = 187671, DATA = Vector(ProbePoint(166.0,52.5314333,13.1548266,2KAtw1LTqqgthruZKxDEg,Some(Timestamp(1640891599,0)),42,), ProbePoint(346.0,52.5269383,13.1604431,2KAtw1LTqqgthruZKxDEg,Some(Timestamp(1640891959,0)),57,), ProbePoint(18.0,52.550941,13.1652703,2KAtw1LTqqgthruZKxDEg,Some(Timestamp(1640892289,0)),0,), ProbePoint(259.0,52.4583141,13.3319228,ac1FLlQKRFu2pzIWowDgDg,Some(Timestamp(1640888898,0)),44,), ProbePoint(208.0,52.4616646,13.3243461,ac1FLlQKRFu2pzIWowDgDg,Some(Timestamp(1640889228,0)),0,), ProbePoint(325.0,52.461683,13.3237886,ac1FLlQKRFu2pzIWowDgDg,Some(Timestamp(1640889288,0)),0,), ProbePoint(180.0,52.5275811,13.3432411,JToKOxTTjahquXZ04p26Q,Some(Timestamp(1640887309,0)),21,), ProbePoint(359.0,52.5273811,13.3433388,JToKOxTTjahquXZ04p26Q,Some(Timestamp(1640887369,0)),27,), ProbePoint(5.0,52.531732,13.3433613,JToKOxTTjahquXZ04p26Q,Some(Timestamp(1640887429,0)),34,), ProbePoint(236.0,52.5278053,13.2188525,Z3xV3JN1SQCMnvnJ5trtg,Some(Timestamp(1640889064,0)),49,))
For more information on the properties that appear in this tutorial, see the HERE Probe Data Catalog Specification.