1
votes

I'm doing data enrichment by joining a kstream with a ktable. The kstream contains messages sent by vehicles and the ktable contains vehicle data. The problem i have is that i want to capture message from the stream that don't have a corresponding join key in the table. Kafka stream silently skip records that they don't have a join match. Is there some way to emit those records to a different topic, so they can be processed later?

StreamsBuilder builder = new StreamsBuilder();
        final KTable<String, VinMappingInfo> vinMappingTable = builder.table(vinInfoTopic, Consumed.with(Serdes.String(), valueSerde));
        KStream<String, VehicleMessage> vehicleStream = builder.stream(sourceTopic);
        vehicleStream.join(vinMappingTable, (vehicleMsg, vinInfo) -> {
            log.info("joining {} with vin info {}", vehicleMsg.getPayload().getId(), vinInfo.data.vin);
            vehicleMsg.setVin(vinInfo.data.vin);
            return vehicleMsg;
        }, Joined.with(null, null, valueSerde))
                .to(destinationTopic);

        final Topology topology = builder.build();
        log.info("The topology of connected processor nodes: \n {}", topology.describe());
        KafkaStreams streams = new KafkaStreams(topology, config);
        streams.cleanUp();
        streams.start();
1

1 Answers

0
votes

You can use a left-join:

stream.leftJoin(table,...);

This ensures the all records from the input stream are in the output stream. The ValueJoiner will be called with apply(streamValue, null) for this case.