1
votes

I did some very easy example with join streams where two topics have the simple key value structure (integer/string) and it's works perfectly.

May I ask, how can I do something like:

SELECT * FROM stream1, stream2
WHERE stream1.key = stream2.key AND (stream1.key > 50 && stream1.key < 100) AND (stream2.key > 50 AND stream2.key < 100)

Kafka allows something like this?

Finally what I want to do is to filter 2 joined streams where key will be GenericRecord and it will looks somehow:

SELECT * FROM stream1, stream2
WHERE stream1.genericRecordkey.someId. = stream2.genericRecordkey.someId

My test example:

public void joinKStreamToKStreamWhereKeyValueIsIntegerString() throws Exception {
    String uniqueKey = new Object() {
    }.getClass().getEnclosingMethod().getName();

    long timestamp = new Date().getTime();

    String firstTopic = String.format("%1$s_1_%2$s", uniqueKey, timestamp);
    String secondTopic = String.format("%1$s_2_%2$s", uniqueKey, timestamp);
    String outputTopic = String.format("%1$s_output_%2$s", uniqueKey, timestamp);
    String appIdConfig = String.format("%1$s_app_id_%2$s", uniqueKey, timestamp);
    String groupIdConfig = String.format("%1$s_group_id_%2$s", uniqueKey, timestamp);

    List<KeyValue<Integer, String>> ikv1 = Arrays.asList(
            new KeyValue<>(1, "Bruce Eckel"),
            new KeyValue<>(2, "Robert Lafore"),
            new KeyValue<>(3, "Andrew Tanenbaum")
    );

    List<KeyValue<Integer, String>> ikv2 = Arrays.asList(
            new KeyValue<>(3, "Modern Operating System"),
            new KeyValue<>(1, "Thinking in Java"),
            new KeyValue<>(3, "Computer Architecture"),
            new KeyValue<>(4, "Programming in Scala")
    );

    List<KeyValue<Integer, String>> expectedResults = Arrays.asList(
            new KeyValue<>(3, "Andrew Tanenbaum/Modern Operating System"),
            new KeyValue<>(1, "Bruce Eckel/Thinking in Java"),
            new KeyValue<>(3, "Andrew Tanenbaum/Computer Architecture")
    );

    Integer partitions = 1;
    Integer replication = 1;
    Properties topicConfig = new Properties();

    TopicUtils.createTopic(firstTopic, partitions, replication, topicConfig);
    TopicUtils.createTopic(secondTopic, partitions, replication, topicConfig);
    TopicUtils.createTopic(outputTopic, partitions, replication, topicConfig);

    final Serde<String> stringSerde = Serdes.String();
    final Serde<Integer> integerSerde = Serdes.Integer();

    Properties streamsConfiguration = new Properties();
    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appIdConfig);
    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
    streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_CONNECT_CONFIG);
    streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
    streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        // The commit interval for flushing records to state stores and downstream must be lower than
        // this integration test's timeout (30 secs) to ensure we observe the expected processing results.
        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // Use a temporary directory for storing state, which will be automatically removed after the test.
        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());

        KStreamBuilder builder = new KStreamBuilder();

        KStream<Integer, String> firstStream = builder.stream(integerSerde, stringSerde, firstTopic);
        KStream<Integer, String> secondStream = builder.stream(integerSerde, stringSerde, secondTopic);

        KStream<Integer, String> outputStream = firstStream.join(secondStream, (l, r) -> {
            return l + "/" + r;
        }, JoinWindows.of(TimeUnit.SECONDS.toMillis(5)), integerSerde, stringSerde, stringSerde);

        outputStream.to(integerSerde, stringSerde, outputTopic);

        KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);

        streams.start();

        Properties pCfg1 = new Properties();
        pCfg1.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
        pCfg1.put(ProducerConfig.ACKS_CONFIG, "all");
        pCfg1.put(ProducerConfig.RETRIES_CONFIG, 0);
        pCfg1.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        pCfg1.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        IntegrationTestUtils.produceKeyValuesSynchronously(firstTopic, ikv1, pCfg1);

        Properties pCfg2 = new Properties();
        pCfg2.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
        pCfg2.put(ProducerConfig.ACKS_CONFIG, "all");
        pCfg2.put(ProducerConfig.RETRIES_CONFIG, 0);
        pCfg2.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        pCfg2.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        IntegrationTestUtils.produceKeyValuesSynchronously(secondTopic, ikv2, pCfg2);

        Properties consumerConfig = new Properties();
        consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdConfig);
        consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        List<KeyValue<Integer, String>> actualResults = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, outputTopic, expectedResults.size());

        streams.close();

        assertThat(actualResults).containsExactlyElementsOf(expectedResults);
    }

Hope I explained well and thanks for any help.

1

1 Answers

3
votes

You can just apply a filter before you do the join.

outputStream = firstStream.filter(...).join(secondStream.filter(...), ...);

If you want to join on stream1.genericRecordkey.someId you will need to extract someId first and set it as key:

firstStream.selectKey((k,v) -> v.someId) ).join(secondStream.selectKey((k,v) -> v.someId), ...);

For more details, check out the docs: http://docs.confluent.io/current/streams/developer-guide.html