Use Kafka Metrics for Connectivity Status

The Data Client Library provides the capability to have programmatic access to Kafka consumer metrics, which can be used to assess connectivity status.

Usage

You can access the underlying Kafka consumer metrics via SubscriptionControl (which you can either get directly or through a Subscription instance).

Scala
Java
// Create QueryApi
val queryApi = DataClient().queryApi(catalogHrn)

// Define Kafka consumer settings assigning an id for the consumer (if no id is specified, a random UUID is assigned)
val consumerSettings =
  ConsumerSettings(groupName = "test-consumer", consumerId = "consumer-id")

// Subscribe to a layer
val subscription: Future[Subscription] = queryApi.subscribe("stream-layer", consumerSettings)

// Retrieve Kafka metrics
val kafkaMetrics: Future[Map[String, String]] =
  subscription.flatMap(s => s.subscriptionControl.getKafkaMetrics())
// Create QueryApi
QueryApi queryApi = DataClient.get(myActorSystem).queryApi(catalogHrn);

// Define Kafka consumer settings assigning an id for the consumer (if no id is specified, a
// random UUID is assigned)
ConsumerSettings consumerSettings =
    new ConsumerSettings.Builder()
        .withGroupName("test-consumer")
        .withConsumerId("consumer-id")
        .build();

// Subscribe to a layer
CompletionStage<Subscription> subscriptionFuture =
    queryApi.subscribe("stream-layer", consumerSettings);

// Retrieve Kafka metrics
subscriptionFuture.thenApply(
    subscription -> {
      scala.collection.immutable.List<String>
          stringList = // import scala.collection.JavaConverters;
          JavaConverters.asScalaBufferConverter(new ArrayList<String>()).asScala().toList();
      return subscription.getSubscriptionControl().getKafkaMetrics(stringList);
    });

Instead of retrieving all available metrics, you can request only a specific subset of them, by adding each metric name to the list passed in getKafkaMetrics (see: Scala API Reference or Java API Reference.

Connectivity Status

You can find an overview of all Kafka metrics in the official Apache Kafka Documentation.

In order to assess consumer connectivity status, the following global connectivity metrics can be used:

Metric name Description
connection-count The current number of active connections.
connection-creation-rate New connections established per second in the window.
connection-close-rate Total connections closed in the window.
io-ratio The fraction of time the I/O thread spent doing I/O.
io-time-ns-avg The average length of time for I/O per select call in nanoseconds.
io-wait-ratio The fraction of time the I/O thread spent waiting.
io-wait-time-ns-avg The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.
select-rate Number of times the I/O layer checked for new I/O to perform per second.

Note

The metrics indicated above have the following MBean name kafka.consumer:type=consumer-metrics,client-id=([-.w]+).

If your application uses Flink together with Kafka, you can access Kafka producer and consumer metrics exported through the Prometheus reporter, and use them to create dashboards in Grafana. You can find additional information by following the link: Flink Metrics. x

Note

In order to enable reporting Kafka metrics in Flink, you will need to set the configuration value of setting here.platform.data-client.enable-flink-kafka-metrics to true (see Configuration).

results matching ""

    No results matching ""