Write Index Layer Data
The Data Client Library provides the class LayerDataFrameWriter
, a custom Spark DataFrameWriter for writing DataFrames to index layers.
For index layers, data will be grouped by the values of the index attributes defined for the layer. If writing to a Protobuf-Encoded layer, there needs to be only one Row
for each set of index attributes.
Project Dependencies
If you want to create an application that uses the HERE platform Spark Connector to write data to index layer, add the required dependencies to your project as described in chapter Dependencies for Spark Connector.
Write process
For index layers, DataFrame
rows are grouped by indexes attributes to create the index data. The data will be uploaded into the index layer using the write engine and afterwards published using the Publish API. Also, DataFrame
can include additional metadata.
All provided metadata columns for index layer:
Column name | Data Type | Meaning | Require |
mt_metadata | Map[String, String] | Metadata of partition | No |
mt_timestamp | Long | Timestamp of creation (UTC) | No |
mt_checksum | String | Checksum of payload | No |
mt_crc | String | CRC of payload | No |
mt_dataSize | Long | Size of payload | No |
mt_compressedDataSize | Long | Compressed size of payload | No |
The following snippet demonstrates how to write a DataFrame
as an Avro, Parquet, or Protobuf-Encoded data file to an index layer of a catalog:
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.data.client.spark.LayerDataFrameWriter.DataFrameExt
import com.here.platform.pipeline.PipelineContext
import org.apache.spark.sql.{DataFrame, SparkSession}
inputDF
.writeLayer(outputCatalogHrn, outputLayerId)
.save()
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
JavaLayerDataFrameWriter.create(inputDF)
.writeLayer(outputCatalogHrn, outputLayerId)
.option("olp.connector.metadata-columns", true)
.save();
The following snippet demonstrates how to write a DataFrame
as a file with an arbitrary format. In this example, the input DataFrame
contains column field1
as integer, field2
as string.
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.data.client.spark.LayerDataFrameWriter._
val sparkSession: SparkSession =
SparkSession.builder().master("local").appName("write-csv").getOrCreate()
val inputDataFrame = loadDataFrame(sparkSession)
inputDataFrame
.writeLayer(catalogHrn, layerId)
.option("header", "true")
.save()
sparkSession.stop()
import com.here.platform.data.client.spark.scaladsl.DataConverter;
import com.here.platform.data.client.spark.scaladsl.GroupedData;
import com.here.platform.data.client.spark.scaladsl.SerializableField;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
SparkSession sparkSession =
SparkSession.builder().master("local").appName("write-csv").getOrCreate();
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "index-csv-layer";
Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);
JavaLayerDataFrameWriter.create(inputDataFrame)
.writeLayer(catalogHrn, layerId)
.option("header", "true")
.save();
sparkSession.stop();
Write Data in Text Format
The following snippet demonstrates how to write a DataFrame
as a file with an arbitrary format. In this example, the input DataFrame
contains a column data
with message as string.
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.data.client.spark.LayerDataFrameWriter._
val sparkSession: SparkSession =
SparkSession.builder().master("local").appName("write-text").getOrCreate()
val inputDataFrame = loadDataFrame(sparkSession)
inputDataFrame
.writeLayer(catalogHrn, layerId)
.option("header", "true")
.save()
sparkSession.stop()
import com.here.platform.data.client.spark.scaladsl.DataConverter;
import com.here.platform.data.client.spark.scaladsl.GroupedData;
import com.here.platform.data.client.spark.scaladsl.SerializableField;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
SparkSession sparkSession =
SparkSession.builder().master("local").appName("write-text").getOrCreate();
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "index-json-layer";
Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);
JavaLayerDataFrameWriter.create(inputDataFrame)
.writeLayer(catalogHrn, layerId)
.option("header", "true")
.save();
sparkSession.stop();
The following snippet demonstrates how to write a DataFrame
as a file with an arbitrary format. In this example, the input DataFrame
contains column intVal
as integer, strVal
as string.
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.data.client.spark.LayerDataFrameWriter._
val sparkSession: SparkSession =
SparkSession.builder().master("local").appName("write-json").getOrCreate()
val inputDataFrame = loadDataFrame(sparkSession)
inputDataFrame
.writeLayer(catalogHrn, layerId)
.option("header", "true")
.save()
sparkSession.stop()
import com.here.platform.data.client.spark.scaladsl.DataConverter;
import com.here.platform.data.client.spark.scaladsl.GroupedData;
import com.here.platform.data.client.spark.scaladsl.SerializableField;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
SparkSession sparkSession =
SparkSession.builder().master("local").appName("write-json").getOrCreate();
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "index-json-layer";
Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);
JavaLayerDataFrameWriter.create(inputDataFrame)
.writeLayer(catalogHrn, layerId)
.option("header", "true")
.save();
sparkSession.stop();
The following snippet demonstrates how to write a DataFrame
as a file with an arbitrary format to an index layer of a catalog. In this example, the input DataFrame
contains a column data
with messages as strings and the data of multiple rows is simply concatenated together:
import com.here.examples.platform.data.client.spark.ExampleUtil._
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.data.client.spark.LayerDataFrameWriter.DataFrameExt
import com.here.platform.data.client.spark.scaladsl.{
GroupedData,
IndexDataConverter,
IndexRowMetadata
}
import com.here.platform.pipeline.PipelineContext
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{Encoders, Row, SparkSession}
inputDF
.writeLayer(catalogHrn, layerId)
.withDataConverter(new IndexDataConverter {
override def serializeGroup(
rowMetadata: IndexRowMetadata,
rows: Iterator[Row]
): GroupedData[IndexRowMetadata] = {
val joinedText = rows
.map(_.getAs[Array[Byte]]("data").map(_.toChar).mkString)
.mkString
GroupedData(rowMetadata, joinedText.getBytes())
}
})
.save()
import com.here.platform.data.client.spark.scaladsl.DataConverter;
import com.here.platform.data.client.spark.scaladsl.GroupedData;
import com.here.platform.data.client.spark.scaladsl.SerializableField;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
JavaLayerDataFrameWriter.create(inputDF)
.writeLayer(catalogHrn, layerId)
.withDataConverter(
new IndexDataConverter() {
@Override
public GroupedData<IndexRowMetadata> serializeGroup(
IndexRowMetadata rowMetadata, Iterator<Row> rows) {
StringBuilder builder = new StringBuilder();
rows.forEachRemaining(row -> builder.append(new String(row.<byte[]>getAs("data"))));
String joinedText = builder.toString();
return new GroupedData<>(rowMetadata, joinedText.getBytes());
}
})
.save();
Note
For information on RSQL, see RSQL.