Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11049,7 +11049,7 @@ class KafkaApisTest extends Logging {
val streamsGroupHeartbeatResponse = new StreamsGroupHeartbeatResponseData()
.setMemberId("member")

future.complete(new StreamsGroupHeartbeatResult(streamsGroupHeartbeatResponse, util.Map.of(), -1))
future.complete(new StreamsGroupHeartbeatResult(streamsGroupHeartbeatResponse, util.Map.of(), -1, -1, -1))
val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
assertEquals(streamsGroupHeartbeatResponse, response.data)
}
Expand Down Expand Up @@ -11119,7 +11119,7 @@ class KafkaApisTest extends Logging {
val streamsGroupHeartbeatResponse = new StreamsGroupHeartbeatResponseData()
.setMemberId("member")

future.complete(new StreamsGroupHeartbeatResult(streamsGroupHeartbeatResponse, util.Map.of(), -1))
future.complete(new StreamsGroupHeartbeatResult(streamsGroupHeartbeatResponse, util.Map.of(), -1, -1, -1))
val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
assertEquals(streamsGroupHeartbeatResponse, response.data)
}
Expand Down Expand Up @@ -11351,7 +11351,7 @@ class KafkaApisTest extends Logging {
val streamsGroupHeartbeatResponse = new StreamsGroupHeartbeatResponseData()
.setMemberId("member")

future.complete(new StreamsGroupHeartbeatResult(streamsGroupHeartbeatResponse, missingTopics, -1))
future.complete(new StreamsGroupHeartbeatResult(streamsGroupHeartbeatResponse, missingTopics, -1, -1, -1))
val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
assertEquals(streamsGroupHeartbeatResponse, response.data)
verify(autoTopicCreationManager).createStreamsInternalTopics(any(), any(), anyLong())
Expand Down Expand Up @@ -11400,7 +11400,7 @@ class KafkaApisTest extends Logging {
val streamsGroupHeartbeatResponse = new StreamsGroupHeartbeatResponseData()
.setMemberId("member")

future.complete(new StreamsGroupHeartbeatResult(streamsGroupHeartbeatResponse, missingTopics, -1))
future.complete(new StreamsGroupHeartbeatResult(streamsGroupHeartbeatResponse, missingTopics, -1, -1, -1))
val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
assertEquals(Errors.NONE.code, response.data.errorCode())
assertEquals(null, response.data.errorMessage())
Expand Down Expand Up @@ -11451,7 +11451,7 @@ class KafkaApisTest extends Logging {
.setStatusDetail("Internal topics are missing: [test-topic]")
))

future.complete(new StreamsGroupHeartbeatResult(streamsGroupHeartbeatResponse, missingTopics, -1))
future.complete(new StreamsGroupHeartbeatResult(streamsGroupHeartbeatResponse, missingTopics, -1, -1, -1))
val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)

assertEquals(Errors.NONE.code, response.data.errorCode())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,11 @@
import org.apache.kafka.coordinator.common.runtime.PartitionWriter;
import org.apache.kafka.coordinator.group.GroupCoordinatorShard.DeletedTopic;
import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.api.streams.StreamsGroupTopologyDescriptionPlugin;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
import org.apache.kafka.coordinator.group.streams.StreamsGroupDescribeResult;
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
import org.apache.kafka.coordinator.group.streams.TopologyDescriptionManager;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.TopicsDelta;
Expand Down Expand Up @@ -250,6 +252,9 @@ public GroupCoordinatorService build() {
String logPrefix = String.format("GroupCoordinator id=%d", nodeId);
LogContext logContext = new LogContext(String.format("[%s] ", logPrefix));

Optional<StreamsGroupTopologyDescriptionPlugin> streamsGroupTopologyDescriptionPlugin =
Optional.ofNullable(config.streamsGroupTopologyDescriptionPlugin(Map.of()));

CoordinatorShardBuilderSupplier<GroupCoordinatorShard, CoordinatorRecord> supplier = () ->
new GroupCoordinatorShard.Builder(config, groupConfigManager)
.withAuthorizerPlugin(authorizerPlugin);
Expand Down Expand Up @@ -297,7 +302,9 @@ public GroupCoordinatorService build() {
groupConfigManager,
persister,
timer,
partitionMetadataClient
partitionMetadataClient,
streamsGroupTopologyDescriptionPlugin,
time
);
}
}
Expand Down Expand Up @@ -352,6 +359,14 @@ public GroupCoordinatorService build() {
*/
private final PartitionMetadataClient partitionMetadataClient;

/**
* The broker-level component that owns the streams-group topology description plugin
* (KIP-1331): plugin reference, per-group push back-off, and the three entry points
* the service delegates into — heartbeat post-processing, the push RPC, and the
* pre-tombstone hook on DeleteGroups.
*/
private final TopologyDescriptionManager topologyDescriptionManager;

/**
* The number of partitions of the __consumer_offsets topics. This is provided
* when the component is started.
Expand Down Expand Up @@ -382,7 +397,9 @@ public GroupCoordinatorService build() {
GroupConfigManager groupConfigManager,
Persister persister,
Timer timer,
PartitionMetadataClient partitionMetadataClient
PartitionMetadataClient partitionMetadataClient,
Optional<StreamsGroupTopologyDescriptionPlugin> streamsGroupTopologyDescriptionPlugin,
Time time
) {
this.log = logContext.logger(GroupCoordinatorService.class);
this.config = config;
Expand All @@ -397,6 +414,10 @@ public GroupCoordinatorService build() {
.map(ConsumerGroupPartitionAssignor::name)
.collect(Collectors.toSet());
this.partitionMetadataClient = partitionMetadataClient;
this.topologyDescriptionManager = new TopologyDescriptionManager(
streamsGroupTopologyDescriptionPlugin,
time
);
}

/**
Expand Down Expand Up @@ -608,6 +629,8 @@ public CompletableFuture<StreamsGroupHeartbeatResult> streamsGroupHeartbeat(
new StreamsGroupHeartbeatResult(
new StreamsGroupHeartbeatResponseData().setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()),
Map.of(),
-1,
-1,
-1
)
);
Expand All @@ -624,15 +647,19 @@ public CompletableFuture<StreamsGroupHeartbeatResult> streamsGroupHeartbeat(
.setErrorCode(apiError.error().code())
.setErrorMessage(apiError.message()),
Map.of(),
-1,
-1,
-1
)
);
}

final String heartbeatGroupId = request.groupId();
return runtime.scheduleWriteOperation(
"streams-group-heartbeat",
topicPartitionFor(request.groupId()),
topicPartitionFor(heartbeatGroupId),
coordinator -> coordinator.streamsGroupHeartbeat(context, request)
).thenApply(result -> topologyDescriptionManager.maybeSetTopologyDescriptionRequired(result, heartbeatGroupId)
).exceptionally(exception -> handleOperationException(
"streams-group-heartbeat",
request,
Expand All @@ -643,6 +670,8 @@ public CompletableFuture<StreamsGroupHeartbeatResult> streamsGroupHeartbeat(
.setErrorCode(error.code())
.setErrorMessage(message),
Map.of(),
-1,
-1,
-1
),
log
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2325,7 +2325,13 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream

response.setStatus(returnedStatus);

return new CoordinatorResult<>(records, new StreamsGroupHeartbeatResult(response, internalTopicsToBeCreated, updatedTopology.topologyEpoch()));
return new CoordinatorResult<>(records, new StreamsGroupHeartbeatResult(
response,
internalTopicsToBeCreated,
updatedTopology.topologyEpoch(),
group.storedDescriptionTopologyEpoch(),
group.failedDescriptionTopologyEpoch()
));
}

/**
Expand Down Expand Up @@ -4466,7 +4472,13 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream
if (instanceId == null) {
StreamsGroupMember member = group.getMemberOrThrow(memberId);
log.info("[GroupId {}][MemberId {}] Member {} left the streams group.", groupId, memberId, memberId);
return streamsGroupFenceMember(group, member, new StreamsGroupHeartbeatResult(response, Map.of(), group.currentTopologyEpoch()));
return streamsGroupFenceMember(group, member, new StreamsGroupHeartbeatResult(
response,
Map.of(),
group.currentTopologyEpoch(),
group.storedDescriptionTopologyEpoch(),
group.failedDescriptionTopologyEpoch()
));
} else {
StreamsGroupMember member = group.staticMember(instanceId);
throwIfStaticMemberIsUnknown(member, instanceId);
Expand All @@ -4478,7 +4490,13 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream
} else {
log.info("[GroupId {}][MemberId {}] Static member {} with instance id {} left the streams group.",
group.groupId(), memberId, memberId, instanceId);
return streamsGroupFenceMember(group, member, new StreamsGroupHeartbeatResult(response, Map.of(), group.currentTopologyEpoch()));
return streamsGroupFenceMember(group, member, new StreamsGroupHeartbeatResult(
response,
Map.of(),
group.currentTopologyEpoch(),
group.storedDescriptionTopologyEpoch(),
group.failedDescriptionTopologyEpoch()
));
}
}
}
Expand Down Expand Up @@ -4547,7 +4565,13 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream

return new CoordinatorResult<>(
List.of(record),
new StreamsGroupHeartbeatResult(response, Map.of(), group.currentTopologyEpoch())
new StreamsGroupHeartbeatResult(
response,
Map.of(),
group.currentTopologyEpoch(),
group.storedDescriptionTopologyEpoch(),
group.failedDescriptionTopologyEpoch()
)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,27 @@
/**
* A simple record to hold the result of a StreamsGroupHeartbeat request.
*
* @param data The data to be returned to the client.
* @param creatableTopics The internal topics to be created.
* @param currentTopologyEpoch The topology epoch the group is operating at after this heartbeat, or -1 if the
* group has no topology yet. The service layer uses this to decide whether to set
* TopologyDescriptionRequired on the response (KIP-1331).
* <p>The three epoch fields let the service layer decide, without re-reading the group,
* whether to set {@code TopologyDescriptionRequired} on the response: a push is needed
* when the stored epoch lags the current epoch and the same epoch has not already been
* recorded as a permanent failure. All three are -1 for failure-fast paths that do not
* resolve a group.
*
* @param data The data to be returned to the client.
* @param creatableTopics The internal topics to be created.
* @param currentTopologyEpoch The topology epoch the group is operating at after this heartbeat,
* or -1 if the group has no topology yet.
* @param storedDescriptionTopologyEpoch The most recent topology epoch successfully stored by the topology
* description plugin, or -1 if none.
* @param failedDescriptionTopologyEpoch The most recent topology epoch the plugin permanently rejected,
* or -1 if none.
*/
public record StreamsGroupHeartbeatResult(
StreamsGroupHeartbeatResponseData data,
Map<String, CreatableTopic> creatableTopics,
int currentTopologyEpoch
int currentTopologyEpoch,
int storedDescriptionTopologyEpoch,
int failedDescriptionTopologyEpoch
) {

public StreamsGroupHeartbeatResult {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.streams;

import org.apache.kafka.common.utils.Time;

import java.util.concurrent.ConcurrentHashMap;

/**
* In-memory per-group back-off that throttles broker re-solicitation of a topology
* description push. An entry is armed when the broker decides to set
* {@code TopologyDescriptionRequired=true} on a heartbeat or after a transient plugin
* failure; consecutive arms at the same topology epoch double the window from
* {@value #INITIAL_DELAY_MS} ms up to {@value #MAX_DELAY_MS} ms. Successful pushes,
* permanent plugin failures, and topology-epoch advances clear the entry.
*/
public class StreamsGroupTopologyDescriptionBackoff {

static final long INITIAL_DELAY_MS = 30_000L;
static final long MAX_DELAY_MS = 3_600_000L;

private final Time time;
private final ConcurrentHashMap<String, Entry> state = new ConcurrentHashMap<>();

record Entry(int topologyEpoch, long currentDelayMs, long nextAttemptMs) { }

public StreamsGroupTopologyDescriptionBackoff(Time time) {
this.time = time;
}

/**
* @return true if a back-off window is in effect for the given group at the given
* topology epoch and the broker should suppress soliciting another push.
*/
public boolean isActive(String groupId, int topologyEpoch) {
Entry entry = state.get(groupId);
return entry != null
&& entry.topologyEpoch() == topologyEpoch
&& time.milliseconds() < entry.nextAttemptMs();
}

/**
* Atomic check-and-arm. Returns true if no window was in effect and a new one was
* armed, false if a window was already active and nothing changed. Used on the
* heartbeat path to fold the "check + arm" pair into a single compute so two
* concurrent heartbeats for the same group cannot both arm the back-off.
*/
public boolean armIfNotActive(String groupId, int topologyEpoch) {
final long now = time.milliseconds();
final boolean[] armed = new boolean[]{false};
state.compute(groupId, (key, existing) -> {
if (existing != null
&& existing.topologyEpoch() == topologyEpoch
&& now < existing.nextAttemptMs()) {
return existing;
}
armed[0] = true;
return new Entry(topologyEpoch, INITIAL_DELAY_MS, now + INITIAL_DELAY_MS);
});
return armed[0];
}

/**
* Arm a new back-off window or extend the existing one. If the existing entry is for a
* different topology epoch the window is reset to {@link #INITIAL_DELAY_MS}.
*/
public void armOrExtend(String groupId, int topologyEpoch) {
final long now = time.milliseconds();
state.compute(groupId, (key, existing) -> {
if (existing == null || existing.topologyEpoch() != topologyEpoch) {
return new Entry(topologyEpoch, INITIAL_DELAY_MS, now + INITIAL_DELAY_MS);
}
long nextDelay = Math.min(existing.currentDelayMs() * 2, MAX_DELAY_MS);
return new Entry(topologyEpoch, nextDelay, now + nextDelay);
});
}

/**
* Drop the back-off entry for a group. Called on a successful push, a permanent plugin
* failure, or when the group is removed.
*/
public void clear(String groupId) {
state.remove(groupId);
}

// Visible for testing.
Entry entry(String groupId) {
return state.get(groupId);
}
}
Loading
Loading