From 4c06aa3dc3ba91e3d12e87cbff6e4ac12a17daae Mon Sep 17 00:00:00 2001 From: claimundefine Date: Thu, 27 Jun 2024 19:18:00 -0400 Subject: [PATCH 1/2] Add tombstoned metric to JMX --- .../kafka/schemaregistry/metrics/MetricsContainer.java | 9 +++++++++ .../schemaregistry/storage/KafkaStoreMessageHandler.java | 1 + .../kafka/schemaregistry/metrics/MetricsTest.java | 4 ++++ 3 files changed, 14 insertions(+) diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/metrics/MetricsContainer.java b/core/src/main/java/io/confluent/kafka/schemaregistry/metrics/MetricsContainer.java index d42de547724..25089228f52 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/metrics/MetricsContainer.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/metrics/MetricsContainer.java @@ -57,6 +57,7 @@ public class MetricsContainer { public static final String METRIC_NAME_API_FAILURE_COUNT = "api-failure-count"; public static final String METRIC_NAME_REGISTERED_COUNT = "registered-count"; public static final String METRIC_NAME_DELETED_COUNT = "deleted-count"; + public static final String METRIC_NAME_TOMBSTONED_COUNT = "tombstoned-count"; public static final String METRIC_NAME_AVRO_SCHEMAS_CREATED = "avro-schemas-created"; public static final String METRIC_NAME_AVRO_SCHEMAS_DELETED = "avro-schemas-deleted"; public static final String METRIC_NAME_JSON_SCHEMAS_CREATED = "json-schemas-created"; @@ -73,6 +74,7 @@ public class MetricsContainer { private final SchemaRegistryMetric schemasCreated; private final SchemaRegistryMetric schemasDeleted; + private final SchemaRegistryMetric schemasTombstoned; private final SchemaRegistryMetric customSchemaProviders; private final SchemaRegistryMetric apiCallsSuccess; private final SchemaRegistryMetric apiCallsFailure; @@ -127,6 +129,9 @@ public MetricsContainer(SchemaRegistryConfig config, String kafkaClusterId) { new CumulativeCount()); this.schemasDeleted = createMetric(METRIC_NAME_DELETED_COUNT, "Number of deleted schemas", new CumulativeCount()); + this.schemasTombstoned = createMetric(METRIC_NAME_TOMBSTONED_COUNT, + "Number of tombstoned schemas", + new CumulativeCount()); this.avroSchemasCreated = createMetric(METRIC_NAME_AVRO_SCHEMAS_CREATED, "Number of registered Avro schemas", new CumulativeCount()); @@ -216,6 +221,10 @@ public SchemaRegistryMetric getSchemasDeleted(String type) { return getSchemaTypeMetric(type, false); } + public SchemaRegistryMetric getSchemasTombstoned() { + return schemasTombstoned; + } + private SchemaRegistryMetric getSchemaTypeMetric(String type, boolean isRegister) { switch (type) { case AvroSchema.TYPE: diff --git a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaStoreMessageHandler.java b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaStoreMessageHandler.java index 6ec5d182dac..7b886749e35 100644 --- a/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaStoreMessageHandler.java +++ b/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaStoreMessageHandler.java @@ -191,6 +191,7 @@ private void handleSchemaUpdate(SchemaKey schemaKey, } } else { lookupCache.schemaTombstoned(schemaKey, oldSchemaValue); + updateMetrics(metricsContainer.getSchemasTombstoned(), null); } } diff --git a/core/src/test/java/io/confluent/kafka/schemaregistry/metrics/MetricsTest.java b/core/src/test/java/io/confluent/kafka/schemaregistry/metrics/MetricsTest.java index ea73f05a32d..00fb2b502e3 100644 --- a/core/src/test/java/io/confluent/kafka/schemaregistry/metrics/MetricsTest.java +++ b/core/src/test/java/io/confluent/kafka/schemaregistry/metrics/MetricsTest.java @@ -33,6 +33,7 @@ import static io.confluent.kafka.schemaregistry.metrics.MetricsContainer.METRIC_NAME_DELETED_COUNT; import static io.confluent.kafka.schemaregistry.metrics.MetricsContainer.METRIC_NAME_MASTER_SLAVE_ROLE; import static io.confluent.kafka.schemaregistry.metrics.MetricsContainer.METRIC_NAME_REGISTERED_COUNT; +import static io.confluent.kafka.schemaregistry.metrics.MetricsContainer.METRIC_NAME_TOMBSTONED_COUNT; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -63,6 +64,8 @@ public void testSchemaCreatedCount() throws Exception { new ObjectName("kafka.schema.registry:type=" + METRIC_NAME_AVRO_SCHEMAS_CREATED); ObjectName schemasDeleted = new ObjectName("kafka.schema.registry:type=" + METRIC_NAME_DELETED_COUNT); + ObjectName schemasTombstoned = + new ObjectName("kafka.schema.registry:type=" + METRIC_NAME_TOMBSTONED_COUNT); ObjectName avroDeleted = new ObjectName("kafka.schema.registry:type=" + METRIC_NAME_AVRO_SCHEMAS_DELETED); @@ -93,6 +96,7 @@ public void testSchemaCreatedCount() throws Exception { assertEquals((double) schemaCount, mBeanServer.getAttribute(schemasCreated, METRIC_NAME_REGISTERED_COUNT)); assertEquals((double) schemaCount, mBeanServer.getAttribute(avroCreated, METRIC_NAME_AVRO_SCHEMAS_CREATED)); assertEquals((double) schemaCount, mBeanServer.getAttribute(schemasDeleted, METRIC_NAME_DELETED_COUNT)); + assertEquals((double) schemaCount, mBeanServer.getAttribute(schemasTombstoned, METRIC_NAME_TOMBSTONED_COUNT)); assertEquals((double) schemaCount, mBeanServer.getAttribute(avroDeleted, METRIC_NAME_AVRO_SCHEMAS_DELETED)); } From 8ad718f9642d634cd4cbd3067645fba1e2fe5e70 Mon Sep 17 00:00:00 2001 From: claimundefine Date: Thu, 27 Jun 2024 20:54:02 -0400 Subject: [PATCH 2/2] Add tombstoning schemas to test --- .../confluent/kafka/schemaregistry/metrics/MetricsTest.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/core/src/test/java/io/confluent/kafka/schemaregistry/metrics/MetricsTest.java b/core/src/test/java/io/confluent/kafka/schemaregistry/metrics/MetricsTest.java index 00fb2b502e3..73f49aed183 100644 --- a/core/src/test/java/io/confluent/kafka/schemaregistry/metrics/MetricsTest.java +++ b/core/src/test/java/io/confluent/kafka/schemaregistry/metrics/MetricsTest.java @@ -93,6 +93,12 @@ public void testSchemaCreatedCount() throws Exception { subject, i.toString())); } + // Tombstoning schemas should not modify create count. + for (Integer i = 1; i < schemaIdCounter; i++) { + assertEquals(i, service.deleteSchemaVersion(RestService.DEFAULT_REQUEST_PROPERTIES, + subject, i.toString(), true)); + } + assertEquals((double) schemaCount, mBeanServer.getAttribute(schemasCreated, METRIC_NAME_REGISTERED_COUNT)); assertEquals((double) schemaCount, mBeanServer.getAttribute(avroCreated, METRIC_NAME_AVRO_SCHEMAS_CREATED)); assertEquals((double) schemaCount, mBeanServer.getAttribute(schemasDeleted, METRIC_NAME_DELETED_COUNT));