diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 2b1d5c775f274..f0439e88ad6e6 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -11049,7 +11049,7 @@ class KafkaApisTest extends Logging { val streamsGroupHeartbeatResponse = new StreamsGroupHeartbeatResponseData() .setMemberId("member") - future.complete(new StreamsGroupHeartbeatResult(streamsGroupHeartbeatResponse, util.Map.of(), -1)) + future.complete(new StreamsGroupHeartbeatResult(streamsGroupHeartbeatResponse, util.Map.of(), -1, -1, -1)) val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest) assertEquals(streamsGroupHeartbeatResponse, response.data) } @@ -11119,7 +11119,7 @@ class KafkaApisTest extends Logging { val streamsGroupHeartbeatResponse = new StreamsGroupHeartbeatResponseData() .setMemberId("member") - future.complete(new StreamsGroupHeartbeatResult(streamsGroupHeartbeatResponse, util.Map.of(), -1)) + future.complete(new StreamsGroupHeartbeatResult(streamsGroupHeartbeatResponse, util.Map.of(), -1, -1, -1)) val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest) assertEquals(streamsGroupHeartbeatResponse, response.data) } @@ -11351,7 +11351,7 @@ class KafkaApisTest extends Logging { val streamsGroupHeartbeatResponse = new StreamsGroupHeartbeatResponseData() .setMemberId("member") - future.complete(new StreamsGroupHeartbeatResult(streamsGroupHeartbeatResponse, missingTopics, -1)) + future.complete(new StreamsGroupHeartbeatResult(streamsGroupHeartbeatResponse, missingTopics, -1, -1, -1)) val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest) assertEquals(streamsGroupHeartbeatResponse, response.data) verify(autoTopicCreationManager).createStreamsInternalTopics(any(), any(), anyLong()) @@ -11400,7 +11400,7 @@ class KafkaApisTest extends Logging { val streamsGroupHeartbeatResponse = new StreamsGroupHeartbeatResponseData() .setMemberId("member") - future.complete(new StreamsGroupHeartbeatResult(streamsGroupHeartbeatResponse, missingTopics, -1)) + future.complete(new StreamsGroupHeartbeatResult(streamsGroupHeartbeatResponse, missingTopics, -1, -1, -1)) val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest) assertEquals(Errors.NONE.code, response.data.errorCode()) assertEquals(null, response.data.errorMessage()) @@ -11451,7 +11451,7 @@ class KafkaApisTest extends Logging { .setStatusDetail("Internal topics are missing: [test-topic]") )) - future.complete(new StreamsGroupHeartbeatResult(streamsGroupHeartbeatResponse, missingTopics, -1)) + future.complete(new StreamsGroupHeartbeatResult(streamsGroupHeartbeatResponse, missingTopics, -1, -1, -1)) val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest) assertEquals(Errors.NONE.code, response.data.errorCode()) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java index 2911736a01001..49be42f949ebe 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java @@ -98,9 +98,11 @@ import org.apache.kafka.coordinator.common.runtime.PartitionWriter; import org.apache.kafka.coordinator.group.GroupCoordinatorShard.DeletedTopic; import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor; +import org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescriptionPlugin; import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics; import org.apache.kafka.coordinator.group.streams.StreamsGroupDescribeResult; import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult; +import org.apache.kafka.coordinator.group.streams.TopologyDescriptionManager; import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataImage; import org.apache.kafka.image.TopicsDelta; @@ -250,6 +252,9 @@ public GroupCoordinatorService build() { String logPrefix = String.format("GroupCoordinator id=%d", nodeId); LogContext logContext = new LogContext(String.format("[%s] ", logPrefix)); + Optional streamsGroupTopologyDescriptionPlugin = + Optional.ofNullable(config.streamsGroupTopologyDescriptionPlugin(Map.of())); + CoordinatorShardBuilderSupplier supplier = () -> new GroupCoordinatorShard.Builder(config, groupConfigManager) .withAuthorizerPlugin(authorizerPlugin); @@ -297,7 +302,9 @@ public GroupCoordinatorService build() { groupConfigManager, persister, timer, - partitionMetadataClient + partitionMetadataClient, + streamsGroupTopologyDescriptionPlugin, + time ); } } @@ -352,6 +359,14 @@ public GroupCoordinatorService build() { */ private final PartitionMetadataClient partitionMetadataClient; + /** + * The broker-level component that owns the streams-group topology description plugin + * (KIP-1331): plugin reference, per-group push back-off, and the three entry points + * the service delegates into — heartbeat post-processing, the push RPC, and the + * pre-tombstone hook on DeleteGroups. + */ + private final TopologyDescriptionManager topologyDescriptionManager; + /** * The number of partitions of the __consumer_offsets topics. This is provided * when the component is started. @@ -382,7 +397,9 @@ public GroupCoordinatorService build() { GroupConfigManager groupConfigManager, Persister persister, Timer timer, - PartitionMetadataClient partitionMetadataClient + PartitionMetadataClient partitionMetadataClient, + Optional streamsGroupTopologyDescriptionPlugin, + Time time ) { this.log = logContext.logger(GroupCoordinatorService.class); this.config = config; @@ -397,6 +414,10 @@ public GroupCoordinatorService build() { .map(ConsumerGroupPartitionAssignor::name) .collect(Collectors.toSet()); this.partitionMetadataClient = partitionMetadataClient; + this.topologyDescriptionManager = new TopologyDescriptionManager( + streamsGroupTopologyDescriptionPlugin, + time + ); } /** @@ -608,6 +629,8 @@ public CompletableFuture streamsGroupHeartbeat( new StreamsGroupHeartbeatResult( new StreamsGroupHeartbeatResponseData().setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()), Map.of(), + -1, + -1, -1 ) ); @@ -624,15 +647,19 @@ public CompletableFuture streamsGroupHeartbeat( .setErrorCode(apiError.error().code()) .setErrorMessage(apiError.message()), Map.of(), + -1, + -1, -1 ) ); } + final String heartbeatGroupId = request.groupId(); return runtime.scheduleWriteOperation( "streams-group-heartbeat", - topicPartitionFor(request.groupId()), + topicPartitionFor(heartbeatGroupId), coordinator -> coordinator.streamsGroupHeartbeat(context, request) + ).thenApply(result -> topologyDescriptionManager.maybeSetTopologyDescriptionRequired(result, heartbeatGroupId) ).exceptionally(exception -> handleOperationException( "streams-group-heartbeat", request, @@ -643,6 +670,8 @@ public CompletableFuture streamsGroupHeartbeat( .setErrorCode(error.code()) .setErrorMessage(message), Map.of(), + -1, + -1, -1 ), log diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 1f09b2908d91f..c5673613fe4aa 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -2325,7 +2325,13 @@ private CoordinatorResult stream response.setStatus(returnedStatus); - return new CoordinatorResult<>(records, new StreamsGroupHeartbeatResult(response, internalTopicsToBeCreated, updatedTopology.topologyEpoch())); + return new CoordinatorResult<>(records, new StreamsGroupHeartbeatResult( + response, + internalTopicsToBeCreated, + updatedTopology.topologyEpoch(), + group.storedDescriptionTopologyEpoch(), + group.failedDescriptionTopologyEpoch() + )); } /** @@ -4466,7 +4472,13 @@ private CoordinatorResult stream if (instanceId == null) { StreamsGroupMember member = group.getMemberOrThrow(memberId); log.info("[GroupId {}][MemberId {}] Member {} left the streams group.", groupId, memberId, memberId); - return streamsGroupFenceMember(group, member, new StreamsGroupHeartbeatResult(response, Map.of(), group.currentTopologyEpoch())); + return streamsGroupFenceMember(group, member, new StreamsGroupHeartbeatResult( + response, + Map.of(), + group.currentTopologyEpoch(), + group.storedDescriptionTopologyEpoch(), + group.failedDescriptionTopologyEpoch() + )); } else { StreamsGroupMember member = group.staticMember(instanceId); throwIfStaticMemberIsUnknown(member, instanceId); @@ -4478,7 +4490,13 @@ private CoordinatorResult stream } else { log.info("[GroupId {}][MemberId {}] Static member {} with instance id {} left the streams group.", group.groupId(), memberId, memberId, instanceId); - return streamsGroupFenceMember(group, member, new StreamsGroupHeartbeatResult(response, Map.of(), group.currentTopologyEpoch())); + return streamsGroupFenceMember(group, member, new StreamsGroupHeartbeatResult( + response, + Map.of(), + group.currentTopologyEpoch(), + group.storedDescriptionTopologyEpoch(), + group.failedDescriptionTopologyEpoch() + )); } } } @@ -4547,7 +4565,13 @@ private CoordinatorResult stream return new CoordinatorResult<>( List.of(record), - new StreamsGroupHeartbeatResult(response, Map.of(), group.currentTopologyEpoch()) + new StreamsGroupHeartbeatResult( + response, + Map.of(), + group.currentTopologyEpoch(), + group.storedDescriptionTopologyEpoch(), + group.failedDescriptionTopologyEpoch() + ) ); } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupHeartbeatResult.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupHeartbeatResult.java index b6066b41168c9..b5ccb9faffbd7 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupHeartbeatResult.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupHeartbeatResult.java @@ -26,16 +26,27 @@ /** * A simple record to hold the result of a StreamsGroupHeartbeat request. * - * @param data The data to be returned to the client. - * @param creatableTopics The internal topics to be created. - * @param currentTopologyEpoch The topology epoch the group is operating at after this heartbeat, or -1 if the - * group has no topology yet. The service layer uses this to decide whether to set - * TopologyDescriptionRequired on the response (KIP-1331). + *

The three epoch fields let the service layer decide, without re-reading the group, + * whether to set {@code TopologyDescriptionRequired} on the response: a push is needed + * when the stored epoch lags the current epoch and the same epoch has not already been + * recorded as a permanent failure. All three are -1 for failure-fast paths that do not + * resolve a group. + * + * @param data The data to be returned to the client. + * @param creatableTopics The internal topics to be created. + * @param currentTopologyEpoch The topology epoch the group is operating at after this heartbeat, + * or -1 if the group has no topology yet. + * @param storedDescriptionTopologyEpoch The most recent topology epoch successfully stored by the topology + * description plugin, or -1 if none. + * @param failedDescriptionTopologyEpoch The most recent topology epoch the plugin permanently rejected, + * or -1 if none. */ public record StreamsGroupHeartbeatResult( StreamsGroupHeartbeatResponseData data, Map creatableTopics, - int currentTopologyEpoch + int currentTopologyEpoch, + int storedDescriptionTopologyEpoch, + int failedDescriptionTopologyEpoch ) { public StreamsGroupHeartbeatResult { diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTopologyDescriptionBackoff.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTopologyDescriptionBackoff.java new file mode 100644 index 0000000000000..c8852af6e5671 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTopologyDescriptionBackoff.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.utils.Time; + +import java.util.concurrent.ConcurrentHashMap; + +/** + * In-memory per-group back-off that throttles broker re-solicitation of a topology + * description push. An entry is armed when the broker decides to set + * {@code TopologyDescriptionRequired=true} on a heartbeat or after a transient plugin + * failure; consecutive arms at the same topology epoch double the window from + * {@value #INITIAL_DELAY_MS} ms up to {@value #MAX_DELAY_MS} ms. Successful pushes, + * permanent plugin failures, and topology-epoch advances clear the entry. + */ +public class StreamsGroupTopologyDescriptionBackoff { + + static final long INITIAL_DELAY_MS = 30_000L; + static final long MAX_DELAY_MS = 3_600_000L; + + private final Time time; + private final ConcurrentHashMap state = new ConcurrentHashMap<>(); + + record Entry(int topologyEpoch, long currentDelayMs, long nextAttemptMs) { } + + public StreamsGroupTopologyDescriptionBackoff(Time time) { + this.time = time; + } + + /** + * @return true if a back-off window is in effect for the given group at the given + * topology epoch and the broker should suppress soliciting another push. + */ + public boolean isActive(String groupId, int topologyEpoch) { + Entry entry = state.get(groupId); + return entry != null + && entry.topologyEpoch() == topologyEpoch + && time.milliseconds() < entry.nextAttemptMs(); + } + + /** + * Atomic check-and-arm. Returns true if no window was in effect and a new one was + * armed, false if a window was already active and nothing changed. Used on the + * heartbeat path to fold the "check + arm" pair into a single compute so two + * concurrent heartbeats for the same group cannot both arm the back-off. + */ + public boolean armIfNotActive(String groupId, int topologyEpoch) { + final long now = time.milliseconds(); + final boolean[] armed = new boolean[]{false}; + state.compute(groupId, (key, existing) -> { + if (existing != null + && existing.topologyEpoch() == topologyEpoch + && now < existing.nextAttemptMs()) { + return existing; + } + armed[0] = true; + return new Entry(topologyEpoch, INITIAL_DELAY_MS, now + INITIAL_DELAY_MS); + }); + return armed[0]; + } + + /** + * Arm a new back-off window or extend the existing one. If the existing entry is for a + * different topology epoch the window is reset to {@link #INITIAL_DELAY_MS}. + */ + public void armOrExtend(String groupId, int topologyEpoch) { + final long now = time.milliseconds(); + state.compute(groupId, (key, existing) -> { + if (existing == null || existing.topologyEpoch() != topologyEpoch) { + return new Entry(topologyEpoch, INITIAL_DELAY_MS, now + INITIAL_DELAY_MS); + } + long nextDelay = Math.min(existing.currentDelayMs() * 2, MAX_DELAY_MS); + return new Entry(topologyEpoch, nextDelay, now + nextDelay); + }); + } + + /** + * Drop the back-off entry for a group. Called on a successful push, a permanent plugin + * failure, or when the group is removed. + */ + public void clear(String groupId) { + state.remove(groupId); + } + + // Visible for testing. + Entry entry(String groupId) { + return state.get(groupId); + } +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyDescriptionManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyDescriptionManager.java new file mode 100644 index 0000000000000..7dcbdd6e17e1a --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/TopologyDescriptionManager.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescriptionPlugin; + +import java.util.Optional; + +/** + * Broker-level component that owns everything tied to the streams-group topology + * description plugin: the configured plugin reference, the per-group re-solicitation + * back-off, and the heartbeat-path gate that asks clients to push their topology. + * + *

This class is broker-level (one instance per {@code GroupCoordinatorService}); the + * back-off map is keyed by {@code groupId} and shared across all partitions hosted on the + * broker. State here is intentionally non-timeline and non-replayed: it is rebuilt from + * scratch on broker restart, and the persisted {@code StoredDescriptionTopologyEpoch} / + * {@code FailedDescriptionTopologyEpoch} fields on each streams group drive convergence + * after a restart. + */ +public class TopologyDescriptionManager { + private final Optional plugin; + private final StreamsGroupTopologyDescriptionBackoff backoff; + + public TopologyDescriptionManager( + Optional plugin, + Time time + ) { + this.plugin = plugin; + this.backoff = new StreamsGroupTopologyDescriptionBackoff(time); + } + + /** + * @return true if a topology description plugin is configured on this broker. + */ + public boolean isPluginConfigured() { + return plugin.isPresent(); + } + + /** + * Post-processes a successful streams group heartbeat result by deciding whether the + * broker should set {@code TopologyDescriptionRequired=true} on the response, and + * arming the per-group back-off when it does. + * + *

The flag is set when the topology description plugin is configured, the group + * has resolved to a topology epoch, that epoch is neither stored nor permanently + * failed at the plugin, no back-off is in effect for this epoch, and the response + * does not carry a {@code STALE_TOPOLOGY} status (the member would just be told to + * catch up first). When the response already carries an error code we leave it + * alone. + */ + public StreamsGroupHeartbeatResult maybeSetTopologyDescriptionRequired( + StreamsGroupHeartbeatResult result, + String groupId + ) { + if (plugin.isEmpty()) { + return result; + } + StreamsGroupHeartbeatResponseData response = result.data(); + if (response.errorCode() != Errors.NONE.code()) { + return result; + } + int currentEpoch = result.currentTopologyEpoch(); + if (currentEpoch < 0 + || result.storedDescriptionTopologyEpoch() == currentEpoch + || result.failedDescriptionTopologyEpoch() == currentEpoch + || responseHasStaleTopology(response)) { + return result; + } + // Atomic check-and-arm: only set the flag if the back-off window is not already + // in effect for this epoch, so two concurrent heartbeats for the same group cannot + // both arm the back-off and double the window beyond its intended length. + if (backoff.armIfNotActive(groupId, currentEpoch)) { + response.setTopologyDescriptionRequired(true); + } + return result; + } + + // Visible for testing. + StreamsGroupTopologyDescriptionBackoff backoff() { + return backoff; + } + + private static boolean responseHasStaleTopology(StreamsGroupHeartbeatResponseData response) { + if (response.status() == null) { + return false; + } + byte staleCode = Status.STALE_TOPOLOGY.code(); + return response.status().stream().anyMatch(s -> s.statusCode() == staleCode); + } +} diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java index 23c12223af05e..6074c4699fb10 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java @@ -92,6 +92,7 @@ import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime; import org.apache.kafka.coordinator.common.runtime.MetadataImageBuilder; +import org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescriptionPlugin; import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics; import org.apache.kafka.coordinator.group.streams.StreamsGroupDescribeResult; import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult; @@ -474,6 +475,8 @@ public void testStreamsGroupHeartbeatWhenNotStarted() throws ExecutionException, new StreamsGroupHeartbeatResult( new StreamsGroupHeartbeatResponseData().setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()), Map.of(), + -1, + -1, -1 ), future.get() @@ -506,6 +509,8 @@ public void testStreamsGroupHeartbeat() throws ExecutionException, InterruptedEx new StreamsGroupHeartbeatResult( new StreamsGroupHeartbeatResponseData(), Map.of(), + -1, + -1, -1 ) )); @@ -515,7 +520,7 @@ public void testStreamsGroupHeartbeat() throws ExecutionException, InterruptedEx request ); - assertEquals(new StreamsGroupHeartbeatResult(new StreamsGroupHeartbeatResponseData(), Map.of(), -1), future.get(5, TimeUnit.SECONDS)); + assertEquals(new StreamsGroupHeartbeatResult(new StreamsGroupHeartbeatResponseData(), Map.of(), -1, -1, -1), future.get(5, TimeUnit.SECONDS)); } private static Stream testStreamsGroupHeartbeatWithExceptionSource() { @@ -575,6 +580,8 @@ public void testStreamsGroupHeartbeatWithException( .setErrorCode(expectedErrorCode) .setErrorMessage(expectedErrorMessage), Map.of(), + -1, + -1, -1 ), future.get(5, TimeUnit.SECONDS) @@ -597,6 +604,8 @@ public void testStreamsGroupHeartbeatFailsForUnsupportedFeatures() throws Except .setErrorCode(Errors.INVALID_REQUEST.code()) .setErrorMessage("TaskOffsets are not supported yet."), Map.of(), + -1, + -1, -1 ), service.streamsGroupHeartbeat( @@ -612,6 +621,8 @@ public void testStreamsGroupHeartbeatFailsForUnsupportedFeatures() throws Except .setErrorCode(Errors.INVALID_REQUEST.code()) .setErrorMessage("TaskEndOffsets are not supported yet."), Map.of(), + -1, + -1, -1 ), service.streamsGroupHeartbeat( @@ -627,6 +638,8 @@ public void testStreamsGroupHeartbeatFailsForUnsupportedFeatures() throws Except .setErrorCode(Errors.INVALID_REQUEST.code()) .setErrorMessage("WarmupTasks are not supported yet."), Map.of(), + -1, + -1, -1 ), service.streamsGroupHeartbeat( @@ -642,6 +655,8 @@ public void testStreamsGroupHeartbeatFailsForUnsupportedFeatures() throws Except .setErrorCode(Errors.INVALID_REQUEST.code()) .setErrorMessage("Regular expressions for source topics are not supported yet."), Map.of(), + -1, + -1, -1 ), service.streamsGroupHeartbeat( @@ -676,6 +691,8 @@ public void testStreamsHeartbeatRequestValidation() throws ExecutionException, I .setErrorCode(Errors.INVALID_REQUEST.code()) .setErrorMessage("MemberId can't be empty."), Map.of(), + -1, + -1, -1 ), service.streamsGroupHeartbeat( @@ -691,6 +708,8 @@ public void testStreamsHeartbeatRequestValidation() throws ExecutionException, I .setErrorCode(Errors.INVALID_REQUEST.code()) .setErrorMessage("MemberId can't be empty."), Map.of(), + -1, + -1, -1 ), service.streamsGroupHeartbeat( @@ -707,6 +726,8 @@ public void testStreamsHeartbeatRequestValidation() throws ExecutionException, I .setErrorCode(Errors.INVALID_REQUEST.code()) .setErrorMessage("GroupId can't be empty."), Map.of(), + -1, + -1, -1 ), service.streamsGroupHeartbeat( @@ -723,6 +744,8 @@ public void testStreamsHeartbeatRequestValidation() throws ExecutionException, I .setErrorCode(Errors.INVALID_REQUEST.code()) .setErrorMessage("GroupId can't be empty."), Map.of(), + -1, + -1, -1 ), service.streamsGroupHeartbeat( @@ -740,6 +763,8 @@ public void testStreamsHeartbeatRequestValidation() throws ExecutionException, I .setErrorCode(Errors.INVALID_REQUEST.code()) .setErrorMessage("RebalanceTimeoutMs must be provided in first request."), Map.of(), + -1, + -1, -1 ), service.streamsGroupHeartbeat( @@ -758,6 +783,8 @@ public void testStreamsHeartbeatRequestValidation() throws ExecutionException, I .setErrorCode(Errors.INVALID_REQUEST.code()) .setErrorMessage("ActiveTasks must be empty when (re-)joining."), Map.of(), + -1, + -1, -1 ), service.streamsGroupHeartbeat( @@ -777,6 +804,8 @@ public void testStreamsHeartbeatRequestValidation() throws ExecutionException, I .setErrorCode(Errors.INVALID_REQUEST.code()) .setErrorMessage("StandbyTasks must be empty when (re-)joining."), Map.of(), + -1, + -1, -1 ), service.streamsGroupHeartbeat( @@ -797,6 +826,8 @@ public void testStreamsHeartbeatRequestValidation() throws ExecutionException, I .setErrorCode(Errors.INVALID_REQUEST.code()) .setErrorMessage("WarmupTasks must be empty when (re-)joining."), Map.of(), + -1, + -1, -1 ), service.streamsGroupHeartbeat( @@ -818,6 +849,8 @@ public void testStreamsHeartbeatRequestValidation() throws ExecutionException, I .setErrorCode(Errors.INVALID_REQUEST.code()) .setErrorMessage("Topology must be non-null when (re-)joining."), Map.of(), + -1, + -1, -1 ), service.streamsGroupHeartbeat( @@ -840,6 +873,8 @@ public void testStreamsHeartbeatRequestValidation() throws ExecutionException, I .setErrorCode(Errors.INVALID_REQUEST.code()) .setErrorMessage("RackId can't be empty."), Map.of(), + -1, + -1, -1 ), service.streamsGroupHeartbeat( @@ -859,6 +894,8 @@ public void testStreamsHeartbeatRequestValidation() throws ExecutionException, I .setErrorCode(Errors.INVALID_REQUEST.code()) .setErrorMessage("InstanceId can't be null."), Map.of(), + -1, + -1, -1 ), service.streamsGroupHeartbeat( @@ -882,6 +919,8 @@ public void testStreamsHeartbeatRequestValidation() throws ExecutionException, I .setErrorCode(Errors.INVALID_REQUEST.code()) .setErrorMessage("MemberEpoch is -3, but must be greater than or equal to -2."), Map.of(), + -1, + -1, -1 ), service.streamsGroupHeartbeat( @@ -901,6 +940,8 @@ public void testStreamsHeartbeatRequestValidation() throws ExecutionException, I .setErrorCode(Errors.INVALID_REQUEST.code()) .setErrorMessage("Topology can only be provided when (re-)joining."), Map.of(), + -1, + -1, -1 ), service.streamsGroupHeartbeat( @@ -924,6 +965,8 @@ public void testStreamsHeartbeatRequestValidation() throws ExecutionException, I .setErrorCode(Errors.STREAMS_INVALID_TOPOLOGY.code()) .setErrorMessage("Changelog topic changelog_topic_with_fixed_partition must have an undefined partition count, but it is set to 3."), Map.of(), + -1, + -1, -1 ), service.streamsGroupHeartbeat( @@ -5921,6 +5964,7 @@ private static class GroupCoordinatorServiceBuilder { private Persister persister = new NoOpStatePersister(); private MetadataImage metadataImage = null; private PartitionMetadataClient partitionMetadataClient = null; + private Optional streamsGroupTopologyDescriptionPlugin = Optional.empty(); GroupCoordinatorService build() { return build(false); @@ -5933,6 +5977,7 @@ GroupCoordinatorService build(boolean serviceStartup) { .build(); } + MockTimer mockTimer = new MockTimer(); var service = new GroupCoordinatorService( logContext, config, @@ -5940,8 +5985,10 @@ GroupCoordinatorService build(boolean serviceStartup) { metrics, configManager, persister, - new MockTimer(), - partitionMetadataClient + mockTimer, + partitionMetadataClient, + streamsGroupTopologyDescriptionPlugin, + mockTimer.time() ); if (serviceStartup) { diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTopologyDescriptionTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTopologyDescriptionTest.java new file mode 100644 index 0000000000000..0ff151109fa2f --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTopologyDescriptionTest.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData; +import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.internals.LogContext; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; +import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime; +import org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescriptionPlugin; +import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics; +import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult; +import org.apache.kafka.server.share.persister.NoOpStatePersister; +import org.apache.kafka.server.util.timer.MockTimer; + +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import static org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status; +import static org.apache.kafka.coordinator.common.runtime.TestUtil.requestContext; +import static org.apache.kafka.coordinator.group.GroupConfigManagerTest.createConfigManager; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for the heartbeat post-processing on {@link GroupCoordinatorService} that sets + * {@code TopologyDescriptionRequired} on the response when a topology description plugin + * is configured and the stored/failed epochs lag the current topology epoch. + */ +public class GroupCoordinatorServiceTopologyDescriptionTest { + + private static final TopicPartition GROUP_TP = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0); + + @SuppressWarnings("unchecked") + private static CoordinatorRuntime mockRuntime() { + return (CoordinatorRuntime) mock(CoordinatorRuntime.class); + } + + private static GroupCoordinatorService buildService( + CoordinatorRuntime runtime, + Optional plugin, + boolean startup + ) { + MockTimer timer = new MockTimer(); + MockTime time = timer.time(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + GroupCoordinatorConfigTest.createGroupCoordinatorConfig(4096, 600000L, 24), + runtime, + new GroupCoordinatorMetrics(), + createConfigManager(), + new NoOpStatePersister(), + timer, + null, + plugin, + time + ); + if (startup) { + service.startup(() -> 1); + } + return service; + } + + @Test + public void testHeartbeatSetsTopologyDescriptionRequiredWhenStoredLags() throws Exception { + CoordinatorRuntime runtime = mockRuntime(); + StreamsGroupTopologyDescriptionPlugin plugin = mock(StreamsGroupTopologyDescriptionPlugin.class); + when(runtime.scheduleWriteOperation( + eq("streams-group-heartbeat"), + eq(GROUP_TP), + any() + )).thenReturn(CompletableFuture.completedFuture( + new StreamsGroupHeartbeatResult(new StreamsGroupHeartbeatResponseData(), Map.of(), 5, -1, -1))); + + GroupCoordinatorService service = buildService(runtime, Optional.of(plugin), true); + StreamsGroupHeartbeatResult result = service.streamsGroupHeartbeat( + requestContext(ApiKeys.STREAMS_GROUP_HEARTBEAT), validHeartbeatRequest() + ).get(5, TimeUnit.SECONDS); + + assertTrue(result.data().topologyDescriptionRequired()); + } + + @Test + public void testHeartbeatSkipsFlagWhenStoredMatchesCurrent() throws Exception { + CoordinatorRuntime runtime = mockRuntime(); + StreamsGroupTopologyDescriptionPlugin plugin = mock(StreamsGroupTopologyDescriptionPlugin.class); + when(runtime.scheduleWriteOperation( + eq("streams-group-heartbeat"), + eq(GROUP_TP), + any() + )).thenReturn(CompletableFuture.completedFuture( + new StreamsGroupHeartbeatResult(new StreamsGroupHeartbeatResponseData(), Map.of(), 5, 5, -1))); + + GroupCoordinatorService service = buildService(runtime, Optional.of(plugin), true); + StreamsGroupHeartbeatResult result = service.streamsGroupHeartbeat( + requestContext(ApiKeys.STREAMS_GROUP_HEARTBEAT), validHeartbeatRequest() + ).get(5, TimeUnit.SECONDS); + + assertFalse(result.data().topologyDescriptionRequired()); + } + + @Test + public void testHeartbeatSkipsFlagWhenFailedAtCurrentEpoch() throws Exception { + CoordinatorRuntime runtime = mockRuntime(); + StreamsGroupTopologyDescriptionPlugin plugin = mock(StreamsGroupTopologyDescriptionPlugin.class); + when(runtime.scheduleWriteOperation( + eq("streams-group-heartbeat"), + eq(GROUP_TP), + any() + )).thenReturn(CompletableFuture.completedFuture( + new StreamsGroupHeartbeatResult(new StreamsGroupHeartbeatResponseData(), Map.of(), 5, -1, 5))); + + GroupCoordinatorService service = buildService(runtime, Optional.of(plugin), true); + StreamsGroupHeartbeatResult result = service.streamsGroupHeartbeat( + requestContext(ApiKeys.STREAMS_GROUP_HEARTBEAT), validHeartbeatRequest() + ).get(5, TimeUnit.SECONDS); + + assertFalse(result.data().topologyDescriptionRequired()); + } + + @Test + public void testHeartbeatSkipsFlagWhenStaleTopologyStatusPresent() throws Exception { + CoordinatorRuntime runtime = mockRuntime(); + StreamsGroupTopologyDescriptionPlugin plugin = mock(StreamsGroupTopologyDescriptionPlugin.class); + StreamsGroupHeartbeatResponseData responseData = new StreamsGroupHeartbeatResponseData() + .setStatus(List.of(new StreamsGroupHeartbeatResponseData.Status() + .setStatusCode(Status.STALE_TOPOLOGY.code()) + .setStatusDetail("behind"))); + when(runtime.scheduleWriteOperation( + eq("streams-group-heartbeat"), + eq(GROUP_TP), + any() + )).thenReturn(CompletableFuture.completedFuture( + new StreamsGroupHeartbeatResult(responseData, Map.of(), 5, -1, -1))); + + GroupCoordinatorService service = buildService(runtime, Optional.of(plugin), true); + StreamsGroupHeartbeatResult result = service.streamsGroupHeartbeat( + requestContext(ApiKeys.STREAMS_GROUP_HEARTBEAT), validHeartbeatRequest() + ).get(5, TimeUnit.SECONDS); + + assertFalse(result.data().topologyDescriptionRequired()); + } + + @Test + public void testHeartbeatNeverSetsFlagWithoutPlugin() throws Exception { + CoordinatorRuntime runtime = mockRuntime(); + when(runtime.scheduleWriteOperation( + eq("streams-group-heartbeat"), + eq(GROUP_TP), + any() + )).thenReturn(CompletableFuture.completedFuture( + new StreamsGroupHeartbeatResult(new StreamsGroupHeartbeatResponseData(), Map.of(), 5, -1, -1))); + + GroupCoordinatorService service = buildService(runtime, Optional.empty(), true); + StreamsGroupHeartbeatResult result = service.streamsGroupHeartbeat( + requestContext(ApiKeys.STREAMS_GROUP_HEARTBEAT), validHeartbeatRequest() + ).get(5, TimeUnit.SECONDS); + + assertFalse(result.data().topologyDescriptionRequired()); + } + + private static StreamsGroupHeartbeatRequestData validHeartbeatRequest() { + return new StreamsGroupHeartbeatRequestData() + .setGroupId("foo") + .setMemberId(Uuid.randomUuid().toString()) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(1500) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()) + .setTopology(new StreamsGroupHeartbeatRequestData.Topology()); + } +} diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java index 72e2d9d11f794..448145c54faae 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java @@ -195,7 +195,7 @@ public void testStreamsGroupHeartbeat() { StreamsGroupHeartbeatRequestData request = new StreamsGroupHeartbeatRequestData(); CoordinatorResult result = new CoordinatorResult<>( List.of(), - new StreamsGroupHeartbeatResult(new StreamsGroupHeartbeatResponseData(), Map.of(), -1) + new StreamsGroupHeartbeatResult(new StreamsGroupHeartbeatResponseData(), Map.of(), -1, -1, -1) ); when(groupMetadataManager.streamsGroupHeartbeat( diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupHeartbeatResultTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupHeartbeatResultTest.java index e08fcd36d2e2f..826bcf2a72c73 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupHeartbeatResultTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupHeartbeatResultTest.java @@ -30,30 +30,43 @@ public class StreamsGroupHeartbeatResultTest { @Test - public void testThreeArgConstructorPreservesTopologyEpoch() { + public void testConstructorPreservesEpochs() { StreamsGroupHeartbeatResult result = new StreamsGroupHeartbeatResult( - new StreamsGroupHeartbeatResponseData(), Map.of(), 7); + new StreamsGroupHeartbeatResponseData(), Map.of(), 7, 5, 3); assertEquals(7, result.currentTopologyEpoch()); + assertEquals(5, result.storedDescriptionTopologyEpoch()); + assertEquals(3, result.failedDescriptionTopologyEpoch()); } @Test public void testCurrentTopologyEpochIsPartOfEquality() { - // Records derive equals from all components; two results with different topology epochs are unequal. StreamsGroupHeartbeatResult a = new StreamsGroupHeartbeatResult( - new StreamsGroupHeartbeatResponseData(), Map.of(), 1); + new StreamsGroupHeartbeatResponseData(), Map.of(), 1, -1, -1); StreamsGroupHeartbeatResult b = new StreamsGroupHeartbeatResult( - new StreamsGroupHeartbeatResponseData(), Map.of(), 2); + new StreamsGroupHeartbeatResponseData(), Map.of(), 2, -1, -1); assertNotEquals(a, b); StreamsGroupHeartbeatResult c = new StreamsGroupHeartbeatResult( - new StreamsGroupHeartbeatResponseData(), Map.of(), 1); + new StreamsGroupHeartbeatResponseData(), Map.of(), 1, -1, -1); assertEquals(a, c); } + @Test + public void testStoredAndFailedEpochsArePartOfEquality() { + StreamsGroupHeartbeatResult a = new StreamsGroupHeartbeatResult( + new StreamsGroupHeartbeatResponseData(), Map.of(), 1, 1, -1); + StreamsGroupHeartbeatResult differentStored = new StreamsGroupHeartbeatResult( + new StreamsGroupHeartbeatResponseData(), Map.of(), 1, 0, -1); + StreamsGroupHeartbeatResult differentFailed = new StreamsGroupHeartbeatResult( + new StreamsGroupHeartbeatResponseData(), Map.of(), 1, 1, 0); + assertNotEquals(a, differentStored); + assertNotEquals(a, differentFailed); + } + @Test public void testCreatableTopicsMapIsImmutable() { StreamsGroupHeartbeatResult result = new StreamsGroupHeartbeatResult( - new StreamsGroupHeartbeatResponseData(), Map.of(), -1); + new StreamsGroupHeartbeatResponseData(), Map.of(), -1, -1, -1); assertThrows(UnsupportedOperationException.class, () -> result.creatableTopics().put("t", null)); } @@ -61,7 +74,7 @@ public void testCreatableTopicsMapIsImmutable() { @Test public void testNullDataIsRejected() { assertThrows(NullPointerException.class, - () -> new StreamsGroupHeartbeatResult(null, Map.of(), -1)); + () -> new StreamsGroupHeartbeatResult(null, Map.of(), -1, -1, -1)); assertTrue(true); } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTopologyDescriptionBackoffTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTopologyDescriptionBackoffTest.java new file mode 100644 index 0000000000000..b2226bd748a96 --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTopologyDescriptionBackoffTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams; + +import org.apache.kafka.common.utils.MockTime; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class StreamsGroupTopologyDescriptionBackoffTest { + + @Test + public void testFirstArmUsesInitialDelay() { + MockTime time = new MockTime(); + StreamsGroupTopologyDescriptionBackoff backoff = new StreamsGroupTopologyDescriptionBackoff(time); + backoff.armOrExtend("g", 1); + assertTrue(backoff.isActive("g", 1)); + time.sleep(StreamsGroupTopologyDescriptionBackoff.INITIAL_DELAY_MS - 1); + assertTrue(backoff.isActive("g", 1)); + time.sleep(1); + assertFalse(backoff.isActive("g", 1)); + } + + @Test + public void testConsecutiveArmsDoubleTheWindowUpToMax() { + MockTime time = new MockTime(); + StreamsGroupTopologyDescriptionBackoff backoff = new StreamsGroupTopologyDescriptionBackoff(time); + + long expected = StreamsGroupTopologyDescriptionBackoff.INITIAL_DELAY_MS; + backoff.armOrExtend("g", 1); + assertEquals(expected, backoff.entry("g").currentDelayMs()); + + // Each consecutive arm at the same epoch doubles, until we hit the cap. + for (int i = 0; i < 20; i++) { + backoff.armOrExtend("g", 1); + expected = Math.min(expected * 2, StreamsGroupTopologyDescriptionBackoff.MAX_DELAY_MS); + assertEquals(expected, backoff.entry("g").currentDelayMs(), + "iteration " + i); + } + assertEquals(StreamsGroupTopologyDescriptionBackoff.MAX_DELAY_MS, + backoff.entry("g").currentDelayMs()); + } + + @Test + public void testDifferentEpochResetsTheWindow() { + MockTime time = new MockTime(); + StreamsGroupTopologyDescriptionBackoff backoff = new StreamsGroupTopologyDescriptionBackoff(time); + backoff.armOrExtend("g", 1); + backoff.armOrExtend("g", 1); // doubled + long doubled = backoff.entry("g").currentDelayMs(); + assertTrue(doubled > StreamsGroupTopologyDescriptionBackoff.INITIAL_DELAY_MS); + backoff.armOrExtend("g", 2); + assertEquals(StreamsGroupTopologyDescriptionBackoff.INITIAL_DELAY_MS, + backoff.entry("g").currentDelayMs()); + assertEquals(2, backoff.entry("g").topologyEpoch()); + } + + @Test + public void testIsActiveIsScopedToEpoch() { + MockTime time = new MockTime(); + StreamsGroupTopologyDescriptionBackoff backoff = new StreamsGroupTopologyDescriptionBackoff(time); + backoff.armOrExtend("g", 1); + assertTrue(backoff.isActive("g", 1)); + // A query at a different epoch never matches — the broker should re-solicit. + assertFalse(backoff.isActive("g", 2)); + } + + @Test + public void testClearRemovesEntry() { + MockTime time = new MockTime(); + StreamsGroupTopologyDescriptionBackoff backoff = new StreamsGroupTopologyDescriptionBackoff(time); + backoff.armOrExtend("g", 1); + assertNotNull(backoff.entry("g")); + backoff.clear("g"); + assertNull(backoff.entry("g")); + assertFalse(backoff.isActive("g", 1)); + } + + @Test + public void testArmIfNotActiveArmsWhenIdle() { + MockTime time = new MockTime(); + StreamsGroupTopologyDescriptionBackoff backoff = new StreamsGroupTopologyDescriptionBackoff(time); + assertTrue(backoff.armIfNotActive("g", 1)); + assertTrue(backoff.isActive("g", 1)); + assertEquals(StreamsGroupTopologyDescriptionBackoff.INITIAL_DELAY_MS, + backoff.entry("g").currentDelayMs()); + } + + @Test + public void testArmIfNotActiveReturnsFalseWhenAlreadyActive() { + MockTime time = new MockTime(); + StreamsGroupTopologyDescriptionBackoff backoff = new StreamsGroupTopologyDescriptionBackoff(time); + assertTrue(backoff.armIfNotActive("g", 1)); + long armedAt = backoff.entry("g").nextAttemptMs(); + // A second call inside the window does not arm and does not extend the window. + assertFalse(backoff.armIfNotActive("g", 1)); + assertEquals(armedAt, backoff.entry("g").nextAttemptMs()); + } + + @Test + public void testArmIfNotActiveReArmsAfterEpochAdvance() { + MockTime time = new MockTime(); + StreamsGroupTopologyDescriptionBackoff backoff = new StreamsGroupTopologyDescriptionBackoff(time); + assertTrue(backoff.armIfNotActive("g", 1)); + // A query at the new epoch sees no active window and arms a fresh one. + assertTrue(backoff.armIfNotActive("g", 2)); + assertEquals(2, backoff.entry("g").topologyEpoch()); + assertEquals(StreamsGroupTopologyDescriptionBackoff.INITIAL_DELAY_MS, + backoff.entry("g").currentDelayMs()); + } +}