Notifications let you know when a catalog, layer, or partition is updated and provide information about which data has changed. The notifications function is useful when you run a stream application on the platform or service outside the platform that depends on the versioned map data. With this function, you no longer need to continually poll for changes because your application automatically receives a notification when the catalog is updated. Upon receiving a notification, your application only needs to request the corresponding data.
You can choose for which portion of the catalog you will receive notifications based on the specific requirements:
Layer-level notification to get notified about the changes in the layer or even geographic area of the layer
This tutorial demonstrates how to create and manage subscriptions to catalog and layer-level changes using OLP CLI.
Once the subscription is created, the tutorial demonstrates how to implement a consumer application using Data Client Library.
Note: Important
However, your use case might be processing map data to transform or derive information from it. Such processing typically means consuming one or multiple versioned partitioned catalogs and outputting a different versioned partitioned catalog instead. Whenever one input catalog changes, you typically need to run your process to update the output.
In this case, you don't need to use the notification mechanics described in this tutorial. Instead, it is recommended to use the Data Processing library and batch processing pipelines. These are designed to automatically run when an input catalog changes. To get started, use the Copy a Catalog Using the Data Processing Library tutorial.
This tutorial demonstrates how to subscribe to catalog-level changes and consume the notifications.
When notifications are enabled for a catalog, a stream is automatically created by the HERE platform with the same HERE Resource Name (HRN) as the catalog. Whenever the catalog data is updated, a new version of the catalog is created, and a new record is written to the stream.
The notification record contains the timestamp of the change and the new catalog version number. The version number is particularly useful, as you can use it to request the updated data.
To receive notifications about new catalog version publications, perform the following steps:
Create a source catalog with notifications enabled with OLP CLI.
Implement a notification listener application using Data Client Library.
Upload new data into the catalog with OLP CLI to trigger a notification. In real-life situations, someone else would update the data, but for the purposes of this tutorial, update it on your side.
Figure 1. Catalog-level notification
Create a catalog with notifications enabled
Create a catalog that you can then monitor for changes using notifications with the configuration below. When the data in the catalog is changed, the notifications will be sent to subscribers.
The catalog configuration contains a sample versioned layer with heretilepartitioningScheme with tileLevel14.
For the sake of simplicity, the "contentType": "text/plain" is used.
The catalog should have the notifications enabled. To enable notifications for a new catalog, make sure you have the following property in the catalog configuration file used for the catalog creation:
"notifications":{"enabled":true}
The complete catalog configuration:
source-catalog.json
{"id":"notifications-source","name":"Catalog Notifications Source Tutorial","summary":"Catalog Notifications Source","description":"When changed, the notifications will be sent to subscribers","notifications":{"enabled":true},"layers":[{"id":"notification-source-versioned-layer","name":"Notifications Source Tutorial layer","summary":"Messages for the notifications tutorial.","description":"Messages for the notifications tutorial.","layerType":"versioned","volume":{"volumeType":"durable"},"partitioning":{"scheme":"heretile","tileLevels":[14]},"contentType":"text/plain"}]}
Replace {{YOUR_USERNAME}} below with your username and run the command using the above json file:
In this tutorial, the source versioned catalog contains a sample dataset of average values for temperature and humidity in a certain area. An example of the data:
Temperature,Humidity
18,72
The data is updated every month, and new data is uploaded to the catalog.
Let's create a simple Java/Scala application to receive notifications when the catalog containing the dataset is updated. The application prints the catalog version and timestamp to the console.
Change the {{SOURCE_CATALOG_HRN}} placeholder to your source catalog HRN created in the previous chapter.
/*
* Copyright (c) 2018-2023 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/importcom.here.hrn.HRN;importcom.here.platform.data.client.flink.javadsl.FlinkDataClient;importcom.here.platform.data.client.flink.javadsl.FlinkQueryApi;importcom.here.platform.data.client.settings.NotificationConsumerSettings;importjava.util.Date;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;publicclassCatalogLevelNotification{privatestaticfinalHRN CATALOG_HRN = HRN.fromString("{{SOURCE_CATALOG_HRN}}");publicstaticfinalString CONSUMER_GROUP_NAME ="catalog-level-notification";publicstaticvoidmain(String[] args)throwsException{// Create queryApi for the source catalogfinalFlinkDataClient flinkDataClient =newFlinkDataClient();FlinkQueryApi queryApi = flinkDataClient.queryApi(CATALOG_HRN);// Specify Kafka consumer group nameNotificationConsumerSettings consumerSettings =newNotificationConsumerSettings.Builder().withGroupName(CONSUMER_GROUP_NAME).build();// Create the context in which a streaming program is executedStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env
// source function that emits catalog-level notifications.addSource(queryApi.subscribeToNotifications(consumerSettings))// map notification to a human-readable string to be displayed in the console.map(
notification ->String.format("===> BatchPublicationNotification: New catalog version %s, date/time: %s",
notification.getCatalogVersion(),newDate(notification.timestamp())))// Write to standard output.print();// Trigger the program execution
env.execute("Catalog-level notification listener");}}
/*
* Copyright (c) 2018-2023 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/importcom.here.hrn.HRN
importcom.here.platform.data.client.flink.scaladsl.FlinkDataClient
importcom.here.platform.data.client.settings.NotificationConsumerSettings
importorg.apache.flink.api.scala.createTypeInformationimportorg.apache.flink.streaming.api.scala.StreamExecutionEnvironment
importjava.util.Date
object CatalogLevelNotificationScala {privateval CatalogHrn = HRN("{{SOURCE_CATALOG_HRN}}")privateval ConsumerGroupName ="catalog-level-notification"def main(args: Array[String]):Unit={// Create queryApi for the source catalogval flinkDataClient =new FlinkDataClient()val queryApi = flinkDataClient.queryApi(CatalogHrn)// Specify Kafka consumer group nameval consumerSettings = NotificationConsumerSettings(ConsumerGroupName)// Create the context in which a streaming program is executedval env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env
// source function that emits catalog-level notifications.addSource(queryApi.subscribeToNotifications(consumerSettings))// map BatchPublicationNotification to a human-readable string to be displayed in the console.map { notification =>
s"===> BatchPublicationNotification: New catalog version ${notification.version}, date/time: ${new Date(notification.timestamp)}"}// Write to standard output.print()// Trigger the program execution
env.execute("Catalog-level notification listener")}}
The application uses Flink Data Client LibrarysubscribeToNotifications method to subscribe to the catalog defined by the CATALOG_HRN variable.
The consumer group name is a Kafka Consumer Group name. It is set to catalog-level-notification in this tutorial.
Run the application locally
To execute the catalog-level notification consumer application, run the following command:
[...] Subscribe to notifications with group name: catalog-level-notifications-consumer-group, offset: LatestOffset and consumer id: c458fd12-c525-4c58-ac71-11a149c8234c
you are subscribed to the notifications.
Once there is new data in the source catalog, you will receive a notification in the console. Note that the current console is busy listening to the notifications, so you may want to open another console instance to proceed with uploading data.
The tutorial contains sample data in the src/main/resources/data folder. The dataset represents the weather summary in Berlin and Munich in August 2021.
Let's upload the data into the partition 377894444 representing Berlin City Centre on zoom level 14. For more information on calculating partition tile IDs and zoom level, refer to Calculate partition Tile IDs tutorial.
To upload the data into a catalog, use the OLP CLI.
olp catalog layer partition put {{SOURCE_CATALOG_HRN}} notification-source-versioned-layer --partitions 377894444:src/main/resources/data/august-berlin.csv
The command uploads data to the catalog, and a new version of the catalog is created. The resulting output will contain a notification with the catalog version and the time of the change.
===> BatchPublicationNotification: New catalog version 0, date/time: Mon Nov 15 14:46:10 EET 2021
To get another change notification, upload more data.
The application is a typical streaming program and thus runs indefinitely. Hit Ctrl + C in the console to stop the program.
Note that if there was another versioned layer in the catalog, you would receive notifications for changes in any of them.
Layer-Level Notification
Layer-level notifications allow you to get a notification whenever changes occur in a particular layer.
Note
Layer-level notifications are currently available only for versioned layers with HERE Tile partitioning.
Notifications are essentially a data stream. Notifications are written as messages to a stream layer. You receive notifications by subscribing to the layer and consuming the messages as you would consume data from any stream layer in the HERE platform.
To receive notifications when the specific partition or the specific geographic area is updated, perform the following steps:
Create a notification catalog with a stream layer where the notifications will be published with OLP CLI.
Implement a notification listener application using Data Client Library. The application is simply a consumer of data published into the notification-stream-layer layer of the notification catalog created in the previous chapter.
Subscribe to a layer in the source catalog with OLP CLI.
Upload new data into the layer you're subscribed to with OLP CLI. In real-life situations, someone else would update the data, but for the purposes of this tutorial, update it on your side.
Figure 2. Layer-level notification
Create the notification catalog
For this part of the tutorial, you will need two catalogs:
notification catalog with a stream layer to receive change notifications
Note that this time a stream layer should exist to receive notifications about changes in the versioned layer of the source catalog, as opposed to the catalog-level notifications.
Create the notification catalog using the configuration below. When the source catalog is changed, the notification message will be sent to the stream layer of the notification catalog.
The catalog configuration contains a sample stream layer with genericpartitioningScheme.
For the sake of simplicity, the "contentType": "text/plain" is used.
The complete catalog configuration:
notification-catalog.json
{"id":"notifications-sink","name":"Catalog Notifications Stream Tutorial","summary":"Catalog for Notifications Stream","description":"The catalog containing the notification stream","layers":[{"id":"notification-stream-layer","name":"Notification Stream","summary":"Stream for notification","description":"Stream for notification","layerType":"stream","volume":{"volumeType":"durable"},"partitioning":{"scheme":"generic"},"contentType":"text/plain"}]}
Replace {{YOUR_USERNAME}} below with your username and run the following:
You will receive the following message in the CLI:
Catalog {{NOTIFICATION_CATALOG_HRN}} has been created.
Note
If a billing tag is required in your realm, update the config file by adding the billingTags: ["YOUR_BILLING_TAG"] property to the layer section.
Implement a notification consumer application
In this tutorial, the source version catalog contains two datasets of average values for temperature and humidity stored in different partitions: Berlin and Munich. The structure of the datasets is the same as in the Catalog-level notification consumer application.
Let's create a simple Java/Scala application to receive notifications when a new message is published to the stream layer. The application prints the notification message containing information about changed partitions.
Change the {{NOTIFICATION_CATALOG_HRN}} placeholder to your source catalog HRN created in the previous chapter.
/*
* Copyright (c) 2018-2023 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/importcom.here.hrn.HRN;importcom.here.platform.data.client.flink.javadsl.FlinkDataClient;importcom.here.platform.data.client.flink.javadsl.FlinkQueryApi;importcom.here.platform.data.client.flink.javadsl.FlinkReadEngine;importcom.here.platform.data.client.javadsl.Partition;importcom.here.platform.data.client.settings.ConsumerSettings;importcom.here.platform.data.client.settings.ConsumerSettings.Builder;importorg.apache.flink.api.common.functions.RichMapFunction;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;publicclassLayerLevelNotification{privatestaticfinalHRN NOTIFICATION_CATALOG_HRN =
HRN.fromString("{{NOTIFICATION_CATALOG_HRN}}");privatestaticfinalString NOTIFICATION_STREAM_LAYER_ID ="notification-stream-layer";publicstaticfinalString CONSUMER_GROUP_NAME ="layer-level-notification";publicstaticvoidmain(String[] args)throwsException{// Create queryApi for the source catalogfinalFlinkDataClient flinkDataClient =newFlinkDataClient();FlinkQueryApi queryApi = flinkDataClient.queryApi(NOTIFICATION_CATALOG_HRN);// Specify Kafka consumer group nameConsumerSettings consumerSettings =newBuilder().withLatestOffset().withGroupName(CONSUMER_GROUP_NAME).build();// Create the context in which a streaming program is executedStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env
// source function that emits catalog-level notifications.addSource(queryApi.subscribe(NOTIFICATION_STREAM_LAYER_ID, consumerSettings))// read partition bytes and map to human-readable string.map(newSampleMapper())// Write to standard output.print();// Trigger the program execution
env.execute("Layer-level notification listener");}privatestaticclassSampleMapperextendsRichMapFunction<Partition,String>{privatetransientFlinkDataClient flinkDataClient;privatetransientFlinkReadEngine flinkReadEngine;@Overridepublicvoidopen(Configuration parameters){// Create read engine to read the partition bytes
flinkDataClient =newFlinkDataClient();
flinkReadEngine = flinkDataClient.readEngine(NOTIFICATION_CATALOG_HRN);}@OverridepublicStringmap(Partition partition){byte[] dataAsBytes = flinkReadEngine.getDataAsBytes(partition);return"===> Received "+newString(dataAsBytes);}@Overridepublicvoidclose(){// FlinkDataClient is a heavyweight object that needs to be created once, reused and// terminated.
flinkDataClient.terminate();}}}
/*
* Copyright (c) 2018-2023 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/importcom.here.hrn.HRN
importcom.here.platform.data.client.flink.scaladsl.{FlinkDataClient, FlinkReadEngine}importcom.here.platform.data.client.scaladsl.Partition
importcom.here.platform.data.client.settings.{ConsumerSettings, LatestOffset}importorg.apache.flink.api.common.functions.RichMapFunction
importorg.apache.flink.api.scala.createTypeInformationimportorg.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object LayerLevelNotificationScala {privateval NotificationCatalogHrn = HRN("{{NOTIFICATION_CATALOG_HRN}}")privateval NotificationStreamLayerId ="notification-stream-layer"val ConsumerGroupName ="layer-level-notification"def main(args: Array[String]):Unit={// Create queryApi for the source catalogval flinkDataClient =new FlinkDataClient()val queryApi = flinkDataClient.queryApi(NotificationCatalogHrn)// Specify Kafka consumer group nameval consumerSettings = ConsumerSettings(ConsumerGroupName, LatestOffset)// Create the context in which a streaming program is executedval env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env
// source function that emits catalog-level notifications.addSource(queryApi.subscribe(NotificationStreamLayerId, consumerSettings))// read partition bytes and map to human-readable string.map(new SampleMapper(NotificationCatalogHrn))// Write to standard output.print()// Trigger the program execution
env.execute("Layer-level notification listener")}class SampleMapper(hrn: HRN)extends RichMapFunction[Partition,String]with Serializable {@transientprivatelazyval flinkDataClient: FlinkDataClient =new FlinkDataClient()@transientprivatelazyval flinkReadEngine: FlinkReadEngine =// Create read engine to read the partition bytes
flinkDataClient.readEngine(hrn)overridedef map(partition: Partition):String=
s"===> Received ${new String(flinkReadEngine.getDataAsBytes(partition))}"overridedef close():Unit=// FlinkDataClient is a heavyweight object that needs to be created once, reused and terminated.
flinkDataClient.terminate()}}
The application uses Flink Data Client Librarysubscribe method to subscribe to the notification-stream-layer layer of the catalog defined by the NOTIFICATION_CATALOG_HRN variable.
The consumer group name is a Kafka Consumer Group name. It is set to layer-level-notification this time.
Run the application locally
To execute the layer-level notification consumer application, run the following command:
ConsumerCoordinator:799 - [Consumer clientId=[...], groupId=[...].layer-level-notification] Setting offset for partition [...]
you are subscribed to the notifications.
Once there is new data in the source catalog's notification-source-versioned-layer, you will receive a notification in the console. Note that the current console is busy listening to the notifications, so you may want to open another console instance to proceed with uploading data.
Subscribe to notifications using OLP CLI
You can choose which changes you receive notifications for:
All changes in the layer
Changes to a geographic area
Changes to specific partitions
You can create multiple subscriptions for a catalog, but a subscription always pertains to a single catalog. You can subscribe to either a few versioned layers in a catalog or all its layers. You can also configure the subscription area and notification type to be either the same or different for every layer you subscribe to.
When you create a subscription, you can indicate which changes you want to be notified about.
Subscribe to all changes in the layer using OLP CLI
You will receive the following message in the CLI:
Subscription {{ALL_CHANGES_SUBSCRIPTION_ID}} has been created.
Let's upload the data into the partition 377894444 representing Berlin City Centre on zoom level 14. For more information on calculating partition tile IDs and zoom level, refer to Calculate partition Tile IDs tutorial.
Let's update the weather summary dataset by creating a first entry containing August weather summary data. To upload the data into catalog, use the OLP CLI.
olp catalog layer partition put {{SOURCE_CATALOG_HRN}} notification-source-versioned-layer --partitions 377894444:src/main/resources/data/august-berlin.csv
Once the partition is uploaded you should see the result in the console:
===> Received {"catalogHRN":"{{SOURCE_CATALOG_HRN}}","catalogVersion":0,"layerId":"notification-source-versioned-layer","partitions":[{"version":0,"partition":"377894444","layer":"notification-source-versioned-layer","dataHandle":"d974c61f-45f1-42fa-9ccd-165a4decd236","deleted":false}],"timestamp":1637532678413,"subscriptionResultType":"FULL"}
olp catalog layer partition put {{SOURCE_CATALOG_HRN}} notification-source-versioned-layer --partitions 377782524:src/main/resources/data/august-munich.csv
Once the partition is uploaded you should see the result in the console:
===> Received {"catalogHRN":"{{SOURCE_CATALOG_HRN}}","catalogVersion":1,"layerId":"notification-source-versioned-layer","partitions":[{"version":1,"partition":"377782524","layer":"notification-source-versioned-layer","dataHandle":"cf101de8-20f8-4dc0-a9a9-b754af6ad3ed","deleted":false}],"timestamp":1637532827487,"subscriptionResultType":"FULL"}
The notification is received for any partition uploaded, be it Berlin, Munich, or anywhere else in the world.
In this case WKT, that is, a well-known text representation of geometry is equal to POINT(13.409419 52.520817) which is a coordinate of the Berlin Television Tower.
olp catalog layer partition put {{SOURCE_CATALOG_HRN}} notification-source-versioned-layer --partitions 377894444:src/main/resources/data/august-berlin.csv
The Berlin Television Tower is located in the same 377894444 partition on zoom level 14, that's why, once the partition is uploaded you should see the result in the console:
===> Received {"catalogHRN":"{{SOURCE_CATALOG_HRN}}","catalogVersion":2,"layerId":"notification-source-versioned-layer","partitions":[{"version":2,"partition":"377894444","layer":"notification-source-versioned-layer","dataHandle":"d974c61f-45f1-42fa-9ccd-165a4decd236","deleted":false}],"timestamp":1637593998401,"subscriptionResultType":"FULL"}
Create a subscription to the 377894444 partition in Berlin
Upload data to Berlin and Munich
You should receive only one notification as a result.
Conclusion
In this tutorial, you learned to create and manage subscriptions to catalog and layer-level changes using OLP CLI. You also learned how to consume subscription notifications using Data Client.
Resources
For more information, refer to the following additional resources:
How to write and read data with Data Client Library: Data Client Library developer guide.