To add to Matthias J. Sax point i have attached sample code to show, how it can be done.
public static void main(final String[] args) {
try {
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streamProcessor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.STATE_DIR_CONFIG, "state-store");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
StreamsBuilder streamsBuilder = new StreamsBuilder();
final KStream<String, String> textLines = streamsBuilder.stream(inputTopicList);
final KStream<String, String> textLines = builder.stream(inputTopiclist);
textLines.transform(getTopicDetailsTransformer::new)
.foreach(new ForeachAction<String, String>() {
public void apply(String key, String value) {
System.out.println(key + ": " + value);
}
});
textLines.to(outputTopic);
} catch (Exception e) {
System.out.println(e);
}
}
private static class getTopicDetailsTransformer implements Transformer<String, String, KeyValue<String, String>> {
private ProcessorContext context;
@Override
public void init(final ProcessorContext context) {
this.context = context;
}
public KeyValue<String, String> transform(final String recordKey, final String recordValue) {
//here i am returning key as topic name.
return KeyValue.pair(context.topic(), recordValue);
}
@Override
public void close() {
// Not needed.
}
}