Skip to content

KAFKA-20623: Update RPC for streams group topology description plugin (2/3)#22552

Open
frankvicky wants to merge 2 commits into
apache:trunkfrom
frankvicky:KAFKA-20623-2-update-rpc
Open

KAFKA-20623: Update RPC for streams group topology description plugin (2/3)#22552
frankvicky wants to merge 2 commits into
apache:trunkfrom
frankvicky:KAFKA-20623-2-update-rpc

Conversation

@frankvicky

@frankvicky frankvicky commented Jun 12, 2026

Copy link
Copy Markdown
Contributor

JIRA: KAFKA-20623
This is a part of KIP-1331
This PR shouldn't be merge before #22551

Adds the StreamsGroupTopologyDescriptionUpdate RPC handler stacked on
the heartbeat extension (1/3). The push pipeline runs through
TopologyDescriptionManager.handleSetTopology: validate (group, member), convert wire payload, call plugin.setTopology, persist
StoredDescriptionTopologyEpoch on success or
FailedDescriptionTopologyEpoch on permanent failure, and centralise
back-off state mutations in a single whenComplete.

Push pipeline on the manager

TopologyDescriptionManager gains the push entry points:

  • preCheckTopologyDescriptionUpdate — synchronous structural
    validation that rejects
    the request with UNSUPPORTED_VERSION when no plugin is configured,
    INVALID_REQUEST
    for empty MemberId / GroupId or null TopologyDescription.
  • handleSetTopology — runs validateStreamsGroupMember first (so a
    fenced caller gets
    UNKNOWN_MEMBER_ID rather than an INVALID_REQUEST from a payload
    conversion
    failure), then converts the wire payload to the broker-side POJO, then
    calls the
    plugin. Plugin success writes a metadata record advancing
    StoredDescriptionTopologyEpoch; a
    StreamsTopologyDescriptionPermanentFailureException
    (or a synchronous throw) writes FailedDescriptionTopologyEpoch; any
    other exception
    is treated as transient and writes no record.

Back-off mutation point

All back-off mutations on the push path are folded into a single
whenComplete,
driven by a BackoffAction holder populated by each terminal branch.
The default is
ARM; only success and permanent-failure branches set CLEAR. A
post-plugin write
failure therefore re-arms the back-off and the next heartbeat sees the
drift and
re-solicits an idempotent re-push, matching the KIP-1331 invariant.

Wire ↔ POJO converter

StreamsGroupTopologyDescriptionConverter translates
StreamsGroupTopologyDescriptionUpdateRequestData.TopologyDescription
into the
broker-side StreamsGroupTopologyDescription POJO. Subtopology node
ordering and
string-collection iteration order are preserved via LinkedHashSet, so
a downstream
pretty-printer (e.g. the future kafka-streams-groups.sh --topology
command) can
reproduce the source ordering.

Wire-level node types (SOURCE=1, PROCESSOR=2, SINK=3) map to the
sealed Node hierarchy. GlobalStore shape is validated; a malformed
payload throws InvalidRequestException. The POJO itself drops
Set.copyOf in favour of Collections.unmodifiableSet to preserve the
converter's insertion order — production callers only ever construct
nodes from a fresh LinkedHashSet so this is not a defensive-copy
regression.

Coordinator interface and service entry

  • GroupCoordinator.streamsGroupTopologyDescriptionUpdate(context, request) interface
    method added.
  • GroupCoordinatorService.streamsGroupTopologyDescriptionUpdate(...)
    short-circuits
    on a non-active coordinator with COORDINATOR_NOT_AVAILABLE, then
    delegates to
    topologyDescriptionManager.preCheckTopologyDescriptionUpdate(...)
    and
    handleSetTopology(...). Unhandled exceptions are translated by
    handleOperationException into the wire error response.

Coordinator shard + GMM

  • GroupMetadataManager.streamsGroupSetTopologyDescriptionEpoch(groupId, pushedEpoch, permanentFailure)
    — writes a StreamsGroupMetadata record advancing
    StoredDescriptionTopologyEpoch
    (on success) or FailedDescriptionTopologyEpoch (on permanent
    failure).
  • GroupCoordinatorShard.streamsGroupSetTopologyDescriptionEpoch(...)
    exposes the
    method to the runtime write-operation scheduler.
    topologyDescriptionManager.preCheckTopologyDescriptionUpdate(...)
    and
    handleSetTopology(...). Unhandled exceptions are translated by
    handleOperationException into the wire error response.

Coordinator shard + GMM

  • GroupMetadataManager.streamsGroupSetTopologyDescriptionEpoch(groupId, pushedEpoch, permanentFailure)
    — writes a StreamsGroupMetadata record advancing
    StoredDescriptionTopologyEpoch
    (on success) or FailedDescriptionTopologyEpoch (on permanent
    failure).
  • GroupCoordinatorShard.streamsGroupSetTopologyDescriptionEpoch(...)
    exposes the
    method to the runtime write-operation scheduler.

…on plugin (1/3)

Wires the plugin reference into GroupCoordinatorService and adds the
heartbeat-path
gate that asks streams clients to push their topology description.
Plugin lookup is
resolved internally from groupCoordinatorConfig (no BrokerServer
change).
… (2/3)

Adds the StreamsGroupTopologyDescriptionUpdate RPC handler stacked on
the
heartbeat extension (1/3). The push pipeline runs through
TopologyDescriptionManager.handleSetTopology: validate (group, member),
convert
wire payload, call plugin.setTopology, persist
StoredDescriptionTopologyEpoch on
success or FailedDescriptionTopologyEpoch on permanent failure, and
centralise
back-off state mutations in a single whenComplete.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Kafka Broker group-coordinator triage PRs from the community

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant