Interface with the Data API via Spark

When you use this processing library, you implement Compilers, mix in the InputLayers and OutputLayers traits. These interfaces define data structures for the catalog and layer IDs. By implementing these interfaces, you tell the processing library which input catalogs and input layers each compiler requires. The processing library queries the input catalogs accordingly and provides the metadata to the compiler. In addition, the processing library requires each compiler to specify the output layers that it produces.

The Data Processing Library uses this information to implement incremental publishing via the Publisher, where output layers specified in each compiler are queried before publishing, to detect:

  • which partitions contain new payloads
  • which partitions are unchanged
  • which partitions are deleted

Since there is only one output catalog, it is not necessary to specify its identifier in application.conf or when you run the application. However, partition keys contain a catalog identified and you have to use the Default.OutCatalogId value when your compiler products output keys.

Input layers and catalogs can be shared across DriverTasks; the processing library optimizes this by querying the Data API only once. However, each layer of the output catalog can be produced by one task only. This is a requirement to implement incremental publishing: the Publisher collects all the metadata of a layer candidate to be published in one single place to perform the comparison, the conditional upload, and the multipart commit properly.

It is a mistake to have two or more tasks specify the same output layer, as the layer would be fully overwritten by the second task, resulting in an invalid output.

The following sections describe important internals of the Catalog and Publisher, which the Driver operates. While you do not need to directly operate these internals, this information is necessary to understand how input catalogs are accessed and how payloads are pushed to the output catalog.

Query a Catalog

The package contains APIs to access catalogs via Spark RDDs.

The Catalog trait provides convenient access to a Data API catalog via Spark.

The following operations are supported, via the Data Client Library:

  • Snapshot queries: construct an RDD of all the metadata of the catalog at a given version, eventually restricted to a set of layers.
  • Change queries: construct an RDD of changed metadata between two versions, eventually restricted to a set of layers.
  • Commit: regroup (Spark coalesce) an RDD of (updated) metadata into a fixed number of parts, upload them in parallel, and perform a multipart commit.
  • Configuration: easily access catalog configuration.

Most queries, except for configuration and queries for the latest version, are performed in parallel by Spark worker nodes. This ensures that metadata is not concentrated in the Driver, preventing a bottleneck that would hinder scalability.

Publish and Commit to a Catalog

The package provides higher-level functionality to publish the output payloads at the end of data processing and then commit the result. The Publisher class provides two methods to publish the output of compilation: full snapshot publishing and incremental publishing. In both cases, the Publisher requires the following input:

  • an RDD of output keys and payloads, which are the candidates to be published
  • an RDD of metadata for the output catalog, which contains what was already published

The Publisher performs the following steps:

  1. Joins the payloads through the keys (partition + layer), that are candidates to be published with the metadata already published.
  2. Calculates the hashes of the payloads.
  3. Discards all the payloads if their hashes correspond to those already published; discarding all output data that has not changed.
  4. Uploads all the payloads that are actually new via an Uploader, generating new metadata entries for them.
  5. Returns an RDD of metadata to be committed to the Catalog.

However, the two publishing methods vary in that:

  • Full snapshot publishing deletes each entry of the output catalog which was not explicitly provided in the input, in the resulting commit.
  • Incremental publishing does not modify partitions that were not provided in the output catalog.

Changes provided to the Publisher, either a new partition content or a delete request, are applied on top of the existing partitions.

The publisher merely performs incremental publishing based on hash differences, it is not an incremental compiler.

In both cases:

  • Payloads of newly-introduced keys are uploaded and committed as new metadata.
  • Payloads of keys already present in the output are discarded if not modified, otherwise they are uploaded and committed as changed metadata.
  • Empty payloads of keys already present in the output become deleted metadata.

It's important to note that all of this processing takes place in Spark worker nodes: hash calculation and comparison, eventual uploading, generation of the metadata to be committed. The RDD with the resulting metadata can then be passed to the Catalog for the actual commit.

State Layer

The library requires the output catalog to have an additional layer configured with generic partitioning scheme, for internal use. Applications cannot publish data to this layer. The layer ID is configurable but, its default name is: state layer.

The state layer is used by stateful compilation patterns to persist some RDDs and retrieve them on the next run.

Typically, within these RDDs, the input-output dependency graph is persisted. This graph specifies which input partition affects which output partition. This information is stored per-DriverTask and is needed in incremental compilation to identify which output partitions are candidates to be recompiled and republished.

Additionally, Fingerprints, that are required to guarantee the correctness of incremental runs, are also stored in this layer.

results matching ""

    No results matching ""