From 4f06f5b523562e841db9aef7a361b83241e5933f Mon Sep 17 00:00:00 2001 From: wilmerdooley Date: Fri, 12 Jun 2026 14:54:40 +0000 Subject: [PATCH] KAFKA-19753: Fix duplicated topic metrics in FetchMetricsManager for topic names containing dots Signed-off-by: wilmerdooley --- .../internals/FetchMetricsManager.java | 135 +++++++++++++++--- .../internals/FetchMetricsManagerTest.java | 121 ++++++++++++++++ 2 files changed, 233 insertions(+), 23 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManager.java index 646e33333a40c..362f5481ee3c6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManager.java @@ -114,7 +114,8 @@ void recordRecordsFetched(int records) { void recordBytesFetched(String topic, int bytes) { String name = topicBytesFetchedMetricName(topic); - maybeRecordDeprecatedBytesFetched(name, topic, bytes); + maybeRecordDeprecatedBytesFetched(topic, bytes); + maybeRemoveDeprecatedBytesFetched(topic); Sensor bytesFetched = new SensorBuilder(metrics, name, () -> Map.of("topic", topic)) .withAvg(metricsRegistry.topicFetchSizeAvg) @@ -126,7 +127,8 @@ void recordBytesFetched(String topic, int bytes) { void recordRecordsFetched(String topic, int records) { String name = topicRecordsFetchedMetricName(topic); - maybeRecordDeprecatedRecordsFetched(name, topic, records); + maybeRecordDeprecatedRecordsFetched(topic, records); + maybeRemoveDeprecatedRecordsFetched(topic); Sensor recordsFetched = new SensorBuilder(metrics, name, () -> Map.of("topic", topic)) .withAvg(metricsRegistry.topicRecordsPerRequestAvg) @@ -139,7 +141,8 @@ void recordPartitionLag(TopicPartition tp, long lag) { this.recordsLag.record(lag); String name = partitionRecordsLagMetricName(tp); - maybeRecordDeprecatedPartitionLag(name, tp, lag); + maybeRecordDeprecatedPartitionLag(tp, lag); + maybeRemoveDeprecatedPartitionLag(tp); Sensor recordsLag = new SensorBuilder(metrics, name, () -> mkMap(mkEntry("topic", tp.topic()), mkEntry("partition", String.valueOf(tp.partition())))) .withValue(metricsRegistry.partitionRecordsLag) @@ -154,7 +157,8 @@ void recordPartitionLead(TopicPartition tp, long lead) { this.recordsLead.record(lead); String name = partitionRecordsLeadMetricName(tp); - maybeRecordDeprecatedPartitionLead(name, tp, lead); + maybeRecordDeprecatedPartitionLead(tp, lead); + maybeRemoveDeprecatedPartitionLead(tp); Sensor recordsLead = new SensorBuilder(metrics, name, () -> mkMap(mkEntry("topic", tp.topic()), mkEntry("partition", String.valueOf(tp.partition())))) .withValue(metricsRegistry.partitionRecordsLead) @@ -184,15 +188,17 @@ void maybeUpdateAssignment(SubscriptionState subscription) { metrics.removeSensor(partitionRecordsLeadMetricName(tp)); metrics.removeMetric(partitionPreferredReadReplicaMetricName(tp)); // Remove deprecated metrics. - metrics.removeSensor(deprecatedMetricName(partitionRecordsLagMetricName(tp))); - metrics.removeSensor(deprecatedMetricName(partitionRecordsLeadMetricName(tp))); - metrics.removeMetric(deprecatedPartitionPreferredReadReplicaMetricName(tp)); + metrics.removeSensor(deprecatedPartitionRecordsLagMetricName(tp)); + metrics.removeSensor(deprecatedPartitionRecordsLeadMetricName(tp)); + if (!newAssignedPartitions.contains(deprecatedTopicPartition(tp))) { + metrics.removeMetric(deprecatedPartitionPreferredReadReplicaMetricName(tp)); + } } } for (TopicPartition tp : newAssignedPartitions) { if (!this.assignedPartitions.contains(tp)) { - maybeRecordDeprecatedPreferredReadReplica(tp, subscription); + maybeRemoveDeprecatedPreferredReadReplica(tp); MetricName metricName = partitionPreferredReadReplicaMetricName(tp); metrics.addMetricIfAbsent( @@ -203,15 +209,23 @@ void maybeUpdateAssignment(SubscriptionState subscription) { } } + for (TopicPartition tp : newAssignedPartitions) { + maybeRecordDeprecatedPreferredReadReplica(tp, subscription, newAssignedPartitions); + } + this.assignedPartitions = newAssignedPartitions; this.assignmentId = newAssignmentId; } } @Deprecated // To be removed in Kafka 5.0 release. - private void maybeRecordDeprecatedBytesFetched(String name, String topic, int bytes) { - if (shouldReportDeprecatedMetric(topic)) { - Sensor deprecatedBytesFetched = new SensorBuilder(metrics, deprecatedMetricName(name), () -> topicTags(topic)) + private void maybeRecordDeprecatedBytesFetched(String topic, int bytes) { + if (shouldReportDeprecatedMetric(topic) && + metrics.getSensor(topicBytesFetchedMetricName(topic.replace('.', '_'))) == null) { + Sensor deprecatedBytesFetched = new SensorBuilder( + metrics, + deprecatedTopicBytesFetchedMetricName(topic), + () -> topicTags(topic)) .withAvg(metricsRegistry.topicFetchSizeAvg) .withMax(metricsRegistry.topicFetchSizeMax) .withMeter(metricsRegistry.topicBytesConsumedRate, metricsRegistry.topicBytesConsumedTotal) @@ -221,9 +235,13 @@ private void maybeRecordDeprecatedBytesFetched(String name, String topic, int by } @Deprecated // To be removed in Kafka 5.0 release. - private void maybeRecordDeprecatedRecordsFetched(String name, String topic, int records) { - if (shouldReportDeprecatedMetric(topic)) { - Sensor deprecatedRecordsFetched = new SensorBuilder(metrics, deprecatedMetricName(name), () -> topicTags(topic)) + private void maybeRecordDeprecatedRecordsFetched(String topic, int records) { + if (shouldReportDeprecatedMetric(topic) && + metrics.getSensor(topicRecordsFetchedMetricName(topic.replace('.', '_'))) == null) { + Sensor deprecatedRecordsFetched = new SensorBuilder( + metrics, + deprecatedTopicRecordsFetchedMetricName(topic), + () -> topicTags(topic)) .withAvg(metricsRegistry.topicRecordsPerRequestAvg) .withMeter(metricsRegistry.topicRecordsConsumedRate, metricsRegistry.topicRecordsConsumedTotal) .build(); @@ -232,9 +250,13 @@ private void maybeRecordDeprecatedRecordsFetched(String name, String topic, int } @Deprecated // To be removed in Kafka 5.0 release. - private void maybeRecordDeprecatedPartitionLag(String name, TopicPartition tp, long lag) { - if (shouldReportDeprecatedMetric(tp.topic())) { - Sensor deprecatedRecordsLag = new SensorBuilder(metrics, deprecatedMetricName(name), () -> topicPartitionTags(tp)) + private void maybeRecordDeprecatedPartitionLag(TopicPartition tp, long lag) { + if (shouldReportDeprecatedMetric(tp.topic()) && + metrics.getSensor(partitionRecordsLagMetricName(deprecatedTopicPartition(tp))) == null) { + Sensor deprecatedRecordsLag = new SensorBuilder( + metrics, + deprecatedPartitionRecordsLagMetricName(tp), + () -> topicPartitionTags(tp)) .withValue(metricsRegistry.partitionRecordsLag) .withMax(metricsRegistry.partitionRecordsLagMax) .withAvg(metricsRegistry.partitionRecordsLagAvg) @@ -245,9 +267,13 @@ private void maybeRecordDeprecatedPartitionLag(String name, TopicPartition tp, l } @Deprecated // To be removed in Kafka 5.0 release. - private void maybeRecordDeprecatedPartitionLead(String name, TopicPartition tp, double lead) { - if (shouldReportDeprecatedMetric(tp.topic())) { - Sensor deprecatedRecordsLead = new SensorBuilder(metrics, deprecatedMetricName(name), () -> topicPartitionTags(tp)) + private void maybeRecordDeprecatedPartitionLead(TopicPartition tp, double lead) { + if (shouldReportDeprecatedMetric(tp.topic()) && + metrics.getSensor(partitionRecordsLeadMetricName(deprecatedTopicPartition(tp))) == null) { + Sensor deprecatedRecordsLead = new SensorBuilder( + metrics, + deprecatedPartitionRecordsLeadMetricName(tp), + () -> topicPartitionTags(tp)) .withValue(metricsRegistry.partitionRecordsLead) .withMin(metricsRegistry.partitionRecordsLeadMin) .withAvg(metricsRegistry.partitionRecordsLeadAvg) @@ -258,8 +284,11 @@ private void maybeRecordDeprecatedPartitionLead(String name, TopicPartition tp, } @Deprecated // To be removed in Kafka 5.0 release. - private void maybeRecordDeprecatedPreferredReadReplica(TopicPartition tp, SubscriptionState subscription) { - if (shouldReportDeprecatedMetric(tp.topic())) { + private void maybeRecordDeprecatedPreferredReadReplica( + TopicPartition tp, + SubscriptionState subscription, + Set assignedPartitions) { + if (shouldReportDeprecatedMetric(tp.topic()) && !assignedPartitions.contains(deprecatedTopicPartition(tp))) { MetricName metricName = deprecatedPartitionPreferredReadReplicaMetricName(tp); metrics.addMetricIfAbsent( metricName, @@ -269,6 +298,41 @@ private void maybeRecordDeprecatedPreferredReadReplica(TopicPartition tp, Subscr } } + @Deprecated // To be removed in Kafka 5.0 release. + private void maybeRemoveDeprecatedBytesFetched(String topic) { + if (!shouldReportDeprecatedMetric(topic)) { + metrics.removeSensor(deprecatedTopicBytesFetchedMetricName(topic)); + } + } + + @Deprecated // To be removed in Kafka 5.0 release. + private void maybeRemoveDeprecatedRecordsFetched(String topic) { + if (!shouldReportDeprecatedMetric(topic)) { + metrics.removeSensor(deprecatedTopicRecordsFetchedMetricName(topic)); + } + } + + @Deprecated // To be removed in Kafka 5.0 release. + private void maybeRemoveDeprecatedPartitionLag(TopicPartition tp) { + if (!shouldReportDeprecatedMetric(tp.topic())) { + metrics.removeSensor(deprecatedPartitionRecordsLagMetricName(tp)); + } + } + + @Deprecated // To be removed in Kafka 5.0 release. + private void maybeRemoveDeprecatedPartitionLead(TopicPartition tp) { + if (!shouldReportDeprecatedMetric(tp.topic())) { + metrics.removeSensor(deprecatedPartitionRecordsLeadMetricName(tp)); + } + } + + @Deprecated // To be removed in Kafka 5.0 release. + private void maybeRemoveDeprecatedPreferredReadReplica(TopicPartition tp) { + if (!shouldReportDeprecatedMetric(tp.topic())) { + metrics.removeMetric(deprecatedPartitionPreferredReadReplicaMetricName(tp)); + } + } + private static String topicBytesFetchedMetricName(String topic) { return "topic." + topic + ".bytes-fetched"; } @@ -277,6 +341,16 @@ private static String topicRecordsFetchedMetricName(String topic) { return "topic." + topic + ".records-fetched"; } + @Deprecated + private static String deprecatedTopicBytesFetchedMetricName(String topic) { + return deprecatedMetricName(topicBytesFetchedMetricName(topic.replace('.', '_'))); + } + + @Deprecated + private static String deprecatedTopicRecordsFetchedMetricName(String topic) { + return deprecatedMetricName(topicRecordsFetchedMetricName(topic.replace('.', '_'))); + } + private static String partitionRecordsLeadMetricName(TopicPartition tp) { return tp + ".records-lead"; } @@ -285,6 +359,21 @@ private static String partitionRecordsLagMetricName(TopicPartition tp) { return tp + ".records-lag"; } + @Deprecated + private static String deprecatedPartitionRecordsLeadMetricName(TopicPartition tp) { + return deprecatedMetricName(partitionRecordsLeadMetricName(deprecatedTopicPartition(tp))); + } + + @Deprecated + private static String deprecatedPartitionRecordsLagMetricName(TopicPartition tp) { + return deprecatedMetricName(partitionRecordsLagMetricName(deprecatedTopicPartition(tp))); + } + + @Deprecated + private static TopicPartition deprecatedTopicPartition(TopicPartition tp) { + return new TopicPartition(tp.topic().replace('.', '_'), tp.partition()); + } + private static String deprecatedMetricName(String name) { return name + ".deprecated"; } @@ -315,4 +404,4 @@ static Map topicPartitionTags(TopicPartition tp) { mkEntry("partition", String.valueOf(tp.partition()))); } -} \ No newline at end of file +} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManagerTest.java index f01d0f65046a9..bcfc48c25635b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManagerTest.java @@ -38,6 +38,8 @@ import static org.apache.kafka.clients.consumer.internals.FetchMetricsManager.topicPartitionTags; import static org.apache.kafka.clients.consumer.internals.FetchMetricsManager.topicTags; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; public class FetchMetricsManagerTest { @@ -202,6 +204,63 @@ public void testRecordsFetchedTopic() { assertEquals(6, metricValue(metricsRegistry.topicRecordsConsumedTotal, deprecatedTags), EPSILON); } + @Test + public void testRecordsFetchedTopicWithPeriodBeforeUnderscoreTopic() { + String dottedTopic = "my.topic"; + String underscoreTopic = "my_topic"; + Map dottedTags = Map.of("topic", dottedTopic); + Map underscoreTags = Map.of("topic", underscoreTopic); + int initialMetricsSize = metrics.metrics().size(); + + metricsManager.recordRecordsFetched(dottedTopic, 1); + assertEquals(6, metrics.metrics().size() - initialMetricsSize); + assertNotNull(metrics.getSensor("topic.my.topic.records-fetched")); + assertNotNull(metrics.getSensor("topic.my_topic.records-fetched.deprecated")); + assertNull(metrics.getSensor("topic.my_topic.records-fetched")); + + metricsManager.recordRecordsFetched(underscoreTopic, 2); + assertEquals(6, metrics.metrics().size() - initialMetricsSize); + assertNotNull(metrics.getSensor("topic.my_topic.records-fetched")); + assertNull(metrics.getSensor("topic.my_topic.records-fetched.deprecated")); + assertEquals(1, metricValue(metricsRegistry.topicRecordsConsumedTotal, dottedTags), EPSILON); + assertEquals(2, metricValue(metricsRegistry.topicRecordsConsumedTotal, underscoreTags), EPSILON); + } + + @Test + public void testRecordsFetchedTopicWithUnderscoreBeforePeriodTopic() { + String dottedTopic = "my.topic"; + String underscoreTopic = "my_topic"; + Map dottedTags = Map.of("topic", dottedTopic); + Map underscoreTags = Map.of("topic", underscoreTopic); + int initialMetricsSize = metrics.metrics().size(); + + metricsManager.recordRecordsFetched(underscoreTopic, 2); + assertEquals(3, metrics.metrics().size() - initialMetricsSize); + assertNotNull(metrics.getSensor("topic.my_topic.records-fetched")); + assertNull(metrics.getSensor("topic.my_topic.records-fetched.deprecated")); + + metricsManager.recordRecordsFetched(dottedTopic, 1); + assertEquals(6, metrics.metrics().size() - initialMetricsSize); + assertNotNull(metrics.getSensor("topic.my.topic.records-fetched")); + assertNull(metrics.getSensor("topic.my_topic.records-fetched.deprecated")); + assertEquals(1, metricValue(metricsRegistry.topicRecordsConsumedTotal, dottedTags), EPSILON); + assertEquals(2, metricValue(metricsRegistry.topicRecordsConsumedTotal, underscoreTags), EPSILON); + } + + @Test + public void testRecordsFetchedTopicWithoutPeriodDoesNotRegisterDeprecatedSensor() { + String topicName = "my_topic"; + Map tags = Map.of("topic", topicName); + int initialMetricsSize = metrics.metrics().size(); + + metricsManager.recordRecordsFetched(topicName, 2); + + assertEquals(3, metrics.metrics().size() - initialMetricsSize); + assertNotNull(metrics.getSensor("topic.my_topic.records-fetched")); + assertNull(metrics.getSensor("topic.my_topic.records-fetched.deprecated")); + assertEquals(2, metricValue(metricsRegistry.topicRecordsConsumedTotal, tags), EPSILON); + } + @Test @SuppressWarnings("deprecation") public void testPartitionLag() { @@ -297,6 +356,68 @@ public void testPartitionLead() { assertEquals(15, metricValue(metricsRegistry.partitionRecordsLeadAvg, deprecatedTags), EPSILON); } + @Test + public void testPartitionMetricsWithPeriodBeforeUnderscoreTopic() { + TopicPartition dottedTp = new TopicPartition("my.topic", 0); + TopicPartition underscoreTp = new TopicPartition("my_topic", 0); + Map dottedTags = Map.of( + "topic", dottedTp.topic(), + "partition", String.valueOf(dottedTp.partition())); + Map underscoreTags = Map.of( + "topic", underscoreTp.topic(), + "partition", String.valueOf(underscoreTp.partition())); + int initialMetricsSize = metrics.metrics().size(); + + metricsManager.recordPartitionLag(dottedTp, 4); + metricsManager.recordPartitionLead(dottedTp, 8); + assertEquals(12, metrics.metrics().size() - initialMetricsSize); + assertNotNull(metrics.getSensor("my.topic-0.records-lag")); + assertNotNull(metrics.getSensor("my_topic-0.records-lag.deprecated")); + assertNotNull(metrics.getSensor("my.topic-0.records-lead")); + assertNotNull(metrics.getSensor("my_topic-0.records-lead.deprecated")); + + metricsManager.recordPartitionLag(underscoreTp, 2); + metricsManager.recordPartitionLead(underscoreTp, 6); + assertEquals(12, metrics.metrics().size() - initialMetricsSize); + assertNotNull(metrics.getSensor("my_topic-0.records-lag")); + assertNull(metrics.getSensor("my_topic-0.records-lag.deprecated")); + assertNotNull(metrics.getSensor("my_topic-0.records-lead")); + assertNull(metrics.getSensor("my_topic-0.records-lead.deprecated")); + assertEquals(4, metricValue(metricsRegistry.partitionRecordsLag, dottedTags), EPSILON); + assertEquals(2, metricValue(metricsRegistry.partitionRecordsLag, underscoreTags), EPSILON); + assertEquals(8, metricValue(metricsRegistry.partitionRecordsLead, dottedTags), EPSILON); + assertEquals(6, metricValue(metricsRegistry.partitionRecordsLead, underscoreTags), EPSILON); + } + + @Test + public void testMaybeUpdateAssignmentWithPeriodBeforeUnderscoreTopic() { + TopicPartition dottedTp = new TopicPartition("my.topic", 0); + TopicPartition underscoreTp = new TopicPartition("my_topic", 0); + Map dottedTags = Map.of( + "topic", dottedTp.topic(), + "partition", String.valueOf(dottedTp.partition())); + Map underscoreTags = Map.of( + "topic", underscoreTp.topic(), + "partition", String.valueOf(underscoreTp.partition())); + + SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE); + subscriptionState.assignFromUser(Set.of(dottedTp)); + subscriptionState.updatePreferredReadReplica(dottedTp, 1, () -> 0L); + metricsManager.maybeUpdateAssignment(subscriptionState); + assertEquals(1, readReplicaMetricValue(metricsRegistry.partitionPreferredReadReplica, dottedTags), EPSILON); + assertEquals(1, readReplicaMetricValue(metricsRegistry.partitionPreferredReadReplica, underscoreTags), EPSILON); + + subscriptionState.assignFromUser(Set.of(dottedTp, underscoreTp)); + subscriptionState.updatePreferredReadReplica(underscoreTp, 2, () -> 0L); + metricsManager.maybeUpdateAssignment(subscriptionState); + assertEquals(2, readReplicaMetricValue(metricsRegistry.partitionPreferredReadReplica, underscoreTags), EPSILON); + + subscriptionState.assignFromUser(Set.of(underscoreTp)); + subscriptionState.updatePreferredReadReplica(underscoreTp, 2, () -> 0L); + metricsManager.maybeUpdateAssignment(subscriptionState); + assertEquals(2, readReplicaMetricValue(metricsRegistry.partitionPreferredReadReplica, underscoreTags), EPSILON); + } + @Test @SuppressWarnings("deprecation") public void testMaybeUpdateAssignment() {