EMR Spark Cluster

In the EMR Spark approach, all the Spark jobs are executed on an Amazon EMR cluster.

Requirements

  • The machine must have a public IPv4 address so the access rules in the AWS firewall can be created.
  • The user must have permissions on his AWS account to create IAM roles and policies. The role "DevOps" is recommended.

Deployment Steps

Activate the SDK conda environment:

conda activate olp-sdk-for-python-1.12-env

Initialize the EMR workspace which contains the global properties as well as your keypair.

For Linux/MacOS:

emr-init

For Windows:

emr-init.lnk

Edit the generated emr.env providing your AWS and repository credentials.

For Linux/MacOS:

nano ~/.here/emr/emr.env

Sample emr.env file:

#!/usr/bin/env bash

# Credentials variables
export DEFAULT_AWS_ACCESS_KEY="your AWS access key"
export DEFAULT_AWS_ACCESS_KEY_SECRET="your AWS access key secret"
export DEFAULT_HERE_USER="your HERE maven repository user"
export DEFAULT_HERE_PASSWORD="your HERE maven repository password"

# Environment variables
export DEFAULT_EMR_CORES="2"
export DEFAULT_EMR_VERSION="emr-5.24.0"
export DEFAULT_EMR_MASTER_TYPE="m4.large"
export DEFAULT_EMR_WORKER_TYPE="m4.2xlarge"
export DEFAULT_TAG_TEAM="My Team"
export DEFAULT_TAG_PROJECT="My Project"
export DEFAULT_TAG_OWNER="Me"
export DEFAULT_TAG_ENV="PoC"
export DEFAULT_AWS_REGION="cn-northwest-1"

For Windows:

notepad.exe %USERPROFILE%\.here\emr\emr.bat

Sample emr.bat file:

REM Credentials variables
set DEFAULT_AWS_ACCESS_KEY=your-AWS-access-key
set DEFAULT_AWS_ACCESS_KEY_SECRET=your-AWS-access-key-secret
set DEFAULT_HERE_USER=your-HERE-maven-repository-user
set DEFAULT_HERE_PASSWORD=your-HERE-maven-repository-password

REM Environment variables
set DEFAULT_EMR_CORES=2
set DEFAULT_EMR_VERSION=emr-5.24.0
set DEFAULT_EMR_MASTER_TYPE=m4.large
set DEFAULT_EMR_WORKER_TYPE=m4.2xlarge
set DEFAULT_TAG_TEAM=My-Team
set DEFAULT_TAG_PROJECT=My-Project
set DEFAULT_TAG_OWNER=Me
set DEFAULT_TAG_ENV=PoC
set DEFAULT_AWS_REGION=cn-northwest-1

Provision the EMR cluster:

For Linux/MacOS:

emr-provision -ns <custom-single-word>

For Windows:

emr-provision.lnk -ns <custom-single-word>

Note

<custom-single-word> is a suffix added to AWS resource names to avoid collisions. It should contain alphanumeric characters and hyphens only.

After successful provisioning, you should see a message similar to:

Apply complete! Resources: 20 added, 0 changed, 0 destroyed.

Outputs:

emr_master_public_dns = ec2-3-16-25-189.cn-northwest-1.compute.amazonaws.com.cn

Environment up and running, fully operational!

Access your Livy session list here:

>> http://ec2-3-16-25-189.cn-northwest-1.compute.amazonaws.com.cn:8998

Access the YARN Resource Manager here:

>> http://ec2-3-16-25-189.cn-northwest-1.compute.amazonaws.com.cn:8088

You can use this bucket to upload and process data

>> s3://spark-emrlab-bucket-lab

Within Jupyter, create a notebook, then select one of Python3 kernels and add the following cells and add your credentials from credentials.properties in placeholder for property spark.driver.extraJavaOptions and spark.executor.extraJavaOptions:

Cell 1

%load_ext sparkmagic.magics

Cell 2

%%spark config
{
  "driverMemory": "2G",
  "executorMemory": "4G",
  "executorCores": 2,
  "conf": {
    "spark.driver.extraJavaOptions": "-Dcom.here.platform.analytics.ds.schema.olp-artifact-service.env.artifact-prod.url=https://artifact.api.platform.hereolp.cn/v1 -Dhere.platform.data-client.endpoint-locator.discovery-service-env=here-cn,-Dhere.platform.data-client.request-signer.credentials.here-account.here-token-endpoint-url=<here.account.token.endpoint> -Dhere.platform.data-client.request-signer.credentials.here-account.here-client-id=<here.account.clientid> -Dhere.platform.data-client.request-signer.credentials.here-account.here-access-key-id=<here.access.key.id> -Dhere.platform.data-client.request-signer.credentials.here-account.here-access-key-secret=<here.access.key.secret>",
    "spark.executor.extraJavaOptions": "-Dcom.here.platform.analytics.ds.schema.olp-artifact-service.env.artifact-prod.url=https://artifact.api.platform.hereolp.cn/v1 -Dhere.platform.data-client.endpoint-locator.discovery-service-env=here-cn,-Dhere.platform.data-client.request-signer.credentials.here-account.here-token-endpoint-url=<here.account.token.endpoint> -Dhere.platform.data-client.request-signer.credentials.here-account.here-client-id=<here.account.clientid> -Dhere.platform.data-client.request-signer.credentials.here-account.here-access-key-id=<here.access.key.id> -Dhere.platform.data-client.request-signer.credentials.here-account.here-access-key-secret=<here.access.key.secret>",
    "spark.scheduler.mode": "FAIR",
    "spark.executor.instances": 2,
    "spark.dynamicAllocation.enabled": "true",
    "spark.shuffle.service.enabled": "true",
    "spark.dynamicAllocation.executorIdleTimeout": "60s",
    "spark.dynamicAllocation.cachedExecutorIdleTimeout": "60s",
    "spark.dynamicAllocation.minExecutors": 2,
    "spark.dynamicAllocation.maxExecutors": 4,
    "spark.dynamicAllocation.initialExecutors": 2,
    "spark.jars.ivySettings": "/var/lib/spark/.here/ivy.settings.xml",
    "spark.driver.userClassPathFirst": "false",
    "spark.executor.userClassPathFirst": "false",
    "spark.jars.packages": "com.here.olp.util:mapquad:4.0.13,com.here.platform.location:location-compilation-core_2.11:0.20.184,com.here.platform.location:location-core_2.11:0.20.184,com.here.platform.location:location-inmemory_2.11:0.20.184,com.here.platform.location:location-integration-here-commons_2.11:0.20.184,com.here.platform.location:location-integration-optimized-map_2.11:0.20.184,com.here.platform.location:location-data-loader-standalone_2.11:0.20.184,com.here.platform.location:location-spark_2.11:0.20.184,com.here.platform.location:location-compilation-here-map-content_2.11:0.20.184,com.here.platform.location:location-examples-utils_2.11:0.4.115,com.here.schema.sdii:sdii_archive_v1_java:2.0.1,com.here.sdii:sdii_message_v3_java:4.0.1,com.here.sdii:sdii_message_list_v3_java:4.0.1,com.here.schema.rib:lane-attributes_v2_scala:2.33.0,com.here.schema.rib:road-traffic-pattern-attributes_v2_scala:2.33.0,com.here.schema.rib:advanced-navigation-attributes_v2_scala:2.33.0,com.here.schema.rib:cartography_v2_scala:2.33.0,com.here.schema.rib:adas-attributes_v2_scala:2.33.0,com.typesafe.akka:akka-actor_2.11:2.5.11,com.beachape:enumeratum_2.11:1.5.13,com.github.ben-manes.caffeine:caffeine:2.6.2,com.github.cb372:scalacache-caffeine_2.11:0.24.3,com.github.cb372:scalacache-core_2.11:0.24.3,com.github.os72:protoc-jar:3.6.0,com.google.protobuf:protobuf-java:3.6.1,com.iheart:ficus_2.11:1.4.3,com.typesafe:config:1.3.3,org.apache.logging.log4j:log4j-api-scala_2.11:11.0,org.typelevel:cats-core_2.11:1.4.0,org.typelevel:cats-kernel_2.11:1.4.0,org.apache.logging.log4j:log4j-api:2.8.2,com.here.platform.data.client:spark-support_2.11:0.5.30,com.here.platform.data.client:data-client_2.11:0.5.30,com.here.platform.data.client:client-core_2.11:0.5.30,com.here.platform.data.client:hrn_2.11:0.1.614,com.here.platform.data.client:data-engine_2.11:0.5.30,com.here.platform.data.client:blobstore-client_2.11:0.5.30,com.here.account:here-oauth-client:0.4.13,com.here.platform.analytics:spark-ds-connector-deps_2.11:0.6.15,com.here.platform.analytics:spark-ds-connector_2.11:0.6.15",
    "spark.jars.excludes": "com.google.protobuf:protobuf-java,com.here.*:*_proto,org.json4s:*,org.apache.spark:spark-core_2.11,org.apache.spark:spark-sql_2.11,org.apache.spark:spark-streaming_2.11,org.apache.spark:spark-launcher_2.11,org.apache.spark:spark-network-shuffle_2.11,org.apache.spark:spark-unsafe_2.11,org.apache.spark:spark-network-common_2.11,org.apache.spark:spark-tags_2.11,org.scala-lang:scala-library,org.scala-lang:scala-compiler,org.scala-lang.modules:scala-parser-combinators_2.11,org.scala-lang.modules:scala-java8-compat_2.11,org.scala-lang:scala-reflect,org.scala-lang:scalap,com.fasterxml.jackson.core:jackson-*"
  }
}

Cell 3

# For a Scala Spark session
%spark add -s scala-spark -l scala -u <PUT YOUR LIVY ENDPOINT HERE> -k

# For a Pyspark Session
%spark add -s pyspark -l python -u <PUT YOUR LIVY ENDPOINT HERE> -k

Note

On EMR it is necessary to explicitly provide the credentials to read the platform data in the notebook. You will need your App ID and KeySecret to submit your job.

Use Your Credentials

Scala

%%spark
val accessKeyId = "<Your Access Key ID>"
val accessKeySecret = "<Your Access Key Secret>"
// Use the exact same value as below:
val tokenEndPointUrl = "https://elb.cn-northwest-1.account.hereapi.cn/oauth2/token"
val layerHRN = "<Some Layern HRN>"

val df = spark.read.option( "partitions", 900)
            .option("parallelism", 4)
            .option("tokenendpointurl", tokenEndPointUrl) 
            .option("accesskeyid", accessKeyId) 
            .option("accesskeysecret", accessKeySecret)
            .ds(layerHRN)

PySpark

%%spark
accessKeyId = "<Your Access Key ID>"
accessKeySecret = "<Your Access Key Secret>"
# Use the exact same value as below:
tokenEndPointUrl = "https://elb.cn-northwest-1.account.hereapi.cn/oauth2/token"
layerHRN = "<Some Layern HRN>"

df = spark.read.format("com.here.platform.analytics.ds")
            .option("partitions", 900)
            .option("parallelism", 4)
            .option("tokenendpointurl", tokenEndPointUrl)
            .option("accesskeyid", accessKeyId) 
            .option("accesskeysecret", accessKeySecret)
            .option("layerhrn", layerHRN)
            .load()

Start coding your job!

After finishing your job, destroy the cluster to prevent getting charged for unused infrastructure.

For Linux/MacOS:

emr-deprovision

For Windows:

emr-deprovision.lnk

Deep Debugging

By default, internet access is restricted only to Livy and Yarn resource manager endpoints. If you want to explore the cluster logs and access the internal node machines you will need to open an SSH tunnel and connect. When you deploy a new cluster, we create an script command for you to open the SSH tunnel: For Linux/MacOS:

$ cd ~/.here/emr
$ ./emr-tunnel.sh

For Windows:

$ cd %USERPROFILE%\.here\emr
$ emr-tunnel.bat

Next, you will need to install foxy proxy in your web browser:

Then, depending on your web browser, load the foxy proxy configuration that we provide at:

  • For Chrome: ~/anaconda3/envs/<your_env>/lib/olp-emr/util/foxy-proxy-chrome.xml
  • For Firefox: ~/anaconda3/envs/<your_env>/lib/olp-emr/util/foxy-proxy-firefox.json

Next, you can activate Foxy proxy for all URLs or based on the patterns (See Foxy proxy for instructions). Now you will be able to access internal machine endpoints via your web browser.

Tutorial Notebooks

The tutorial notebooks for EMR are located in the folder:

For Linux/MacOS: $HOME/olp-sdk-for-python-1.12/tutorial-notebooks/emr.

For Windows: %USERPROFILE%\olp-sdk-for-python-1.12\tutorial-notebooks\emr.

You can start with the Getting Started notebook located at

For Linux/MacOS: $HOME/olp-sdk-for-python-1.12/tutorial-notebooks/GettingStarted.ipynb

For Windows: %USERPROFILE%\olp-sdk-for-python-1.12\tutorial-notebooks\GettingStarted.ipynb to get an overview of all tutorial notebooks.

Use Data SDK for Java and Scala Jars on AWS EMR Jupyter Notebooks

If you want to use Data SDK for Java and Scala jars on EMR Notebooks, you can configure the existing EMR cluster created with the Deployment Steps mentioned above.

Start a EMR Notebooks instance and within EMR Notebooks, create a notebook, then select one of required kernels and add the following cells:

Cell 1

%load_ext sparkmagic.magics

Cell 2

%manage_spark

Next, click on the create session tab in the output widget of the above commands and then paste the below json in the Properties input textbox.

Note

Add your credentials in placeholder for property spark.driver.extraJavaOptions and spark.executor.extraJavaOptions from credentials.properties. Check the status of Spark Context and check the livy session by going to {EMR_Master_Node_IP}:8998.

{
  "driverMemory": "2G",
  "executorMemory": "4G",
  "executorCores": 2,
  "conf": {
    "spark.driver.extraJavaOptions": "-Dcom.here.platform.analytics.ds.schema.olp-artifact-service.env.artifact-prod.url=https://artifact.api.platform.hereolp.cn/v1 -Dhere.platform.data-client.endpoint-locator.discovery-service-env=here-cn,-Dhere.platform.data-client.request-signer.credentials.here-account.here-token-endpoint-url=<here.account.token.endpoint> -Dhere.platform.data-client.request-signer.credentials.here-account.here-client-id=<here.account.clientid> -Dhere.platform.data-client.request-signer.credentials.here-account.here-access-key-id=<here.access.key.id> -Dhere.platform.data-client.request-signer.credentials.here-account.here-access-key-secret=<here.access.key.secret>",
    "spark.executor.extraJavaOptions": "-Dcom.here.platform.analytics.ds.schema.olp-artifact-service.env.artifact-prod.url=https://artifact.api.platform.hereolp.cn/v1 -Dhere.platform.data-client.endpoint-locator.discovery-service-env=here-cn,-Dhere.platform.data-client.request-signer.credentials.here-account.here-token-endpoint-url=<here.account.token.endpoint> -Dhere.platform.data-client.request-signer.credentials.here-account.here-client-id=<here.account.clientid> -Dhere.platform.data-client.request-signer.credentials.here-account.here-access-key-id=<here.access.key.id> -Dhere.platform.data-client.request-signer.credentials.here-account.here-access-key-secret=<here.access.key.secret>",
    "spark.scheduler.mode": "FAIR",
    "spark.executor.instances": 2,
    "spark.dynamicAllocation.enabled": "true",
    "spark.shuffle.service.enabled": "true",
    "spark.dynamicAllocation.executorIdleTimeout": "60s",
    "spark.dynamicAllocation.cachedExecutorIdleTimeout": "60s",
    "spark.dynamicAllocation.minExecutors": 2,
    "spark.dynamicAllocation.maxExecutors": 4,
    "spark.dynamicAllocation.initialExecutors": 2,
    "spark.jars.ivySettings": "/var/lib/spark/.here/ivy.settings.xml",
    "spark.driver.userClassPathFirst": "false",
    "spark.executor.userClassPathFirst": "false",
    "spark.jars.packages": "com.here.olp.util:mapquad:4.0.13,com.here.platform.location:location-compilation-core_2.11:0.20.184,com.here.platform.location:location-core_2.11:0.20.184,com.here.platform.location:location-inmemory_2.11:0.20.184,com.here.platform.location:location-integration-here-commons_2.11:0.20.184,com.here.platform.location:location-integration-optimized-map_2.11:0.20.184,com.here.platform.location:location-data-loader-standalone_2.11:0.20.184,com.here.platform.location:location-spark_2.11:0.20.184,com.here.platform.location:location-compilation-here-map-content_2.11:0.20.184,com.here.platform.location:location-examples-utils_2.11:0.4.115,com.here.schema.sdii:sdii_archive_v1_java:2.0.1,com.here.sdii:sdii_message_v3_java:4.0.1,com.here.sdii:sdii_message_list_v3_java:4.0.1,com.here.schema.rib:lane-attributes_v2_scala:2.33.0,com.here.schema.rib:road-traffic-pattern-attributes_v2_scala:2.33.0,com.here.schema.rib:advanced-navigation-attributes_v2_scala:2.33.0,com.here.schema.rib:cartography_v2_scala:2.33.0,com.here.schema.rib:adas-attributes_v2_scala:2.33.0,com.typesafe.akka:akka-actor_2.11:2.5.11,com.beachape:enumeratum_2.11:1.5.13,com.github.ben-manes.caffeine:caffeine:2.6.2,com.github.cb372:scalacache-caffeine_2.11:0.24.3,com.github.cb372:scalacache-core_2.11:0.24.3,com.github.os72:protoc-jar:3.6.0,com.google.protobuf:protobuf-java:3.6.1,com.iheart:ficus_2.11:1.4.3,com.typesafe:config:1.3.3,org.apache.logging.log4j:log4j-api-scala_2.11:11.0,org.typelevel:cats-core_2.11:1.4.0,org.typelevel:cats-kernel_2.11:1.4.0,org.apache.logging.log4j:log4j-api:2.8.2,com.here.platform.data.client:spark-support_2.11:0.5.30,com.here.platform.data.client:data-client_2.11:0.5.30,com.here.platform.data.client:client-core_2.11:0.5.30,com.here.platform.data.client:hrn_2.11:0.1.614,com.here.platform.data.client:data-engine_2.11:0.5.30,com.here.platform.data.client:blobstore-client_2.11:0.5.30,com.here.account:here-oauth-client:0.4.13,com.here.platform.analytics:spark-ds-connector-deps_2.11:0.6.15,com.here.platform.analytics:spark-ds-connector_2.11:0.6.15",
    "spark.jars.excludes": "com.google.protobuf:protobuf-java,com.here.*:*_proto,org.json4s:*,org.apache.spark:spark-core_2.11,org.apache.spark:spark-sql_2.11,org.apache.spark:spark-streaming_2.11,org.apache.spark:spark-launcher_2.11,org.apache.spark:spark-network-shuffle_2.11,org.apache.spark:spark-unsafe_2.11,org.apache.spark:spark-network-common_2.11,org.apache.spark:spark-tags_2.11,org.scala-lang:scala-library,org.scala-lang:scala-compiler,org.scala-lang.modules:scala-parser-combinators_2.11,org.scala-lang.modules:scala-java8-compat_2.11,org.scala-lang:scala-reflect,org.scala-lang:scalap,com.fasterxml.jackson.core:jackson-*"
  }
}

Thank you for choosing the HERE Data SDK for Python. After the setup, kindly consider filling out this short 1-minute survey to help us improve the setup experience.


results matching ""

    No results matching ""