Read and write to Object store layer using Hadoop FS support in Spark

Objectives: Understand how to use the Hadoop FS Support to read and write data to Object store layer using Spark.

Complexity: Easy

Time to complete: 30 min

Dependencies: Organize your work in projects

Source code: Download

The example in this tutorial demonstrates how to use the Hadoop FS Support component provided by the Data Client Library. This provides support to access the data stored in the Object store layer using standard tools like Apache Spark with minimum custom code.

The tutorial has following steps:

  1. Creating catalog with the Object store layer.
  2. Writing application that generates test data in the parquet format, writes the test data to the Object store layer using hadoop-fs-support library and reads the test data from the Object store layer using hadoop-fs-support library.
  3. Accessing data stored in the Object store layer using CLI.

As a preparation step, you will need to create your catalog with Object store layer type.

Set up the Maven Project

Create the following folder structure for the project:

hadoop-fs-support-spark-pipeline
└── src
    └── main
        ├── java
        └── resources
        └── scala

You can do this with a single bash command:

mkdir -p hadoop-fs-support-spark-pipeline/src/main/{java,resources,scala}

Create catalog

You will need to create your catalog. You can perform these steps by following the steps outlined in the Organize your work in projects, using the OLP Command Line Interface (CLI).

You should use a unique identifier name for the catalog, for example {{YOUR_USERNAME}}-hadoop-fs-support-spark-pipeline.

Create a file called hadoop-fs-support-spark-pipeline.json with the contents below, replacing {{YOUR_CATALOG_ID}} with the chosen identifier.

{
  "id": "hadoop-fs-support-spark-pipeline-catalog",
  "name": "Tutorial for reading and writing data to Object store layer using Hadoop FS Support",
  "summary": "Tutorial for reading and writing data to Object store layer using Hadoop FS Support",
  "description": "Tutorial for reading and writing data to Object store layer using Hadoop FS Support",
  "tags": ["Hadoop FS Support", "Object store"],
  "layers": [
    {
      "id": "parquet",
      "name": "parquet-layer",
      "summary": "Simulated data.",
      "description": "Simulated parquet data to demonstrate usability of Object store layer",
      "tags": ["Hadoop FS Support", "Object store"],
      "layerType": "objectstore",
      "volume": {
        "volumeType": "durable"
      }
    }
  ]
}

Replace {{YOUR_CATALOG_ID}} below with your own identifier. Also replace {{YOUR_PROJECT_HRN}} with the HRN of your project from Organize your work in projects and then run the following command:

#!/usr/bin/env bash
set -o nounset -o errexit -o xtrace

### [catalog]
olp catalog create {{YOUR_CATALOG_ID}} \
    "Tutorial for reading and writing data to Object store layer using Hadoop FS Support ({{YOUR_USERNAME}})" \
    --config hadoop-fs-support-spark-pipeline.json \
    --scope {{YOUR_PROJECT_HRN}}
### [catalog]

Setup pipeline configurations

Create a file named pipeline-config.conf, and populate it with the contents below, replacing {{YOUR_CATALOG_HRN}} with the HRN to the catalog you created in Organize your work in projects.

This tutorial can be run outside of pipeline environment as well. In that case the only difference is that in your application code you will need to set the value of catalogHrn with HRN you recieved from catalog creation in the above step.

pipeline.config {

  output-catalog {hrn = "hrn:here:data::olp-here:dummy"}

  input-catalogs {
      objectStoreCatalog { hrn = "{{YOUR_CATALOG_HRN}}" }
  }
}

Set up the Project in Maven

In order to develop an application that runs on pipelines with Spark, use the sdk-batch-bom as the parent pom for our application:

<parent>
    <groupId>com.here.platform</groupId>
    <artifactId>sdk-batch-bom</artifactId>
    <version>2.24.8</version>
    <relativePath/>
</parent>

Adjust dependencies for Scala and Java.

<dependencies>
    <dependency>
        <groupId>com.here.platform.pipeline</groupId>
        <artifactId>pipeline-interface_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>com.here.platform.data.client</groupId>
        <artifactId>hadoop-fs-support_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
    </dependency>
</dependencies>

Implement the Application

This application implements writing parquet data to Object store layer using Hadoop FS Support and then running simple spark-sql query to select rows for which the index column has even value.

Scala
Java
/*
 * Copyright (c) 2018-2021 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import com.here.platform.pipeline.PipelineContext
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.slf4j.LoggerFactory

object HadoopFsSupportSparkScala {

  def main(args: Array[String]): Unit = {

    val logger = LoggerFactory.getLogger(HadoopFsSupportSparkScala.getClass)

    val pipelineContext = new PipelineContext
    val catalogHrn = pipelineContext.config.inputCatalogs("objectStoreCatalog").toString
    val layerId = "parquet"

    val sparkSession =
      SparkSession
        .builder()
        .appName("HadoopFsSupportSparkScala")
        .getOrCreate()

    val dataFrame = generateDataFrame(sparkSession)

    // Write data frame as parquet at random directory location in object store layer
    val parquetDir = "parquet-dir"
    logger.info(s"Writing parquet files at: blobfs://$catalogHrn:$layerId/$parquetDir")
    dataFrame.write
      .parquet(s"blobfs://$catalogHrn:$layerId/$parquetDir")

    // Read parquet data from object store layer, select only those rows where index column has and even value
    logger.info(s"Reading parquet files at: blobfs://$catalogHrn:$layerId/$parquetDir")
    val parquetData: DataFrame =
      sparkSession.read
        .parquet(s"blobfs://$catalogHrn:$layerId/$parquetDir")
        .select("index")
        .where("tileId % 2 == 0")
        .sort("index")
    val parquetArray: Array[Row] = parquetData.collect()

    printDfArray(parquetArray)

  }

  // Generate test data in csv format
  private def generateDataFrame(sparkSession: SparkSession): DataFrame = {
    import sparkSession.implicits._
    val csvData: Dataset[String] = sparkSession.sparkContext.parallelize("""
                                                                           |index,tileId
                                                                           |0,0
                                                                           |1,1
                                                                           |2,3
                                                                           |3,6
                                                                           |4,4
                                                                           |5,5
                                                                           |6,7
                                                                           |7,2
                                                                           |8,8
                                                                           |9,10
  """.stripMargin.lines.toList).toDS

    sparkSession.read
      .option("header", value = true)
      .option("timestampFormat", "MM/dd/yyyy")
      .option("inferSchema", value = true)
      .csv(csvData)
  }

  // Print data frame
  private def printDfArray(rowArray: Array[Row]): Unit =
    for (row <- rowArray) {
      for (i <- 0 until row.size) {
        println(row.get(i))
      }
    }

}

/*
 * Copyright (c) 2018-2021 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import com.here.platform.pipeline.PipelineContext;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HadoopFsSupportSparkJava {

  public static void main(String[] args) {
    Logger logger = LoggerFactory.getLogger(HadoopFsSupportSparkJava.class);

    PipelineContext pipelineContext = new PipelineContext();
    String catalogHrn =
        pipelineContext.getConfig().getInputCatalogs().get("objectStoreCatalog").toString();
    String layerId = "parquet";

    SparkSession sparkSession =
        SparkSession.builder().appName("HadoopFsSupportSparkJava").getOrCreate();

    Dataset<Row> dataFrame = generateDataFrame(sparkSession);

    // Write data frame as parquet at random directory location in object store layer
    String parquetDir = "parquet-dir";
    logger.info(
        "Writing parquet files at: blobfs://" + catalogHrn + ":" + layerId + "/" + parquetDir);
    dataFrame.write().parquet("blobfs://" + catalogHrn + ":" + layerId + "/" + parquetDir);

    // Read parquet data from object store layer, select only those rows where index column has and
    // even value
    logger.info(
        "Reading parquet files at: blobfs://" + catalogHrn + ":" + parquetDir + "/" + parquetDir);
    Dataset<Row> parquetData =
        sparkSession
            .read()
            .parquet("blobfs://" + catalogHrn + ":" + layerId + "/" + parquetDir)
            .select("index")
            .where("tileId % 2 == 0")
            .sort("index");
    List<Row> parquetArray = parquetData.collectAsList();

    printDfArray(parquetArray);
  }

  // Generate test data in csv format
  private static Dataset<Row> generateDataFrame(SparkSession sparkSession) {
    String csvString =
        String.join(
            "\n",
            "index,tileId",
            "0,0",
            "1,1",
            "2,3",
            "3,6",
            "4,4",
            "5,5",
            "6,7",
            "7,2",
            "8,8",
            "9,10");

    ArrayList<String> data = new ArrayList<>(Arrays.asList(csvString.split("\n")));

    Dataset<String> csvData = sparkSession.createDataset(data, Encoders.STRING());

    return sparkSession
        .read()
        .option("header", true)
        .option("timestampFormat", "MM/dd/yyyy")
        .option("inferSchema", true)
        .csv(csvData);
  }

  // Print data frame
  private static void printDfArray(List<Row> rowArray) {
    for (Row row : rowArray) {
      for (int i = 0; i < row.length(); i++) {
        System.out.println(row.get(i));
      }
    }
  }
}

Run the Application

To run the application locally, execute the following command:

Run Scala
Run Java

mvn compile exec:java \
    -Dexec.mainClass=HadoopFsSupportSparkScala \
    -Dpipeline-config.file=pipeline-config.conf \
    -Dspark.master=local[*] \
    -Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
    

mvn compile exec:java \
    -Dexec.mainClass=SparkPipelineJava \
    -Dpipeline-config.file=pipeline-config.conf \
    -Dspark.master=local[*] \
    -Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
    

Pay attention to the set of -Dhere.platform.data-client.request-signer.credentials.here-account.* params. We specify this parameters to pass data from credentials.properties file and {{YOUR_PROJECT_HRN}} to Data Client Library. For more details about the Data Client Library initialization, see the Set Your Credentials via Java System Properties.

Access the data using CLI

Using HRN of the catalog that you created in catalog creation step you should be able to look at the contents of Object store layer:

olp catalog layer object list {{YOUR_CATALOG_HRN}} parquet --key parquet-dir

The CLI output should be similar to(following is a sample output not exact value):

name                                    keyType                       lastModified                                      size                
parquet-dir/_SUCCESS                    object                        2021-01-12T19:23:47Z                              0                   
parquet-dir/part-00000-5bf23e5f-53be-4c83-917c-69ca6e042934-c000.snappy.parquet object                        2021-01-12T19:23:39Z                              353                 
...

You can download the data stored in the Object store layer referenced by a certain key:

olp catalog layer object get {{YOUR_CATALOG_HRN}} parquet --key parquet-dir/part-00000-5bf23e5f-53be-4c83-917c-69ca6e042934-c000.snappy.parquet

Further Information

For more details on the topics covered in this tutorial, you can refer to the following sources:

results matching ""

    No results matching ""