Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ void recordRecordsFetched(int records) {

void recordBytesFetched(String topic, int bytes) {
String name = topicBytesFetchedMetricName(topic);
maybeRecordDeprecatedBytesFetched(name, topic, bytes);
maybeRecordDeprecatedBytesFetched(topic, bytes);
maybeRemoveDeprecatedBytesFetched(topic);

Sensor bytesFetched = new SensorBuilder(metrics, name, () -> Map.of("topic", topic))
.withAvg(metricsRegistry.topicFetchSizeAvg)
Expand All @@ -126,7 +127,8 @@ void recordBytesFetched(String topic, int bytes) {

void recordRecordsFetched(String topic, int records) {
String name = topicRecordsFetchedMetricName(topic);
maybeRecordDeprecatedRecordsFetched(name, topic, records);
maybeRecordDeprecatedRecordsFetched(topic, records);
maybeRemoveDeprecatedRecordsFetched(topic);

Sensor recordsFetched = new SensorBuilder(metrics, name, () -> Map.of("topic", topic))
.withAvg(metricsRegistry.topicRecordsPerRequestAvg)
Expand All @@ -139,7 +141,8 @@ void recordPartitionLag(TopicPartition tp, long lag) {
this.recordsLag.record(lag);

String name = partitionRecordsLagMetricName(tp);
maybeRecordDeprecatedPartitionLag(name, tp, lag);
maybeRecordDeprecatedPartitionLag(tp, lag);
maybeRemoveDeprecatedPartitionLag(tp);

Sensor recordsLag = new SensorBuilder(metrics, name, () -> mkMap(mkEntry("topic", tp.topic()), mkEntry("partition", String.valueOf(tp.partition()))))
.withValue(metricsRegistry.partitionRecordsLag)
Expand All @@ -154,7 +157,8 @@ void recordPartitionLead(TopicPartition tp, long lead) {
this.recordsLead.record(lead);

String name = partitionRecordsLeadMetricName(tp);
maybeRecordDeprecatedPartitionLead(name, tp, lead);
maybeRecordDeprecatedPartitionLead(tp, lead);
maybeRemoveDeprecatedPartitionLead(tp);

Sensor recordsLead = new SensorBuilder(metrics, name, () -> mkMap(mkEntry("topic", tp.topic()), mkEntry("partition", String.valueOf(tp.partition()))))
.withValue(metricsRegistry.partitionRecordsLead)
Expand Down Expand Up @@ -184,15 +188,17 @@ void maybeUpdateAssignment(SubscriptionState subscription) {
metrics.removeSensor(partitionRecordsLeadMetricName(tp));
metrics.removeMetric(partitionPreferredReadReplicaMetricName(tp));
// Remove deprecated metrics.
metrics.removeSensor(deprecatedMetricName(partitionRecordsLagMetricName(tp)));
metrics.removeSensor(deprecatedMetricName(partitionRecordsLeadMetricName(tp)));
metrics.removeMetric(deprecatedPartitionPreferredReadReplicaMetricName(tp));
metrics.removeSensor(deprecatedPartitionRecordsLagMetricName(tp));
metrics.removeSensor(deprecatedPartitionRecordsLeadMetricName(tp));
if (!newAssignedPartitions.contains(deprecatedTopicPartition(tp))) {
metrics.removeMetric(deprecatedPartitionPreferredReadReplicaMetricName(tp));
}
}
}

for (TopicPartition tp : newAssignedPartitions) {
if (!this.assignedPartitions.contains(tp)) {
maybeRecordDeprecatedPreferredReadReplica(tp, subscription);
maybeRemoveDeprecatedPreferredReadReplica(tp);

MetricName metricName = partitionPreferredReadReplicaMetricName(tp);
metrics.addMetricIfAbsent(
Expand All @@ -203,15 +209,23 @@ void maybeUpdateAssignment(SubscriptionState subscription) {
}
}

for (TopicPartition tp : newAssignedPartitions) {
maybeRecordDeprecatedPreferredReadReplica(tp, subscription, newAssignedPartitions);
}

this.assignedPartitions = newAssignedPartitions;
this.assignmentId = newAssignmentId;
}
}

@Deprecated // To be removed in Kafka 5.0 release.
private void maybeRecordDeprecatedBytesFetched(String name, String topic, int bytes) {
if (shouldReportDeprecatedMetric(topic)) {
Sensor deprecatedBytesFetched = new SensorBuilder(metrics, deprecatedMetricName(name), () -> topicTags(topic))
private void maybeRecordDeprecatedBytesFetched(String topic, int bytes) {
if (shouldReportDeprecatedMetric(topic) &&
metrics.getSensor(topicBytesFetchedMetricName(topic.replace('.', '_'))) == null) {
Sensor deprecatedBytesFetched = new SensorBuilder(
metrics,
deprecatedTopicBytesFetchedMetricName(topic),
() -> topicTags(topic))
.withAvg(metricsRegistry.topicFetchSizeAvg)
.withMax(metricsRegistry.topicFetchSizeMax)
.withMeter(metricsRegistry.topicBytesConsumedRate, metricsRegistry.topicBytesConsumedTotal)
Expand All @@ -221,9 +235,13 @@ private void maybeRecordDeprecatedBytesFetched(String name, String topic, int by
}

@Deprecated // To be removed in Kafka 5.0 release.
private void maybeRecordDeprecatedRecordsFetched(String name, String topic, int records) {
if (shouldReportDeprecatedMetric(topic)) {
Sensor deprecatedRecordsFetched = new SensorBuilder(metrics, deprecatedMetricName(name), () -> topicTags(topic))
private void maybeRecordDeprecatedRecordsFetched(String topic, int records) {
if (shouldReportDeprecatedMetric(topic) &&
metrics.getSensor(topicRecordsFetchedMetricName(topic.replace('.', '_'))) == null) {
Sensor deprecatedRecordsFetched = new SensorBuilder(
metrics,
deprecatedTopicRecordsFetchedMetricName(topic),
() -> topicTags(topic))
.withAvg(metricsRegistry.topicRecordsPerRequestAvg)
.withMeter(metricsRegistry.topicRecordsConsumedRate, metricsRegistry.topicRecordsConsumedTotal)
.build();
Expand All @@ -232,9 +250,13 @@ private void maybeRecordDeprecatedRecordsFetched(String name, String topic, int
}

@Deprecated // To be removed in Kafka 5.0 release.
private void maybeRecordDeprecatedPartitionLag(String name, TopicPartition tp, long lag) {
if (shouldReportDeprecatedMetric(tp.topic())) {
Sensor deprecatedRecordsLag = new SensorBuilder(metrics, deprecatedMetricName(name), () -> topicPartitionTags(tp))
private void maybeRecordDeprecatedPartitionLag(TopicPartition tp, long lag) {
if (shouldReportDeprecatedMetric(tp.topic()) &&
metrics.getSensor(partitionRecordsLagMetricName(deprecatedTopicPartition(tp))) == null) {
Sensor deprecatedRecordsLag = new SensorBuilder(
metrics,
deprecatedPartitionRecordsLagMetricName(tp),
() -> topicPartitionTags(tp))
.withValue(metricsRegistry.partitionRecordsLag)
.withMax(metricsRegistry.partitionRecordsLagMax)
.withAvg(metricsRegistry.partitionRecordsLagAvg)
Expand All @@ -245,9 +267,13 @@ private void maybeRecordDeprecatedPartitionLag(String name, TopicPartition tp, l
}

@Deprecated // To be removed in Kafka 5.0 release.
private void maybeRecordDeprecatedPartitionLead(String name, TopicPartition tp, double lead) {
if (shouldReportDeprecatedMetric(tp.topic())) {
Sensor deprecatedRecordsLead = new SensorBuilder(metrics, deprecatedMetricName(name), () -> topicPartitionTags(tp))
private void maybeRecordDeprecatedPartitionLead(TopicPartition tp, double lead) {
if (shouldReportDeprecatedMetric(tp.topic()) &&
metrics.getSensor(partitionRecordsLeadMetricName(deprecatedTopicPartition(tp))) == null) {
Sensor deprecatedRecordsLead = new SensorBuilder(
metrics,
deprecatedPartitionRecordsLeadMetricName(tp),
() -> topicPartitionTags(tp))
.withValue(metricsRegistry.partitionRecordsLead)
.withMin(metricsRegistry.partitionRecordsLeadMin)
.withAvg(metricsRegistry.partitionRecordsLeadAvg)
Expand All @@ -258,8 +284,11 @@ private void maybeRecordDeprecatedPartitionLead(String name, TopicPartition tp,
}

@Deprecated // To be removed in Kafka 5.0 release.
private void maybeRecordDeprecatedPreferredReadReplica(TopicPartition tp, SubscriptionState subscription) {
if (shouldReportDeprecatedMetric(tp.topic())) {
private void maybeRecordDeprecatedPreferredReadReplica(
TopicPartition tp,
SubscriptionState subscription,
Set<TopicPartition> assignedPartitions) {
if (shouldReportDeprecatedMetric(tp.topic()) && !assignedPartitions.contains(deprecatedTopicPartition(tp))) {
MetricName metricName = deprecatedPartitionPreferredReadReplicaMetricName(tp);
metrics.addMetricIfAbsent(
metricName,
Expand All @@ -269,6 +298,41 @@ private void maybeRecordDeprecatedPreferredReadReplica(TopicPartition tp, Subscr
}
}

@Deprecated // To be removed in Kafka 5.0 release.
private void maybeRemoveDeprecatedBytesFetched(String topic) {
if (!shouldReportDeprecatedMetric(topic)) {
metrics.removeSensor(deprecatedTopicBytesFetchedMetricName(topic));
}
}

@Deprecated // To be removed in Kafka 5.0 release.
private void maybeRemoveDeprecatedRecordsFetched(String topic) {
if (!shouldReportDeprecatedMetric(topic)) {
metrics.removeSensor(deprecatedTopicRecordsFetchedMetricName(topic));
}
}

@Deprecated // To be removed in Kafka 5.0 release.
private void maybeRemoveDeprecatedPartitionLag(TopicPartition tp) {
if (!shouldReportDeprecatedMetric(tp.topic())) {
metrics.removeSensor(deprecatedPartitionRecordsLagMetricName(tp));
}
}

@Deprecated // To be removed in Kafka 5.0 release.
private void maybeRemoveDeprecatedPartitionLead(TopicPartition tp) {
if (!shouldReportDeprecatedMetric(tp.topic())) {
metrics.removeSensor(deprecatedPartitionRecordsLeadMetricName(tp));
}
}

@Deprecated // To be removed in Kafka 5.0 release.
private void maybeRemoveDeprecatedPreferredReadReplica(TopicPartition tp) {
if (!shouldReportDeprecatedMetric(tp.topic())) {
metrics.removeMetric(deprecatedPartitionPreferredReadReplicaMetricName(tp));
}
}

private static String topicBytesFetchedMetricName(String topic) {
return "topic." + topic + ".bytes-fetched";
}
Expand All @@ -277,6 +341,16 @@ private static String topicRecordsFetchedMetricName(String topic) {
return "topic." + topic + ".records-fetched";
}

@Deprecated
private static String deprecatedTopicBytesFetchedMetricName(String topic) {
return deprecatedMetricName(topicBytesFetchedMetricName(topic.replace('.', '_')));
}

@Deprecated
private static String deprecatedTopicRecordsFetchedMetricName(String topic) {
return deprecatedMetricName(topicRecordsFetchedMetricName(topic.replace('.', '_')));
}

private static String partitionRecordsLeadMetricName(TopicPartition tp) {
return tp + ".records-lead";
}
Expand All @@ -285,6 +359,21 @@ private static String partitionRecordsLagMetricName(TopicPartition tp) {
return tp + ".records-lag";
}

@Deprecated
private static String deprecatedPartitionRecordsLeadMetricName(TopicPartition tp) {
return deprecatedMetricName(partitionRecordsLeadMetricName(deprecatedTopicPartition(tp)));
}

@Deprecated
private static String deprecatedPartitionRecordsLagMetricName(TopicPartition tp) {
return deprecatedMetricName(partitionRecordsLagMetricName(deprecatedTopicPartition(tp)));
}

@Deprecated
private static TopicPartition deprecatedTopicPartition(TopicPartition tp) {
return new TopicPartition(tp.topic().replace('.', '_'), tp.partition());
}

private static String deprecatedMetricName(String name) {
return name + ".deprecated";
}
Expand Down Expand Up @@ -315,4 +404,4 @@ static Map<String, String> topicPartitionTags(TopicPartition tp) {
mkEntry("partition", String.valueOf(tp.partition())));
}

}
}
Loading