Read and write to Object store layer using Hadoop FS support in Spark

Objectives: Understand how to use the Hadoop FS Support to read and write data to Object store layer using Spark.

Complexity: Easy

Time to complete: 30 min

Dependencies: Organize your work in projects

Source code: Download

The example in this tutorial demonstrates how to use the Hadoop FS Support component provided by the Data Client Library. This provides support to access the data stored in the Object store layer, using standard tools like Apache Spark, with minimum customized code.

The tutorial has following steps:

  1. Create a catalog with the Object store layer.
  2. Write an application to generate test data in the parquet format.
  3. Write the test data to the Object store layer, using the hadoop-fs-support library.
  4. Read the test data from the Object store layer, using the hadoop-fs-support library.
  5. Access the data stored in the Object store layer, using the CLI.

As a preparation step, you must create your catalog with the Object store layer type.

Set up the Maven project

Create the following folder structure for the project:

hadoop-fs-support-spark-pipeline
└── src
    └── main
        ├── java
        └── resources
        └── scala

You can do this with a single bash command:

mkdir -p hadoop-fs-support-spark-pipeline/src/main/{java,resources,scala}

Create catalog

You must create your catalog. You can accomplish this by following the steps outlined in the Organize your work in projects, using the OLP Command Line Interface (CLI).

Use a unique identifier name for the catalog. For example, {{YOUR_USERNAME}}-hadoop-fs-support-spark-pipeline.

Create a file called hadoop-fs-support-spark-pipeline.json with the following contents, replacing {{YOUR_CATALOG_ID}} with your chosen identifier.

{
  "id": "hadoop-fs-support-spark-pipeline-catalog",
  "name": "Tutorial for reading and writing data to Object store layer using Hadoop FS Support",
  "summary": "Tutorial for reading and writing data to Object store layer using Hadoop FS Support",
  "description": "Tutorial for reading and writing data to Object store layer using Hadoop FS Support",
  "tags": ["Hadoop FS Support", "Object store"],
  "layers": [
    {
      "id": "parquet",
      "name": "parquet-layer",
      "summary": "Simulated data.",
      "description": "Simulated parquet data to demonstrate usability of Object store layer",
      "tags": ["Hadoop FS Support", "Object store"],
      "layerType": "objectstore",
      "volume": {
        "volumeType": "durable"
      }
    }
  ]
}

Replace {{YOUR_CATALOG_ID}} with your own identifier. Also, replace {{YOUR_PROJECT_HRN}} with the HRN of your project from Organize your work in projects, then run the following command:

#!/usr/bin/env bash
set -o nounset -o errexit -o xtrace

### [catalog]
olp catalog create {{YOUR_CATALOG_ID}} \
    "Tutorial for reading and writing data to Object store layer using Hadoop FS Support ({{YOUR_USERNAME}})" \
    --config hadoop-fs-support-spark-pipeline.json \
    --scope {{YOUR_PROJECT_HRN}}
### [catalog]

Setup pipeline configurations

  1. Create a file named pipeline-config.conf, and populate it with the following snippet.
  2. Replace `` with the HRN from the catalog you created in Organize your work in projects.

This tutorial can also run outside of pipeline environment. The only difference is that your application code must set the value of catalogHrn with the HRN you received from your catalog creation step.

pipeline.config {

  output-catalog {hrn = "hrn:here:data::olp-here:dummy"}

  input-catalogs {
      objectStoreCatalog { hrn = "{{YOUR_CATALOG_HRN}}" }
  }
}

Set up the project in Maven

In order to develop an application which runs on pipelines with Spark, use the sdk-batch-bom_2.12 as the parent pom for the application:

<parent>
    <groupId>com.here.platform</groupId>
    <artifactId>sdk-batch-bom_2.12</artifactId>
    <version>2.31.7</version>
    <relativePath/>
</parent>

Adjust dependencies for Scala and Java.

<dependencies>
    <dependency>
        <groupId>com.here.platform.pipeline</groupId>
        <artifactId>pipeline-interface_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>com.here.platform.data.client</groupId>
        <artifactId>hadoop-fs-support_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_${scala.compat.version}</artifactId>
    </dependency>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
    </dependency>
</dependencies>

Implement the application

This application writes parquet data to the Object store layer, using Hadoop FS Support, and then running a simple spark-sql query to select rows for which the index column has an even value.

Scala
Java
/*
 * Copyright (c) 2018-2021 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import com.here.platform.pipeline.PipelineContext
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.slf4j.LoggerFactory

object HadoopFsSupportSparkScala {

  def main(args: Array[String]): Unit = {

    val logger = LoggerFactory.getLogger(HadoopFsSupportSparkScala.getClass)

    val pipelineContext = new PipelineContext
    val catalogHrn = pipelineContext.config.inputCatalogs("objectStoreCatalog").toString
    val layerId = "parquet"

    val sparkSession =
      SparkSession
        .builder()
        .appName("HadoopFsSupportSparkScala")
        .getOrCreate()

    val dataFrame = generateDataFrame(sparkSession)

    // Write data frame as parquet at random directory location in object store layer
    val parquetDir = "parquet-dir"
    logger.info(s"Writing parquet files at: blobfs://$catalogHrn:$layerId/$parquetDir")
    dataFrame.write
      .parquet(s"blobfs://$catalogHrn:$layerId/$parquetDir")

    // Read parquet data from object store layer, select only those rows where index column has and even value
    logger.info(s"Reading parquet files at: blobfs://$catalogHrn:$layerId/$parquetDir")
    val parquetData: DataFrame =
      sparkSession.read
        .parquet(s"blobfs://$catalogHrn:$layerId/$parquetDir")
        .select("index")
        .where("tileId % 2 == 0")
        .sort("index")
    val parquetArray: Array[Row] = parquetData.collect()

    printDfArray(parquetArray)

  }

  // Generate test data in csv format
  private def generateDataFrame(sparkSession: SparkSession): DataFrame = {
    import sparkSession.implicits._
    val csvData: Dataset[String] = sparkSession.sparkContext.parallelize("""
                                                                           |index,tileId
                                                                           |0,0
                                                                           |1,1
                                                                           |2,3
                                                                           |3,6
                                                                           |4,4
                                                                           |5,5
                                                                           |6,7
                                                                           |7,2
                                                                           |8,8
                                                                           |9,10
  """.stripMargin.lines.toList).toDS

    sparkSession.read
      .option("header", value = true)
      .option("timestampFormat", "MM/dd/yyyy")
      .option("inferSchema", value = true)
      .csv(csvData)
  }

  // Print data frame
  private def printDfArray(rowArray: Array[Row]): Unit =
    for (row <- rowArray) {
      for (i <- 0 until row.size) {
        println(row.get(i))
      }
    }

}

/*
 * Copyright (c) 2018-2021 HERE Europe B.V.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import com.here.platform.pipeline.PipelineContext;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HadoopFsSupportSparkJava {

  public static void main(String[] args) {
    Logger logger = LoggerFactory.getLogger(HadoopFsSupportSparkJava.class);

    PipelineContext pipelineContext = new PipelineContext();
    String catalogHrn =
        pipelineContext.getConfig().getInputCatalogs().get("objectStoreCatalog").toString();
    String layerId = "parquet";

    SparkSession sparkSession =
        SparkSession.builder().appName("HadoopFsSupportSparkJava").getOrCreate();

    Dataset<Row> dataFrame = generateDataFrame(sparkSession);

    // Write data frame as parquet at random directory location in object store layer
    String parquetDir = "parquet-dir";
    logger.info(
        "Writing parquet files at: blobfs://" + catalogHrn + ":" + layerId + "/" + parquetDir);
    dataFrame.write().parquet("blobfs://" + catalogHrn + ":" + layerId + "/" + parquetDir);

    // Read parquet data from object store layer, select only those rows where index column has and
    // even value
    logger.info(
        "Reading parquet files at: blobfs://" + catalogHrn + ":" + parquetDir + "/" + parquetDir);
    Dataset<Row> parquetData =
        sparkSession
            .read()
            .parquet("blobfs://" + catalogHrn + ":" + layerId + "/" + parquetDir)
            .select("index")
            .where("tileId % 2 == 0")
            .sort("index");
    List<Row> parquetArray = parquetData.collectAsList();

    printDfArray(parquetArray);
  }

  // Generate test data in csv format
  private static Dataset<Row> generateDataFrame(SparkSession sparkSession) {
    String csvString =
        String.join(
            "\n",
            "index,tileId",
            "0,0",
            "1,1",
            "2,3",
            "3,6",
            "4,4",
            "5,5",
            "6,7",
            "7,2",
            "8,8",
            "9,10");

    ArrayList<String> data = new ArrayList<>(Arrays.asList(csvString.split("\n")));

    Dataset<String> csvData = sparkSession.createDataset(data, Encoders.STRING());

    return sparkSession
        .read()
        .option("header", true)
        .option("timestampFormat", "MM/dd/yyyy")
        .option("inferSchema", true)
        .csv(csvData);
  }

  // Print data frame
  private static void printDfArray(List<Row> rowArray) {
    for (Row row : rowArray) {
      for (int i = 0; i < row.length(); i++) {
        System.out.println(row.get(i));
      }
    }
  }
}

Run the application

To run the application locally, use the following command:

Run Scala
Run Java

mvn compile exec:java \
    -Dexec.mainClass=HadoopFsSupportSparkScala \
    -Dpipeline-config.file=pipeline-config.conf \
    -Dspark.master="local[*]" \
    -Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
    

mvn compile exec:java \
    -Dexec.mainClass=SparkPipelineJava \
    -Dpipeline-config.file=pipeline-config.conf \
    -Dspark.master="local[*]" \
    -Dhere.platform.data-client.request-signer.credentials.here-account.here-token-scope={{YOUR_PROJECT_HRN}}
    

Consider the set of -Dhere.platform.data-client.request-signer.credentials.here-account.* params. We specify this parameters to pass data from the credentials.properties file and {{YOUR_PROJECT_HRN}} to the Data Client Library. For more details about initializing the Data Client Library, see Set Your Credentials via Java System Properties.

Access the data using CLI

Using the HRN of the catalog, which you created in the catalog creation step, you can look at the contents of Object store layer:

olp catalog layer object list {{YOUR_CATALOG_HRN}} parquet --key parquet-dir

The CLI output should be similar to (the following is sample output, not exact values):

name                                    keyType                       lastModified                                      size                
parquet-dir/_SUCCESS                    object                        2021-01-12T19:23:47Z                              0                   
parquet-dir/part-00000-5bf23e5f-53be-4c83-917c-69ca6e042934-c000.snappy.parquet object                        2021-01-12T19:23:39Z                              353                 
...

You can download the data stored in the Object store layer referenced by a certain key:

olp catalog layer object get {{YOUR_CATALOG_HRN}} parquet --key parquet-dir/part-00000-5bf23e5f-53be-4c83-917c-69ca6e042934-c000.snappy.parquet

Further information

For additional details on the topics covered in this tutorial, you can refer to the following sources:

results matching ""

    No results matching ""