Catalog Notification

Catalog notifications let you know when a catalog is updated.

Enable Notifications

To enable notification, the catalog must have notifications enabled in its configuration.

Subscribe to Notifications

To subscribe to a notification stream, you need to call subscribeToNotifications from QueryApi.

Scala
Java
// create queryApi for target catalog
val queryApi = DataClient().queryApi(catalogHrn)

// create subscription configuration
val consumerSettings =
  NotificationConsumerSettings(groupName = "consumer-group-name")

// subscription to notifications
val control: Future[NotificationSubscriptionControl] =
  queryApi.subscribeToNotifications(
    consumerSettings, { notification =>
      // this callback is called each time a new batch publication happens in catalog
      println(s"catalog ${catalogHrn} has a new version ${notification.version}")
    }
  )
// create queryApi for target catalog
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);

// create subscription configuration
NotificationConsumerSettings consumerSettings =
    new NotificationConsumerSettings.Builder().withGroupName("consumer-group-name").build();

// subscription to notifications
CompletionStage<NotificationSubscriptionControl> controlStage =
    queryApi.subscribeToNotifications(
        consumerSettings,
        notification ->
            // this callback is called each time a new batch publication happens in catalog
            System.out.printf(
                "catalog %s has a new version %d\n",
                catalogHrn, notification.getCatalogVersion()));

For better control, you can subscribe to a Akka stream source.

Scala
Java
// create queryApi for target catalog
val queryApi = DataClient().queryApi(catalogHrn)

// create subscription configuration
val consumerSettings =
  NotificationConsumerSettings(groupName = "consumer-group-name")

// subscription to notifications
val control: Future[NotificationSubscriptionControl] =
  queryApi
    .subscribeToNotifications(consumerSettings)
    .map { subscription =>
      // consume the notification streams
      subscription.notifications
        .runWith(Sink.foreach { notification: BatchPublicationNotification =>
          // this callback is called each time a new batch publication happens in catalog
          println(s"catalog ${catalogHrn} has a new version ${notification.version}")
        })

      subscription.subscriptionControl
    }

// create queryApi for target catalog
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);

// create subscription configuration
NotificationConsumerSettings consumerSettings =
    new NotificationConsumerSettings.Builder().withGroupName("consumer-group-name").build();

// subscription to notifications
CompletionStage<NotificationSubscriptionControl> controlStage =
    queryApi
        .subscribeToNotifications(consumerSettings)
        .thenApply(
            subscription -> {
              subscription
                  .notifications()
                  .runWith(
                      Sink.foreach(
                          notification ->
                              // this callback is called each time a new batch publication
                              // happens in catalog
                              System.out.printf(
                                  "catalog %s has a new version %d\n",
                                  catalogHrn, notification.getCatalogVersion())),
                      myMaterializer);

              return subscription.subscriptionControl();
            });

To cancel the subscription, call NotificationSubscriptionControl.shutdown.

Scala
Java
control.flatMap(_.shutdown())
controlStage.thenCompose(control -> control.shutdown());

results matching ""

    No results matching ""