When joining a KStream with a GlobalKTable, you can use parts of the key and value of the KStream, but it must end up matching the entire GlobalKTable key, so, unfortunately, you can't do what you stated above with a join.
But you should still be able to do something close to this even using the DSL. If you used KStream.transformValues with a ValueTransformerWithKeySupplier, you could scan the statestore and extract the records you want based on a substring contained in the stream record. Additionally, you don't necessarily need to scan the entire store but use a range query instead.
EDIT: Here's some code that I got working to demonstrate what I am getting at.
@SuppressWarnings("unchecked")
public class MultiResultJoinExample {
public static void main(String[] args) {
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "mult-partial-key-join-results");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
final StreamsBuilder builder = new StreamsBuilder();
final String storeName = "kv-store";
final StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName),
Serdes.String(),
Serdes.String());
builder.addStateStore(keyValueStoreBuilder);
final KStream<String, String> streamToJoinAgainst = builder.stream("to-join-input", Consumed.with(Serdes.String(), Serdes.String() ));
streamToJoinAgainst.transformValues(new StoringValueTransformer(storeName), storeName);
final KStream<String, String> streamNeedingJoin = builder.stream("need-join-input", Consumed.with(Serdes.String(), Serdes.String()));
streamNeedingJoin.flatTransformValues(new FlatMapJoiningTransformer(storeName), storeName).to("output", Produced.with(Serdes.String(), Serdes.String()));
final KafkaStreams streams = new KafkaStreams(builder.build(props), props);
streams.start();
}
static final class FlatMapJoiningTransformer implements ValueTransformerWithKeySupplier<String, String, Iterable<String>> {
final String storeName;
public FlatMapJoiningTransformer(String storeName) {
this.storeName = storeName;
}
@Override
public ValueTransformerWithKey<String, String, Iterable<String>> get() {
return new ValueTransformerWithKey<String, String, Iterable<String>>() {
private KeyValueStore<String, String> kvStore;
@Override
public void init(ProcessorContext<Void, Void> context) {
kvStore = (KeyValueStore<String, String>) context.getStateStore(storeName);
}
@Override
public Iterable<String> transform(String readOnlyKey, String value) {
List<String> results = new ArrayList<>();
final String patternToMatch = readOnlyKey.substring(4, 7);
try (KeyValueIterator<String, String> iter = kvStore.all()) {
while(iter.hasNext()) {
final KeyValue<String, String> kv = iter.next();
if (kv.key.contains(patternToMatch) || kv.value.contains(patternToMatch)){
results.add(kv.value + " - " + value);
}
}
}
return results;
}
@Override
public void close() {
}
};
}
}
static final class StoringValueTransformer implements ValueTransformerWithKeySupplier<String, String, String> {
final String storeName;
public StoringValueTransformer(String storeName) {
this.storeName = storeName;
}
@Override
public ValueTransformerWithKey<String, String, String> get() {
return new ValueTransformerWithKey<String, String, String>(){
private KeyValueStore<String, String> kvStore;
@Override
public void init(ProcessorContext<Void, Void> context) {
kvStore = (KeyValueStore<String, String>)context.getStateStore(storeName);
}
@Override
public String transform(String readOnlyKey, String value) {
kvStore.putIfAbsent(readOnlyKey, value);
return value;
}
@Override
public void close() {
//no-op
}
};
}
}
}
HTH,
Bill