0
votes

I have a mix-and-match DSL-PAPI topology. The DSL part joins pageviews("pageviews" topic) with users ("users" topic) of those pageviews. I want to join both, so in case the user is new, then create a new "user" from pvs information into the "users" topic, and do nothing otherwise.

So I'm trying to do a left join between pageviews and users, and in case the user comes null, that means no user was created yet with this key, so in that case I create one.

In code, I get pageviews as stream and user as table, joined them producing new User when user comes null in the join, and then filtering and sending to "users" those new users.

    val builder = new StreamsBuilder()
    val pageviewsTopic: KStream[Key, Pageview] = builder.stream("pageviews")
      .map((muipk, pageview) => (new MerchantUserPartitionKey(muipk.merchantSiteId, muipk.uid) -> pageview))

    val usersTopic: KTable[MerchantUserPartitionKey, user] = builder.table("users")

    val joinedPageviewsWithUsers: KStream[MerchantUserPartitionKey, User] =
      pageviewsTopic.leftJoin(
        usersTopic,
        new ValueJoiner[Pageview, User, User] {
          override def apply(pageview: Pageview, user: User): User = {
            logger.info("JOIN PAGEVIEW-user")
            if (user == null) {
              new User(UUIDUtils.generateRandomId(), pageview.uid /*, some other data */)
            } else {
              logger.info("user already created.")
              null
            }
          }
        })
    // Generate users.
    joinedPageviewsWithUsers.
      filter((key, user) => user != null ).
      to("users")

DSL topology generated looks like this:

  Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [pageviews])
      --> KSTREAM-MAP-0000000001
    Processor: KSTREAM-MAP-0000000001 (stores: [])
      --> KSTREAM-FILTER-0000000006
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-FILTER-0000000006 (stores: [])
      --> KSTREAM-SINK-0000000005
      <-- KSTREAM-MAP-0000000001
    Sink: KSTREAM-SINK-0000000005 (topic: KSTREAM-MAP-0000000001-repartition)
      <-- KSTREAM-FILTER-0000000006
  Sub-topology: 1
    Source: KSTREAM-SOURCE-0000000007 (topics: [KSTREAM-MAP-0000000001-repartition])
      --> KSTREAM-LEFTJOIN-0000000008
    Processor: KSTREAM-LEFTJOIN-0000000008 (stores: [users-STATE-STORE-0000000002])
      --> KSTREAM-FILTER-0000000009
      <-- KSTREAM-SOURCE-0000000007
    Processor: KSTREAM-FILTER-0000000009 (stores: [])
      --> KSTREAM-SINK-0000000010
      <-- KSTREAM-LEFTJOIN-0000000008
    Source: KSTREAM-SOURCE-0000000003 (topics: [users])
      --> KTABLE-SOURCE-0000000004
    Sink: KSTREAM-SINK-0000000010 (topic: users)
      <-- KSTREAM-FILTER-0000000009
    Processor: KTABLE-SOURCE-0000000004 (stores: [user-STATE-STORE-0000000002])
      --> none
      <-- KSTREAM-SOURCE-0000000003

However, when running this for several pageviews with same key, the "users" create new users, but it always joins with "null". So, it looks like the store is not updated with the newly generated data in "users" topic, even if it shows using the user-STATE-STORE-0000000002.

Do you need to do something extra to get data into the store? Is this somehow a KafkaStreams anti-pattern (write to the topic you're previously joining with)?

UPDATE with more info:

  • Keys are not null
  • ValueJoiner code is executed (printouts are shown), only that user value comes always null.
  • Users are written to "users" topic (in this case, by logic it does it every time it enters the ValueJoiner, since it always finds the outer value to be null and so it inserts the user into "users")
1
Do you have any null keys? Did you verify that apply is called? Did you verify if data is written into topic "users" ? - Matthias J. Sax
Answered in an update to clarify (basically "no", "yes", "yes") - xmar
Can you check the lag for topics "users" -- did the KTable catch up? Can you use Interactive Queries to check if the table was updated? - Matthias J. Sax
Sorry, rebuilt the cluster already and re-worked with PAPI. However by your answer I understand this was supported, is it? Lag on users topic: how does the consumption for this topic to feed StateStore appear when checking for the application consumer-group? Also, I was not using IQ here. What was your hypothesis? - xmar
Yes, this should work as expected. Not sure atm what might have been the problem. If you read a topic as a KTable, you can monitor the lag as for any other topic (application.id == group.id). - Matthias J. Sax

1 Answers

0
votes

When a stream is in a subtopology that looks up into a table that is in another subtopology, then there may be regular consumption/production delays involved. This happens for example when you define streams or tables from topics directly. If you can use more meaningful directives like through (which writes to topic but lets topology know it's going to still be used in this topology) it will help KafkaStreams to know how there is such relation.