Use Spark Connector to migrate CSV data to Protobuf
Objectives: Understand how to use the Spark Connector to upload your data from CSV files within a directory to a Protobuf layer.
Complexity: Beginner
Time to complete: 30 min
Prerequisites: Use Spark connector to read and write data
Source code: Download
The example in this tutorial demonstrates how to use the Spark Connector provided by the Data Client Library to upload CSV data stored in a directory into a layer which uses the Protobuf format. One of the possible scenarios where you might like to do this is when you have a data processing application for which you want to achieve better bandwidth consumption.
In the main part of the tutorial, we will cover the following usages:
- Reading the CSV data from the directory into one data frame;
- Restructuring the dataframe to be compatible with the layer schema;
- Uploading the resulting dataframe to the layer.
As a preparation step, you will need to create CSVs files in the resources/csv
folder and create the output layer corresponding to your CSV schema. Let's assume the CSV data, located in the resources/csv
folder, is in the following format:
tileId, tag, latitute, longitude
123456789,806553341,33.5230503,-113.0826719
123456789,806553341,33.5231036,-113.08299199999999
123456789,806553341,33.523157,-113.0833119
123456789,806553341,33.5232106,-113.0836317
123456789,806553341,33.5232646,-113.0839515
123456789,806553341,33.5233187,-113.08427119999999
123456789,806553341,33.5233725,-113.08459099999999
....
Data Client's Spark connector does not allow to use a partition id multiple times in dataframe rows. This means that in order to upload it to a Protobuf layer you need to group the data by partition id. So instead of having this schema:
(partition, tag, lat, lon)
it needs to be restructured to become:
(partition, Array[(tag, lat, lon)])
Create corresponding layer schema
First generate the schema project, then deploy it with the following proto file:
syntax = "proto3";
package com.here.platform.data.csv.v1;
// Declare any dependent resources the main POM file and add the import statements here:
//import "com/company/dependentGroupId/filename.proto";
// MainProtobufMessage is a placeholder, this value must match the package/messagename in the mainMessage tag of the layer-manifest-plugin in the schema_ds module.
message MainProtobufMessage {
repeated TaggedPoint shape_points = 1;
}
message TaggedPoint {
string tag = 1;
double lat = 2;
double lon = 3;
}
For information about creating and deploying a schema, refer to creating and deploying a schema.
Set up the Maven project
Create the following folder structure for the project:
csv-to-protobuf
└── src
└── main
├── java
└── resources
└── scala
You can do this with a single bash
command:
mkdir -p csv-to-protobuf/src/main/{java,resources,scala}
Create the output catalog
Create a file named pipeline-config.conf
, and populate it with the following contents, replacing {{YOUR_OUTPUT_CATALOG_HRN}}
with the HRN of the output catalog containing the Protobuf layer.
pipeline.config {
output-catalog { hrn = "{{YOUR_OUTPUT_CATALOG_HRN}}" }
input-catalogs { }
}
The POM for this example is identical to that in the Path Matching in Spark Tutorial.
Parent POM:
<parent>
<groupId>com.here.platform</groupId>
<artifactId>sdk-batch-bom_2.12</artifactId>
<version>2.51.5</version>
<relativePath/>
</parent>
Dependencies:
<dependencies>
<dependency>
<groupId>com.here.platform.pipeline</groupId>
<artifactId>pipeline-interface_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>com.here.platform.data.client</groupId>
<artifactId>spark-support_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>com.here.hrn</groupId>
<artifactId>hrn_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
</dependencies>
You can perform these steps by following the tutorial Organize your work in projects, using the OLP Command Line Interface (CLI).
You should use a unique identifier name for the catalog, for example {{YOUR_USERNAME}}-csv-to-protobuf-output
.
Create a file called csv-to-protobuf-output.json
with the contents below, replacing {{DEPLOYED_SCHEMA_HRN}}
with the deployed schema HRN.
You must create the output catalog containing the previously deployed schema with the layer format configured as Protobuf.
Note:
All timestamps are in UTC milliseconds since epoch (Jan 1, 1970 00:00:00 AM UTC). If you run your application in another timezone, verify that the timestamp is converted into UTC before you query or upload data. In Java or Scala you can do the conversion by using this function call: Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis()
{
"id": "csv-to-protobuf-output",
"name": "Uploaded CSV data (From tutorial)",
"summary": "Catalog for Tutorial CSV to Protobuf",
"description": "Archive of simulated road topology data.",
"tags": ["Tutorial", "Converted"],
"layers": [
{
"id": "versioned-layer-protobuf-data",
"name": "versioned-layer-protobuf-data",
"summary": "CSV converted data to protobuf.",
"description": "CSV converted data to protobuf.",
"contentType": "application/x-protobuf",
"layerType": "versioned",
"volume": {
"volumeType": "durable"
},
"partitioning": {
"scheme": "generic"
},
"schema": {
"hrn": "{{DEPLOYED_SCHEMA_HRN}}"
}
}
]
}
Implement the CSV upload application
This application uses the data sources created in the previous stage to read CSV files from the directory, performing some transformations on the resulting data to adapt it to the Protobuf layer schema, and writing to the Protobuf layer in the previously created catalog.
import org.slf4j.LoggerFactory
import com.here.platform.pipeline.PipelineContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.DataTypes
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.functions._
import com.here.platform.data.client.spark.LayerDataFrameWriter.DataFrameExt
object SparkSqlUploadCsvScala extends App {
private val logger = LoggerFactory.getLogger(SparkSqlUploadCsvScala.getClass)
private val pipelineContext = new PipelineContext()
private val outputCatalog = pipelineContext.config.outputCatalog
private val outputLayerId = "versioned-layer-protobuf-data"
val spark = SparkSession
.builder()
.appName("spark-sql-csv-upload-to-protobuf")
.getOrCreate()
val csvPath = getClass.getResource("/csv").getPath
val csvFiles = csvPath + "/*.csv"
val csvSchema = new StructType()
.add(StructField("tileId", DataTypes.StringType))
.add(StructField("tag", DataTypes.StringType))
.add(StructField("lat", DataTypes.DoubleType))
.add(StructField("lon", DataTypes.DoubleType))
val csvDf = spark.read
.format("com.databricks.spark.csv")
.option("header", "false")
.option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
.schema(csvSchema)
.load(csvFiles)
csvDf.show()
import spark.implicits._
val df = csvDf
.select(
struct($"tag", $"lat", $"lon").as("shape_point"),
$"tileId".as("mt_partition")
)
.groupBy("mt_partition")
.agg(collect_list($"shape_point").as("shape_points"))
df.printSchema()
df.show()
df.writeLayer(outputCatalog, outputLayerId).save()
logger.info(s"Finished uplaoding CSV data to Protobuf to $outputCatalog")
spark.stop()
}
import static org.apache.spark.sql.functions.*;
import com.here.hrn.HRN;
import com.here.platform.data.client.spark.javadsl.JavaLayerDataFrameWriter;
import com.here.platform.pipeline.PipelineContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SparkSqlUploadCsvJava {
public static void main(String[] args) {
Logger logger = LoggerFactory.getLogger(SparkSqlUploadCsvJava.class);
PipelineContext pipelineContext = new PipelineContext();
HRN outputCatalog = pipelineContext.getConfig().getOutputCatalog();
String outputLayerId = "versioned-layer-protobuf-data";
SparkSession spark =
SparkSession.builder().appName("spark-sql-csv-upload-to-protobuf").getOrCreate();
String csvPath = SparkSqlUploadCsvJava.class.getResource("/csv").getPath();
String csvFiles = csvPath + "/*.csv";
StructType csvSchema =
new StructType()
.add(new StructField("tileId", DataTypes.StringType, false, Metadata.empty()))
.add(new StructField("tag", DataTypes.StringType, false, Metadata.empty()))
.add(new StructField("lat", DataTypes.DoubleType, false, Metadata.empty()))
.add(new StructField("lon", DataTypes.DoubleType, false, Metadata.empty()));
Dataset<Row> csvDf =
spark
.read()
.format("com.databricks.spark.csv")
.option("header", "false")
.option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
.schema(csvSchema)
.load(csvFiles);
csvDf.show();
Dataset<Row> df =
csvDf
.select(
struct(col("tag"), col("lat"), col("lon")).as("shape_point"),
col("tileId").as("mt_partition"))
.groupBy("mt_partition")
.agg(collect_list(col("shape_point")).as("shape_points"));
df.printSchema();
df.show();
JavaLayerDataFrameWriter.create(df).writeLayer(outputCatalog, outputLayerId).save();
logger.info(String.format("Finished uplaoding CSV data to Protobuf to %s", outputCatalog));
spark.stop();
}
}
Compile and run locally
To run the application locally, execute the following command:
mvn compile exec:java \
-Dexec.mainClass=SparkSqlUploadCsvScala \
-Dpipeline-config.file=pipeline-config.conf \
-Dpipeline-job.file=pipeline-job.conf \
-Dexec.cleanupDaemonThreads=false \
-Dspark.master="local[*]" \
-Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
mvn compile exec:java \
-Dexec.mainClass=SparkSqlUploadCsvJava \
-Dpipeline-config.file=pipeline-config.conf \
-Dpipeline-job.file=pipeline-job.conf \
-Dexec.cleanupDaemonThreads=false \
-Dspark.master="local[*]" \
-Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
For additional details on the topics covered in this tutorial, you can refer to the following sources: