Write Volatile Layer Data

The spark-support module provides the class LayerDataFrameWriter, a custom Spark DataFrameWriter for writing DataFrames to volatile layers.

Project Dependencies

If you want to create an application that uses the HERE platform Spark Connector to write data to volatile layer, please add the required dependencies to your project as described in chapter Dependencies for Spark Connector.

Formats

The spark connector provides write functionality for the following formats:

  • Protobuf
  • Avro
  • Parquet
  • Raw

Note: Usage specifics

For Raw format, data converter must be implemented from DataConverter trait/interface and set using withDataConverter method. Note that format method is not required.

Write process

For volatile layers, DataFrame rows are grouped by the mt_partition column to create the partition data. The data will be uploaded into the volatile layer using the write engine and afterwards published using the Publish API. Also, DataFrame can include additional metadata.

Please note that having multiple rows for the same partition is allowed only for Avro and Parquet format. For the other formats, this will throw an error.

Metadata fields

All provided metadata columns for volatile layer:

Column name Data Type Meaning Require
mt_partition String ID of partition in HERE platform Yes
mt_timestamp Long Timestamp of creation No
mt_checksum String Checksum of payload No
mt_crc String CRC of payload No
mt_dataSize Long Size of payload No
mt_compressedDataSize Long Compressed size of payload No

Note: Restrictions of metadata

Metadata column type must be as in the table above, otherwise it will throw an IllegalArgumentException exception. Extra fields that start with mt_ prefix but are not provided are ignored during write operation.

Write Data in Protobuf Format

The following snippet demonstrates how to manipulate and write a DataFrame[Row](Dataset<Row>) as a Protobuf-Encoded data file:

Scala
Java
import com.here.platform.data.client.spark.LayerDataFrameWriter._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, count, first, lit}
import org.apache.spark.sql.types.IntegerType
val sparkSession: SparkSession =
  SparkSession.builder().master("local").appName("write-protobuf").getOrCreate()

// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of a volatile layer containing protobuf schema
// with the required fields 'text' in String Type and 'count' in Integer Type)
val catalogHrn = HRN("hrn:here:data:::catalog-1")
val layerId = "volatile-protobuf-layer"
val inputDataFrame = loadDataFrame(sparkSession)
inputDataFrame.show()
// ------------------------
// |   text|partition_name|
// ------------------------
// |value-1|   partition-1|
// |value-2|   partition-2|
// |value-1|   partition-1|
// ------------------------

// Some computations
val computationDataFrame = inputDataFrame
  .groupBy("text")
  .agg(count("text").as("count"), first("partition_name").as("mt_partition"))

// Casting column type and adding extra mt_timestamp column
val outDataFrame = computationDataFrame
  .withColumn("count", col("count").cast(IntegerType))
  .withColumn("mt_timestamp", lit(System.currentTimeMillis))
outDataFrame.show()
// ------------------------------------------
// |   text|count|mt_partition| mt_timestamp|
// ------------------------------------------
// |value-2|    1| partition-1|1569598607978|
// |value-1|    2| partition-2|1569598607978|
// ------------------------------------------

outDataFrame
  .writeLayer(catalogHrn, layerId)
  .save()

sparkSession.stop()
import static org.apache.spark.sql.functions.*;

import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;

SparkSession sparkSession =
    SparkSession.builder().master("local").appName("write-protobuf").getOrCreate();

// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of a volatile layer containing protobuf schema
// with the required fields 'text' in String Type and 'count' in Integer Type)
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "volatile-protobuf-layer";
Dataset<Row> inputDataset = loadDataFrame(sparkSession);
inputDataset.show();
// ------------------------
// |   text|partition_name|
// ------------------------
// |value-1|   partition-1|
// |value-2|   partition-2|
// |value-1|   partition-1|
// ------------------------

// Some computations
Dataset<Row> computationDataset =
    inputDataset
        .groupBy("text")
        .agg(count("text").as("count"), first("partition_name").as("mt_partition"));

// Casting column type and adding extra mt_timestamp column
Dataset<Row> outDataset =
    computationDataset
        .withColumn("count", col("count").cast(IntegerType))
        .withColumn("mt_timestamp", lit(System.currentTimeMillis()));
outDataset.show();
// ------------------------------------------
// |   text|count|mt_partition| mt_timestamp|
// ------------------------------------------
// |value-2|    1| partition-1|1569598607978|
// |value-1|    2| partition-2|1569598607978|
// ------------------------------------------

JavaLayerDataFrameWriter.create(outDataset).writeLayer(catalogHrn, layerId).save();

sparkSession.stop();

Note: Restrictions

Metadata mt_partition column must have unique values, otherwise will throw an IllegalArgumentException exception. Layer must have application/x-protobuf content type.

Write Data in Avro and Parquet Formats

The following snippet demonstrates how to write a DataFrame[Row](Dataset<Row>) as an Avro or Parquet-Encoded data file:

Scala
Java
import com.here.hrn.HRN
import com.here.platform.data.client.spark.LayerDataFrameWriter._
import org.apache.spark.sql.SparkSession
val sparkSession: SparkSession =
  SparkSession.builder().master("local").appName("write-avro-or-parquet").getOrCreate()

// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of volatile layer)
val catalogHrn = HRN("hrn:here:data:::catalog-1")
val layerId = "volatile-parquet-layer"
val inputDataFrame: DataFrame = loadDataFrame(sparkSession)
inputDataFrame.show()
// ---------------------------
// |tileId|index|mt_partition|
// ---------------------------
// |     5|    1| partition-1|
// |     6|    2| partition-1|
// |     7|   10| partition-2|
// |     8|   20| partition-2|
// ---------------------------

inputDataFrame
  .writeLayer(catalogHrn, layerId)
  .save()

sparkSession.stop()
import static org.apache.spark.sql.functions.*;

import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;

SparkSession sparkSession =
    SparkSession.builder().master("local").appName("write-avro-or-parquet").getOrCreate();

// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of a versioned layer)
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "volatile-parquet-layer";
Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);

JavaLayerDataFrameWriter.create(inputDataFrame).writeLayer(catalogHrn, layerId).save();
sparkSession.stop();

Note: Usage specifics

Avro and Parquet are columnar storage formats. Therefore, the DataFrame can have duplicates in mt_partition column. Rows group by mt_partition and save each group in a single platform partition. Layer must have application/x-parquet or application/x-avro-binary content type in accordance to type.

Write Data in Raw Format

The following snippet demonstrates how to write a DataFrame as a file with an arbitrary format. In this example, the input DataFrame contains a column data with message as string.

Note: Restrictions

Metadata mt_partition column must have unique values, otherwise it will throw an IllegalArgumentException exception. Layer must have application/octet-stream content type.

Scala
Java
import com.here.hrn.HRN
import com.here.platform.data.client.spark.LayerDataFrameWriter._
import org.apache.spark.sql.SparkSession
val sparkSession: SparkSession =
  SparkSession.builder().master("local").appName("write-raw").getOrCreate()

// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of versioned layer)
val catalogHrn = HRN("hrn:here:data:::catalog-1")
val layerId = "versioned-raw-layer"
val inputDataFrame = loadDataFrame(sparkSession)
// -------------------
// |mt_partition| raw|
// -------------------
// | partition-1|[31]|
// | partition-2|[32]|
// | partition-3|[33]|
// -------------------

inputDataFrame
  .writeLayer(catalogHrn, layerId)
  .withDataConverter(new VolatileDataConverter {
    override def serializeGroup(rowMetadata: VolatileRowMetadata,
                                rows: Iterator[Row]): GroupedData[VolatileRowMetadata] = {
      // One row per partition Id so we have just to process first element of the iterator
      val bytes = "serializeGroup=>".getBytes ++ rows.next().getAs[Array[Byte]]("data")
      GroupedData(rowMetadata, bytes)
    }
  })
  .save()

sparkSession.stop()
import static org.apache.spark.sql.functions.*;

import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;

SparkSession sparkSession =
    SparkSession.builder().master("local").appName("write-raw").getOrCreate();

// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of a volatile layer)
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "volatile-raw-layer";
Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);
// -------------------
// |mt_partition| raw|
// -------------------
// | partition-1|[31]|
// | partition-2|[32]|
// | partition-3|[33]|
// -------------------

JavaLayerDataFrameWriter.create(inputDataFrame)
    .writeLayer(catalogHrn, layerId)
    .withDataConverter(
        new VolatileDataConverter() {
          @Override
          public GroupedData<VolatileRowMetadata> serializeGroup(
              VolatileRowMetadata rowMetadata, Iterator<Row> rows) {
            byte[] bytes =
                ArrayUtils.addAll("serializeGroup=>".getBytes(), rows.next().getAs("data"));
            return new GroupedData<>(rowMetadata, bytes);
          }
        })
    .save();

sparkSession.stop();

Note: Usage specifics

The connector groups rows by mt_partition column value. In this case, data converter must be implemented from DataConverter trait/interface and set using withDataConverter method. Layer must have must have application/octet-stream content type.

For Raw format, data converter must be implemented from DataConverter trait/interface and set using withDataConverter method. Note that format method is not required.

results matching ""

    No results matching ""