From a9d5ece8a3f1a5feb86daad91754464f93c82165 Mon Sep 17 00:00:00 2001 From: Z1kkurat Date: Wed, 8 Mar 2023 12:25:20 +0100 Subject: [PATCH] Add KafkaMetricsRegistry --- README.md | 47 +++++++++ .../skafka/metrics/KafkaMetricsRegistry.scala | 97 +++++++++++++++++++ .../skafka/metrics/MeteredConsumerOf.scala | 23 +++++ .../skafka/metrics/MeteredProducerOf.scala | 19 ++++ .../skafka/metrics/syntax.scala | 14 +++ 5 files changed, 200 insertions(+) create mode 100644 skafka/src/main/scala/com/evolutiongaming/skafka/metrics/KafkaMetricsRegistry.scala create mode 100644 skafka/src/main/scala/com/evolutiongaming/skafka/metrics/MeteredConsumerOf.scala create mode 100644 skafka/src/main/scala/com/evolutiongaming/skafka/metrics/MeteredProducerOf.scala create mode 100644 skafka/src/main/scala/com/evolutiongaming/skafka/metrics/syntax.scala diff --git a/README.md b/README.md index 08c7e49d..c1cade6c 100644 --- a/README.md +++ b/README.md @@ -42,6 +42,53 @@ val records: IO[ConsumerRecords[String, String]] = consumer.use { consumer => } ``` +## Collecting metrics +Both `Producer` and `Consumer` expose metrics, internally collected by the Java client via `clientMetrics` method. +To simplify collection of metrics from multiple clients inside the same VM you can use `KafkaMetricsRegistry`. +It allows 'registering' functions that obtain metrics from different clients, aggregating them into a single list +of metrics when collected. This allows defining clients in different code units with the only requirement of registering +them in `KafkaMetricsRegistry`. The registered functions will be saved in a `Ref` and invoked every time metrics +are collected. +Examples: +1. Manual registration of each client +```scala +import com.evolutiongaming.skafka.consumer.ConsumerOf +import com.evolutiongaming.skafka.metrics.KafkaMetricsRegistry +import com.evolutiongaming.skafka.producer.ProducerOf + +val consumerOf: ConsumerOf[F] = ??? +val producerOf: ProducerOf[F] = ??? +val kafkaRegistry: KafkaMetricsRegistry[F] = ??? + +for { + consumer <- consumerOf.apply(consumerConfig) + <- kafkaRegistry.register(consumer.clientMetrics) + producer <- producerOf.apply(producerConfig) + _ <- kafkaRegistry.register(producer.clientMetrics) + + metrics <- kafkaRegistry.collectAll +} yield () +``` +2. Wrapping `ConsumerOf` or `ProducerOf` with a syntax extension +```scala +import com.evolutiongaming.skafka.metrics.syntax._ + +val kafkaRegistry: KafkaMetricsRegistry[F] = ... +val consumerOf: ConsumerOf[F] = ConsumerOf.apply1[F](...).withNativeMetrics(kafkaRegistry) +val producerOf: ProducerOf[F] = ProducerOf.apply1[F](...).withNativeMetrics(kafkaRegistry) + +for { + consumer <- consumerOf.apply(consumerConfig) + producer <- producerOf.apply(producerConfig) + metrics <- kafkaRegistry.collectAll +} yield () +``` +#### Metrics duplication +`KafkaMetricsRegistry` deduplicates metrics by default. It can be turned off by using a different factory method +accepting `allowDuplicates` parameter. +When using it in the default mode it's important to use different `client.id` values for different clients inside a +single VM, otherwise only one of them will be picked (order is not guaranteed). + ## Setup ```scala diff --git a/skafka/src/main/scala/com/evolutiongaming/skafka/metrics/KafkaMetricsRegistry.scala b/skafka/src/main/scala/com/evolutiongaming/skafka/metrics/KafkaMetricsRegistry.scala new file mode 100644 index 00000000..42464312 --- /dev/null +++ b/skafka/src/main/scala/com/evolutiongaming/skafka/metrics/KafkaMetricsRegistry.scala @@ -0,0 +1,97 @@ +package com.evolutiongaming.skafka.metrics + +import cats.effect.kernel.Sync +import cats.effect.{Ref, Resource} +import cats.syntax.all._ +import com.evolutiongaming.skafka.ClientMetric + +import java.util.UUID + +/** Allows reporting metrics of multiple Kafka clients inside a single VM. + * + * Example: + * {{{ + * val consumerOf: ConsumerOf[F] = ??? + * val producerOf: ProducerOf[F] = ??? + * val kafkaRegistry: KafkaMetricsRegistry[F] = ??? + * + * for { + * consumer <- consumerOf.apply(consumerConfig) + * _ <- kafkaRegistry.register(consumer.clientMetrics) + * + * producer <- producerOf.apply(producerConfig) + * _ <- kafkaRegistry.register(producer.clientMetrics) + * + * metrics <- kafkaRegistry.collectAll + * } yield () + * }}} + * + * To avoid manually registering each client there are syntax extension, wrapping `ProducerOf` and `ConsumerOf`, + * see `com.evolutiongaming.skafka.metrics.syntax`. + * + * Example: + * {{{ + * val kafkaRegistry: KafkaMetricsRegistry[F] = ... + * val consumerOf: ConsumerOf[F] = ConsumerOf.apply1[F](...).withNativeMetrics(kafkaRegistry) + * val producerOf: ProducerOf[F] = ProducerOf.apply1[F](...).withNativeMetrics(kafkaRegistry) + * + * for { + * consumer <- consumerOf.apply(consumerConfig). + * producer <- producerOf.apply(producerConfig) + * metrics <- kafkaRegistry.collectAll + * } yield () + * }}} + */ +trait KafkaMetricsRegistry[F[_]] { + + /** + * Register a function to obtain a list of client metrics. + * Normally, you would pass [[com.evolutiongaming.skafka.consumer.Consumer.clientMetrics]] or + * [[com.evolutiongaming.skafka.producer.Producer.clientMetrics]] + * + * @return synthetic ID of registered function + */ + def register(metrics: F[Seq[ClientMetric[F]]]): Resource[F, UUID] + + /** Collect metrics from all registered functions */ + def collectAll: F[Seq[ClientMetric[F]]] +} + +object KafkaMetricsRegistry { + private final class FromRef[F[_]: Sync](ref: Ref[F, Map[UUID, F[Seq[ClientMetric[F]]]]], allowDuplicates: Boolean) + extends KafkaMetricsRegistry[F] { + override def register(metrics: F[Seq[ClientMetric[F]]]): Resource[F, UUID] = { + val acquire: F[UUID] = for { + id <- Sync[F].delay(UUID.randomUUID()) + _ <- ref.update(m => m + (id -> metrics)) + } yield id + + def release(id: UUID): F[Unit] = + ref.update(m => m - id) + + Resource.make(acquire)(id => release(id)) + } + + override def collectAll: F[Seq[ClientMetric[F]]] = + ref.get.flatMap { map: Map[UUID, F[Seq[ClientMetric[F]]]] => + map.values.toList.sequence.map { metrics => + val results: List[ClientMetric[F]] = metrics.flatten + + if (allowDuplicates) { + results + } else { + results + .groupBy(metric => (metric.name, metric.group, metric.tags)) + .map { case (_, values) => values.head } + .toSeq + } + } + } + } + + def ref[F[_]: Sync](allowDuplicates: Boolean): F[KafkaMetricsRegistry[F]] = { + Ref.of[F, Map[UUID, F[Seq[ClientMetric[F]]]]](Map.empty).map(ref => new FromRef[F](ref, allowDuplicates)) + } + + def ref[F[_]: Sync]: F[KafkaMetricsRegistry[F]] = ref[F](allowDuplicates = false) +} diff --git a/skafka/src/main/scala/com/evolutiongaming/skafka/metrics/MeteredConsumerOf.scala b/skafka/src/main/scala/com/evolutiongaming/skafka/metrics/MeteredConsumerOf.scala new file mode 100644 index 00000000..36e81df5 --- /dev/null +++ b/skafka/src/main/scala/com/evolutiongaming/skafka/metrics/MeteredConsumerOf.scala @@ -0,0 +1,23 @@ +package com.evolutiongaming.skafka.metrics + +import cats.effect.Resource +import com.evolutiongaming.skafka.FromBytes +import com.evolutiongaming.skafka.consumer.{Consumer, ConsumerConfig, ConsumerOf} + +class MeteredConsumerOf[F[_]](consumerOf: ConsumerOf[F], kafkaMetricsRegistry: KafkaMetricsRegistry[F]) + extends ConsumerOf[F] { + + override def apply[K, V]( + config: ConsumerConfig + )(implicit fromBytesK: FromBytes[F, K], fromBytesV: FromBytes[F, V]): Resource[F, Consumer[F, K, V]] = { + for { + consumer <- consumerOf.apply[K, V](config) + _ <- kafkaMetricsRegistry.register(consumer.clientMetrics) + } yield consumer + } +} + +object MeteredConsumerOf { + def wrap[F[_]](consumerOf: ConsumerOf[F], kafkaMetricsRegistry: KafkaMetricsRegistry[F]): MeteredConsumerOf[F] = + new MeteredConsumerOf[F](consumerOf, kafkaMetricsRegistry) +} diff --git a/skafka/src/main/scala/com/evolutiongaming/skafka/metrics/MeteredProducerOf.scala b/skafka/src/main/scala/com/evolutiongaming/skafka/metrics/MeteredProducerOf.scala new file mode 100644 index 00000000..440c7a81 --- /dev/null +++ b/skafka/src/main/scala/com/evolutiongaming/skafka/metrics/MeteredProducerOf.scala @@ -0,0 +1,19 @@ +package com.evolutiongaming.skafka.metrics + +import cats.effect.Resource +import com.evolutiongaming.skafka.producer.{Producer, ProducerConfig, ProducerOf} + +class MeteredProducerOf[F[_]](producerOf: ProducerOf[F], kafkaMetricsRegistry: KafkaMetricsRegistry[F]) + extends ProducerOf[F] { + override def apply(config: ProducerConfig): Resource[F, Producer[F]] = { + for { + producer <- producerOf.apply(config) + _ <- kafkaMetricsRegistry.register(producer.clientMetrics) + } yield producer + } +} + +object MeteredProducerOf { + def wrap[F[_]](producerOf: ProducerOf[F], kafkaMetricsRegistry: KafkaMetricsRegistry[F]): MeteredProducerOf[F] = + new MeteredProducerOf[F](producerOf, kafkaMetricsRegistry) +} diff --git a/skafka/src/main/scala/com/evolutiongaming/skafka/metrics/syntax.scala b/skafka/src/main/scala/com/evolutiongaming/skafka/metrics/syntax.scala new file mode 100644 index 00000000..0bf51c1d --- /dev/null +++ b/skafka/src/main/scala/com/evolutiongaming/skafka/metrics/syntax.scala @@ -0,0 +1,14 @@ +package com.evolutiongaming.skafka.metrics + +import com.evolutiongaming.skafka.consumer.ConsumerOf +import com.evolutiongaming.skafka.producer.ProducerOf + +object syntax { + implicit final class MeteredProducerOfOps[F[_]](val producerOf: ProducerOf[F]) extends AnyVal { + def withNativeMetrics(registry: KafkaMetricsRegistry[F]): ProducerOf[F] = MeteredProducerOf.wrap(producerOf, registry) + } + + implicit final class MeteredConsumerOfOps[F[_]](val consumerOf: ConsumerOf[F]) extends AnyVal { + def withNativeMetrics(registry: KafkaMetricsRegistry[F]): ConsumerOf[F] = MeteredConsumerOf.wrap(consumerOf, registry) + } +}