Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,8 @@ class BrokerServer(
.withPersister(persister)
.withAuthorizerPlugin(authorizerPlugin.toJava)
.withPartitionMetadataClient(partitionMetadataClient)
.withStreamsGroupTopologyDescriptionPlugin(
Optional.ofNullable(config.groupCoordinatorConfig.streamsGroupTopologyDescriptionPlugin(java.util.Map.of())))
Comment on lines +715 to +716

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure to follow why we need withStreamsGroupTopologyDescriptionPlugin. The build already gets the groupCoordinatorConfig. Can't we rely on it internally?

.build()
}

Expand Down
10 changes: 5 additions & 5 deletions core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11049,7 +11049,7 @@ class KafkaApisTest extends Logging {
val streamsGroupHeartbeatResponse = new StreamsGroupHeartbeatResponseData()
.setMemberId("member")

future.complete(new StreamsGroupHeartbeatResult(streamsGroupHeartbeatResponse, util.Map.of(), -1))
future.complete(new StreamsGroupHeartbeatResult(streamsGroupHeartbeatResponse, util.Map.of(), -1, -1, -1))
val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
assertEquals(streamsGroupHeartbeatResponse, response.data)
}
Expand Down Expand Up @@ -11119,7 +11119,7 @@ class KafkaApisTest extends Logging {
val streamsGroupHeartbeatResponse = new StreamsGroupHeartbeatResponseData()
.setMemberId("member")

future.complete(new StreamsGroupHeartbeatResult(streamsGroupHeartbeatResponse, util.Map.of(), -1))
future.complete(new StreamsGroupHeartbeatResult(streamsGroupHeartbeatResponse, util.Map.of(), -1, -1, -1))
val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
assertEquals(streamsGroupHeartbeatResponse, response.data)
}
Expand Down Expand Up @@ -11351,7 +11351,7 @@ class KafkaApisTest extends Logging {
val streamsGroupHeartbeatResponse = new StreamsGroupHeartbeatResponseData()
.setMemberId("member")

future.complete(new StreamsGroupHeartbeatResult(streamsGroupHeartbeatResponse, missingTopics, -1))
future.complete(new StreamsGroupHeartbeatResult(streamsGroupHeartbeatResponse, missingTopics, -1, -1, -1))
val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
assertEquals(streamsGroupHeartbeatResponse, response.data)
verify(autoTopicCreationManager).createStreamsInternalTopics(any(), any(), anyLong())
Expand Down Expand Up @@ -11400,7 +11400,7 @@ class KafkaApisTest extends Logging {
val streamsGroupHeartbeatResponse = new StreamsGroupHeartbeatResponseData()
.setMemberId("member")

future.complete(new StreamsGroupHeartbeatResult(streamsGroupHeartbeatResponse, missingTopics, -1))
future.complete(new StreamsGroupHeartbeatResult(streamsGroupHeartbeatResponse, missingTopics, -1, -1, -1))
val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)
assertEquals(Errors.NONE.code, response.data.errorCode())
assertEquals(null, response.data.errorMessage())
Expand Down Expand Up @@ -11451,7 +11451,7 @@ class KafkaApisTest extends Logging {
.setStatusDetail("Internal topics are missing: [test-topic]")
))

future.complete(new StreamsGroupHeartbeatResult(streamsGroupHeartbeatResponse, missingTopics, -1))
future.complete(new StreamsGroupHeartbeatResult(streamsGroupHeartbeatResponse, missingTopics, -1, -1, -1))
val response = verifyNoThrottling[StreamsGroupHeartbeatResponse](requestChannelRequest)

assertEquals(Errors.NONE.code, response.data.errorCode())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.common.annotation.InterfaceStability;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -58,24 +59,24 @@ public sealed interface Node {
public record Source(String name, Set<String> topics, Set<String> successors) implements Node {
public Source {
Objects.requireNonNull(name, "name");
topics = Set.copyOf(Objects.requireNonNull(topics, "topics"));
successors = Set.copyOf(Objects.requireNonNull(successors, "successors"));
topics = Collections.unmodifiableSet(Objects.requireNonNull(topics, "topics"));

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set.copyOf doesn't guarantee the order, so I change to unmodifiableSet

successors = Collections.unmodifiableSet(Objects.requireNonNull(successors, "successors"));
}
}

public record Processor(String name, Set<String> stores, Set<String> successors) implements Node {
public Processor {
Objects.requireNonNull(name, "name");
stores = Set.copyOf(Objects.requireNonNull(stores, "stores"));
successors = Set.copyOf(Objects.requireNonNull(successors, "successors"));
stores = Collections.unmodifiableSet(Objects.requireNonNull(stores, "stores"));
successors = Collections.unmodifiableSet(Objects.requireNonNull(successors, "successors"));
}
}

public record Sink(String name, Optional<String> topic, Set<String> successors) implements Node {
public Sink {
Objects.requireNonNull(name, "name");
Objects.requireNonNull(topic, "topic");
successors = Set.copyOf(Objects.requireNonNull(successors, "successors"));
successors = Collections.unmodifiableSet(Objects.requireNonNull(successors, "successors"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
import org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateRequestData;
import org.apache.kafka.common.message.StreamsGroupTopologyDescriptionUpdateResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
Expand Down Expand Up @@ -98,6 +100,27 @@ CompletableFuture<StreamsGroupHeartbeatResult> streamsGroupHeartbeat(
StreamsGroupHeartbeatRequestData request
);

/**
* Persist the full topology description pushed by a Streams group member.
*
* <p>The broker forwards the description to the configured
* {@code StreamsGroupTopologyDescriptionPlugin}. On success the broker writes a
* metadata record advancing {@code StoredDescriptionTopologyEpoch}. On a permanent
* plugin failure it writes {@code FailedDescriptionTopologyEpoch} to stop re-soliciting
* at the same epoch. On a transient failure no record is written and the broker arms
* an in-memory back-off.
*
* @param context The request context.
* @param request The StreamsGroupTopologyDescriptionUpdateRequest data.
*
* @return A future yielding the response. The error code is set to indicate any error
* encountered during the execution.
*/
CompletableFuture<StreamsGroupTopologyDescriptionUpdateResponseData> streamsGroupTopologyDescriptionUpdate(
AuthorizableRequestContext context,
StreamsGroupTopologyDescriptionUpdateRequestData request
);

/**
* Heartbeat to a Share Group.
*
Expand Down
Loading
Loading