Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1521,8 +1521,15 @@ public Map<String, List<PartitionInfo>> listTopics(Duration timeout) {
* any records from these partitions until they have been resumed using {@link #resume(Collection)}.
* Note that this method does not affect partition subscription. In particular, it does not cause a group
* rebalance when automatic assignment is used.
* <p/>
* The pause state is preserved across a rebalance for partitions that remain assigned to this
* consumer, but it is not preserved for partitions revoked from this consumer
* (e.g., the paused state is lost when the partition is reassigned to another member).
* The eager rebalance protocol revokes all partitions on every rebalance,
* so under that protocol no pause/resume state is preserved; the cooperative rebalance protocol and
* the consumer group protocol (KIP-848) only revoke partitions that are no longer assigned, so the pause/resume
* state of partitions that remain assigned to this consumer is preserved.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small nit: We should use the classic vs consumer language here to align with the config. I would also give example of an assignor for each sub-mode of the classic one. I don't think people really understand eager vs cooperative.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, updated

*
* Note: Rebalance will not preserve the pause/resume state.
* @param partitions The partitions which should be paused
* @throws IllegalStateException if any of the provided partitions are not currently assigned to this consumer
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,71 @@ public void testPause(GroupProtocol groupProtocol) {
assertTrue(consumer.paused().isEmpty());
}

@ParameterizedTest
@EnumSource(GroupProtocol.class)
public void testPauseFlagPreservedForRetainedPartitionOnManualAssignmentChange(GroupProtocol groupProtocol) {
consumer = newConsumer(groupProtocol, groupId);

// Manually assign two partitions and pause one of them.
consumer.assign(List.of(tp0, tp1));
consumer.pause(Set.of(tp0));
assertEquals(Set.of(tp0), consumer.paused());

// Change the assignment while keeping tp0 assigned.
consumer.assign(List.of(tp0));
assertEquals(Set.of(tp0), consumer.assignment());
assertEquals(Set.of(tp0), consumer.paused(), "The pause state of the partition that remains assigned must be preserved");

// A partition that is (re)added to the assignment starts unpaused.
consumer.assign(List.of(tp0, tp1));
assertEquals(Set.of(tp0), consumer.paused());
}

/**
* This behaviour is common to the Classic (cooperative) and Consumer protocols: paused partitions
* remain paused after a rebalance as long as they remain assigned to the same consumer.
* It is tested separately for each consumer because the rebalance-related calls differ.
* See {@link org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumerTest#testPauseFlagPreservedForRetainedPartitionAcrossRebalance()}.
*/
@ParameterizedTest
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
public void testPauseFlagPreservedForRetainedPartitionAcrossRebalance(GroupProtocol groupProtocol) {
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);

Map<String, Integer> tpCounts = new HashMap<>();
tpCounts.put(topic, 1);
tpCounts.put(topic2, 1);
tpCounts.put(topic3, 1);
initMetadata(client, tpCounts);
Node node = metadata.fetch().nodes().get(0);

ConsumerPartitionAssignor assignor = new CooperativeStickyAssignor();
consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, false, groupInstanceId);

// Initial subscription and rebalance assigning tp0 and t2p0.
consumer.subscribe(Arrays.asList(topic, topic2), getConsumerRebalanceListener(consumer));
Node coordinator = prepareRebalance(client, node, Set.of(topic, topic2), assignor, Arrays.asList(tp0, t2p0), null);
consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE));
consumer.poll(Duration.ZERO);
assertEquals(Set.of(tp0, t2p0), consumer.assignment());

// Pause tp0, the partition that will be retained across the rebalance.
consumer.pause(Set.of(tp0));
assertEquals(Set.of(tp0), consumer.paused());

// Change the subscription so that t2p0 is revoked while tp0 is retained and t3p0 is added.
consumer.subscribe(Arrays.asList(topic, topic3), getConsumerRebalanceListener(consumer));
prepareRebalance(client, node, Set.of(topic, topic3), assignor, Arrays.asList(tp0, t3p0), coordinator);
consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE));
consumer.poll(Duration.ZERO);

// tp0 is retained across the rebalance, so its pause state must be preserved.
// The newly added t3p0 starts unpaused.
assertEquals(Set.of(tp0, t3p0), consumer.assignment());
assertEquals(Set.of(tp0), consumer.paused(), "Partition that remain assigned should keep the pause state");
}

@ParameterizedTest
@EnumSource(GroupProtocol.class)
public void testConsumerJmxPrefix(GroupProtocol groupProtocol) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.PartitionsAssignedEvent;
import org.apache.kafka.clients.consumer.internals.events.PartitionsRemovedEvent;
import org.apache.kafka.clients.consumer.internals.events.PausePartitionsEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent;
import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
import org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksAssignedCallbackCompletedEvent;
Expand Down Expand Up @@ -926,6 +927,47 @@ public void testCommitSyncAllConsumed() {
assertFalse(capturedEvent.offsets().isPresent(), "Expected empty optional offsets");
}

/**
* This behaviour is common to the Classic (cooperative) and Consumer protocols: paused partitions
* remain paused after a rebalance as long as they remain assigned to the same consumer.
* It is tested separately for each consumer because the rebalance-related calls differ.
* See {@link org.apache.kafka.clients.consumer.KafkaConsumerTest#testPauseFlagPreservedForRetainedPartitionAcrossRebalance(GroupProtocol)}.
*/
@Test
public void testPauseFlagPreservedForRetainedPartitionAcrossRebalance() {
SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE);
consumer = newConsumer(
mock(FetchBuffer.class),
mock(ConsumerInterceptors.class),
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions);
completeTopicSubscriptionChangeEventSuccessfully();
completePausePartitionsEventSuccessfully();

final String topicName = "topic1";
final TopicPartition tp0 = new TopicPartition(topicName, 0);
final TopicPartition tp1 = new TopicPartition(topicName, 1);
final TopicPartition tp2 = new TopicPartition(topicName, 2);

consumer.subscribe(singleton(topicName), mock(ConsumerRebalanceListener.class));

// Simulate rebalance that reconciled a new assignment of tp0 and tp1
subscriptions.assignFromSubscribedAwaitingCallback(Set.of(tp0, tp1), Set.of(tp0, tp1));
assertEquals(Set.of(tp0, tp1), consumer.assignment());

// Pause tp0, the partition that will be retained across the rebalance.
consumer.pause(Set.of(tp0));
assertEquals(Set.of(tp0), consumer.paused());

// Reconcile a new assignment that retains tp0, revokes tp1, and adds tp2.
subscriptions.assignFromSubscribedAwaitingCallback(Set.of(tp0, tp2), Set.of(tp2));

// tp0 is retained across the rebalance, so its pause state must be preserved.
// The newly added tp2 starts unpaused.
assertEquals(Set.of(tp0, tp2), consumer.assignment());
assertEquals(Set.of(tp0), consumer.paused(), "Partition that remain assigned should keep the pause state");
}

@Test
public void testAutoCommitSyncDisabled() {
SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE);
Expand Down Expand Up @@ -2429,6 +2471,15 @@ private void completeTopicSubscriptionChangeEventSuccessfully() {
}).when(applicationEventHandler).addAndGet(ArgumentMatchers.isA(TopicSubscriptionChangeEvent.class));
}

private void completePausePartitionsEventSuccessfully() {
doAnswer(invocation -> {
PausePartitionsEvent event = invocation.getArgument(0);
event.partitions().forEach(consumer.subscriptions()::pause);
event.future().complete(null);
return null;
}).when(applicationEventHandler).addAndGet(ArgumentMatchers.isA(PausePartitionsEvent.class));
}

private void completeTopicPatternSubscriptionChangeEventSuccessfully() {
doAnswer(invocation -> {
TopicPatternSubscriptionChangeEvent event = invocation.getArgument(0);
Expand Down
Loading