Read and write to Object store layer using Hadoop FS support in Spark
Objectives: Understand how to use the Hadoop FS Support to read and write data to Object store layer using Spark.
Complexity: Easy
Time to complete: 30 min
Dependencies: Organize your work in projects
Source code: Download
The example in this tutorial demonstrates how to use the Hadoop FS Support component provided by the Data Client Library. This provides support to access the data stored in the Object store layer using standard tools like Apache Spark with minimum custom code.
The tutorial has following steps:
- Creating catalog with the Object store layer.
- Writing application that generates test data in the
parquet
format, writes the test data to the Object store layer using hadoop-fs-support library and reads the test data from the Object store layer using hadoop-fs-support library. - Accessing data stored in the Object store layer using CLI.
As a preparation step, you will need to create your catalog with Object store layer type.
Set up the Maven Project
Create the following folder structure for the project:
hadoop-fs-support-spark-pipeline
└── src
└── main
├── java
└── resources
└── scala
You can do this with a single bash
command:
mkdir -p hadoop-fs-support-spark-pipeline/src/main/{java,resources,scala}
Create catalog
You will need to create your catalog. You can perform these steps by following the steps outlined in the Organize your work in projects, using the OLP Command Line Interface (CLI).
You should use a unique identifier name for the catalog, for example {{YOUR_USERNAME}}-hadoop-fs-support-spark-pipeline
.
Create a file called hadoop-fs-support-spark-pipeline.json
with the contents below, replacing {{YOUR_CATALOG_ID}}
with the chosen identifier.
{
"id": "hadoop-fs-support-spark-pipeline-catalog",
"name": "Tutorial for reading and writing data to Object store layer using Hadoop FS Support",
"summary": "Tutorial for reading and writing data to Object store layer using Hadoop FS Support",
"description": "Tutorial for reading and writing data to Object store layer using Hadoop FS Support",
"tags": ["Hadoop FS Support", "Object store"],
"layers": [
{
"id": "parquet",
"name": "parquet-layer",
"summary": "Simulated data.",
"description": "Simulated parquet data to demonstrate usability of Object store layer",
"tags": ["Hadoop FS Support", "Object store"],
"layerType": "objectstore",
"volume": {
"volumeType": "durable"
}
}
]
}
Replace {{YOUR_CATALOG_ID}}
below with your own identifier. Also replace {{YOUR_PROJECT_HRN}}
with the HRN of your project from Organize your work in projects and then run the following command:
#!/usr/bin/env bash
set -o nounset -o errexit -o xtrace
olp catalog create {{YOUR_CATALOG_ID}} \
"Tutorial for reading and writing data to Object store layer using Hadoop FS Support ({{YOUR_USERNAME}})" \
--config hadoop-fs-support-spark-pipeline.json \
--scope {{YOUR_PROJECT_HRN}}
Setup pipeline configurations
Create a file named pipeline-config.conf
, and populate it with the contents below, replacing {{YOUR_CATALOG_HRN}}
with the HRN to the catalog you created in Organize your work in projects.
This tutorial can be run outside of pipeline environment as well. In that case the only difference is that in your application code you will need to set the value of catalogHrn
with HRN you recieved from catalog creation in the above step.
pipeline.config {
output-catalog {hrn = "hrn:here:data::olp-here:dummy"}
input-catalogs {
objectStoreCatalog { hrn = "{{YOUR_CATALOG_HRN}}" }
}
}
Set up the Project in Maven
In order to develop an application that runs on pipelines with Spark, use the sdk-batch-bom
as the parent pom for our application:
<parent>
<groupId>com.here.platform</groupId>
<artifactId>sdk-batch-bom</artifactId>
<version>2.24.8</version>
<relativePath/>
</parent>
Adjust dependencies for Scala and Java.
<dependencies>
<dependency>
<groupId>com.here.platform.pipeline</groupId>
<artifactId>pipeline-interface_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>com.here.platform.data.client</groupId>
<artifactId>hadoop-fs-support_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.compat.version}</artifactId>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
</dependencies>
Implement the Application
This application implements writing parquet data to Object store layer using Hadoop FS Support and then running simple spark-sql query to select rows for which the index column has even value.
import com.here.platform.pipeline.PipelineContext
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.slf4j.LoggerFactory
object HadoopFsSupportSparkScala {
def main(args: Array[String]): Unit = {
val logger = LoggerFactory.getLogger(HadoopFsSupportSparkScala.getClass)
val pipelineContext = new PipelineContext
val catalogHrn = pipelineContext.config.inputCatalogs("objectStoreCatalog").toString
val layerId = "parquet"
val sparkSession =
SparkSession
.builder()
.appName("HadoopFsSupportSparkScala")
.getOrCreate()
val dataFrame = generateDataFrame(sparkSession)
val parquetDir = "parquet-dir"
logger.info(s"Writing parquet files at: blobfs://$catalogHrn:$layerId/$parquetDir")
dataFrame.write
.parquet(s"blobfs://$catalogHrn:$layerId/$parquetDir")
logger.info(s"Reading parquet files at: blobfs://$catalogHrn:$layerId/$parquetDir")
val parquetData: DataFrame =
sparkSession.read
.parquet(s"blobfs://$catalogHrn:$layerId/$parquetDir")
.select("index")
.where("tileId % 2 == 0")
.sort("index")
val parquetArray: Array[Row] = parquetData.collect()
printDfArray(parquetArray)
}
private def generateDataFrame(sparkSession: SparkSession): DataFrame = {
import sparkSession.implicits._
val csvData: Dataset[String] = sparkSession.sparkContext.parallelize("""
|index,tileId
|0,0
|1,1
|2,3
|3,6
|4,4
|5,5
|6,7
|7,2
|8,8
|9,10
""".stripMargin.lines.toList).toDS
sparkSession.read
.option("header", value = true)
.option("timestampFormat", "MM/dd/yyyy")
.option("inferSchema", value = true)
.csv(csvData)
}
private def printDfArray(rowArray: Array[Row]): Unit =
for (row <- rowArray) {
for (i <- 0 until row.size) {
println(row.get(i))
}
}
}
import com.here.platform.pipeline.PipelineContext;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HadoopFsSupportSparkJava {
public static void main(String[] args) {
Logger logger = LoggerFactory.getLogger(HadoopFsSupportSparkJava.class);
PipelineContext pipelineContext = new PipelineContext();
String catalogHrn =
pipelineContext.getConfig().getInputCatalogs().get("objectStoreCatalog").toString();
String layerId = "parquet";
SparkSession sparkSession =
SparkSession.builder().appName("HadoopFsSupportSparkJava").getOrCreate();
Dataset<Row> dataFrame = generateDataFrame(sparkSession);
String parquetDir = "parquet-dir";
logger.info(
"Writing parquet files at: blobfs://" + catalogHrn + ":" + layerId + "/" + parquetDir);
dataFrame.write().parquet("blobfs://" + catalogHrn + ":" + layerId + "/" + parquetDir);
logger.info(
"Reading parquet files at: blobfs://" + catalogHrn + ":" + parquetDir + "/" + parquetDir);
Dataset<Row> parquetData =
sparkSession
.read()
.parquet("blobfs://" + catalogHrn + ":" + layerId + "/" + parquetDir)
.select("index")
.where("tileId % 2 == 0")
.sort("index");
List<Row> parquetArray = parquetData.collectAsList();
printDfArray(parquetArray);
}
private static Dataset<Row> generateDataFrame(SparkSession sparkSession) {
String csvString =
String.join(
"\n",
"index,tileId",
"0,0",
"1,1",
"2,3",
"3,6",
"4,4",
"5,5",
"6,7",
"7,2",
"8,8",
"9,10");
ArrayList<String> data = new ArrayList<>(Arrays.asList(csvString.split("\n")));
Dataset<String> csvData = sparkSession.createDataset(data, Encoders.STRING());
return sparkSession
.read()
.option("header", true)
.option("timestampFormat", "MM/dd/yyyy")
.option("inferSchema", true)
.csv(csvData);
}
private static void printDfArray(List<Row> rowArray) {
for (Row row : rowArray) {
for (int i = 0; i < row.length(); i++) {
System.out.println(row.get(i));
}
}
}
}
Run the Application
To run the application locally, execute the following command:
mvn compile exec:java \
-Dexec.mainClass=HadoopFsSupportSparkScala \
-Dpipeline-config.file=pipeline-config.conf \
-Dspark.master=local[*] \
-Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
mvn compile exec:java \
-Dexec.mainClass=SparkPipelineJava \
-Dpipeline-config.file=pipeline-config.conf \
-Dspark.master=local[*] \
-Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
Pay attention to the set of -Dhere.platform.data-client.request-signer.credentials.here-account.*
params. We specify this parameters to pass data from credentials.properties
file and {{YOUR_PROJECT_HRN}}
to Data Client Library. For more details about the Data Client Library initialization, see the Set Your Credentials via Java System Properties.
Access the data using CLI
Using HRN of the catalog that you created in catalog creation step you should be able to look at the contents of Object store layer:
olp catalog layer object list {{YOUR_CATALOG_HRN}} parquet --key parquet-dir
The CLI output should be similar to(following is a sample output not exact value):
name keyType lastModified size
parquet-dir/_SUCCESS object 2021-01-12T19:23:47Z 0
parquet-dir/part-00000-5bf23e5f-53be-4c83-917c-69ca6e042934-c000.snappy.parquet object 2021-01-12T19:23:39Z 353
...
You can download the data stored in the Object store layer referenced by a certain key:
olp catalog layer object get {{YOUR_CATALOG_HRN}} parquet --key parquet-dir/part-00000-5bf23e5f-53be-4c83-917c-69ca6e042934-c000.snappy.parquet
For more details on the topics covered in this tutorial, you can refer to the following sources: