Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,8 @@ private <VR> KTable<K, VR> doTransformValues(final ValueTransformerWithKeySuppli
final KTableProcessorSupplier<K, V, K, VR> processorSupplier = new KTableTransformValues<>(
this,
transformerSupplier,
queryableStoreName);
queryableStoreName,
stateStoreNames);

final ProcessorParameters<K, VR, ?, ?> processorParameters =
unsafeCastProcessorParametersToCompletelyDifferentType(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,23 @@ class KTableTransformValues<K, V, VOut> implements KTableProcessorSupplier<K, V,
private final KTableImpl<K, ?, V> parent;
private final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VOut> transformerSupplier;
private final String queryableName;
private final String[] stateStoreNames;
private boolean sendOldValues = false;

KTableTransformValues(final KTableImpl<K, ?, V> parent,
final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VOut> transformerSupplier,
final String queryableName) {
this(parent, transformerSupplier, queryableName, new String[0]);
}

KTableTransformValues(final KTableImpl<K, ?, V> parent,
final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VOut> transformerSupplier,
final String queryableName,
final String[] stateStoreNames) {
this.parent = Objects.requireNonNull(parent, "parent");
this.transformerSupplier = Objects.requireNonNull(transformerSupplier, "transformerSupplier");
this.queryableName = queryableName;
this.stateStoreNames = Objects.requireNonNull(stateStoreNames, "stateStoreNames");
}

@Override
Expand All @@ -72,7 +81,11 @@ public KTableValueGetter<K, VOut> get() {

@Override
public String[] storeNames() {
return parentValueGetterSupplier.storeNames();
final String[] parentStoreNames = parentValueGetterSupplier.storeNames();
final String[] storeNames = new String[parentStoreNames.length + stateStoreNames.length];
System.arraycopy(parentStoreNames, 0, storeNames, 0, parentStoreNames.length);
System.arraycopy(stateStoreNames, 0, storeNames, parentStoreNames.length, stateStoreNames.length);
return storeNames;
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,21 @@
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.api.MockProcessorContext;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.MockApiProcessor;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

Expand Down Expand Up @@ -67,6 +72,56 @@ public class KTableKTableInnerJoinTest {
private Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());


@Test
public void shouldJoinAfterUnmaterializedTransformValuesWithExtraStore() {
final StreamsBuilder builder = new StreamsBuilder();
final StoreBuilder<KeyValueStore<Integer, String>> transformerStore = Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("transformer-store"),
Serdes.Integer(),
Serdes.String()
);
builder.addStateStore(transformerStore);

final KTable<Integer, String> transformed = builder
.table(topic1, consumed, materialized)
.transformValues(
() -> new ValueTransformerWithKey<>() {
@Override
public void init(final ProcessorContext context) {
context.getStateStore(transformerStore.name());
}

@Override
public String transform(final Integer readOnlyKey, final String value) {
return value;
}

@Override
public void close() {}
},
Materialized.with(Serdes.Integer(), Serdes.String()),
transformerStore.name()
);

transformed
.join(transformed, (value1, value2) -> value1)
.toStream()
.to(output);

try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
final TestInputTopic<Integer, String> inputTopic =
driver.createInputTopic(
topic1,
Serdes.Integer().serializer(),
Serdes.String().serializer(),
Instant.ofEpochMilli(0L),
Duration.ZERO
);

inputTopic.pipeInput(1, "A");
}
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testJoin(final boolean withHeaders) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,19 @@ public void shouldGetStoreNamesFromParentIfNotMaterialized() {
assertThat(storeNames, is(new String[]{"store1", "store2"}));
}

@Test
public void shouldGetStoreNamesFromParentAndExtraStoresIfNotMaterialized() {
final KTableTransformValues<String, String, String> transformValues =
new KTableTransformValues<>(parent, new ExclamationValueTransformerSupplier(), null, new String[]{"store3", "store4"});

when(parent.valueGetterSupplier()).thenReturn(parentGetterSupplier);
when(parentGetterSupplier.storeNames()).thenReturn(new String[]{"store1", "store2"});

final String[] storeNames = transformValues.view().storeNames();

assertThat(storeNames, is(new String[]{"store1", "store2", "store3", "store4"}));
}

@Test
public void shouldGetQueryableStoreNameIfMaterialized() {
final KTableTransformValues<String, String, String> transformValues =
Expand Down