Custom metrics can be added to a stream or batch pipeline using accumulators, which are essentially simple counters. This is a feature of the Apache Spark and the Apache Flink frameworks. While similar in concept, the executable code for accumulators is different for a stream or batch pipeline. Coding details can be found here:
The basic use of accumulators is tied to some significant event in the data processing workflow of your pipeline design. The goal is to capture the results of that event as expressed by a numerical value. The accumulator can be configured as a simple counter or as a summation value. This limits the kind of data that can be reported.
Each accumulator is associated with a name and a value. That key value pair (KVP) constitutes the metric that is reported by the pipeline service when the pipeline is being executed. The key is always the name of the metric and should be selected so that it is semantically meaningful. This is how the metric will be identified when reported in Grafana.
Figure 1 illustrates the relationship between pipeline data processing and metric reporting. This process is essentially identical to the reporting mechanism used for standard pipeline metrics. So, Grafana can collect custom metric data along with standard metric data and present it in a dashboard.
When using Spark accumulators there are some caveats that programmers need to be aware of.
- Computations inside transformations are evaluated lazily, so unless an action happens on an RDD the transformations are not executed. As a result of this, accumulators used inside functions like
filter()won't get executed unless some action happen on the RDD.
- Spark guarantees to update accumulators inside actions only once. So even if a task is restarted and the lineage is recomputed, the accumulators will only be updated once.
- Spark does not guarantee this for transformations. So if a task is restarted and the lineage is recomputed, there are chances of undesirable side effects when the accumulators will be updated more than once.
- To be on the safe side, always use accumulators inside actions ONLY.
When using Flink accumulators there are some caveats that programmers need to be aware of.
- Flink pipelines are executed lazily. When the pipeline’s main method is executed, the data loading and transformations do not happen directly. Rather, each operation is created and added to the pipeline’s explicit workflow. The operations are actually executed when the execution is explicitly triggered by an
execute()call on the execution environment. The overall result will be stored in the
JobExecutionResultobject which is returned from the
execute()method of the execution environment. But this only works if the execution waits for the completion of the job.
- Currently the result of accumulators is only available after the overall job has ended.
- All accumulators share a single namespace per job. Thus you can use the same accumulator in different operator functions of your job. Flink will internally merge all accumulators with the same name.
- Flink currently has the following built-in accumulators: IntCounter, LongCounter, DoubleCounter, and Histogram. Each of them uses the standard Accumulator interface. OLP Pipelines do not support Histogram accumulator.
- The simplest accumulator is a counter. You can increment it using the
Accumulator.add(V value)method. At the end of the job, Flink will sum up (merge) all partial results and send the final result to the pipeline service.
Note: Example References
The following references are provided without any warranty or guarantee of functionality in OLP. They are shared here to help fill in some of the gaps in existing Apache documentation. All example are generic for Apache Flink or Apache Spark.
- Spark Accumulators Explained
- Java Code Examples for org.apache.flink.api.common.accumulators.Accumulator
- The Histogram Accumulator in Flink
- Example: Flink Accumulator in Java
- Example: Flink EmptyFieldsCountAccumulator in Java
- Example: Spark NewAccumulators in Scala
- Example: Spark custom-accumulator-v2 in Scala
- Example: Spark Custom Accumulators