Spark Connector Tutorial

Objectives: Understand how to use the Spark Connector to upload your data from CSV files within a directory to a Protobuf layer.

Complexity: Easy

Time to complete: 30 min

Dependencies: Spark connector

Source code: Download

The example in this tutorial demonstrates how to use the Spark Connector provided by the Data Client Library to upload CSV data stored in a directory to a layer that uses the Protobuf format. One of the possible scenarios where you might like to do this is when you have a data processing application for which you want to achieve better bandwidth consumption.

In the main part of the tutorial, we will cover the following usages:

  • Reading the CSV input layer data into one data frame;
  • Restructuring the dataframe to be compatible with the layer schema;
  • Uploading the resulting dataframe to the layer.

As a preparation step, you will need to create the input layer and the output layer corresponding to your CSV schema. Let's assume the CSV data inside the input layer is in the following format:

tileId, tag, latitute, longitude
123456789,806553341,33.5230503,-113.0826719
123456789,806553341,33.5231036,-113.08299199999999
123456789,806553341,33.523157,-113.0833119
123456789,806553341,33.5232106,-113.0836317
123456789,806553341,33.5232646,-113.0839515
123456789,806553341,33.5233187,-113.08427119999999
123456789,806553341,33.5233725,-113.08459099999999
....

Data Client's Spark connector does not allow to use a partition id multiple times in dataframe rows. This means that in order to upload it to a Protobuf layer you need to group the data by partition id. So instead of having this schema:

(partition, tag, lat, lon)

it needs to be restructured to become:

(partition, Array[(tag, lat, lon)])

Create Corresponding Layer schema

First generate the schema project and deploy it with the following proto file:

syntax = "proto3";

package com.here.platform.data.csv.v1;

// Declare any dependent resources the main POM file and add the import statements here:
//import "com/company/dependentGroupId/filename.proto";

// MainProtobufMessage is a placeholder, this value must match the package/messagename in the mainMessage tag of the layer-manifest-plugin in the schema_ds module.
message MainProtobufMessage {
    repeated TaggedPoint shape_points = 1;
}

message TaggedPoint {
    string tag = 1;
    double lat = 2;
    double lon = 3;
}

For information about creating and deloying a schema please refer to creating and deploying a schema

Set up the Maven Project

Create the following folder structure for the project:

csv-to-protobuf
└── src
    └── main
        ├── java
        └── resources
        └── scala

You can do this with a single bash command:

mkdir -p csv-to-protobuf/src/main/{java,resources,scala}

Create the Output Catalog

Create a file named pipeline-config.conf, and populate it with the contents below, replacing {{YOUR_OUTPUT_CATALOG_HRN}} with the HRN of the output catalog containing the Protobuf layer.

pipeline.config {

  output-catalog { hrn = "{{YOUR_OUTPUT_CATALOG_HRN}}" }

  input-catalogs { }
}

The POM for this example is identical to that in the Path Matching in Spark Tutorial.

Parent POM:

<parent>
    <groupId>com.here.platform</groupId>
    <artifactId>sdk-batch-bom_2.12</artifactId>
    <version>2.28.13</version>
    <relativePath/>
</parent>

Dependencies:

<dependencies>
    <dependency>
        <groupId>com.here.platform.pipeline</groupId>
        <artifactId>pipeline-interface_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>com.here.platform.data.client</groupId>
        <artifactId>spark-support_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>com.here.hrn</groupId>
        <artifactId>hrn_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
    </dependency>
</dependencies>

You can perform these steps by following the tutorial Organize your work in projects, using the OLP Command Line Interface (CLI).

You should use a unique identifier name for the catalog, for example {{YOUR_USERNAME}}-csv-to-protobuf-output.

Create a file called csv-to-protobuf-output.json with the contents below, replacing {{DEPLOYED_SCHEMA_HRN}} with the deployed schema HRN.

You will need to create the output catalog with the previously deployed schema with the layer format configured as Protobuf.

Note:

All timestamps are in UTC milliseconds since epoch (Jan 1, 1970 00:00:00 AM UTC). If you run your application in another timezone please make sure that the timestamp is converted into UTC before you query or upload data. In Java or Scala you can do the conversion by using this function call: Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis()

{
  "id": "csv-to-protobuf-output",
  "name": "Uploaded CSV data (From tutorial)",
  "summary": "Catalog for Tutorial CSV to Protobuf",
  "description": "Archive of simulated road topology data.",
  "tags": ["Tutorial", "Converted"],
  "layers": [
    {
      "id": "versioned-layer-protobuf-data",
      "name": "versioned-layer-protobuf-data",
      "summary": "CSV converted data to protobuf.",
      "description": "CSV converted data to protobuf.",
      "contentType": "application/x-protobuf",
      "layerType": "versioned",
      "volume": {
        "volumeType": "durable"
      },
      "partitioning": {
        "scheme": "generic"
      },
      "schema": {
        "hrn": "{{DEPLOYED_SCHEMA_HRN}}"
      }
    }
  ]
}

Implement the CSV Upload Application

This application uses the data sources created in the previous stage to read from the CSV layer, performing some transformations on the resulting data to adapt it to the Protobuf layer schema, and writing to the Protobuf layer in the previously created catalog.

Scala
Java

/*
 * Copyright (c) 2018-2021 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import org.slf4j.LoggerFactory
import com.here.platform.pipeline.PipelineContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.DataTypes
import org.apache.spark.sql.types.StructField

import org.apache.spark.sql.functions._
import com.here.platform.data.client.spark.LayerDataFrameWriter.DataFrameExt

object SparkSqlUploadCsvScala extends App {
  private val logger = LoggerFactory.getLogger(SparkSqlUploadCsvScala.getClass)

  private val pipelineContext = new PipelineContext()
  private val outputCatalog = pipelineContext.config.outputCatalog
  private val outputLayerId = "versioned-layer-protobuf-data"

  val spark = SparkSession
    .builder()
    .appName("spark-sql-csv-upload-to-protobuf")
    .master("local[*]")
    .getOrCreate()

  val csvPath = getClass.getResource("/csv").getPath
  val csvFiles = csvPath + "/*.csv"
  val csvSchema = new StructType()
    .add(StructField("tileId", DataTypes.StringType))
    .add(StructField("tag", DataTypes.StringType))
    .add(StructField("lat", DataTypes.DoubleType))
    .add(StructField("lon", DataTypes.DoubleType))

  val csvDf = spark.read
    .format("com.databricks.spark.csv")
    .option("header", "false")
    .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
    .schema(csvSchema)
    .load(csvFiles)

  csvDf.show()

  /*
        The CSV dataframe is:
            +--------+---------+------------------+-------------------+
            |  tileId|  tag    |               lat|                lon|
            +--------+---------+------------------+-------------------+
            |19300322|806553341|        33.5231036|-113.08299199999999|
            |19300322|806553341|         33.523157|       -113.0833119|
            |19300322|806553341|        33.5232106|       -113.0836317|
            |19300322|806553341|        33.5232646|       -113.0839515|
            |19300322|806553341|        33.5233187|-113.08427119999999|
            |19300322|806553341|        33.5233725|-113.08459099999999|
            |19300322|806553341|33.523426199999996|       -113.0849126|
            |19300322|806553341|33.523480899999996|       -113.0852401|
        ...
   */

  import spark.implicits._

  val df = csvDf
    .select(
      struct($"tag", $"lat", $"lon").as("shape_point"),
      $"tileId".as("mt_partition")
    )
    .groupBy("mt_partition")
    .agg(collect_list($"shape_point").as("shape_points"))

  df.printSchema()
  /*
        The resulting dataframe schema that will be written to the layer is:
        root
        |-- mt_partition: string (nullable = true)
        |-- shape_points: array (nullable = true)
        |    |-- element: struct (containsNull = true)
        |    |    |-- tag: string (nullable = true)
        |    |    |-- lat: double (nullable = true)
        |    |    |-- lon: double (nullable = true)
   */

  df.show()
  /*
        The resulting dataframe data that will be written to the layer is:
        +------------+--------------------+
        |mt_partition|        shape_points|
        +------------+--------------------+
        |    19300322|[[806553341, 33.5...|
        |    19300324|[[806553341, 33.5...|
        +------------+--------------------+
   */

  df.writeLayer(outputCatalog, outputLayerId).save()

  logger.info(s"Finished uplaoding CSV data to Protobuf to $outputCatalog")

  spark.stop()
}



/*
 * Copyright (c) 2018-2021 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import static org.apache.spark.sql.functions.*;

import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import com.here.platform.pipeline.PipelineContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SparkSqlUploadCsvJava {
  public static void main(String[] args) {
    Logger logger = LoggerFactory.getLogger(SparkSqlUploadCsvJava.class);
    PipelineContext pipelineContext = new PipelineContext();
    HRN outputCatalog = pipelineContext.getConfig().getOutputCatalog();
    String outputLayerId = "versioned-layer-protobuf-data";
    SparkSession spark =
        SparkSession.builder()
            .master("local[*]")
            .appName("spark-sql-csv-upload-to-protobuf")
            .getOrCreate();

    String csvPath = SparkSqlUploadCsvJava.class.getResource("/csv").getPath();
    String csvFiles = csvPath + "/*.csv";
    StructType csvSchema =
        new StructType()
            .add(new StructField("tileId", DataTypes.StringType, false, Metadata.empty()))
            .add(new StructField("tag", DataTypes.StringType, false, Metadata.empty()))
            .add(new StructField("lat", DataTypes.DoubleType, false, Metadata.empty()))
            .add(new StructField("lon", DataTypes.DoubleType, false, Metadata.empty()));
    Dataset<Row> csvDf =
        spark
            .read()
            .format("com.databricks.spark.csv")
            .option("header", "false")
            .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
            .schema(csvSchema)
            .load(csvFiles);
    csvDf.show();
    /*
            The CSV dataframe is:
            +--------+---------+------------------+-------------------+
            |  tileId|  tag    |               lat|                lon|
            +--------+---------+------------------+-------------------+
            |19300322|806553341|        33.5231036|-113.08299199999999|
            |19300322|806553341|         33.523157|       -113.0833119|
            |19300322|806553341|        33.5232106|       -113.0836317|
            |19300322|806553341|        33.5232646|       -113.0839515|
            |19300322|806553341|        33.5233187|-113.08427119999999|
            |19300322|806553341|        33.5233725|-113.08459099999999|
            |19300322|806553341|33.523426199999996|       -113.0849126|
            |19300322|806553341|33.523480899999996|       -113.0852401|
            ...
    */

    Dataset<Row> df =
        csvDf
            .select(
                struct(col("tag"), col("lat"), col("lon")).as("shape_point"),
                col("tileId").as("mt_partition"))
            .groupBy("mt_partition")
            .agg(collect_list(col("shape_point")).as("shape_points"));
    df.printSchema();
    /*
        The resulting dataframe schema that will be written to the layer is:
        root
        |-- mt_partition: string (nullable = true)
        |-- shape_points: array (nullable = true)
        |    |-- element: struct (containsNull = true)
        |    |    |-- tag: string (nullable = true)
        |    |    |-- lat: double (nullable = true)
        |    |    |-- lon: double (nullable = true)
    */

    df.show();
    /*
        The resulting dataframe data that will be written to the layer is:
        +------------+--------------------+
        |mt_partition|        shape_points|
        +------------+--------------------+
        |    19300322|[[806553341, 33.5...|
        |    19300324|[[806553341, 33.5...|
        +------------+--------------------+
    */

    JavaLayerDataFrameWriter.create(df).writeLayer(outputCatalog, outputLayerId).save();

    logger.info(String.format("Finished uplaoding CSV data to Protobuf to %s", outputCatalog));

    spark.stop();
  }
}


Compile and Run Locally

To run the application locally, execute the following command:

Scala
Java

mvn compile exec:java \
    -Dexec.mainClass=SparkSqlUploadCsvScala \
    -Dpipeline-config.file=pipeline-config.conf \
    -Dpipeline-job.file=pipeline-job.conf \
    -Dexec.cleanupDaemonThreads=false \
    -Dspark.master=local[*] \
    -Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
    

mvn compile exec:java \
    -Dexec.mainClass=SparkSqlUploadCsvJava \
    -Dpipeline-config.file=pipeline-config.conf \
    -Dpipeline-job.file=pipeline-job.conf \
    -Dexec.cleanupDaemonThreads=false \
    -Dspark.master=local[*] \
    -Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
    

Further Information

For more details on the topics covered in this tutorial, you can refer to the following sources:

results matching ""

    No results matching ""