Compose an Incremental Processing Pipeline with DeltaSets
Note
DeltaSets
is a new feature of Data Processing Library and the API might change in future versions. DeltaSets
can only be used in Scala, not in Java projects.
DeltaSets are a new distributed processing abstraction provided by Data Processing Library. Similar to Spark RDDs, DeltaSets provide a functional interface for transforming data in a cluster, with transformations such as mapReduce
and filterByKey
. Their main difference from RDDs is that DeltaSet transformations can be computed incrementally if required.
DeltaSets allow you to build custom Compilation Patterns, which means that you can have compilers with as many resolveFn
, compileInFn
, and compileOutFn
functions as required by your particular application.
Design
The main processing abstraction is a DeltaSet[K, V]
, where K
is a type of key and V
is a type of value. The DeltaSet represents a collection of key-value pairs that is stored and transformed in a Spark cluster. A key can be associated with one value only.
-
K
— is often com.here.platform.data.processing.catalog.Partition.Key
, which is a key identifying a partition inside the platform catalog. However, K
can be any type that is Serializable and that has an implicit Ordering defined. Examples of these types include strings, integers, and tuples.
-
V
— is often com.here.platform.data.processing.catalog.Partition.Meta
that identifies data in the platform catalog, or a com.here.platform.data.processing.blobstore.Payload
with the actual data stored in the catalog. However, V
can be any type, even an integer or a string.
For example, reading the contents of the platform catalog layer result in a DeltaSet[Key, Meta]
.
A DeltaSet is always immutable, but transformations can be applied to it resulting in a transformed DeltaSet. For example, the transformation mapValues({x => x + 1})
can be used to transform a DeltaSet[Key, Int]
into a new DeltaSet[Key, Int]
in which all values are incremented by one.
Once a DeltaSet is transformed into DeltaSet[Key, Payload]
to contain the desired payloads for the output catalog, you can publish it. This results in a PublishedSet
, which you can then commit as a new version of the output catalog.
The transformations are always lazy, which means that they are only performed when you commit the output catalog. In other words, a DeltaSet is not evaluated until it is committed to the platform catalog.
Example: Copy a Layer
This example shows how to implement a pipeline that copies a layer from one catalog to another catalog.
The simplest way to use DeltaSets is to extend the DeltaSimpleSetup
in the application's Main
object. To add support for the standard configuration files and command line options for pipelines, in this example you also extend the PipelineRunner
trait (see Set up and Run the Driver), giving our Main
object the following skeleton:
import com.here.platform.data.processing.catalog.{Catalog, Layer}
import com.here.platform.data.processing.driver._
import com.here.platform.data.processing.driver.config.CompleteConfig
import com.here.platform.data.processing.driver.deltasets._
import com.here.platform.data.processing.driver.runner.pipeline.PipelineRunner
object Main extends PipelineRunner with DeltaSimpleSetup {
val applicationVersion: String = "1.0"
def setupSets(completeConfig: CompleteConfig, context: DeltaContext): Iterable[PublishedSet] = {
???
}
}
DeltaSimpleSetup
requires the Main
object to implement the setupSets
method, which defines the processing logic of the pipeline. Processing logic defined using DeltaSets can be structured in many different ways, typically in four phases:
- Query the
Key
and Meta
pairs from one or more input catalogs resulting in a DeltaSet[Key, Meta]
. - Retrieve the payloads corresponding to the metadata resulting in a
DeltaSet[Key, Payload]
. - Transform the data stored in the payloads and rewrite the keys to store the target catalog and the target layer.
- Publish the transformed payloads, resulting in a
PublishedSet
that is then committed to the output catalog.
In this example, you copy the payloads without modifying them, instead of transforming them in Step 3.
-
Query: To query the Key
and Meta
pairs from one or more input catalogs, the setupSets
method provides as an argument a DeltaContext
, which provides, among others, access to the input catalog.
import com.here.platform.data.processing.catalog.Partition._
val keyMetas: DeltaSet[Key, Meta] =
context.queryCatalogLayer(Catalog.Id("inCatalogA"), Layer.Id("inLayerA"))
-
Retrieve: To retrieve the Payload
s corresponding to the metadata, import DeltaSet transformations from the context, and get an instance of the Retriever
object for the corresponding catalog. Then, keyMetas
can be transformed.
import com.here.platform.data.processing.blobstore.Payload
import context.transformations._
val retriever = context.inRetriever(Catalog.Id("inCatalogA"))
val keyPayloads: DeltaSet[Key, Payload] =
keyMetas.mapValuesWithKey((key, meta) => retriever.getPayload(key, meta))
-
Process: To rewrite the keys to store the target catalog and the target layer, use the mapKeys
operation.
val rewrittenKeys: DeltaSet[Key, Payload] =
keyPayloads.mapKeys(
OneToOne(
_.copy(catalog = Default.OutCatalogId, layer = Layer.Id("outLayerA")),
_.copy(catalog = Catalog.Id("inCatalogA"), layer = Layer.Id("inLayerA"))
),
PreservesPartitioning
)
-
Publish: To publish the transformed payloads, a DeltaSet[Key, Payload]
provides a publish
operation, which takes a set of layers to publish to as arguments.
val result: PublishedSet = rewrittenKeys.publish(Set(Layer.Id("inLayerA")))
Iterable(result)
The complete example is shown below:
import com.here.platform.data.processing.catalog.{Catalog, Layer, Partition}
import com.here.platform.data.processing.driver._
import com.here.platform.data.processing.driver.config.CompleteConfig
import com.here.platform.data.processing.driver.deltasets._
import com.here.platform.data.processing.driver.runner.pipeline.PipelineRunner
object Main extends PipelineRunner with DeltaSimpleSetup {
val applicationVersion: String = "1.0"
def setupSets(completeConfig: CompleteConfig, context: DeltaContext): Iterable[PublishedSet] = {
import context.transformations._
val retriever = context.inRetriever(Catalog.Id("inCatalogA"))
Iterable(
context
.queryCatalogLayer(Catalog.Id("inCatalogA"), Layer.Id("inLayerA"))
.mapValuesWithKey((key, meta) => retriever.getPayload(key, meta))
.mapKeys(
OneToOne[Partition.Key, Partition.Key](
_.copy(catalog = Default.OutCatalogId, layer = Layer.Id("outLayerA")),
_.copy(catalog = Catalog.Id("inCatalogA"), layer = Layer.Id("inLayerA"))
),
PreservesPartitioning
)
.publish(Set(Layer.Id("outLayerA")))
)
}
}
This section explains the transformations currently available on DeltaSets.
Publish Payloads
Every DeltaSet must eventually be turned into a set of payloads which contain the data to be published in the output catalog. The publish
operation is available on any DeltaSet of type DeltaSet[Key, Payload]
and this operation uploads all payloads to the Data API. The result of the publish
operation is a PublishedSet
that can only be returned from setupSets
, but cannot be transformed any further. This is described in the example below.
val payloads: DeltaSet[Key, Payload] = ???
val published: PublishedSet = payloads.publish(Set(Layer.Id("outLayer")))
You cannot publish
to the same layer multiple times. If you need to individually publish parts of an output layer, use publishPart
instead.
(Advanced) Multi-part Publish
Use publishPart
instead of publish
to upload disjoint parts of an output layer (or a set of output layers) and if you need to read back each part individually. Each output key must be deterministically assigned to a single part, through a PublishedPartMapper
.
In the example below, a PartMapperByLayer
is used to map keys to different publish parts based on their zoom level. Lower zoom level partitions are first published, and then read back to build aggregated partitions at a higher zoom level. The unionPublishedParts
operation is finally used to combine all parts together into the final PublishedSet
:
val intermediate: DeltaSet[Key, Payload] = ???
val partMapper = PartMapperByLevel(Set(12, 11))
val firstPart: PublishedPart =
intermediate.publishPart(Set(Layer.Id("multi-level-layer")),
partMapper,
partMapper.partForLevel(12))
val secondPart: PublishedPart = firstPart
.readBack()
.mapGroup({
case (key, meta) => (key.copy(partition = key.partition.ancestors.head), (key, meta))
}, context.defaultPartitioner)
.mapValues { partitions: Iterable[(Key, Meta)] =>
val payload: Payload = Aggregator.aggregate(partitions)
payload
}
.publishPart(Set(Layer.Id("multi-level-layer")), partMapper, partMapper.partForLevel(11))
context.unionPublishedParts(Seq(firstPart, secondPart))
Transform Values
Use mapValues
and mapValuesWithKey
to transform the values inside of a DeltaSet. Both these operations do not modify the keys in the DeltaSet, hence they do not need to shuffle data between worker nodes in the cluster when they are run. Consequently, these operations are very efficient.
In the following example, the values in a DeltaSet[Key, Int]
are incremented by one using mapValues
.
val integers: DeltaSet[Key, Int] = ???
val incrementedIntegers: DeltaSet[Key, Int] = integers.mapValues(_ + 1)
mapValuesWithKey
is a similar operation, but it also provides the key to the transformation function. See the copy-a-layer example for details on using this operation.
Transform Keys and Values
To transform both keys and values simultaneously, or to transform the key in a key-value pair based on its value, use one of these transformations: mapUnique
, mapGroup
, or mapReduce
. However, if performance is a concern, then consider using either mapValues
, mapValuesWithKey
or one of the mapKeys*
operations.
mapUnique
transforms a key-value pair into a new key-value pair, as long as no duplicate keys are produced. If duplicate keys are produced, the transformation will fail at run-time. Since data is shuffled between nodes in the cluster, a partitioner must be explicitly provided as an argument.
For example, in the following snippet, keys and values are split in two layers:
- Positive values: set the layer to
positive_values
. - Negative values: set the layer to
negative_values
.
val deltaSet1: DeltaSet[Key, Int] = ???
val split: DeltaSet[Key, Int] =
deltaSet1.mapUnique(
mapFn = { (key, i) =>
if (i >= 0) {
(key.copy(layer = Layer.Id("positive_values")), i)
} else {
(key.copy(layer = Layer.Id("negative_values")), i)
}
},
partitioning = context.defaultPartitioner
)
mapGroup
transforms a key-value pair into a new key-value pair. The values of duplicate keys are grouped, such that the resulting DeltaSet assigns each key a collection of values.
For example, in the following snippet, metadata associated with HERE tile partitions are mapped to and grouped by their parent HERE tile.
val deltaSet1: DeltaSet[Key, Meta] = ???
val deltaSet2: DeltaSet[Key, Iterable[Meta]] =
deltaSet1.mapGroup(
mapFn = {
case (key, value) =>
(key.copy(partition = key.partition.parent.getOrElse(key.partition)), value)
},
partitioning = context.defaultPartitioner
)
mapReduce
transforms a key-value pair into a new version that reduces the values of duplicate keys, using the reduce function that you provide. This reduce function must combine two values into one. Using mapReduce
is more efficient than using mapGroup
and then reducing each value using mapValues
.
For example, in the following snippet, all integers associated with HERE tile partitions are mapped to their parent HERE tile and the value of each output HERE tile is reduced to its sum.
val deltaSet1: DeltaSet[Key, Int] = ???
val deltaSet2: DeltaSet[Key, Int] =
deltaSet1.mapReduce(
mapFn = {
case (key, value) =>
(key.copy(partition = key.partition.parent.getOrElse(key.partition)), value)
},
reduceFn = _ + _,
partitioning = context.defaultPartitioner
)
These mapUnique
, mapGroup
, and mapReduce
operations take a key-value mapping function to produce exactly one key. In contrast, flatMapUnique
, flatMapGroup
, and flatMapReduce
operations take a mapping function to produce zero or more keys.
Transform Keys
DeltaSets provide a set of transformations for modifying the keys in a DeltaSet without taking into account values. These transformations are very efficient and should be preferred over transformations based on keys and values if possible.
For example, mapKeys
is an efficient operation to transform just the keys in a DeltaSet without reading or writing the values. The key transformation must be 1-to-1, that is, every key in the input DeltaSet is mapped to a unique key in the output DeltaSet. To ensure that the transformation is 1-to-1 and to allow efficient incremental processing, mapKeys
requires you to specify, both the key mapping function mapFn
, and the inverse of that function, inverseFn
:
val partitioner = NameHashPartitioner(10)
val input: DeltaSet[Key, Int] = ???
val incrementedIntegers: DeltaSet[Key, Int] =
input.mapKeys(
OneToOne(
mapFn = key => key.copy(layer = Layer.Id("outLayerA")),
inverseFn = key => key.copy(layer = Layer.Id("inLayerA"))
),
partitioner
)
See the copy-a-layer example for more details on how to use this operation.
If the DeltaSet contains a key x
, for which inverseFn(mapFn(x)) != x
, the transformation will fail at run-time. The inverse function can be called on keys that are not produced by mapFn
, so it must return a correct result for any key supplied to it. inverseFn
can be defined as a partial function if it is defined only on a subset of the possible keys.
val partitioner = HashPartitioner[String](context.defaultParallelism)
val input: DeltaSet[Int, String] = ???
val stringKeyed: DeltaSet[String, String] =
input.mapKeys(
OneToOne[Int, String](
mapFn = _.toString,
inverseFn = {
case s: String if s forall Character.isDigit => s.toInt
}
),
partitioner
)
If it is inconvenient or impossible to specify the inverse transformation, consider using the more expensive mapUnique
transformation explained below.
flatMapKeys
is a transformation which maps each input key to zero or more output keys (1-to-many). Similar to mapKeys
an inverse function must be passed to show that each output key is the result of exactly one input key.
When mapping multiple input keys to the same output key (1-to-1 or 1-to-n), the set of values can either be grouped or reduced, just like in key and value transformations. DeltaSets provide four transformations to cover all combinations of grouping/reducing and n-to-1/m-to-n: mapKeysGroup
and flatMapKeysGroup
group all values in a collection, whereas mapKeysReduce
and flatMapKeysReduce
apply a reduce function to all values. See DirectMToNCompiler migration for an example of how to use flatMapKeysGroup
.
Filter Data
filterByKey
filters key-value pairs from a DeltaSet based only on their keys. This transformation runs very efficiently and does not require any data exchange between nodes in the Spark cluster.
In the following snippet, administrative_places
layer from the hmc
catalog is queried and all partition keys with a generic partition name that starts with "1469256839" are filtered. This corresponds to reading all administrative places in Australia if the hmc
catalog ID points to the HERE Map Content catalog.
import com.here.platform.data.processing.catalog.Partition.Generic
val filteredInput: DeltaSet[Key, Meta] =
context
.queryCatalogLayer(Catalog.Id("hmc"), Layer.Id("administrative_places"))
.filterByKey {
case Key(catalog, layer, Generic(name)) =>
name.startsWith("1469256839")
case _ => false
}
For convenience, you can also use Partition Key Filters in a filterByKey
operation. For example, to filter only those partitions that have a HERE tile as a partition name with the HERE tile belonging to a bounding box around Berlin, use the following filterByKey
operation.
val filteredInput: DeltaSet[Key, Meta] =
context
.queryCatalogLayer(Catalog.Id("hmc"), Layer.Id("road_attributes"))
.filterByKey(
BoundingBoxFilter(
south = 50.97656,
west = 11.95313,
north = 51.06445,
east = 12.04102
)
)
Partition Key Filters can also be defined from the configuration file, in the path here.platform.data-processing.deltasets.partitionKeyFilters
. The partition key filters defined in the configuration file will apply to all query
transformations and readBack
.
Join Data
join
is a DeltaSet transformation that takes two DeltaSets and produces a DeltaSet that contains, for each key contained in both DeltaSets, the pair of values associated with the key in each of the DeltaSets.
val integers: DeltaSet[Key, Int] = ???
val strings: DeltaSet[Key, String] = ???
val pairs: DeltaSet[Key, (Int, String)] = integers join strings
Other kinds of join transformations provided by DeltaSets are:
-
outerJoin
takes two DeltaSets and produces a DeltaSet that contains, for each key contained in either DeltaSet, the pair of values associated with the key in each of the DeltaSets. If a key is not associated value in one of the DeltaSets, the entry in the pair is set to None
. -
leftOuterJoin
takes two DeltaSets and produces a DeltaSet that contains, for each key contained in the left DeltaSet, the pair of values associated with the key in each of the DeltaSets. If a key is not associated value in the right DeltaSet, the entry in the pair is set to None
.
The following example shows a common use-case of joins: two layers, road-attributes
and topology-geometry
are queried from catalog hmc
(HERE Map Content). The keys in both resulting DeltaSets are rewritten to contain the same catalog and layer of the output catalog. Then, the outerJoin
is computed, resulting in a DeltaSet that contains the metadata for both layers. In this way, the contents of the partitions can be correlated.
val topology: DeltaSet[Key, Meta] =
context
.queryCatalogLayer(Catalog.Id("hmc"), Layer.Id("topology_geometry"))
.mapKeys(
OneToOne(
_.copy(catalog = Default.OutCatalogId, layer = Layer.Id("outLayerA")),
_.copy(catalog = Catalog.Id("hmc"), layer = Layer.Id("topology_geometry"))
),
context.defaultPartitioner
)
val roadAttributes: DeltaSet[Key, Meta] =
context
.queryCatalogLayer(Catalog.Id("hmc"), Layer.Id("road_attributes"))
.mapKeys(
OneToOne(
_.copy(catalog = Default.OutCatalogId, layer = Layer.Id("outLayerA")),
_.copy(catalog = Catalog.Id("hmc"), layer = Layer.Id("road_attributes"))
),
context.defaultPartitioner
)
val pairs: DeltaSet[Key, (Option[Meta], Option[Meta])] =
topology outerJoin roadAttributes
Note that join
is a stateful transformation, while outerJoin
and leftOuterJoin
are not stateful, which can make the latter two more efficient.
Union Data
disjointUnion
is a DeltaSet transformation that takes two input DeltaSets and produces an output DeltaSet that contains every key-value pair contained in either of the input DeltaSets. The operation throws an exception if there is a key that is contained in both input DeltaSets.
In the following snippet, you split an input DeltaSet into two DeltaSets with tiles at zoom level 10 and 12, respectively. Then, each DeltaSet is transformed separately using mapValues
into strings and the union of the results are stored in the variable combined
.
val input: DeltaSet[Key, Meta] = ???
val tilesAt10: DeltaSet[Key, String] =
input
.filterByKey {
case Key(_, _, t: HereTile) => t.quad.getZoomLevel == 10
case _ => false
}
.mapValues(???)
val tilesAt12: DeltaSet[Key, String] =
input
.filterByKey {
case Key(_, _, t: HereTile) => t.quad.getZoomLevel == 12
case _ => false
}
.mapValues(???)
val combined: DeltaSet[Key, String] =
tilesAt10 disjointUnion tilesAt12
In the snippet above, both input DeltaSets have the exact same type. They can also have different value types, as in the following snippet. In this case, the value type of the output DeltaSet is a common supertype of value types of the inputs. In the following snippet, the union of DeltaSet[Key, Int]
and DeltaSet[Key, String]
is typed as DeltaSet[Key, Any]
. This works because, in Scala, Any
is a supertype of both String
and Int
.
val integers: DeltaSet[Key, Int] = ???
val strings: DeltaSet[Key, String] = ???
val union: DeltaSet[Key, Any] = integers disjointUnion strings
The context allows the construction of a disjoint union of not only two, but two or more DeltaSet
s:
val integers1: DeltaSet[Key, Int] = ???
val integers2: DeltaSet[Key, Int] = ???
val integers3: DeltaSet[Key, Int] = ???
val union: DeltaSet[Key, Int] =
context.disjointUnion(List(integers1, integers2, integers3))
Dynamically Resolve References
mapValuesWithResolver
can be used to transform a DeltaSet of key-meta pairs, the subjects, and dynamically access other partitions, the references, during the transformation. It is an alternative to static reference resolution using resolveReferences
or a RefTreeCompiler
, which requires pre-computing all required references up-front. In contrast, dynamic reference resolution is more flexible, requires fewer lines of code and can be faster than static reference resolution, especially for complex reference structures.
mapValuesWithResolver
is similar to mapValuesWithKey
, however, the mapping function that is applied to each subject gets three arguments: the key and metadata of the subject, as well as a Resolver
, that determines the metadata for any key that may be referenced by the subject. Using the metadata of the subject and the references, you can retrieve the corresponding payloads.
The Resolver
uses one or more ResolutionStrategy
s to find the metadata that corresponds to a key. One such strategy is DirectQuery
, which directly requests the metadata via the Data API - which is simple but requires one network query for each metadata resolved. In the next section, you will see three other resolution strategies that are more efficient by downloading large sets of metadata at once.
In the following snippet, you resolve references from partitions in layer A
of catalog inA
to partitions in layer B
of catalog inA
. Each partition in layer A
references the name of a partition in layer B
. Typically, for example when processing HERE Map Content, the partition's name is stored with other data. However, for the purpose of this example, we assume that there's no other information in partitions in layer A
.
First, query the layer A
into subjects
DeltaSet. Then, call mapValuesWithResolver
on it, passing a mapping function and a strategies
parameter, which is set to DirectQuery
.
Inside the mapping function:
- We retrieve the partition in layer
A
using the retriever
. - We convert the content of the partition to a string,
referenceName
, and construct a Key
object referencing the referenceName
partition in layer B
. - We use the resolver to retrieve the metadata corresponding to the reference.
- If the referenced partition does not exist, we throw an exception.
- Otherwise, we retrieve that partition.
val retriever = context.inRetriever(Catalog.Id("inA"))
val subjects: DeltaSet[Key, Meta] =
context.queryCatalogLayer(Catalog.Id("inA"), Layer.Id("A"))
subjects.mapValuesWithResolver(
mapFn = {
case (resolver, key, meta) =>
val referenceName = new String(retriever.getPayload(key, meta).content)
val referenceKey = Key(Catalog.Id("inA"), Layer.Id("B"), Generic(referenceName))
resolver.resolve(referenceKey) match {
case None => throw new Exception("Partition does not exist!")
case Some(referenceMeta) =>
val referencePartition = retriever.getPayload(referenceKey, referenceMeta)
???
}
},
strategies = List(DirectQuery(Catalog.Id("inA"), Set(Layer.Id("B"))))
)
Resolution Strategies
Four resolution strategies are currently available:
-
DirectQuery
-- Given a catalog, and a set of layers, this strategy directly retrieves the metadata individually for each key via the Data API. The result of the request is cached per executor. The size of the cache can be configured by passing an argument to the constructor of DirectQuery
; the default is 10000 metadata objects (around 3MB).
-
Broadcast
-- Given a DeltaSet containing metadata, this strategy sends a complete copy of the metadata to each Spark executor, making sure that the whole metadata is available on each executor without further network requests. Internally, this strategy uses a Spark broadcast variable. Depending on the amount of metadata, this may require a large amount of memory in each e xecutor. The memory required for storing the metadata is roughly 300 bytes per partition in the DeltaSet.
-
BackwardResolution
-- Suppose you have a DeltaSet containing references and a function that maps each reference to a set of subject partitions. BackwardResolution
exposes these references to the resolver when processing the subject partition, without any further network queries. For example, in the following snippet each partition of layer A
is grouped with all of its children in layer B
.
BackwardResolution(
context.queryCatalogLayer(Catalog.Id("inA"), Layer.Id("B")), { (key, meta) =>
Set(key.copy(layer = Layer.Id("A"), partition = key.partition.parent.get))
}
)
The predefined backward resolution strategy BackwardResolution.toSamePartition
groups each subject partition with the reference partition of the same name. Similarly, BackwardResolution.toNeighbors
groups each subject tile with all its neighbor tiles at a given depth.
-
ForwardResolution
-- Suppose you have a DeltaSet containing references and a function that maps each subject to a set of reference partitions. ForwardResolution
exposes these references to the resolver when processing the subject partition, without any further network queries. This strategy is the inverse of BackwardResolution
. ForwardResolution
takes the key and value types of the subject DeltaSet as type parameters. The following snippet shows how to group each subject tile with all neighbor tiles.
ForwardResolution[Key, Meta](
context.queryCatalogLayer(Catalog.Id("inA"), Layer.Id("B")), {
case (key @ Key(_, _, tile: HereTile), meta) =>
tile
.neighbors(1)
.map(tile => key.copy(layer = Layer.Id("B"), partition = tile))
}
)
mapValuesWithResolver
takes a list of resolutions strategies, allowing them to be combined sequentially. Consider, for example, the resolution strategy list in the following snippet. When processing partitions of layer A
and resolving references to tiles in layer B
, all direct neighbors of the currently processed tile will be available without network request. To resolve other tiles in layer B
(either farther away or at a different zoom level), a network request will be performed. To resolve references to layer C
, we will always use network requests, while references in layer D
will be resolved using a broadcast.
val strategies =
List(
BackwardResolution.toNeighbors(
context.queryCatalogLayer(Catalog.Id("inA"), Layer.Id("B")),
Catalog.Id("inA"),
Layer.Id("A")
),
DirectQuery(Catalog.Id("inA"), Set(Layer.Id("B"), Layer.Id("C"))),
Broadcast(context.queryCatalogLayer(Catalog.Id("inA"), Layer.Id("D")))
)
mapValuesWithResolver
can be applied to DeltaSet[Key, Meta]
, in which case the partitioner of the subject DeltaSet
will be used also to partition references. If mapValuesWithResolver
is applied to a DeltaSet
with a different key or value type, a partitioner for the references must be provided. For example, in this snippet, a DeltaSet[HereTile, String]
is transformed.
val deltaSet: DeltaSet[HereTile, String] = ???
deltaSet.mapValuesWithResolver(
(r, k, s) => ???,
Seq(
ForwardResolution(
context.queryCatalogLayer(Catalog.Id("inA"), Layer.Id("B")), {
case (tile, string) =>
tile
.neighbors(1)
.map(tile => Key(Catalog.Id("inA"), Layer.Id("B"), tile))
}
)),
HashPartitioner(12)
)
Statically Resolve References
resolveReferences
is a transformation that implements the same functionality as the first step of a RefTreeCompiler: given a reference relation between catalog partitions, the transformation groups each partition in a DeltaSet with its references. The reference relation is defined by:
- a RefTree, which specifies the types of references that may exist between partitions
- a resolve function, which computes the concrete set of partition keys referenced by a partition For more information, see RefTreeCompiler.
In the following snippet, resolveReferences
is used to group each key-meta pair in deltaSet1
, whose partition name is a HERE tile, is grouped with all key-meta pairs of all of its neighbors.
import com.here.platform.data.processing.compiler.reftree._
val deltaSet1: DeltaSet[Key, Meta] = ???
val deltaSet2: DeltaSet[Key, (Meta, Map[Key, Meta])] =
deltaSet1.resolveReferences(
RefTree(
Subject((Catalog.Id("inCatalogA"), Layer.Id("inLayerA")),
Ref(RefTree.RefName("neighbor"),
(Catalog.Id("inCatalogA"), Layer.Id("inLayerA"))))),
resolveFn = {
case (Key(catalog, layer, partition: HereTile), meta) =>
val neighbor = partition.neighbors(radius = 1) - partition
Map(RefTree.RefName("neighbor") -> neighbor.map(neighborTile =>
Key(catalog, layer, neighborTile)))
case _ => Map.empty
}
)
Note
resolveReferences
behaves differently than the reference resolution step in a RefTreeCompiler
in one detail: If Partition Key Filters are defined in the configuration file, they apply to both references and subjects. In RefTreeCompiler
, only the subjects are filtered.
Read Back Published Data
For a PublishedSet
, readBack
is the only transformation that you can apply. It turns the PublishedSet
, which is the result of publishing partitions to a layer, into a DeltaSet containing the key and metadata of all partitions contained in that layer after the publishing. This way, partitions that were published in an earlier processing step can be read back and used in the following steps.
In the code snippet below, we are using readBack
to read an intermediate
result after it has been published to an output catalog layer. We then combine the result of readBack
using disjointUnion
with a layer from the input catalog hmc
(HERE Map Content) and resolve references between these layers.
val intermediate: DeltaSet[Key, Payload] = ???
val intermediatePublished: PublishedSet = intermediate.publish(Set(Layer.Id("intermediate")))
val intermediateAndTopology: DeltaSet[Key, Meta] =
intermediatePublished.readBack() disjointUnion
context.queryCatalogLayer(Catalog.Id("hmc"), Layer.Id("topology_geometry"))
val references: DeltaSet[Key, (Meta, Map[Key, Meta])] =
intermediateAndTopology.resolveReferences(
RefTree(
Subject((Default.OutCatalogId, Layer.Id("intermediate")),
Ref(RefTree.RefName("intermediate_to_topology"),
(Catalog.Id("hmc"), Layer.Id("topology_geometry"))))),
resolveFn = ???
)
Convert RDDs to DeltaSets
toDeltaSet
is an operation that can be used to convert a Spark RDD into a DeltaSet. This can be used, for example, for ingesting data from other sources using Spark and integrating it into a processing pipeline that uses DeltaSets. The RDD must contain key-value pairs and it may not contain more than one pair with the same key. A partitioner to repartition the RDD must be passed to toDeltaSet
. Unless the RDD is already partitioned with the given partitioner, the repartitioning of the RDD causes a shuffle.
The resulting DeltaSet does not contain any information about changes since the last run of the pipeline, even in an incremental run of the pipeline, downstream DeltaSet transformation will process all data in the DeltaSet.
In the following snippet, we show to ingest a CSV file via Spark RDDs into a DeltaSet. We are using the SparkContext
, which is accessible via the DeltaContext
, to read a CSV file airports.csv
. Then, we convert the RDD into a key-value form, where the first column of the CSV file serves as a key. Finally, we use toDeltaSet
to convert the RDD to a DeltaSet.
val sc = context.driverContext.spark
val rowsByFirstColumn: RDD[(Key, Array[String])] =
sc.textFile("airports.csv").map { x =>
val columns = x.split(",").map(_.trim)
(Key('outCatalog, 'outLayer, Generic(columns(0))), columns.drop(1))
}
val deltaSet: DeltaSet[Key, Array[String]] =
rowsByFirstColumn.toDeltaSet(context.defaultPartitioner)
Spark Partitions and Shuffles
The data in a DeltaSet is always partitioned according to a specific partitioner, that assigns each key in the DeltaSet to a Spark partition, and thereby, to a node in the cluster where the data resides. The partitioner is always preserved during DeltaSet transformations unless a new partitioner is explicitly specified in a transformation. In particular, the repartition
transformation does nothing but change the partitioner and repartition the data according to the new partitioner.
All transformations that potentially transform the keys in a DeltaSet or change the partitioner require repartitioning the data, and may, therefore, move data between the nodes in the cluster, which is called shuffling data. Shuffling is an expensive operation and should be avoided. See the transformation property table to see which DeltaSet transformations may need to shuffle data.
Partitioning Strategies
Each transformation that shuffles data requires you to explicitly provide a partitioning strategy for the resulting keys. This partitioning strategy can either be a partitioner or the PreservesPartitioning
special value.
When you use a partitioner, the transformation uses that partitioner to repartition the result. If there are no performance requirements for your transformation, you can use the defaultPartitioner
field in the DeltaContext
.
When a transformation changes the keys in a DeltaSet but each key remains in the same Spark partition, you can use PreservesPartitioning
. This strategy prevents the transformation from shuffling data altogether, resulting in significant performance improvements. If the transformation cannot preserve the partitioning, an exception is thrown at runtime.
In the copy-a-layer example, we use mapKeys
to change the catalog and layer of the keys in a DeltaSet. It is partitioned with the defaultPartitioner
, a PartitionNamePartitioner
, which groups all catalog partitions with the same name in the same Spark partition, irrespective of the catalog and layer. Consequently, all catalog partitions remain within the same Spark partition and we can use PreservesPartitioning
.
The PreservesPartitioning
partitioning strategy can even be used when the key types for the upstream and downstream DeltaSet
s are not the same, as long as the upstream DeltaSet
is partitioned by a partitioner that is general enough to also handle the downstream key type. This advanced feature allows, for example, to map a DeltaSet with Partition.Key
keys to a DeltaSet with Partition.Name
keys without causing a Spark shuffle:
val catalog: Catalog.Id = ???
val layer: Layer.Id = ???
context
.queryCatalogLayer(catalog, layer)
.mapKeys(
OneToOne[Key, Name](_.partition, Key(catalog, layer, _)),
PreservesPartitioning
)
Here, PreservesPartitioning
can be used because the default partitioner used by queryCatalogLayer
is a PartitionNamePartitioner
that supports both the upstream key type, Partition.Key
, and the downstream key type, Partition.Name
.
Laziness and Persist Data
Whenever a DeltaSet is used in two or more transformations, its result should be persisted in the memory or on the disk of the Spark workers, to avoid recomputing the result twice or more. This is explained in more detail for RDDs in RDD Persistence policy and applies equally to DeltaSets. Use the transformation persist
to persist a DeltaSet for reuse, as in the following example:
import org.apache.spark.storage.StorageLevel
val deltaSet1: DeltaSet[Key, Int] = ???
val doubled = deltaSet1
.mapValues(_ * 2)
.persist(StorageLevel.MEMORY_AND_DISK_2)
val reuse1 =
doubled
.mapValues(x => Payload(BigInt(x).toByteArray))
.publish(Set(Layer.Id("outLayer1")))
val reuse2 =
doubled
.mapValues(x => Payload(BigInt(x / 2).toByteArray))
.publish(Set(Layer.Id("outLayer2")))
Performance Properties
The internal implementations of the different transformations have varying cost in time and space, which cannot directly be seen from the outside. In particular, there are two properties that make certain transformations be more expensive than others: shuffling and stateful transformations.
A transformation that shuffles data moves data between nodes in the cluster, which uses bandwidth and causes a slow-down of the computation.
A transformation that is stateful has to store auxiliary information in the state
layer of the output catalog to allow incremental computations. This means, that extra storage is consumed in the output catalog, extra time is required to compute the state and additional RAM on the nodes of the cluster are required to persist the state until the end of the computation.
Using stateful and/or shuffling operations is inevitable in most pipelines, however, the following table can help to avoid stateful and shuffling transformations wherever possible.
Operation | Shuffles? | Stateful? |
detectChanges | no | yes |
disjointUnion | no | no |
filterByKey | no | no |
flatMapGroup | yes1 | yes |
flatMapKeys | yes1 | no |
flatMapKeysGroup | yes1 | no |
flatMapKeysReduce | yes1 | no |
flatMapReduce | yes1 | yes |
flatMapUnique | yes1 | yes |
join | no | yes |
leftOuterJoin | no | no |
mapGroup | yes1 | yes |
mapKeys | yes1 | no |
mapKeysGroup | yes1 | no |
mapKeysReduce | yes1 | no |
mapReduce | yes1 | yes |
mapUnique | yes1 | yes |
mapValues | no | no |
mapValuesWithKey | no | no |
mapValuesWithResolver | yes | yes |
outerJoin | no | no |
persist | no | no |
publish | no | no |
publishPart | no | no |
readBack | no | no |
repartition | yes | no |
resolveReferences | yes | yes |
toDeltaSet | yes | no |
1. Shuffling can be avoided if PreservesPartitioning
is used as a partitioning strategy. ↩
To avoid the overhead incurred by stateful transformations, it is recommended to avoid them wherever possible. For example, one can often use the stateless outerJoin
instead of a stateful join
, or transform keys and values separately with the stateless mapKeysGroup
and mapValues
transformations instead of using a stateful mapGroup
.
The forceStateless
configuration option can force any transformation to be stateless. This will avoid the overhead incurred by the state, but the transformation will not be able to track dependencies between partitions during an incremental run. In practice, this means that in an incremental run, the DeltaSet has to process all upstream partitions, whether they changed or not, unless it can determine that the upstream DeltaSet did not change at all. In the latter case, no processing needs to be done, no matter the value of forceStateless
. A notable exception is detectChanges
, which will still be able to process only the changed part of the upstream DeltaSet even when forceStateless
is set. detectChanges
can however not reduce the set of changed partitions without state, and will effectively be disabled by setting forceStateless
.
Comparison and Migration
This section compares DeltaSets to the other ways of expressing distributed computation in the Data Processing Library and provides help to migrate to DeltaSets from less flexible interfaces.
Functional Patterns
If you are a previous user of Functional Patterns, you can understand DeltaSets as the way to build custom compilation patterns, which means that you can have compilers with as many resolveFn
, compileInFn
, and compileOutFn
functions as required by your particular application. However, DeltaSets provide many more ways of structuring the computation – for example, you can reuse the result of one compileInFn
in several compileOutFn
s, you can join the results of several compileInFn
, you can publish an intermediate results to the output catalog.
A MapGroupCompiler, expressed in DeltaSet transformations, corresponds to:
- applying a
compileIn
function to all key-meta pairs in the inLayers
grouping the result using flatMapGroup
- transforming the groups of intermediate data into payloads using the
compileOut
function - publishing the result
The full example is shown below. TODO
tags identify the places where you define your intermediate data, input layers, and the MapGroupCompiler.
import com.here.platform.data.processing.blobstore.Retriever
import com.here.platform.data.processing.catalog._
import com.here.platform.data.processing.compiler.{CompileOut1To1Fn, MapGroupCompiler}
import com.here.platform.data.processing.driver._
import com.here.platform.data.processing.driver.config.CompleteConfig
import com.here.platform.data.processing.driver.deltasets._
object MapGroupMain extends DeltaSimpleSetup {
case class IntermediateData()
def inLayers: Map[Catalog.Id, Set[Layer.Id]] =
???
def constructMapGroupCompiler(retrievers: Map[Catalog.Id, Retriever])
: MapGroupCompiler[IntermediateData] with CompileOut1To1Fn[IntermediateData] =
???
def setupSets(completeConfig: CompleteConfig, context: DeltaContext): Iterable[PublishedSet] = {
import context.transformations._
val retrievers = inLayers.map { case (c, _) => (c, context.inRetriever(c)) }
val compiler = constructMapGroupCompiler(retrievers)
val partitioner =
compiler.outPartitioner(context.defaultParallelism).getOrElse(context.defaultPartitioner)
val result =
context
.queryCatalogs(
inLayers
)
.flatMapGroup(
Function.untupled(compiler.compileInFn),
partitioner
)
.mapValuesWithKey(
compiler.compileOutFn
)
.publish(
compiler.outLayers
)
Iterable(result)
}
}
A RefTreeCompiler, expressed in DeltaSet transformations, corresponds to:
- grouping a set of subject partitions with their references using
resolveReferences
- applying a
compileIn
function to all subject-reference pairs, and grouping the result using flatMapGroup
- transforming the groups of intermediate data into payloads using the
compileOut
function - publishing the result
The full example is shown below. TODO
tags identify the places where you define your intermediate data, input layers, and the RefTreeCompiler:
import com.here.platform.data.processing.blobstore.Retriever
import com.here.platform.data.processing.catalog._
import com.here.platform.data.processing.compiler.reftree.CompileInFnWithRefs
import com.here.platform.data.processing.compiler.{CompileOut1To1Fn, RefTreeCompiler}
import com.here.platform.data.processing.driver._
import com.here.platform.data.processing.driver.config.CompleteConfig
import com.here.platform.data.processing.driver.deltasets._
object RefTreeMain extends DeltaSimpleSetup {
case class IntermediateData()
def inLayers: Map[Catalog.Id, Set[Layer.Id]] =
???
def constructTestRefTreeCompiler(
retrievers: Map[Catalog.Id, Retriever]): RefTreeCompiler[IntermediateData]
with CompileInFnWithRefs[IntermediateData]
with CompileOut1To1Fn[IntermediateData] =
???
def setupSets(completeConfig: CompleteConfig, context: DeltaContext): Iterable[PublishedSet] = {
import context.transformations._
val retrievers = inLayers.map { case (c, _) => (c, context.inRetriever(c)) }
val compiler = constructTestRefTreeCompiler(retrievers)
val partitioner =
compiler.outPartitioner(context.defaultParallelism).getOrElse(context.defaultPartitioner)
Iterable(
context
.queryCatalogs(
compiler.inLayers
)
.resolveReferences(
compiler.refStructure,
Function.untupled(compiler.resolveFn)
)
.flatMapGroup(
{ case (k, (v, refs)) => compiler.compileInFn((k, v), refs) },
partitioner
)
.mapValuesWithKey(
compiler.compileOutFn
)
.publish(
compiler.outLayers
)
)
}
}
A Direct M:N Compiler, expressed in DeltaSet transformations, corresponds to:
- computing the intermediate data for each input key-value pair using
compileIn
and mapValuesWithKey
. - mapping all input key-values pairs to the corresponding output keys using
mappingFn
and flatMapKeysGroup
. Here, additionally the inverse of mappingFn
has to be specified. - transforming the groups of intermediate data into payloads using the
compileOut
function - publishing the result
The full example is shown below. TODO
tags identify the places where you define your intermediate data, input layers, the inverse of mappingFn
and the DirectMToNCompiler
:
import com.here.platform.data.processing.blobstore.Retriever
import com.here.platform.data.processing.catalog._
import com.here.platform.data.processing.compiler.direct.CompileInFn
import com.here.platform.data.processing.compiler.{CompileOut1To1Fn, DirectMToNCompiler}
import com.here.platform.data.processing.driver._
import com.here.platform.data.processing.driver.config.CompleteConfig
import com.here.platform.data.processing.driver.deltasets._
object DirectMToNMain extends DeltaSimpleSetup {
case class IntermediateData()
def inverseMappingFn: Partition.Key => Iterable[Partition.Key] = ???
def inLayers: Map[Catalog.Id, Set[Layer.Id]] =
???
def constructTestDirectMToNCompiler(retrievers: Map[Catalog.Id, Retriever]): DirectMToNCompiler[
IntermediateData] with CompileInFn[IntermediateData] with CompileOut1To1Fn[IntermediateData] =
???
def setupSets(completeConfig: CompleteConfig, context: DeltaContext): Iterable[PublishedSet] = {
import context.transformations._
val retrievers = inLayers.map { case (c, _) => (c, context.inRetriever(c)) }
val compiler = constructTestDirectMToNCompiler(retrievers)
val partitioner =
compiler.outPartitioner(context.defaultParallelism).getOrElse(context.defaultPartitioner)
Iterable(
context
.queryCatalogs(
compiler.inLayers
)
.mapValuesWithKey {
case (k, v) =>
compiler.compileInFn((k, v))
}
.flatMapKeysGroup(
ManyToMany(
compiler.mappingFn,
inverseMappingFn
),
partitioner
)
.mapValuesWithKey(
compiler.compileOutFn
)
.publish(
compiler.outLayers
)
)
}
}
A Direct1toNCompiler
can be expressed using a similar sequence of transformations, replacing flatMapKeysGroup
by flatMapKeys
.
Spark RDD-based Patterns
Like an RDD, a DeltaSet represents data distributed over a cluster of machines, that can be transformed using a set of functional operations like filterKeys
or mapValues
. In fact, a DeltaSet internally uses an RDD to represent that data. However, there are three main differences between using DeltaSets and using RDDs directly:
-
Key-Value: In contrast to an RDD, a DeltaSet contains only key-value pairs and never contains duplicate keys. That is why, for example, there is no simple map
operation available for DeltaSets, as it may create duplicate keys.
-
Strongly partitioned: Furthermore, a DeltaSet is always partitioned according to a specific partitioner, that assigns each key-value pair one Spark partition, which in turn is stored on a specific machine in the cluster. An RDD
, on the other hand, can store data without a specific partitioner being defined for how the data is partitioned.
-
Incremental: Every computation expressed with a DeltaSet can be executed incrementally without manually keeping track of dependencies as done in the DepCompiler and IncrementalDepCompiler.
Multi-compiler Tasks
A multi-compiler task can chain the effect of several compilers by uploading the result of one compiler to an catalog layer and downloading the content of that catalog layer in another compiler. DeltaSets can also chain the effect of several compilers, however, you can decide whether intermediate results should be published and read back from the output catalog, as done in a multi-compiler task, or not. For more information on how to emulate a multi-compiler task using DeltaSets see the readBack
transformation.
IDs and Configuration
Sometimes it is useful to change the behavior of a DeltaSet transformation from a configuration file, for example, to tune the performance parameters of transformations without recompiling the application. Each transformation on a DeltaSet has a unique ID, used to identify every transformation in the configuration file. To configure a transformation from the configuration file, assign an ID to it by calling the withId
function, which sets the ID of the transformation immediately preceding it.
val deltaSet1: DeltaSet[Key, Int] = ???
val doubled: DeltaSet[Key, Int] = deltaSet1.mapValues(_ * 2).withId("doublingMap")
Transformations that you do not assign an ID in this way are automatically assigned an ID based on the order in which they appear in the source code. To determine the ID, call the id
method on the DeltaSet that results from the transformation.
IDs are not only assigned to DeltaSet
s, but also to PublishedSet
s. Both share the functionality for identifying and configuring the classes in the common super-class BaseSet
.
For establishing modularity within the pipeline, it can be useful to wrap a group of BaseSet
s into a common namespace. IDs then only need to be unique within this namespace, and the namespace is included in logging messages and the named RDDs in Spark UI. Use BaseSet.Namespace.enter
to enter a new namespace. Namespaces can be nested.
val deltaSet1: DeltaSet[Key, Int] = ???
BaseSet.Namespace.enter("routingModule") {
val doubled: DeltaSet[Key, Int] = deltaSet1
.mapValues(_ * 2)
.withId("doublingMap")
}
DeltaSets can be configured by adding a section to the pipeline's application.conf
which is described in Configuring the Library. You can configure both the defaults that apply to all transformations and each transformation individually. The snippet below shows the default configuration and describes each option that is defined in the here.platform.data-processing.deltasets.default
. To change the default configuration, copy this snippet into your application.conf
and change the values accordingly.
here.platform.data-processing.deltasets.default {
intermediateStorageLevel = "MEMORY_AND_DISK"
validationLevel = "SAFETY"
threads = 1
sorting = false
incremental = true
forceStateless = false
}
You can configure a transformation with the ID id
in the here.platform.data-processing.deltasets.id
path of the configuration. For example, the following snippet sets the number of threads that are used by the doublingMap
transformation.
here.platform.data-processing.deltasets.doublingMap {
threads = 3
}
Configuration files adhere to the namespaces of the transformation. For example, here.platform.data-processing.deltasets.namespace1
can be used to configure all transformations created in the namespace namespace1
. If a transformation with ID id1
is created inside namespace1
, then its configuration is constructed by applying the settings defined in here.platform.data-processing.deltasets.default
, here.platform.data-processing.deltasets.namespace1
and here.platform.data-processing.deltasets.namespace1.id1
in this order.
The configuration read from the configuration file can also be overridden programmatically in the application code. Programmatic overrides have preference over all settings defined in configuration files.
val deltaSet1: DeltaSet[Key, Int] = ???
val doubled: DeltaSet[Key, Int] = deltaSet1
.mapValues(_ * 2)
.withConfigOverride(
c => c.withIncremental(false)
)