0
votes

I have a topic name which is push-processing-KSTREAM-PEEK-0000000014-repartition and this is internal topic by kafka. I did not create this topic and I am using .peek() method after repartition and using peek method 3-4 times.

My question is I can read from topic topic read push-processing-KSTREAM-PEEK-0000000014-repartition, but I can not read when I say topic read push-processing-KSTREAM-PEEK-0000000014-repartition --from-beginning.

This internal topic is created because of peek method right?

Or is it related with other repartition streams code, but its name is KSTREEAM-PEEK?

It has 50 partitions. Because of peek is stateless operation, it should not create internal topics right but why is it name is related with peek and why I can not read from beginning?

Any idea please/

Here is first topology:

   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [appconnect_deviceIds_exported_for_push])
      --> KSTREAM-FLATMAP-0000000004
    Processor: KSTREAM-FLATMAP-0000000004 (stores: [])
      --> KSTREAM-PEEK-0000000005
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-PEEK-0000000005 (stores: [])
      --> KSTREAM-FILTER-0000000007
      <-- KSTREAM-FLATMAP-0000000004
    Processor: KSTREAM-FILTER-0000000007 (stores: [])
      --> KSTREAM-SINK-0000000006
      <-- KSTREAM-PEEK-0000000005
    Sink: KSTREAM-SINK-0000000006 (topic: KSTREAM-PEEK-0000000005-repartition)
      <-- KSTREAM-FILTER-0000000007

  Sub-topology: 1
    Source: KSTREAM-SOURCE-0000000008 (topics: [KSTREAM-PEEK-0000000005-repartition])
      --> KSTREAM-JOIN-0000000009
    Source: KSTREAM-SOURCE-0000000028 (topics: [KSTREAM-PEEK-0000000025-repartition])
      --> KSTREAM-JOIN-0000000029
    Processor: KSTREAM-JOIN-0000000009 (stores: [appconnect_device_stream-STATE-STORE-0000000001])
      --> KSTREAM-MAP-0000000010
      <-- KSTREAM-SOURCE-0000000008
    Processor: KSTREAM-JOIN-0000000029 (stores: [appconnect_device_stream-STATE-STORE-0000000001])
      --> KSTREAM-PEEK-0000000030
      <-- KSTREAM-SOURCE-0000000028
    Processor: KSTREAM-MAP-0000000010 (stores: [])
      --> KSTREAM-PEEK-0000000011
      <-- KSTREAM-JOIN-0000000009
    Processor: KSTREAM-PEEK-0000000030 (stores: [])
      --> KSTREAM-MAP-0000000031
      <-- KSTREAM-JOIN-0000000029
    Processor: KSTREAM-MAP-0000000031 (stores: [])
      --> KSTREAM-SINK-0000000032
      <-- KSTREAM-PEEK-0000000030
    Processor: KSTREAM-PEEK-0000000011 (stores: [])
      --> KSTREAM-SINK-0000000012
      <-- KSTREAM-MAP-0000000010
    Source: KSTREAM-SOURCE-0000000002 (topics: [appconnect_device_stream])
      --> KTABLE-SOURCE-0000000003
    Sink: KSTREAM-SINK-0000000012 (topic: appconnect_devices_exported_for_push)
      <-- KSTREAM-PEEK-0000000011
    Sink: KSTREAM-SINK-0000000032 (topic: appconnect_devices_exported_for_push)
      <-- KSTREAM-MAP-0000000031
    Processor: KTABLE-SOURCE-0000000003 (stores: [appconnect_device_stream-STATE-STORE-0000000001])
      --> none
      <-- KSTREAM-SOURCE-0000000002

  Sub-topology: 2
    Source: KSTREAM-SOURCE-0000000013 (topics: [appconnect_userIds_exported_for_push])
      --> KSTREAM-FLATMAP-0000000017
    Processor: KSTREAM-FLATMAP-0000000017 (stores: [])
      --> KSTREAM-PEEK-0000000018
      <-- KSTREAM-SOURCE-0000000013
    Processor: KSTREAM-PEEK-0000000018 (stores: [])
      --> KSTREAM-FILTER-0000000020
      <-- KSTREAM-FLATMAP-0000000017
    Processor: KSTREAM-FILTER-0000000020 (stores: [])
      --> KSTREAM-SINK-0000000019
      <-- KSTREAM-PEEK-0000000018
    Sink: KSTREAM-SINK-0000000019 (topic: KSTREAM-PEEK-0000000018-repartition)
      <-- KSTREAM-FILTER-0000000020

  Sub-topology: 3
    Source: KSTREAM-SOURCE-0000000021 (topics: [KSTREAM-PEEK-0000000018-repartition])
      --> KSTREAM-JOIN-0000000022
    Processor: KSTREAM-JOIN-0000000022 (stores: [appconnect_user_stream-STATE-STORE-0000000014])
      --> KSTREAM-PEEK-0000000023
      <-- KSTREAM-SOURCE-0000000021
    Processor: KSTREAM-PEEK-0000000023 (stores: [])
      --> KSTREAM-MAP-0000000024
      <-- KSTREAM-JOIN-0000000022
    Processor: KSTREAM-MAP-0000000024 (stores: [])
      --> KSTREAM-PEEK-0000000025
      <-- KSTREAM-PEEK-0000000023
    Processor: KSTREAM-PEEK-0000000025 (stores: [])
      --> KSTREAM-FILTER-0000000027
      <-- KSTREAM-MAP-0000000024
    Processor: KSTREAM-FILTER-0000000027 (stores: [])
      --> KSTREAM-SINK-0000000026
      <-- KSTREAM-PEEK-0000000025
    Source: KSTREAM-SOURCE-0000000015 (topics: [appconnect_user_stream])
      --> KTABLE-SOURCE-0000000016
    Sink: KSTREAM-SINK-0000000026 (topic: KSTREAM-PEEK-0000000025-repartition)
      <-- KSTREAM-FILTER-0000000027
    Processor: KTABLE-SOURCE-0000000016 (stores: [appconnect_user_stream-STATE-STORE-0000000014])
      --> none
      <-- KSTREAM-SOURCE-0000000015

That is step two,

   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000017 (topics: [KSTREAM-PEEK-0000000014-repartition])
      --> KSTREAM-JOIN-0000000018
    Processor: KSTREAM-JOIN-0000000018 (stores: [appconnect_push_processing_submissions-STATE-STORE-0000000000])
      --> KSTREAM-FILTER-0000000019
      <-- KSTREAM-SOURCE-0000000017
    Processor: KSTREAM-FILTER-0000000019 (stores: [])
      --> KSTREAM-SINK-0000000020
      <-- KSTREAM-JOIN-0000000018
    Source: KSTREAM-SOURCE-0000000001 (topics: [appconnect_push_processing_submissions])
      --> KTABLE-SOURCE-0000000002
    Sink: KSTREAM-SINK-0000000020 (topic: appconnect_push_send_bulk)
      <-- KSTREAM-FILTER-0000000019
    Processor: KTABLE-SOURCE-0000000002 (stores: [appconnect_push_processing_submissions-STATE-STORE-0000000000])
      --> none
      <-- KSTREAM-SOURCE-0000000001

  Sub-topology: 1
    Source: KSTREAM-SOURCE-0000000003 (topics: [appconnect_devices_exported_for_push])
      --> KSTREAM-MAP-0000000007
    Processor: KSTREAM-MAP-0000000007 (stores: [])
      --> KSTREAM-PEEK-0000000008
      <-- KSTREAM-SOURCE-0000000003
    Processor: KSTREAM-PEEK-0000000008 (stores: [])
      --> KSTREAM-FILTER-0000000010
      <-- KSTREAM-MAP-0000000007
    Processor: KSTREAM-FILTER-0000000010 (stores: [])
      --> KSTREAM-SINK-0000000009
      <-- KSTREAM-PEEK-0000000008
    Sink: KSTREAM-SINK-0000000009 (topic: KSTREAM-PEEK-0000000008-repartition)
      <-- KSTREAM-FILTER-0000000010

  Sub-topology: 2
    Source: KSTREAM-SOURCE-0000000011 (topics: [KSTREAM-PEEK-0000000008-repartition])
      --> KSTREAM-LEFTJOIN-0000000012
    Processor: KSTREAM-LEFTJOIN-0000000012 (stores: [appconnect_user_stream-STATE-STORE-0000000004])
      --> KSTREAM-KEY-SELECT-0000000013
      <-- KSTREAM-SOURCE-0000000011
    Processor: KSTREAM-KEY-SELECT-0000000013 (stores: [])
      --> KSTREAM-PEEK-0000000014
      <-- KSTREAM-LEFTJOIN-0000000012
    Processor: KSTREAM-PEEK-0000000014 (stores: [])
      --> KSTREAM-FILTER-0000000016
      <-- KSTREAM-KEY-SELECT-0000000013
    Processor: KSTREAM-FILTER-0000000016 (stores: [])
      --> KSTREAM-SINK-0000000015
      <-- KSTREAM-PEEK-0000000014
    Source: KSTREAM-SOURCE-0000000005 (topics: [appconnect_user_stream])
      --> KTABLE-SOURCE-0000000006
    Sink: KSTREAM-SINK-0000000015 (topic: KSTREAM-PEEK-0000000014-repartition)
      <-- KSTREAM-FILTER-0000000016
    Processor: KTABLE-SOURCE-0000000006 (stores: [appconnect_user_stream-STATE-STORE-0000000004])
      --> none
      <-- KSTREAM-SOURCE-0000000005

And all of my these operations use same KEY. I have 5 brokers and 50 partitions for all topics. I have 10 concurrency and I scaled my app to 5. But like I said I am doing repartition and transfer data 3-4 times on a same key. That means all my values related flatMap, map operations go to same partition. 1 or 2 times I am using different key so messages distributed to different partitions, just 1-2 times. Does this affect my performance? Or I should definitely distribute on different partitions to increase my performance.

So basically is kafka showing better performance when performing join or repartition operation with 3-4 times using only a partition between the topics, because kafka will read from only and only a partition and actually knows where to read and read immediately all the data because the data on the physically parallel on the disk (I mean ssd or hdd). Or my second scenario; I should definitely use more partitions to read parallel between the partitions?

And I also think that using peek slows my process.

1
Sorry, I have edited my question and added topologyAlpcan Yıldız

1 Answers

1
votes

The peek() operation is unrelated. Looking at the topology description you posted you program (partly) is as follows:

KStream inputUser = builder.stream().flatMap().peek().filter();
KStream inputDevice = builder.stream().flatMap().peek().filter();
inputUser.join(inputDevice,...)

(Would be easier if you would post your code in the question, too).

Because you call flatMap() Kafka Streams assumes that you change the key, and hence, calling join() triggers the data repartitioning. The repartition topic name is generated by upstream operatore (I am not 100% sure why PEEK is picked instead of FILTER to be fair.)

And all of my these operations use same KEY.

For this case, you might want to use flatMapValues() instead of flatMap(). For this case, Kafka Streams knows that the key did not change and thus it would not create a repartition topic.

Similarly, you might want to use mapValues() instead of map() if the key does not change to avoid unnecessary repartitioning.

My question is I can read from topic "topic read push-processing-KSTREAM-PEEK-0000000014-repartition" but I can not read when I say "topic read push-processing-KSTREAM-PEEK-0000000014-repartition --from-beginning"

I am not sure what you mean by this. What does

when I say "topic read push-processing-KSTREAM-PEEK-0000000014-repartition --from-beginning"

mean? Do you refer to the command line tool bin/kafka-consumer.sh? In general, yes, you can read from a repartition topic, but I am not sure why this would be useful?