Read and write to Object store layer using Hadoop FS support in Spark
Objectives: Understand how to use the Hadoop FS Support to read and write data to Object store layer using Spark.
Complexity: Beginner
Time to complete: 30 min
Prerequisites: Organize your work in projects
Source code: Download
The example in this tutorial demonstrates how to use the Hadoop FS Support component provided by the Data Client Library. This provides support to access the data stored in the Object store layer, using standard tools like Apache Spark, with minimum customized code.
The tutorial has following steps:
- Create a catalog with the Object store layer.
- Write an application to generate test data in the
parquet
format. - Write the test data to the Object store layer, using the hadoop-fs-support library.
- Read the test data from the Object store layer, using the hadoop-fs-support library.
- Access the data stored in the Object store layer, using the CLI.
As a preparation step, you must create your catalog with the Object store layer type.
Set up the Maven project
Create the following folder structure for the project:
hadoop-fs-support-spark-pipeline
└── src
└── main
├── java
└── resources
└── scala
You can do this with a single bash
command:
mkdir -p hadoop-fs-support-spark-pipeline/src/main/{java,resources,scala}
Create catalog
You must create your catalog. You can accomplish this by following the steps outlined in the Organize your work in projects, using the OLP Command Line Interface (CLI).
Use a unique identifier name for the catalog. For example, {{YOUR_USERNAME}}-hadoop-fs-support-spark-pipeline
.
Create a file called hadoop-fs-support-spark-pipeline.json
with the following contents, replacing {{YOUR_CATALOG_ID}}
with your chosen identifier.
{
"id": "hadoop-fs-support-spark-pipeline-catalog",
"name": "Tutorial for reading and writing data to Object store layer using Hadoop FS Support",
"summary": "Tutorial for reading and writing data to Object store layer using Hadoop FS Support",
"description": "Tutorial for reading and writing data to Object store layer using Hadoop FS Support",
"tags": ["Hadoop FS Support", "Object store"],
"layers": [
{
"id": "parquet",
"name": "parquet-layer",
"summary": "Simulated data.",
"description": "Simulated parquet data to demonstrate usability of Object store layer",
"tags": ["Hadoop FS Support", "Object store"],
"layerType": "objectstore",
"volume": {
"volumeType": "durable"
}
}
]
}
Note
If a billing tag is required in your realm, update the config file by adding the billingTags: ["YOUR_BILLING_TAG"]
property to the layer
section.
Replace {{YOUR_CATALOG_ID}}
with your own identifier. Also, replace {{YOUR_PROJECT_HRN}}
with the HRN of your project from Organize your work in projects, then run the following command:
#!/usr/bin/env bash
set -o nounset -o errexit -o xtrace
olp catalog create {{YOUR_CATALOG_ID}} \
"Tutorial for reading and writing data to Object store layer using Hadoop FS Support ({{YOUR_USERNAME}})" \
--config hadoop-fs-support-spark-pipeline.json \
--scope {{YOUR_PROJECT_HRN}}
Setup pipeline configurations
- Create a file named
pipeline-config.conf
, and populate it with the following snippet. - Replace `` with the HRN from the catalog you created in Organize your work in projects.
This tutorial can also run outside of pipeline environment. The only difference is that your application code must set the value of catalogHrn
with the HRN you received from your catalog creation step.
pipeline.config {
output-catalog {hrn = "hrn:here:data::olp-here:dummy"}
input-catalogs {
objectStoreCatalog { hrn = "{{YOUR_CATALOG_HRN}}" }
}
}
Set up the project in Maven
In order to develop an application which runs on pipelines with Spark, use the sdk-batch-bom_2.12
as the parent pom for the application:
<parent>
<groupId>com.here.platform</groupId>
<artifactId>sdk-batch-bom_2.12</artifactId>
<version>2.51.5</version>
<relativePath/>
</parent>
Adjust dependencies for Scala and Java.
<dependencies>
<dependency>
<groupId>com.here.platform.pipeline</groupId>
<artifactId>pipeline-interface_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>com.here.platform.data.client</groupId>
<artifactId>hadoop-fs-support_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
</dependencies>
Implement the application
This application writes parquet data to the Object store layer, using Hadoop FS Support, and then running a simple spark-sql query to select rows for which the index column has an even value.
import com.here.platform.pipeline.PipelineContext
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.slf4j.LoggerFactory
object HadoopFsSupportSparkScala {
def main(args: Array[String]): Unit = {
val logger = LoggerFactory.getLogger(HadoopFsSupportSparkScala.getClass)
val pipelineContext = new PipelineContext
val catalogHrn = pipelineContext.config.inputCatalogs("objectStoreCatalog").toString
val layerId = "parquet"
val sparkSession =
SparkSession
.builder()
.appName("HadoopFsSupportSparkScala")
.getOrCreate()
val dataFrame = generateDataFrame(sparkSession)
val parquetDir = "parquet-dir"
logger.info(s"Writing parquet files at: blobfs://$catalogHrn:$layerId/$parquetDir")
dataFrame.write
.parquet(s"blobfs://$catalogHrn:$layerId/$parquetDir")
logger.info(s"Reading parquet files at: blobfs://$catalogHrn:$layerId/$parquetDir")
val parquetData: DataFrame =
sparkSession.read
.parquet(s"blobfs://$catalogHrn:$layerId/$parquetDir")
.select("index")
.where("tileId % 2 == 0")
.sort("index")
val parquetArray: Array[Row] = parquetData.collect()
printDfArray(parquetArray)
}
private def generateDataFrame(sparkSession: SparkSession): DataFrame = {
import sparkSession.implicits._
val csvData: Dataset[String] = sparkSession.sparkContext.parallelize("""
|index,tileId
|0,0
|1,1
|2,3
|3,6
|4,4
|5,5
|6,7
|7,2
|8,8
|9,10
""".stripMargin.lines.toList).toDS
sparkSession.read
.option("header", value = true)
.option("timestampFormat", "MM/dd/yyyy")
.option("inferSchema", value = true)
.csv(csvData)
}
private def printDfArray(rowArray: Array[Row]): Unit =
for (row <- rowArray) {
for (i <- 0 until row.size) {
println(row.get(i))
}
}
}
import com.here.platform.pipeline.PipelineContext;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HadoopFsSupportSparkJava {
public static void main(String[] args) {
Logger logger = LoggerFactory.getLogger(HadoopFsSupportSparkJava.class);
PipelineContext pipelineContext = new PipelineContext();
String catalogHrn =
pipelineContext.getConfig().getInputCatalogs().get("objectStoreCatalog").toString();
String layerId = "parquet";
SparkSession sparkSession =
SparkSession.builder().appName("HadoopFsSupportSparkJava").getOrCreate();
Dataset<Row> dataFrame = generateDataFrame(sparkSession);
String parquetDir = "parquet-dir";
logger.info(
"Writing parquet files at: blobfs://" + catalogHrn + ":" + layerId + "/" + parquetDir);
dataFrame.write().parquet("blobfs://" + catalogHrn + ":" + layerId + "/" + parquetDir);
logger.info(
"Reading parquet files at: blobfs://" + catalogHrn + ":" + parquetDir + "/" + parquetDir);
Dataset<Row> parquetData =
sparkSession
.read()
.parquet("blobfs://" + catalogHrn + ":" + layerId + "/" + parquetDir)
.select("index")
.where("tileId % 2 == 0")
.sort("index");
List<Row> parquetArray = parquetData.collectAsList();
printDfArray(parquetArray);
}
private static Dataset<Row> generateDataFrame(SparkSession sparkSession) {
String csvString =
String.join(
"\n",
"index,tileId",
"0,0",
"1,1",
"2,3",
"3,6",
"4,4",
"5,5",
"6,7",
"7,2",
"8,8",
"9,10");
ArrayList<String> data = new ArrayList<>(Arrays.asList(csvString.split("\n")));
Dataset<String> csvData = sparkSession.createDataset(data, Encoders.STRING());
return sparkSession
.read()
.option("header", true)
.option("timestampFormat", "MM/dd/yyyy")
.option("inferSchema", true)
.csv(csvData);
}
private static void printDfArray(List<Row> rowArray) {
for (Row row : rowArray) {
for (int i = 0; i < row.length(); i++) {
System.out.println(row.get(i));
}
}
}
}
Run the application
To run the application locally, use the following command:
mvn compile exec:java \
-Dexec.mainClass=HadoopFsSupportSparkScala \
-Dpipeline-config.file=pipeline-config.conf \
-Dspark.master="local[*]" \
-Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
mvn compile exec:java \
-Dexec.mainClass=HadoopFsSupportSparkJava \
-Dpipeline-config.file=pipeline-config.conf \
-Dspark.master="local[*]" \
-Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
Consider the set of -Dhere.platform.data-client.request-signer.credentials.here-account.*
params. We specify this parameters to pass data from the credentials.properties
file and {{YOUR_PROJECT_HRN}}
to the Data Client Library. For more details about initializing the Data Client Library, see Set Your Credentials via Java System Properties.
Access the data using CLI
Using the HRN of the catalog, which you created in the catalog creation step, you can look at the contents of Object store layer:
olp catalog layer object list {{YOUR_CATALOG_HRN}} parquet --key parquet-dir
The CLI output should be similar to (the following is sample output, not exact values):
name keyType lastModified size
parquet-dir/_SUCCESS object 2021-01-12T19:23:47Z 0
parquet-dir/part-00000-5bf23e5f-53be-4c83-917c-69ca6e042934-c000.snappy.parquet object 2021-01-12T19:23:39Z 353
...
You can download the data stored in the Object store layer referenced by a certain key:
olp catalog layer object get {{YOUR_CATALOG_HRN}} parquet --key parquet-dir/part-00000-5bf23e5f-53be-4c83-917c-69ca6e042934-c000.snappy.parquet
For additional details on the topics covered in this tutorial, you can refer to the following sources: