diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 5f0d043c7e460..5dd2ad556e9b5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -466,7 +466,8 @@ private KTable doTransformValues(final ValueTransformerWithKeySuppli final KTableProcessorSupplier processorSupplier = new KTableTransformValues<>( this, transformerSupplier, - queryableStoreName); + queryableStoreName, + stateStoreNames); final ProcessorParameters processorParameters = unsafeCastProcessorParametersToCompletelyDifferentType( diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java index 2948af604d187..c564bb1f93ac4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java @@ -40,14 +40,23 @@ class KTableTransformValues implements KTableProcessorSupplier parent; private final ValueTransformerWithKeySupplier transformerSupplier; private final String queryableName; + private final String[] stateStoreNames; private boolean sendOldValues = false; KTableTransformValues(final KTableImpl parent, final ValueTransformerWithKeySupplier transformerSupplier, final String queryableName) { + this(parent, transformerSupplier, queryableName, new String[0]); + } + + KTableTransformValues(final KTableImpl parent, + final ValueTransformerWithKeySupplier transformerSupplier, + final String queryableName, + final String[] stateStoreNames) { this.parent = Objects.requireNonNull(parent, "parent"); this.transformerSupplier = Objects.requireNonNull(transformerSupplier, "transformerSupplier"); this.queryableName = queryableName; + this.stateStoreNames = Objects.requireNonNull(stateStoreNames, "stateStoreNames"); } @Override @@ -72,7 +81,11 @@ public KTableValueGetter get() { @Override public String[] storeNames() { - return parentValueGetterSupplier.storeNames(); + final String[] parentStoreNames = parentValueGetterSupplier.storeNames(); + final String[] storeNames = new String[parentStoreNames.length + stateStoreNames.length]; + System.arraycopy(parentStoreNames, 0, storeNames, 0, parentStoreNames.length); + System.arraycopy(stateStoreNames, 0, storeNames, parentStoreNames.length, stateStoreNames.length); + return storeNames; } }; } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java index d336b69a3241f..ae43714bc4f5f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java @@ -29,16 +29,21 @@ import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.ValueTransformerWithKey; +import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.api.MockProcessorContext; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.test.TestRecord; import org.apache.kafka.test.MockApiProcessor; import org.apache.kafka.test.MockApiProcessorSupplier; import org.apache.kafka.test.MockValueJoiner; import org.apache.kafka.test.StreamsTestUtils; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -67,6 +72,56 @@ public class KTableKTableInnerJoinTest { private Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String()); + @Test + public void shouldJoinAfterUnmaterializedTransformValuesWithExtraStore() { + final StreamsBuilder builder = new StreamsBuilder(); + final StoreBuilder> transformerStore = Stores.keyValueStoreBuilder( + Stores.inMemoryKeyValueStore("transformer-store"), + Serdes.Integer(), + Serdes.String() + ); + builder.addStateStore(transformerStore); + + final KTable transformed = builder + .table(topic1, consumed, materialized) + .transformValues( + () -> new ValueTransformerWithKey<>() { + @Override + public void init(final ProcessorContext context) { + context.getStateStore(transformerStore.name()); + } + + @Override + public String transform(final Integer readOnlyKey, final String value) { + return value; + } + + @Override + public void close() {} + }, + Materialized.with(Serdes.Integer(), Serdes.String()), + transformerStore.name() + ); + + transformed + .join(transformed, (value1, value2) -> value1) + .toStream() + .to(output); + + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { + final TestInputTopic inputTopic = + driver.createInputTopic( + topic1, + Serdes.Integer().serializer(), + Serdes.String().serializer(), + Instant.ofEpochMilli(0L), + Duration.ZERO + ); + + inputTopic.pipeInput(1, "A"); + } + } + @ParameterizedTest @ValueSource(booleans = {false, true}) public void testJoin(final boolean withHeaders) { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java index 75a64d580ec8e..a8f38e312f4ec 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java @@ -305,6 +305,19 @@ public void shouldGetStoreNamesFromParentIfNotMaterialized() { assertThat(storeNames, is(new String[]{"store1", "store2"})); } + @Test + public void shouldGetStoreNamesFromParentAndExtraStoresIfNotMaterialized() { + final KTableTransformValues transformValues = + new KTableTransformValues<>(parent, new ExclamationValueTransformerSupplier(), null, new String[]{"store3", "store4"}); + + when(parent.valueGetterSupplier()).thenReturn(parentGetterSupplier); + when(parentGetterSupplier.storeNames()).thenReturn(new String[]{"store1", "store2"}); + + final String[] storeNames = transformValues.view().storeNames(); + + assertThat(storeNames, is(new String[]{"store1", "store2", "store3", "store4"})); + } + @Test public void shouldGetQueryableStoreNameIfMaterialized() { final KTableTransformValues transformValues =