Write Volatile Layer Data
The spark-support
module provides the class LayerDataFrameWriter
, a custom Spark DataFrameWriter for writing DataFrames to volatile layers.
Project Dependencies
If you want to create an application that uses the HERE platform Spark Connector to write data to volatile layer, add the required dependencies to your project as described in chapter Dependencies for Spark Connector.
The spark connector provides write functionality for the following formats:
- Protobuf
- Avro
- Parquet
- Raw
- JSON
- Text
- Csv
Note
Usage specifics
For Raw
format, data converter must be implemented from DataConverter
trait/interface and set using withDataConverter
method. Note that format
method is not required.
Write process
For volatile layers, DataFrame
rows are grouped by the mt_partition
column to create the partition data. The data will be uploaded into the volatile layer using the write engine and afterwards published using the Publish API. Also, DataFrame
can include additional metadata.
Note that having multiple rows for the same partition is allowed only for Avro
and Parquet
format. For the other formats, this will throw an error.
All provided metadata columns for volatile layer:
Column name | Data Type | Meaning | Require |
mt_partition | String | ID of partition in HERE platform | Yes |
mt_timestamp | Long | Timestamp of creation | No |
mt_checksum | String | Checksum of payload | No |
mt_crc | String | CRC of payload | No |
mt_dataSize | Long | Size of payload | No |
mt_compressedDataSize | Long | Compressed size of payload | No |
Note
Restrictions of metadata
Metadata column type must be as in the table above, otherwise it will throw an IllegalArgumentException
exception. Extra fields that start with mt_
prefix but are not provided are ignored during write operation.
The following snippet demonstrates how to manipulate and write a DataFrame[Row]
(Dataset<Row>
) as a Protobuf-Encoded data file:
import com.here.platform.data.client.spark.LayerDataFrameWriter._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, count, first, lit}
import org.apache.spark.sql.types.IntegerType
val sparkSession: SparkSession =
SparkSession.builder().master("local").appName("write-protobuf").getOrCreate()
val inputDataFrame = loadDataFrame(sparkSession)
inputDataFrame.show()
val computationDataFrame = inputDataFrame
.groupBy("text")
.agg(count("text").as("count"), first("partition_name").as("mt_partition"))
val outDataFrame = computationDataFrame
.withColumn("count", col("count").cast(IntegerType))
.withColumn("mt_timestamp", lit(System.currentTimeMillis))
outDataFrame.show()
outDataFrame
.writeLayer(catalogHrn, layerId)
.save()
sparkSession.stop()
import static org.apache.spark.sql.functions.*;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;
SparkSession sparkSession =
SparkSession.builder().master("local").appName("write-protobuf").getOrCreate();
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "volatile-protobuf-layer";
Dataset<Row> inputDataset = loadDataFrame(sparkSession);
inputDataset.show();
Dataset<Row> computationDataset =
inputDataset
.groupBy("text")
.agg(count("text").as("count"), first("partition_name").as("mt_partition"));
Dataset<Row> outDataset =
computationDataset
.withColumn("count", col("count").cast(IntegerType))
.withColumn("mt_timestamp", lit(System.currentTimeMillis()));
outDataset.show();
JavaLayerDataFrameWriter.create(outDataset).writeLayer(catalogHrn, layerId).save();
sparkSession.stop();
Note
Restrictions
Metadata mt_partition
column must have unique values, otherwise will throw an IllegalArgumentException
exception. Layer must have application/x-protobuf
content type.
The following snippet demonstrates how to write a DataFrame[Row]
(Dataset<Row>
) as an Avro or Parquet-Encoded data file:
import com.here.platform.data.client.spark.LayerDataFrameWriter._
import org.apache.spark.sql.SparkSession
val sparkSession: SparkSession =
SparkSession.builder().master("local").appName("write-avro-or-parquet").getOrCreate()
val inputDataFrame: DataFrame = loadDataFrame(sparkSession)
inputDataFrame.show()
inputDataFrame
.writeLayer(catalogHrn, layerId)
.save()
sparkSession.stop()
import static org.apache.spark.sql.functions.*;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;
SparkSession sparkSession =
SparkSession.builder().master("local").appName("write-avro-or-parquet").getOrCreate();
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "volatile-parquet-layer";
Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);
JavaLayerDataFrameWriter.create(inputDataFrame).writeLayer(catalogHrn, layerId).save();
sparkSession.stop();
Note
Usage specifics
Avro and Parquet are columnar storage formats. Therefore, the DataFrame
can have duplicates in mt_partition
column. Rows group by mt_partition
and save each group in a single platform partition. Layer must have application/x-parquet
or application/x-avro-binary
content type in accordance to type.
The following snippet demonstrates how to write a DataFrame
as a file with an arbitrary format. In this example, the input DataFrame
contains a column data
with message as string.
Note
Restrictions
Metadata mt_partition
column must have unique values, otherwise it will throw an IllegalArgumentException
exception. Layer must have application/octet-stream
content type.
import com.here.platform.data.client.spark.LayerDataFrameWriter._
import org.apache.spark.sql.SparkSession
val sparkSession: SparkSession =
SparkSession.builder().master("local").appName("write-raw").getOrCreate()
val inputDataFrame = loadDataFrame(sparkSession)
inputDataFrame
.writeLayer(catalogHrn, layerId)
.withDataConverter(new VolatileDataConverter {
override def serializeGroup(rowMetadata: VolatileRowMetadata,
rows: Iterator[Row]): GroupedData[VolatileRowMetadata] = {
val bytes = "serializeGroup=>".getBytes ++ rows.next().getAs[Array[Byte]]("data")
GroupedData(rowMetadata, bytes)
}
})
.save()
sparkSession.stop()
import static org.apache.spark.sql.functions.*;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;
SparkSession sparkSession =
SparkSession.builder().master("local").appName("write-raw").getOrCreate();
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "volatile-raw-layer";
Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);
JavaLayerDataFrameWriter.create(inputDataFrame)
.writeLayer(catalogHrn, layerId)
.withDataConverter(
new VolatileDataConverter() {
@Override
public GroupedData<VolatileRowMetadata> serializeGroup(
VolatileRowMetadata rowMetadata, Iterator<Row> rows) {
byte[] bytes =
ArrayUtils.addAll("serializeGroup=>".getBytes(), rows.next().getAs("data"));
return new GroupedData<>(rowMetadata, bytes);
}
})
.save();
sparkSession.stop();
The following snippet demonstrates how to write a DataFrame
as a file with an arbitrary format. In this example, the input DataFrame
contains column intVal
as integer, strVal
as string.
Note
Restrictions
Metadata mt_partition
column must have unique values, otherwise it will throw an IllegalArgumentException
exception. Layer must have application/json
content type.
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.data.client.spark.LayerDataFrameWriter._
val sparkSession: SparkSession =
SparkSession.builder().master("local").appName("write-json").getOrCreate()
val inputDataFrame = loadDataFrame(sparkSession)
inputDataFrame
.writeLayer(catalogHrn, layerId)
.save()
sparkSession.stop()
import static org.apache.spark.sql.functions.*;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;
SparkSession sparkSession =
SparkSession.builder().master("local").appName("write-json").getOrCreate();
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "volatile-json-layer";
Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);
JavaLayerDataFrameWriter.create(inputDataFrame).writeLayer(catalogHrn, layerId).save();
sparkSession.stop();
Write Data in Text Format
The following snippet demonstrates how to write a DataFrame
as a file with an arbitrary format. In this example, the input DataFrame
contains a column data
with message as string.
Note
Restrictions
Metadata mt_partition
column must have unique values, otherwise it will throw an IllegalArgumentException
exception. Layer must have text/plain
content type. While writing data, Text data source only supports a single column per row.
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.data.client.spark.LayerDataFrameWriter._
val sparkSession: SparkSession =
SparkSession.builder().master("local").appName("write-text").getOrCreate()
val inputDataFrame = loadDataFrame(sparkSession)
inputDataFrame
.writeLayer(catalogHrn, layerId)
.save()
sparkSession.stop()
import static org.apache.spark.sql.functions.*;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;
SparkSession sparkSession =
SparkSession.builder().master("local").appName("write-text").getOrCreate();
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "volatile-text-layer";
Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);
JavaLayerDataFrameWriter.create(inputDataFrame).writeLayer(catalogHrn, layerId).save();
sparkSession.stop();
The following snippet demonstrates how to write a DataFrame
as a file with an arbitrary format. In this example, the input DataFrame
contains column field1
as integer, field2
as string.
Note
Restrictions
Metadata mt_partition
column must have unique values, otherwise it will throw an IllegalArgumentException
exception. Layer must have text/csv
content type.
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.data.client.spark.LayerDataFrameWriter._
val sparkSession: SparkSession =
SparkSession.builder().master("local").appName("write-csv").getOrCreate()
val inputDataFrame = loadDataFrame(sparkSession)
inputDataFrame
.writeLayer(catalogHrn, layerId)
.option("header", "true")
.save()
sparkSession.stop()
import static org.apache.spark.sql.functions.*;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.IntegerType;
SparkSession sparkSession =
SparkSession.builder().master("local").appName("write-csv").getOrCreate();
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "volatile-csv-layer";
Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);
JavaLayerDataFrameWriter.create(inputDataFrame)
.writeLayer(catalogHrn, layerId)
.option("header", "true")
.save();
sparkSession.stop();
Note
Usage specifics
The connector groups rows by mt_partition
column value. In this case, data converter must be implemented from DataConverter
trait/interface and set using withDataConverter
method. Layer must have must have application/octet-stream
content type.
For Raw
format, data converter must be implemented from DataConverter
trait/interface and set using withDataConverter
method. Note that format
method is not required.