Write interactive map layer data

The spark-support module provides the class LayerDataFrameWriter, a custom Spark DataFrameWriter for writing DataFrames to interactive map layers.

Project dependencies

If you want to create an application that uses the HERE platform Spark Connector to write data to an interactive map layer, add the required dependencies to your project as described in chapter Dependencies for Spark Connector.

Formats

The spark connector provides write functionality for an interactive map layer specific format. Note that interactive map layer format/schema is based on GeoJSON and therefore inherently flexible.

Write process

For interactive map layers, DataFrame rows are published directly to the interactive map layer using the Publish API. Objects are "upserted" into the layer, meaning they will be updated if they already exist and created if they don't.

Data fields

DataFrames to be written to an interactive map layer must follow this structure:

Column name Data Type Meaning
mt_id String OID of the object
geometry ROW Object's geometry, first field will contain type, second field will contain coordinates
properties MAP Object's properties in reduced format
custom_members MAP Non-standard top-level fields in reduced format
mt_tags ARRAY The object's tags

Note that additional columns can be present, but will be ignored.

Also note

Note

Restrictions for String values in properties and custom members maps
If you want to write a String value to either the properties or custom_members maps, make sure that double quotes are at the beginning and end of the String value, as part of the value! This is unfortunately necessary in order for JSON serialization to work correctly during writing.

Write data to interactive map layer

The following snippet demonstrates how to create and write a DataFrame[Row](Dataset<Row>) to an interactive map layer:

Scala
Java
import com.here.platform.data.client.model.geojson.{Feature, Geometry, Point}
import com.here.platform.data.client.spark.LayerDataFrameWriter.DataFrameExt
import com.here.platform.data.client.spark.internal.InteractiveMapPartitionHelper
import com.here.platform.pipeline.PipelineContext
import org.apache.spark.sql.{Row, SparkSession}

import scala.collection.JavaConverters._
import scala.collection.mutable
val numObjectsInX = 360
val numObjectsInY = 18

val newRows = new mutable.MutableList[Row]

val schema = InteractiveMapPartitionHelper.writeSchema

log.info("Generating test data, " + (numObjectsInX * numObjectsInY) + " objects")
for (x <- 0 until numObjectsInX)
  for (y <- 0 until numObjectsInY) {
    val oid = "X" + x + "_Y" + y
    val coords = List[Double](
      ((360.0 / numObjectsInX.toDouble) * x) - 180.0,
      ((180.0 / numObjectsInY.toDouble) * y) - 90.0 // Stay in Mercator's bounds.
    )
    val geo: Geometry = new Point.Builder().withCoordinates(coords.map(Double.box).asJava).build

    // NOTE: You MUST ensure that a properties Map is set when creating Features this way!
    // It may be empty, but it absolutely MUST be there.
    val properties = mutable.Map[String, Any]()
    properties += ("info" -> ("Testobject #" + (y * numObjectsInY + x).toString))
    properties += ("row" -> y)
    properties += ("col" -> x)

    val feature = new Feature.Builder()
      .withId(oid)
      .withGeometry(geo)
      .withProperties(properties.asJava)
      .withCustomMember("foo", "bar")
      .build

    newRows += InteractiveMapPartitionHelper.toRow(feature, schema)
  }

log.info("Creating dataframe from test data.")
val writeDF = sparkSession.createDataFrame(
  sparkSession.sparkContext.parallelize(newRows),
  schema
)

log.info("Writing test data to layer " + layerId)
writeDF
  .writeLayer(catalogHrn, layerId)
  .option("olp.connector.write-batch-size", 1000)
  .save()
import com.here.platform.data.client.model.geojson.Feature;
import com.here.platform.data.client.model.geojson.Geometry;
import com.here.platform.data.client.model.geojson.Point;
import com.here.platform.data.client.spark.internal.InteractiveMapPartitionHelper;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructType;

final int numObjectsInX = 360;
final int numObjectsInY = 18;

List<Row> newRows = new ArrayList<>(numObjectsInX * numObjectsInY);

// Create data.
// It's easier to create the Features and make Rows out of them than to create the rows
// directly.
log.info("Generating test data, " + (numObjectsInX * numObjectsInY) + " objects");
StructType schema = InteractiveMapPartitionHelper.writeSchema();
for (int x = 0; x < numObjectsInX; x++) {
  for (int y = 0; y < numObjectsInY; y++) {
    String oid = "X" + x + "_Y" + y;

    List<Double> coordinates = new ArrayList<>(2);
    coordinates.add(((360.0 / (double) numObjectsInX) * x) - 180.0);
    coordinates.add(((180.0 / (double) numObjectsInY) * y) - 90.0);
    Geometry geo = new Point.Builder().withCoordinates(coordinates).build();

    // NOTE: You MUST ensure that a properties Map is set when creating Features this way!
    // It may be empty, but it absolutely MUST be there.
    Map<String, Object> properties = new HashMap<>();
    properties.put("info", new StringBuilder("Testobject #").append(y * numObjectsInY + x));
    properties.put("row", y);
    properties.put("col", x);

    Feature f =
        new Feature.Builder().withGeometry(geo).withId(oid).withProperties(properties).build();

    Row row = InteractiveMapPartitionHelper.toRow(f, schema);
    newRows.add(row);
  }
}

// Create a dataframe from the created rows.
log.info("Creating dataframe from test data.");
Dataset<Row> writeDF =
    sparkSession
        .createDataFrame(
            JavaSparkContext.fromSparkContext(sparkSession.sparkContext()).parallelize(newRows),
            schema)
        .drop("mt_datahub");

// Now do the actual writing.
log.info("Writing test data to layer " + layerId);
JavaLayerDataFrameWriter.create(writeDF)
    .writeLayer(catalogHrn, layerId)
    .option("olp.connector.write-batch-size", 1000)
    .save();

long res = writeDF.count();

results matching ""

    No results matching ""