Write Index Layer Data

The Data Client Library provides the class LayerDataFrameWriter, a custom Spark DataFrameWriter for writing DataFrames to index layers.

For index layers, data will be grouped by the values of the index attributes defined for the layer. If writing to a Protobuf-Encoded layer, there needs to be only one Row for each set of index attributes.

Project Dependencies

If you want to create an application that uses the HERE platform Spark Connector to write data to index layer, add the required dependencies to your project as described in chapter Dependencies for Spark Connector.

Write process

For index layers, DataFrame rows are grouped by indexes attributes to create the index data. The data will be uploaded into the index layer using the write engine and afterwards published using the Publish API. Also, DataFrame can include additional metadata.

Metadata fields

All provided metadata columns for index layer:

Column name Data Type Meaning Require
mt_metadata Map[String, String] Metadata of partition No
mt_timestamp Long Timestamp of creation (UTC) No
mt_checksum String Checksum of payload No
mt_crc String CRC of payload No
mt_dataSize Long Size of payload No
mt_compressedDataSize Long Compressed size of payload No

Write Data as Avro, Parquet, or Protobuf-Encoded Files

The following snippet demonstrates how to write a DataFrame as an Avro, Parquet, or Protobuf-Encoded data file to an index layer of a catalog:

Scala
Java
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.data.client.spark.LayerDataFrameWriter.DataFrameExt
import com.here.platform.pipeline.PipelineContext
import org.apache.spark.sql.{DataFrame, SparkSession}
// val inputDF: DataFrame (Input data stored as a DataFrame)
// val outputCatalogHrn: HRN (HRN of the output catalog that contains the layer $outputLayerId)
// val outputLayerId: String (ID of the output index layer. If protobuf, the schema should match the non-indexed columns of $inputDF)
inputDF
  .writeLayer(outputCatalogHrn, outputLayerId)
  .save()
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Dataset<Row> inputDF (Input data stored as a DataFrame)
// HRN outputCatalogHrn (HRN of a catalog that contains the layer $outputLayerId)
// String outputLayerId (ID of the output index layer. If protobuf, the schema should match the
// non-indexed columns of $inputDF)
JavaLayerDataFrameWriter.create(inputDF)
    .writeLayer(outputCatalogHrn, outputLayerId)
    .option("olp.connector.metadata-columns", true)
    .save();

Write Data in Csv Format

The following snippet demonstrates how to write a DataFrame as a file with an arbitrary format. In this example, the input DataFrame contains column field1 as integer, field2 as string.

Scala
Java
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.data.client.spark.LayerDataFrameWriter._
val sparkSession: SparkSession =
  SparkSession.builder().master("local").appName("write-csv").getOrCreate()

// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of versioned layer)
val inputDataFrame = loadDataFrame(sparkSession)

inputDataFrame
  .writeLayer(catalogHrn, layerId)
  .option("header", "true")
  .save()
sparkSession.stop()
import com.here.platform.data.client.spark.scaladsl.DataConverter;
import com.here.platform.data.client.spark.scaladsl.GroupedData;
import com.here.platform.data.client.spark.scaladsl.SerializableField;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

SparkSession sparkSession =
    SparkSession.builder().master("local").appName("write-csv").getOrCreate();

// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an index layer)
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "index-csv-layer";
Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);

JavaLayerDataFrameWriter.create(inputDataFrame)
    .writeLayer(catalogHrn, layerId)
    .option("header", "true")
    .save();
sparkSession.stop();

Write Data in Text Format

The following snippet demonstrates how to write a DataFrame as a file with an arbitrary format. In this example, the input DataFrame contains a column data with message as string.

Scala
Java
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.data.client.spark.LayerDataFrameWriter._
val sparkSession: SparkSession =
  SparkSession.builder().master("local").appName("write-text").getOrCreate()

// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of versioned layer)
val inputDataFrame = loadDataFrame(sparkSession)

inputDataFrame
  .writeLayer(catalogHrn, layerId)
  .option("header", "true")
  .save()
sparkSession.stop()
import com.here.platform.data.client.spark.scaladsl.DataConverter;
import com.here.platform.data.client.spark.scaladsl.GroupedData;
import com.here.platform.data.client.spark.scaladsl.SerializableField;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

SparkSession sparkSession =
    SparkSession.builder().master("local").appName("write-text").getOrCreate();

// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an index layer)
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "index-json-layer";
Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);

JavaLayerDataFrameWriter.create(inputDataFrame)
    .writeLayer(catalogHrn, layerId)
    .option("header", "true")
    .save();
sparkSession.stop();

Write Data in JSON Format

The following snippet demonstrates how to write a DataFrame as a file with an arbitrary format. In this example, the input DataFrame contains column intVal as integer, strVal as string.

Scala
Java
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.data.client.spark.LayerDataFrameWriter._
val sparkSession: SparkSession =
  SparkSession.builder().master("local").appName("write-json").getOrCreate()

// val sparkSession: org.apache.spark.sql.SparkSession
// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of versioned layer)
val inputDataFrame = loadDataFrame(sparkSession)

inputDataFrame
  .writeLayer(catalogHrn, layerId)
  .option("header", "true")
  .save()
sparkSession.stop()
import com.here.platform.data.client.spark.scaladsl.DataConverter;
import com.here.platform.data.client.spark.scaladsl.GroupedData;
import com.here.platform.data.client.spark.scaladsl.SerializableField;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

SparkSession sparkSession =
    SparkSession.builder().master("local").appName("write-json").getOrCreate();

// val catalogHrn: HRN (HRN of a catalog that contains the layer $layerId)
// val layerId: String (ID of an index layer)
HRN catalogHrn = HRN.fromString("hrn:here:data:::catalog-1");
String layerId = "index-json-layer";
Dataset<Row> inputDataFrame = loadDataFrame(sparkSession);

JavaLayerDataFrameWriter.create(inputDataFrame)
    .writeLayer(catalogHrn, layerId)
    .option("header", "true")
    .save();
sparkSession.stop();

Write Data in Other Formats

The following snippet demonstrates how to write a DataFrame as a file with an arbitrary format to an index layer of a catalog. In this example, the input DataFrame contains a column data with messages as strings and the data of multiple rows is simply concatenated together:

Scala
Java
import com.here.examples.platform.data.client.spark.ExampleUtil._
import com.here.platform.data.client.spark.LayerDataFrameReader.SparkSessionExt
import com.here.platform.data.client.spark.LayerDataFrameWriter.DataFrameExt
import com.here.platform.data.client.spark.scaladsl.{
  GroupedData,
  IndexDataConverter,
  IndexRowMetadata
}
import com.here.platform.pipeline.PipelineContext
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{Encoders, Row, SparkSession}
// val inputDF: DataFrame (Input data stored as a DataFrame)
// val catalogHrn: HRN (HRN of the output catalog that contains the layer $layerId)
// val layerId: String (ID of the output index layer)
inputDF
  .writeLayer(catalogHrn, layerId)
  .withDataConverter(new IndexDataConverter {
    override def serializeGroup(
        rowMetadata: IndexRowMetadata,
        rows: Iterator[Row]
    ): GroupedData[IndexRowMetadata] = {
      val joinedText = rows
        .map(_.getAs[Array[Byte]]("data").map(_.toChar).mkString)
        .mkString
      GroupedData(rowMetadata, joinedText.getBytes())
    }
  })
  .save()
import com.here.platform.data.client.spark.scaladsl.DataConverter;
import com.here.platform.data.client.spark.scaladsl.GroupedData;
import com.here.platform.data.client.spark.scaladsl.SerializableField;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Dataset<Row> inputDF (Input data stored as a DataSet<Row>)
// HRN catalogHrn (HRN of the output catalog that contains the layer $layerId)
// String layerId: String (ID of the output index layer)
JavaLayerDataFrameWriter.create(inputDF)
    .writeLayer(catalogHrn, layerId)
    .withDataConverter(
        new IndexDataConverter() {
          @Override
          public GroupedData<IndexRowMetadata> serializeGroup(
              IndexRowMetadata rowMetadata, Iterator<Row> rows) {
            StringBuilder builder = new StringBuilder();
            rows.forEachRemaining(row -> builder.append(new String(row.<byte[]>getAs("data"))));
            String joinedText = builder.toString();
            return new GroupedData<>(rowMetadata, joinedText.getBytes());
          }
        })
    .save();

Note

For information on RSQL, see RSQL.

results matching ""

    No results matching ""