Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1521,8 +1521,21 @@ public Map<String, List<PartitionInfo>> listTopics(Duration timeout) {
* any records from these partitions until they have been resumed using {@link #resume(Collection)}.
* Note that this method does not affect partition subscription. In particular, it does not cause a group
* rebalance when automatic assignment is used.
* <p/>
* The pause state is preserved across a rebalance for partitions that remain assigned to this
* consumer, but it is lost for partitions that are revoked. Which partitions are revoked depends
* on the group protocol in use (see {@link ConsumerConfig#GROUP_PROTOCOL_CONFIG}):
* <ul>
* <li>Classic group protocol: the behavior depends on the assignor configured in
* {@link ConsumerConfig#PARTITION_ASSIGNMENT_STRATEGY_CONFIG}: eager assignors (e.g., {@link RangeAssignor},
* {@link RoundRobinAssignor}) revoke all partitions on every rebalance (pause state is
* not preserved); cooperative assignors (e.g., {@link CooperativeStickyAssignor}) only revoke the
* partitions that are reassigned to another consumer (pause state preserved for partitions that remain
* assigned)</li>
* <li>Consumer group protocol (KIP-848): only revokes partitions that are reassigned to another consumer
* (pause state preserved for partitions that remain assigned)</li>
* </ul>
*
* Note: Rebalance will not preserve the pause/resume state.
* @param partitions The partitions which should be paused
* @throws IllegalStateException if any of the provided partitions are not currently assigned to this consumer
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,71 @@ public void testPause(GroupProtocol groupProtocol) {
assertTrue(consumer.paused().isEmpty());
}

@ParameterizedTest
@EnumSource(GroupProtocol.class)
public void testPauseFlagPreservedForRetainedPartitionOnManualAssignmentChange(GroupProtocol groupProtocol) {
consumer = newConsumer(groupProtocol, groupId);

// Manually assign two partitions and pause one of them.
consumer.assign(List.of(tp0, tp1));
consumer.pause(Set.of(tp0));
assertEquals(Set.of(tp0), consumer.paused());

// Change the assignment while keeping tp0 assigned.
consumer.assign(List.of(tp0));
assertEquals(Set.of(tp0), consumer.assignment());
assertEquals(Set.of(tp0), consumer.paused(), "The pause state of the partition that remains assigned must be preserved");

// A partition that is (re)added to the assignment starts unpaused.
consumer.assign(List.of(tp0, tp1));
assertEquals(Set.of(tp0), consumer.paused());
}

/**
* This behaviour is common to the Classic (cooperative) and Consumer protocols: paused partitions
* remain paused after a rebalance as long as they remain assigned to the same consumer.
* It is tested separately for each consumer because the rebalance-related calls differ.
* See {@link org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumerTest#testPauseFlagPreservedForRetainedPartitionAcrossRebalance()}.
*/
@ParameterizedTest
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
public void testPauseFlagPreservedForRetainedPartitionAcrossRebalance(GroupProtocol groupProtocol) {
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);

Map<String, Integer> tpCounts = new HashMap<>();
tpCounts.put(topic, 1);
tpCounts.put(topic2, 1);
tpCounts.put(topic3, 1);
initMetadata(client, tpCounts);
Node node = metadata.fetch().nodes().get(0);

ConsumerPartitionAssignor assignor = new CooperativeStickyAssignor();
consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, false, groupInstanceId);

// Initial subscription and rebalance assigning tp0 and t2p0.
consumer.subscribe(Arrays.asList(topic, topic2), getConsumerRebalanceListener(consumer));
Node coordinator = prepareRebalance(client, node, Set.of(topic, topic2), assignor, Arrays.asList(tp0, t2p0), null);
consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE));
consumer.poll(Duration.ZERO);
assertEquals(Set.of(tp0, t2p0), consumer.assignment());

// Pause tp0, the partition that will be retained across the rebalance.
consumer.pause(Set.of(tp0));
assertEquals(Set.of(tp0), consumer.paused());

// Change the subscription so that t2p0 is revoked while tp0 is retained and t3p0 is added.
consumer.subscribe(Arrays.asList(topic, topic3), getConsumerRebalanceListener(consumer));
prepareRebalance(client, node, Set.of(topic, topic3), assignor, Arrays.asList(tp0, t3p0), coordinator);
consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE));
consumer.poll(Duration.ZERO);

// tp0 is retained across the rebalance, so its pause state must be preserved.
// The newly added t3p0 starts unpaused.
assertEquals(Set.of(tp0, t3p0), consumer.assignment());
assertEquals(Set.of(tp0), consumer.paused(), "Partition that remain assigned should keep the pause state");
}

@ParameterizedTest
@EnumSource(GroupProtocol.class)
public void testConsumerJmxPrefix(GroupProtocol groupProtocol) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.PartitionsAssignedEvent;
import org.apache.kafka.clients.consumer.internals.events.PartitionsRemovedEvent;
import org.apache.kafka.clients.consumer.internals.events.PausePartitionsEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent;
import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
import org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksAssignedCallbackCompletedEvent;
Expand Down Expand Up @@ -926,6 +927,47 @@ public void testCommitSyncAllConsumed() {
assertFalse(capturedEvent.offsets().isPresent(), "Expected empty optional offsets");
}

/**
* This behaviour is common to the Classic (cooperative) and Consumer protocols: paused partitions
* remain paused after a rebalance as long as they remain assigned to the same consumer.
* It is tested separately for each consumer because the rebalance-related calls differ.
* See {@link org.apache.kafka.clients.consumer.KafkaConsumerTest#testPauseFlagPreservedForRetainedPartitionAcrossRebalance(GroupProtocol)}.
*/
@Test
public void testPauseFlagPreservedForRetainedPartitionAcrossRebalance() {
SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE);
consumer = newConsumer(
mock(FetchBuffer.class),
mock(ConsumerInterceptors.class),
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions);
completeTopicSubscriptionChangeEventSuccessfully();
completePausePartitionsEventSuccessfully();

final String topicName = "topic1";
final TopicPartition tp0 = new TopicPartition(topicName, 0);
final TopicPartition tp1 = new TopicPartition(topicName, 1);
final TopicPartition tp2 = new TopicPartition(topicName, 2);

consumer.subscribe(singleton(topicName), mock(ConsumerRebalanceListener.class));

// Simulate rebalance that reconciled a new assignment of tp0 and tp1
subscriptions.assignFromSubscribedAwaitingCallback(Set.of(tp0, tp1), Set.of(tp0, tp1));
assertEquals(Set.of(tp0, tp1), consumer.assignment());

// Pause tp0, the partition that will be retained across the rebalance.
consumer.pause(Set.of(tp0));
assertEquals(Set.of(tp0), consumer.paused());

// Reconcile a new assignment that retains tp0, revokes tp1, and adds tp2.
subscriptions.assignFromSubscribedAwaitingCallback(Set.of(tp0, tp2), Set.of(tp2));

// tp0 is retained across the rebalance, so its pause state must be preserved.
// The newly added tp2 starts unpaused.
assertEquals(Set.of(tp0, tp2), consumer.assignment());
assertEquals(Set.of(tp0), consumer.paused(), "Partition that remain assigned should keep the pause state");
}

@Test
public void testAutoCommitSyncDisabled() {
SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE);
Expand Down Expand Up @@ -2429,6 +2471,15 @@ private void completeTopicSubscriptionChangeEventSuccessfully() {
}).when(applicationEventHandler).addAndGet(ArgumentMatchers.isA(TopicSubscriptionChangeEvent.class));
}

private void completePausePartitionsEventSuccessfully() {
doAnswer(invocation -> {
PausePartitionsEvent event = invocation.getArgument(0);
event.partitions().forEach(consumer.subscriptions()::pause);
event.future().complete(null);
return null;
}).when(applicationEventHandler).addAndGet(ArgumentMatchers.isA(PausePartitionsEvent.class));
}

private void completeTopicPatternSubscriptionChangeEventSuccessfully() {
doAnswer(invocation -> {
TopicPatternSubscriptionChangeEvent event = invocation.getArgument(0);
Expand Down
Loading