Interfacing with the Pipeline Process

The HERE platform pipeline manages user pipelines on the runtime environment chosen for that pipeline (using either the Flink or Spark framework) associated with the executable Pipeline Version.

This article describes the parameters and configuration files passed by the pipeline to the Pipeline Version submitted as a job.

You can control the parameters and the content of configuration files by configuring the details of each Pipeline Version via the pipeline REST API, the OLP CLI, or interactively through the platform's Portal GUI.

Running Locally, the Same Interface

When running a pipeline locally, as on a development machine, you have to mimic the interface described in this document by passing parameters and files manually.

The HERE platform SDK uses Java and Scala Maven archetypes to create a basic pipeline project. This project contains sample files and scripts that simplify developing and running pipelines locally.

For local development, you may include the pipeline-config.conf and pipeline-job.conf in the process classpath or their location (path) on the development machine, specified via the pipeline-config.file and pipeline-job.file system properties. Details in these files will change when moving to a production environment.

Entry and Exit Points

You specify the class that represents the entry point of the pipeline in the Pipeline Template. This class is used to select the entry point when a job is submitted to Spark of Flink. A new object is created by the JVM and execution starts at the main method.

Stream processing pipeline applications don't usually terminate; but they may be terminated. Batch processing pipelines naturally terminate when processing is complete. When the pipeline application terminates, it returns an error code. If the error code is 0, the application is considered terminated successfully. If the error code is non-zero, the application is considered terminated due to errors and the Pipeline service may report this event in the job description or may try to submit the job again.

System Properties

The following JVM system properties are set by the Pipeline API when a pipeline is submitted as a new job. They can be obtained via the System.getProperties() method, or the equivalent.

  • olp.pipeline.id: Identifier of the Pipeline, as defined in the Pipeline API
  • olp.pipeline.version.id: Identifier of the Pipeline Version, as defined in the Pipeline API
  • olp.deployment.id: Identifier of the Job, as defined in the Pipeline API
  • olp.realm: The customer realm

Below are additional properties paths used by the platform:

  • env.api.lookup.host
  • akka.*
  • here.platform.*
  • com.here.*

In addition to these, other properties are set by the system to configure the runtime environment. These include Spark or Flink configuration parameters associated with the Pipeline Version configuration that you selected. The actual details are specific to the environment chosen and its version. Because such details may change, they are considered implementation-specific and left to your determination.

System properties specified in this section are visible from the main user process only. These system properties are not necessarily replicated to the JVMs that run in worker nodes of the cluster.

Pipeline Configuration

Configuration of the pipeline, as specified in the corresponding Pipeline Version, is passed via a file named pipeline-config.conf. This file is added to the classpath of the main user process.

Note

The format of the file is HOCON, a superset of JSON and Java properties. It can be parsed by the open-source Typesafe Config library of Lightbend.

If the pipeline is implemented using the Data Processing Library, parsing is handled automatically by the pipeline-runner package. This package also provides an application main to easily interface with the Pipeline service and to implement pipelines.

Example content of pipeline-config.conf:


    pipeline.config {
         billing-tag = "test-billing-tag"
         output-catalog { hrn = "hrn:here:data:::example-output" }
         input-catalogs {
             test-input-1 { hrn = "hrn:here:data:::example1" }
             test-input-2 { hrn = "hrn:here:data:::example2" }
             test-input-3 { hrn = "hrn:here:data:::example3" }
         }
     }

Where:

  • billing-tag specifies an optional tag to group billing entries for the pipeline.
  • output-catalog specifies the HRN that identifies the output catalog of the pipeline.
  • input-catalogs specifies one or more input catalogs for the pipeline. For each input catalog, its fixed identifier is provided together with the HRN of the actual catalog.

Pipeline implementations may bind to and distinguish between multiple input catalogs via the fixed identifiers. Fixed identifiers are defined in a Pipeline Template. An HRN is defined for each Pipeline Version so that the same Pipeline Template may be reused in multiple setups.

Batch Pipeline Job

Batch pipelines perform a specific job and then terminate. Stream pipelines don't perform any specific, time-constrained job and are instead continuously running. This section applies to batch pipelines only.

Batch pipelines process one or more versioned layer of one or more input catalog to produce one or more versioned layer in the single output catalog. Versioned layers form partitioned, consistent snapshots of datasets. When the pipeline is in the SCHEDULED state, the Pipeline API scheduler manages running the pipeline. It monitors the configured input catalogs and triggers a job submission to Spark when it detects that new data is published in any of the input catalogs.

The job passed to the pipeline describes which version of each input catalog should be processed. These are usually the more recent versions for each catalog.

A simple, but correct, batch pipeline may fetch all the data stored in all the interested input layers from the version described in the job, perform some processing, and publish the result to its own output layers, generating a new version of the output catalog. This event, in turn, may trigger other batch pipelines that use that catalog as input. The job description always contains a version number for each input catalog, whether that catalog has changed or not.

Complete reprocessing of the input is usually a resource-intensive operation. The scheduler also keeps track of previous jobs and which version of each input catalog was used at that time. This additional information is also included in the job description (see since-version parameter). Instead of reprocessing the whole input, a more advanced pipeline may process just what has changed since the previous run and simply update the output instead of regenerating it from scratch. Implementing such incremental processing is nontrivial. The Data Processing Library included in the HERE platform provides significant support to solve this problem so that users can focus on their business logic while the library takes care of the complexity of incremental processing.

Job description is passed via a file named pipeline-job.conf. This file is also in HOCON format and is added to the classpath of the main user process. The file is optional and present only for batch processing pipelines.

Example content of pipeline-job.conf:

    pipeline.job.catalog-versions {
        output-catalog { base-version = 42 }
        input-catalogs {
            test-input-1 {
                processing-type = "no_changes"
                version = 19
                }
            test-input-2 {
                processing-type = "changes"
                since-version = 70
                version = 75
                }
            test-input-3 {
                processing-type = "reprocess"
                version = 314159
            }
        }
    }

Where:

  • base-version of output-catalog indicates the already-existing version of the catalog on top of which new data should be published. This parameter will not be necessary in the future and will be removed. It is currently required to commit to the output catalog updated data.
  • input-catalogs contain, for each input, the version of that input that is the most up-to-date. This is the version that should be processed. In addition, information that specifies what has changed since the last time the job ran is also included. Catalogs can be distinguished via the same identifiers present in the pipeline configuration file.
  • processing-type describes what has changed in each input since the last successful run. The value can be no_changes, changes , and reprocess.

    • no_changes indicates that that input catalog has not changed since the last run.
    • changes indicates that that input catalog has changed. A second parameter since-version is included to indicate which version of that catalog was processed the last run.
    • reprocess does not specify whether that input catalog has changed or not. The pipeline is requested to reprocess that whole catalog instead of attempting any kind of incremental processing. This may be due to an explicit user request or to a system condition, such as the first time a pipeline runs.

Note

As of HERE platform Release 2.3, all of the parameters of the pipeline-job.conf file are optional. If the details are not specified, the pipeline will, by default, pick the latest input catalog versions and reprocess.

User Configuration

Both of the previous files are provided by the pipeline, which also defines their content. Users may also provide an additional custom configuration file to configure the Pipeline Version accordingly. The content of the file, which should follow the Java Properties file format, is fully defined by users. For example:

    # Configuration file for myexample pipeline
    myexample.threads = 5
    myexample.language = "en_US"
    myexample.processing.window = 300
    myexample.processing.mode = "stateful"
    myexample.processing.filterInvalid = true

The Pipeline API passes this as the application.properties file to the pipeline by adding it to the classpath of the main JVM process and making it available for use by the Typesafe Config library.

See Also

results matching ""

    No results matching ""