Skip to content

KAFKA-19602: fix(streams): propagate state store names from KTable.transformValues to downstream KTable join#22562

Open
wilmerdooley wants to merge 1 commit into
apache:trunkfrom
wilmerdooley:oss/kafka-19602
Open

KAFKA-19602: fix(streams): propagate state store names from KTable.transformValues to downstream KTable join#22562
wilmerdooley wants to merge 1 commit into
apache:trunkfrom
wilmerdooley:oss/kafka-19602

Conversation

@wilmerdooley

@wilmerdooley wilmerdooley commented Jun 13, 2026

Copy link
Copy Markdown

Change description

When KTable.transformValues is called with extra state stores (via the stateStoreNames parameter) and the result is not materialized, the resulting KTableTransformValues ProcessorSupplier did not expose those extra stores via storeNames(). As a result, a downstream KTable join operator had no way to learn about the extra stores and could not wire them through to the join processor, causing initialization to fail with Processor ... has no access to StateStore ....

This change adds a new constructor to KTableTransformValues that accepts the stateStoreNames and merges them with the parent value getter's storeNames() so the full set of stores is reported. KTableImpl now passes the stateStoreNames it already received to this new constructor. Tests are added in KTableTransformValuesTest and KTableKTableInnerJoinTest to cover both the unit-level storeNames() behavior and the end-to-end join scenario from the issue.

Testing strategy

Unit tests verify that the view() storeNames() method of KTableTransformValues returns the union of parent store names and the extra stateStoreNames when the result is not materialized. A new integration-style test in KTableKTableInnerJoinTest reproduces the exact scenario from KAFKA-19602: a transformValues on a KTable with an extra manually-added state store, followed by a self join, which previously failed during topology initialization.

JIRA: https://issues.apache.org/jira/browse/KAFKA-19602
Reviewers: Uladzislau Blok blokv75@gmail.com

…on KTable with extra store

Signed-off-by: wilmerdooley <wilmerdooley1@gmail.com>
@github-actions github-actions Bot added triage PRs from the community streams small Small PRs and removed triage PRs from the community labels Jun 13, 2026
@UladzislauBlok

UladzislauBlok commented Jun 13, 2026

Copy link
Copy Markdown
Contributor

Hello
Thanks for patch
Transformer API is deprecated, and we're going to remove it
Can we verify if an issue occurs with .process / .processValues methods

@wilmerdooley

Copy link
Copy Markdown
Author

Thanks, that makes sense.

I checked the code path, and I don't see the same issue with .process / .processValues. Those are KStream APIs (streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java around line 1550 and line 1580), and KStreamImpl connects the declared store names directly through ProcessorToStateConnectorNode (streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java around line 1306 and line 1355), which then calls connectProcessorAndStateStores (streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorToStateConnectorNode.java around line 67).

The failing path here looks specific to KTable.transformValues: downstream KTable joins collect store names from valueGetterSupplier().storeNames() (streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java around line 804), and the unmaterialized transform view goes through KTableTransformValues.view() (streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java around line 59). If a processed KStream is converted back with toTable(), KStreamImpl.toTable() creates a materialized table source (streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java around line 593 and line 650), so the downstream join reads that table store instead.

Given that KTable.transformValues is in the API area being deprecated, how would you prefer we proceed? We could keep this fix on transformValues for its remaining lifetime, or move toward a different location/approach if that fits the removal plan better.

I also noticed the current KTable.transformValues declarations and ValueTransformerWithKey interface are not annotated @Deprecated yet (streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java around line 823, streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java around line 54). If useful, I would be glad to help with those annotations or follow the relevant KIP. Please let me know which direction you prefer.

@wilmerdooley

Copy link
Copy Markdown
Author

Thanks, that helps. I read through KIP-1128 and see the plan to replace KTable.transformValues with the new KTable.processFixedKey on the new Processor API. That method is not in the code yet (KAFKA-17178 is still in progress), so there is nowhere to move this fix to today, and the store-name bug still breaks a downstream KTable join on the released versions while transformValues is around. Happy to keep this small fix available for that window if it is useful, and I understand if you would prefer to handle it as part of the KIP-1128 work instead.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants