From 91b0f5aaa872c79b6dffa7a61b098aec31752e83 Mon Sep 17 00:00:00 2001 From: Lianet Magrans Date: Wed, 10 Jun 2026 14:28:28 -0400 Subject: [PATCH 1/3] doc & tests --- .../kafka/clients/consumer/KafkaConsumer.java | 9 ++- .../clients/consumer/KafkaConsumerTest.java | 65 +++++++++++++++++++ .../internals/AsyncKafkaConsumerTest.java | 51 +++++++++++++++ 3 files changed, 124 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 6a939d4d95ea6..c51a72358a375 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1521,8 +1521,15 @@ public Map> listTopics(Duration timeout) { * any records from these partitions until they have been resumed using {@link #resume(Collection)}. * Note that this method does not affect partition subscription. In particular, it does not cause a group * rebalance when automatic assignment is used. + *

+ * The pause state is preserved across a rebalance for partitions that remain assigned to this + * consumer, but it is not preserved for partitions revoked from this consumer + * (e.g., the paused state is lost when the partition is reassigned to another member). + * The eager rebalance protocol revokes all partitions on every rebalance, + * so under that protocol no pause/resume state is preserved; the cooperative rebalance protocol and + * the consumer group protocol (KIP-848) only revoke partitions that are no longer assigned, so the pause/resume + * state of partitions that remain assigned to this consumer is preserved. * - * Note: Rebalance will not preserve the pause/resume state. * @param partitions The partitions which should be paused * @throws IllegalStateException if any of the provided partitions are not currently assigned to this consumer */ diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index c720ccc051c24..b28a2ce9d45ee 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -915,6 +915,71 @@ public void testPause(GroupProtocol groupProtocol) { assertTrue(consumer.paused().isEmpty()); } + @ParameterizedTest + @EnumSource(GroupProtocol.class) + public void testPauseFlagPreservedForRetainedPartitionOnManualAssignmentChange(GroupProtocol groupProtocol) { + consumer = newConsumer(groupProtocol, groupId); + + // Manually assign two partitions and pause one of them. + consumer.assign(List.of(tp0, tp1)); + consumer.pause(Set.of(tp0)); + assertEquals(Set.of(tp0), consumer.paused()); + + // Change the assignment while keeping tp0 assigned. + consumer.assign(List.of(tp0)); + assertEquals(Set.of(tp0), consumer.assignment()); + assertEquals(Set.of(tp0), consumer.paused(), "The pause state of the partition that remains assigned must be preserved"); + + // A partition that is (re)added to the assignment starts unpaused. + consumer.assign(List.of(tp0, tp1)); + assertEquals(Set.of(tp0), consumer.paused()); + } + + /** + * This behaviour is common to the Classic (cooperative) and Consumer protocols: paused partitions + * remain paused after a rebalance as long as they remain assigned to the same consumer. + * It is tested separately for each consumer because the rebalance-related calls differ. + * See {@link org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumerTest#testPauseFlagPreservedForRetainedPartitionAcrossRebalance()}. + */ + @ParameterizedTest + @EnumSource(value = GroupProtocol.class, names = "CLASSIC") + public void testPauseFlagPreservedForRetainedPartitionAcrossRebalance(GroupProtocol groupProtocol) { + ConsumerMetadata metadata = createMetadata(subscription); + MockClient client = new MockClient(time, metadata); + + Map tpCounts = new HashMap<>(); + tpCounts.put(topic, 1); + tpCounts.put(topic2, 1); + tpCounts.put(topic3, 1); + initMetadata(client, tpCounts); + Node node = metadata.fetch().nodes().get(0); + + ConsumerPartitionAssignor assignor = new CooperativeStickyAssignor(); + consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, false, groupInstanceId); + + // Initial subscription and rebalance assigning tp0 and t2p0. + consumer.subscribe(Arrays.asList(topic, topic2), getConsumerRebalanceListener(consumer)); + Node coordinator = prepareRebalance(client, node, Set.of(topic, topic2), assignor, Arrays.asList(tp0, t2p0), null); + consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE)); + consumer.poll(Duration.ZERO); + assertEquals(Set.of(tp0, t2p0), consumer.assignment()); + + // Pause tp0, the partition that will be retained across the rebalance. + consumer.pause(Set.of(tp0)); + assertEquals(Set.of(tp0), consumer.paused()); + + // Change the subscription so that t2p0 is revoked while tp0 is retained and t3p0 is added. + consumer.subscribe(Arrays.asList(topic, topic3), getConsumerRebalanceListener(consumer)); + prepareRebalance(client, node, Set.of(topic, topic3), assignor, Arrays.asList(tp0, t3p0), coordinator); + consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE)); + consumer.poll(Duration.ZERO); + + // tp0 is retained across the rebalance, so its pause state must be preserved. + // The newly added t3p0 starts unpaused. + assertEquals(Set.of(tp0, t3p0), consumer.assignment()); + assertEquals(Set.of(tp0), consumer.paused(), "Partition that remain assigned should keep the pause state"); + } + @ParameterizedTest @EnumSource(GroupProtocol.class) public void testConsumerJmxPrefix(GroupProtocol groupProtocol) throws Exception { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index c6833d6aee488..95566f2f5b3eb 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -51,6 +51,7 @@ import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent; import org.apache.kafka.clients.consumer.internals.events.PartitionsAssignedEvent; import org.apache.kafka.clients.consumer.internals.events.PartitionsRemovedEvent; +import org.apache.kafka.clients.consumer.internals.events.PausePartitionsEvent; import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent; import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent; import org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksAssignedCallbackCompletedEvent; @@ -926,6 +927,47 @@ public void testCommitSyncAllConsumed() { assertFalse(capturedEvent.offsets().isPresent(), "Expected empty optional offsets"); } + /** + * This behaviour is common to the Classic (cooperative) and Consumer protocols: paused partitions + * remain paused after a rebalance as long as they remain assigned to the same consumer. + * It is tested separately for each consumer because the rebalance-related calls differ. + * See {@link org.apache.kafka.clients.consumer.KafkaConsumerTest#testPauseFlagPreservedForRetainedPartitionAcrossRebalance(GroupProtocol)}. + */ + @Test + public void testPauseFlagPreservedForRetainedPartitionAcrossRebalance() { + SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE); + consumer = newConsumer( + mock(FetchBuffer.class), + mock(ConsumerInterceptors.class), + mock(ConsumerRebalanceListenerInvoker.class), + subscriptions); + completeTopicSubscriptionChangeEventSuccessfully(); + completePausePartitionsEventSuccessfully(); + + final String topicName = "topic1"; + final TopicPartition tp0 = new TopicPartition(topicName, 0); + final TopicPartition tp1 = new TopicPartition(topicName, 1); + final TopicPartition tp2 = new TopicPartition(topicName, 2); + + consumer.subscribe(singleton(topicName), mock(ConsumerRebalanceListener.class)); + + // Simulate rebalance that reconciled a new assignment of tp0 and tp1 + subscriptions.assignFromSubscribedAwaitingCallback(Set.of(tp0, tp1), Set.of(tp0, tp1)); + assertEquals(Set.of(tp0, tp1), consumer.assignment()); + + // Pause tp0, the partition that will be retained across the rebalance. + consumer.pause(Set.of(tp0)); + assertEquals(Set.of(tp0), consumer.paused()); + + // Reconcile a new assignment that retains tp0, revokes tp1, and adds tp2. + subscriptions.assignFromSubscribedAwaitingCallback(Set.of(tp0, tp2), Set.of(tp2)); + + // tp0 is retained across the rebalance, so its pause state must be preserved. + // The newly added tp2 starts unpaused. + assertEquals(Set.of(tp0, tp2), consumer.assignment()); + assertEquals(Set.of(tp0), consumer.paused(), "Partition that remain assigned should keep the pause state"); + } + @Test public void testAutoCommitSyncDisabled() { SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE); @@ -2429,6 +2471,15 @@ private void completeTopicSubscriptionChangeEventSuccessfully() { }).when(applicationEventHandler).addAndGet(ArgumentMatchers.isA(TopicSubscriptionChangeEvent.class)); } + private void completePausePartitionsEventSuccessfully() { + doAnswer(invocation -> { + PausePartitionsEvent event = invocation.getArgument(0); + event.partitions().forEach(consumer.subscriptions()::pause); + event.future().complete(null); + return null; + }).when(applicationEventHandler).addAndGet(ArgumentMatchers.isA(PausePartitionsEvent.class)); + } + private void completeTopicPatternSubscriptionChangeEventSuccessfully() { doAnswer(invocation -> { TopicPatternSubscriptionChangeEvent event = invocation.getArgument(0); From 65a195ddc6052261d23d7f28e5dbaa6d2e3ce086 Mon Sep 17 00:00:00 2001 From: Lianet Magrans Date: Fri, 12 Jun 2026 16:20:39 -0400 Subject: [PATCH 2/3] update --- .../kafka/clients/consumer/KafkaConsumer.java | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index c51a72358a375..039af92de17f8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1523,12 +1523,17 @@ public Map> listTopics(Duration timeout) { * rebalance when automatic assignment is used. *

* The pause state is preserved across a rebalance for partitions that remain assigned to this - * consumer, but it is not preserved for partitions revoked from this consumer - * (e.g., the paused state is lost when the partition is reassigned to another member). - * The eager rebalance protocol revokes all partitions on every rebalance, - * so under that protocol no pause/resume state is preserved; the cooperative rebalance protocol and - * the consumer group protocol (KIP-848) only revoke partitions that are no longer assigned, so the pause/resume - * state of partitions that remain assigned to this consumer is preserved. + * consumer, but it is lost for partitions that are revoked. Which partitions are revoked depends + * on the group protocol in use (see {@link ConsumerConfig#GROUP_PROTOCOL_CONFIG}): + *

* * @param partitions The partitions which should be paused * @throws IllegalStateException if any of the provided partitions are not currently assigned to this consumer From fd28aec4dbdf729679d5fca1ba1320f0d6812763 Mon Sep 17 00:00:00 2001 From: Lianet Magrans Date: Fri, 12 Jun 2026 16:27:11 -0400 Subject: [PATCH 3/3] clarify assignor --- .../org/apache/kafka/clients/consumer/KafkaConsumer.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 039af92de17f8..514e24f48aa40 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1526,9 +1526,10 @@ public Map> listTopics(Duration timeout) { * consumer, but it is lost for partitions that are revoked. Which partitions are revoked depends * on the group protocol in use (see {@link ConsumerConfig#GROUP_PROTOCOL_CONFIG}): *
    - *
  • Classic group protocol: eager assignors (e.g., {@link RangeAssignor}, + *
  • Classic group protocol: the behavior depends on the assignor configured in + * {@link ConsumerConfig#PARTITION_ASSIGNMENT_STRATEGY_CONFIG}: eager assignors (e.g., {@link RangeAssignor}, * {@link RoundRobinAssignor}) revoke all partitions on every rebalance (pause state is - * not preserved); cooperative assignor (e.g., {@link CooperativeStickyAssignor}) only revokes + * not preserved); cooperative assignors (e.g., {@link CooperativeStickyAssignor}) only revoke the * partitions that are reassigned to another consumer (pause state preserved for partitions that remain * assigned)
  • *
  • Consumer group protocol (KIP-848): only revokes partitions that are reassigned to another consumer