Read Volatile Layer Data

The Data Client Library provides the class LayerDataFrameReader, a custom Spark DataFrameReader for creating DataFrames that contain the data for all supported layer type including volatile layer.

All the formats supported by DataFrameReader are also supported by the LayerDataFrameReader. Additionally, formats such as Apache Avro, Apache Parquet, Protobuf and raw byte arrays (octet-stream).

Read process

Read operation works according to the following steps:

  1. Spark connector starts with a first communication with the server to get some useful information. For example layer type, layer schema, layer encoding format, etc.
  2. Partitions within the layer get filtered using the provided filter query. If the query is not provided, the value "mt_timestamp=ge=0" will be used by default, and it would mean that all the partitions will be matched.
  3. At this stage, we know the layer format. We can now create its Spark corresponding file format and with partition data, we have an iterator of rows (records).
  4. Some implicit columns will be added to each row depending on the layer type and partition metadata.
  5. The resulting rows will be handed over to the Spark framework to return the finalized DataFrame.

Dataframe columns

Besides the user-defined columns which derive from the partition data, Spark connector provides additional columns used to represent the data partitioning information and partition payload attributes.

Data columns

Corresponds to user defined columns and derives from the partition data.

Layer partitioning columns

Column name Data Type Meaning
mt_partition String partition Id

Partition payload attribute columns

Column name Data Type Meaning
mt_dataHandle String Handler of the data (deprecated)
mt_metadata Map[String, String] Metadata of partition (deprecated)
mt_timestamp Long Timestamp of creation (UTC)
mt_checksum String Checksum of payload
mt_crc String CRC of payload
mt_dataSize Long Size of payload
mt_compressedDataSize Long Compressed size of payload

Project Dependencies

If you want to create an application that uses the HERE platform Spark Connector to read data from volatile layer, add the required dependencies to your project as described in chapter Dependencies for Spark Connector.

Read Parquet-Encoded Data

The following snippet demonstrates how to access a Parquet-encoded DataFrame from a volatile layer of a catalog. Note that the parquet schema is expected to be bundled with the data. Therefore, you don't need to specify the format explicitly.

Scala
Java
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import org.apache.spark.sql.SparkSession
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an versioned layer containing parquet-encoded SDII data
val reader = sparkSession
  .readLayer(catalogHrn, layerId)
  .query(s"mt_partition==$tileId")

if (compressed)
  reader.option("olp.connector.data-decompression-timeout", 300000)
val df = reader.load()

df.printSchema()

val messagesWithAtLeastOneSignRecognition = df
  .select("pathEvents.signRecognition")
  .where("size(pathEvents.signRecognition) > 0")

val count = messagesWithAtLeastOneSignRecognition.count()
import static org.apache.spark.sql.functions.*;

import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;

// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an volatile layer containing parquet-encoded SDII data
Dataset<Row> dataFrame =
    JavaLayerDataFrameReader.create(sparkSession)
        .readLayer(catalogHrn, layerId)
        .query(String.format("mt_partition==%s", tileId))
        .load();

dataFrame.printSchema();

Dataset<Row> messagesWithAtLeastOneSignRecognition =
    dataFrame
        .select("pathEvents.signRecognition")
        .where("size(pathEvents.signRecognition) > 0");

Long count = messagesWithAtLeastOneSignRecognition.count();

Read Avro-Encoded Data

The following snippet demonstrates how to access an Avro-encoded DataFrame from a volatile layer of a catalog. Note that the avro schema is expected to be bundled with the data. Therefore, you don't need to specify the format explicitly.

Scala
Java
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import org.apache.spark.sql.SparkSession
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an versioned layer containing avro-encoded SDII data
val reader = sparkSession
  .readLayer(catalogHrn, layerId)
  .query(s"mt_partition==$tileId")

if (compressed)
  reader.option("olp.connector.data-decompression-timeout", 300000)

val df = reader.load()

df.printSchema()

val messagesWithAtLeastOneSignRecognition = df
  .select("pathEvents.signRecognition")
  .where("size(pathEvents.signRecognition) > 0")

val count = messagesWithAtLeastOneSignRecognition.count()
import static org.apache.spark.sql.functions.*;

import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;

// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of a volatile layer containing avro-encoded SDII data
Dataset<Row> dataFrame =
    JavaLayerDataFrameReader.create(sparkSession)
        .readLayer(catalogHrn, layerId)
        .query(String.format("mt_partition==%s", tileId))
        .load();

Dataset<Row> messagesWithAtLeastOneSignRecognition =
    dataFrame
        .select("pathEvents.signRecognition")
        .where("size(pathEvents.signRecognition) > 0");

Long count = messagesWithAtLeastOneSignRecognition.count();

Read Protobuf-Encoded Data

The following snippet demonstrates how to access a Protobuf-encoded DataFrame from a volatile layer of a catalog. Note that the protobuf schema is expected to be referenced from the layer configuration. Therefore, you don't need to specify the format explicitly.

Scala
Java
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an volatile layer containing protobuf-encoded SDII data that has copy of real data
// from "indexed-locations" layer of "hrn:here:data:::rib-2" catalog)
val dataFrame = sparkSession
  .readLayer(catalogHrn, layerId)
  .query("mt_partition=in=(DEU, CUB)")
  .load()

val tileIdLists = dataFrame
  .select(col("partition_name"), explode(col("tile_id")).as("tile"))

tileIdLists.show()
import static org.apache.spark.sql.functions.*;

import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;

// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an index layer containing protobuf-encoded SDII data that has copy
// of real data
// from "indexed-locations" layer of "hrn:here:data:::rib-2" catalog)
Dataset<Row> dataFrame =
    JavaLayerDataFrameReader.create(sparkSession)
        .readLayer(catalogHrn, layerId)
        .query("mt_partition=in=(DEU, CUB)")
        .load();

Dataset<Row> tileIdLists =
    dataFrame.select(
        dataFrame.col("partition_name"),
        functions.explode(dataFrame.col("tile_id")).as("tile"));

tileIdLists.show();

Note that to read protobuf data from a layer, the schema must be specified in the layer configuration and needs to be available on Artifact Service. Furthermore the schema must have a ds variant. For more information on how to maintain schemas, see the Archetypes Developer's Guide.

Read JSON-Encoded Data

The following snippet demonstrates how to access a JSON-encoded DataFrame from a volatile layer of a catalog. In this example, the JSON object contains property intVal as integer and strVal as string.

Scala
Java
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import org.apache.spark.sql.SparkSession
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of volatile layer)
val df = sparkSession
  .readLayer(catalogHrn, layerId)
  .query(s"mt_partition=in=(${partitionIds mkString ", "})")
  .load()

df.select("mt_partition", "intVal").where("intVal > 0").show()

df.printSchema()
import static org.apache.spark.sql.functions.*;

import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;

// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an volatile layer containing parquet-encoded SDII data
Dataset<Row> dataFrame =
    JavaLayerDataFrameReader.create(sparkSession)
        .readLayer(catalogHrn, layerId)
        .query(
            "mt_partition=in=("
                + partitionId1
                + ", "
                + partitionId2
                + ", "
                + partitionId3
                + ")")
        .load();

dataFrame.select("mt_partition", "intVal").where("intVal > 0").show();

dataFrame.printSchema();

Read Text-Encoded Data

The following snippet demonstrates how to access a Text-encoded DataFrame from a volatile layer of a catalog. In this example, the row object contains field data as string.

Note

Restrictions
While reading Text data, each line becomes each row that has string value column by default. Therefore, Text data source has only a single column value per row.

Scala
Java
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import org.apache.spark.sql.SparkSession
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of volatile layer)
val df = sparkSession
  .readLayer(catalogHrn, layerId)
  .query(s"mt_partition=in=(${partitionIds mkString ", "})")
  .load()

df.select("mt_partition", "value").show()

df.printSchema()
import static org.apache.spark.sql.functions.*;

import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;

// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an volatile layer containing parquet-encoded SDII data
Dataset<Row> dataFrame =
    JavaLayerDataFrameReader.create(sparkSession)
        .readLayer(catalogHrn, layerId)
        .query(
            "mt_partition=in=("
                + partitionId1
                + ", "
                + partitionId2
                + ", "
                + partitionId3
                + ")")
        .load();

dataFrame.select("mt_partition", "value").show();

dataFrame.printSchema();

Read Csv-Encoded Data

The following snippet demonstrates how to access a Csv-encoded DataFrame from a volatile layer of a catalog. In this example, the csv row contains columns field1 as integer and field2 as string.

Scala
Java
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import org.apache.spark.sql.SparkSession
// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of volatile layer)
val df = sparkSession
  .readLayer(catalogHrn, layerId)
  .query(s"mt_partition=in=(${partitionIds mkString ", "})")
  .load()

df.select("mt_partition", "field1").where("field1 > 0").show()

df.printSchema()
import static org.apache.spark.sql.functions.*;

import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;

// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an volatile layer containing parquet-encoded SDII data
Dataset<Row> dataFrame =
    JavaLayerDataFrameReader.create(sparkSession)
        .readLayer(catalogHrn, layerId)
        .query(
            "mt_partition=in=("
                + partitionId1
                + ", "
                + partitionId2
                + ", "
                + partitionId3
                + ")")
        .load();

dataFrame.select("mt_partition", "field1").where("field1 > 0").show();

dataFrame.printSchema();

Read Other Formats

The following snippet demonstrates how to access data in any arbitrary format from a volatile layer of a catalog:

Scala
Java
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import org.apache.spark.sql.{Encoders, SparkSession}
val reader = sparkSession
  .readLayer(catalogHrn, layerId)
  .query(s"mt_partition==$tileId")

if (compressed)
  reader.option("olp.connector.data-decompression-timeout", 300000)
val dataFrame = reader.load()

dataFrame.printSchema()

val dataFrameStringContent = dataFrame
  .map(_.getAs[Array[Byte]]("data").map(_.toChar).mkString)(Encoders.STRING)
  .first()
import static org.apache.spark.sql.functions.*;

import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameReader;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;

Dataset<Row> dataFrame =
    JavaLayerDataFrameReader.create(sparkSession)
        .readLayer(catalogHrn, layerId)
        .query(String.format("mt_partition==%s", tileId))
        .load();

dataFrame.printSchema();

List<String> dataFrameStringContent =
    dataFrame
        .map(
            (MapFunction<Row, String>) row -> new String(row.<byte[]>getAs("data")),
            Encoders.STRING())
        .collectAsList();

Note

raw format refers to application/octet-stream in layer config and not to be confused with raw layer config.

By default, there is defined .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ").

In contrast to index and versioned layers volatile partitions can expire. Although metadata are kept if the volatile data expire the Spark connector does skip all the expired partitions when calling load method.

For information on RSQL, see RSQL.

results matching ""

    No results matching ""