Skip to content

KAFKA-20623: Wire topology description plugin into GroupCoordinatorService#22545

Draft
frankvicky wants to merge 1 commit into
apache:trunkfrom
frankvicky:KAFKA-20623
Draft

KAFKA-20623: Wire topology description plugin into GroupCoordinatorService#22545
frankvicky wants to merge 1 commit into
apache:trunkfrom
frankvicky:KAFKA-20623

Conversation

@frankvicky

@frankvicky frankvicky commented Jun 11, 2026

Copy link
Copy Markdown
Contributor

JIRA: KAFKA-20623 This PR is a part of KIP-1331.

Builder plumbing

  • GroupCoordinatorService.Builder accepts an
    Optional<StreamsGroupTopologyDescriptionPlugin> and constructs the
    per-group back-off
    state at build time so the shard-builder supplier closure can wire a
    removal listener
    back into it.
  • BrokerServer reads
    groupCoordinatorConfig.streamsGroupTopologyDescriptionPlugin(...)
    and passes the
    resulting Optional through. When the config is unset the feature is
    fully disabled.

Heartbeat post-processing

  • StreamsGroupHeartbeatResult now carries
    storedDescriptionTopologyEpoch and
    failedDescriptionTopologyEpoch alongside currentTopologyEpoch, so
    the service layer
    can decide whether to set TopologyDescriptionRequired=true without
    re-reading the
    group.
  • maybeSetTopologyDescriptionRequired applies the KIP-1331 gating:
    plugin present,
    response has no error code, c
    initial delay; a successful push or a permanent plugin failure clears
    the entry.
  • The back-off is per broker (service-level), not per partition. To
    avoid leaking entries
    for groups that vanish, GroupMetadataManager now exposes a
    Consumer<String>
    streams-group removal listener (wired through
    GroupCoordinatorShard.Builder). The
    listener fires from removeGroup whenever the removed group is a
    streams group, and
    from the STREAMS branch of onUnloaded; the service registers it as
    backoff::clear.

StreamsGroupTopologyDescriptionUpdate handler

  • Synchronous pre-checks reject the request fast when the coordinator is
    inactive
    (COORDINATOR_NOT_AVAILABLE), no plugin is configured
    (UNSUPPORTED_VERSION), or
    the request is structurally malformed (INVALID_REQUEST for empty
    MemberId,
    empty GroupId, or null TopologyDescription).
  • The chain runs validateStreamsGroupMember first (so a fenced
    caller gets
    UNKNOWN_MEMBER_ID rather than an INVALID_REQUEST from a payload
    conversion
    failure), then converts the wire payload to the broker-side POJO, then
    calls the
    plugin.
  • Plugin success writes a metadata record advancing
    StoredDescriptionTopologyEpoch;
    a StreamsTopologyDescriptionPermanentFailureException (or a
    synchronous throw)
    writes FailedDescriptionTopologyEpoch; any other exception is
    treated as transient
    and writes no record.
  • The client-visible error is always
    STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED with
    the plugin's message; the permanent-vs-transient split is
    broker-internal.
  • All back-off state mutations happen in a single whenComplete, driven
    by a
    BackoffAction holder populated by each terminal branch. A
    post-plugin write failure
    leaves the action at ARM, so the next heartbeat sees the drift and
    re-solicits an
    idempotent re-push, matching the KIP-1331 invariant.

DeleteGroups integration

  • Before tombstoning, the service identifies retained group ids that are
    streams groups
    with a non-default StoredDescriptionTopologyEpoch
    (streamsGroupsWithStoredTopologyDescription)
    and calls plugin.deleteTopology for each in parallel.
  • Per-group plugin failure surfaces as GROUP_DELETION_FAILED with the
    cause string in
    ErrorMessage; the group is held back from tombstoning so the caller
    can retry.
    Mixed batches are honoured: groups that succeed proceed to tombstone,
    groups that fail
    return the per-group error.
  • For DeleteGroups requests at version < 3 (pre-KIP-1331), the error is
    downgraded to
    UNKNOWN_SERVER_ERROR with no message, matching the convention used
    by KIP-1043 for
    new error codes that older request versions cannot interpret.

Wire ↔ POJO converter

  • StreamsGroupTopologyDescriptionConverter translates
    StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescription
    into the
    broker-side StreamsGroupTopologyDescription POJO. Subtopology node
    ordering and
    string-collection iteration order are preserved via LinkedHashSet,
    so a downstream
    pretty-printer (e.g. the future kafka-streams-groups.sh --topology
    command) can
    reproduce the source ordering.
  • Wire-level node types (SOURCE=1, PROCESSOR=2, SINK=3) map to the
    sealed Node
    hierarchy. GlobalStore shape is validated; a malformed payload
    throws
    InvalidRequestException.

Reviewers: David Jacot david.jacot@gmail.com, Lucas Brutschy
lbrutschy@confluent.io

@github-actions github-actions Bot added triage PRs from the community core Kafka Broker group-coordinator labels Jun 11, 2026
Objects.requireNonNull(name, "name");
topics = Set.copyOf(Objects.requireNonNull(topics, "topics"));
successors = Set.copyOf(Objects.requireNonNull(successors, "successors"));
topics = Collections.unmodifiableSet(Objects.requireNonNull(topics, "topics"));

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set.copyOf doesn't guarantee the order, so I change to unmodifiableSet

Comment on lines +715 to +716
.withStreamsGroupTopologyDescriptionPlugin(
Optional.ofNullable(config.groupCoordinatorConfig.streamsGroupTopologyDescriptionPlugin(java.util.Map.of())))

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure to follow why we need withStreamsGroupTopologyDescriptionPlugin. The build already gets the groupCoordinatorConfig. Can't we rely on it internally?

Group removed = groups.get(groupId, Long.MAX_VALUE);
groups.remove(groupId);
if (removed != null && removed.type() == Group.GroupType.STREAMS) {
streamsGroupRemovalListener.accept(groupId);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's discuss this further. I would like to understand the rational. I think that we should try to avoid this kind of coupling from the shard to the service. It should be the other way around.

@github-actions github-actions Bot removed the triage PRs from the community label Jun 12, 2026
));
}

/**

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be nice to move this to a separate class (like TopologyDescriptionManager)

@frankvicky frankvicky marked this pull request as draft June 12, 2026 14:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants