0
votes

First of all I already have found this question here: flink program behaves differently in parallelism, and it looks the same issue that I'm facing right now, but I think I do need CEP in my scenario, because I have more than 1.000.000 records that belongs to a different user key per hour to analize.

So when I run cep with parallelism 1 everything works fine, even for different user keys, but a bit slow because flink needs to analyze user by user in a single Thread, and this operation needs to be faster enough to recognize some pattern and then send a notification in less than 1 minute for example, that's why I need more than 1 parallel thread.

In my case I'm working with RichFlatMapFunction to keep the previous pattern to recognize the next and then send the notification, here is my code:

final DataStream<EventPush> eventsStream = RabbitMQConnector.eventStreamObject(env)
                .flatMap(new RabbitMQPushConsumer())
                .keyBy(k -> k.id);

private static SingleOutputStreamOperator<String> getPushToSend(KeyedStream<EventPush, String> stream) {
        return stream.flatMap(new WebPushFlatMapFunction())
                .map(x -> new ObjectMapper().writeValueAsString(x));
    }

/*the code below belongs to WebPushFlatMapFunction class, which is the RichFlatMapFunction using ValueState*/

 private boolean inTime(long start, long end) {
        final long difference = (end > start) ? (end - start) : (start - end);
        long time_frame = 120000L;
        return difference > 0 && time_frame >= difference;
    }

    @Override
    public void flatMap(EventPush value, Collector<EventPush> out) {
        final String pageName= value.pageName.trim();
        Tuple4<String, String, Long, Timestamp> prev;
        try {
            prev = previous.value();
            if (b_pageName.equalsIgnoreCase(pattern)) {
                LOG.info("umid " + value.idsUmid + " match (" + pattern + ") at: " + value.timestamp);
                previous.update(new Tuple4<>(value.idsUmid, pageName, value.timestamp.getTime(), value.timestamp));
            }
            if (prev != null) {
                if (inTime(value.timestamp.getTime(), prev.f2)) {
                    if ((prev.f1 != null && !prev.f1.equals("")) && prev.f1.equals(full_pattern) && pageName.equals(home) && prev.f3.before(value.timestamp)) {
                        if (PropertyFileReader.isWebPushLoggerActivated())
                            LOG.info("umid " + value.idsUmid + " match (" + home + ")" + "triggered at: " + value.timestamp);
                        prev.f1 = "";
                        out.collect(value);
                    }
                    if ((prev.f1 != null && !prev.f1.equals("")) && prev.f1.equals(pattern) && pageName.equals(full_pattern) && prev.f3.before(value.timestamp)) {
                        LOG.info("umid " + value.idsUmid + " match (" + full_pattern + ") at: " + value.timestamp);
                        prev.f3 = value.timestamp;
                        prev.f1 = pageName;
                        previous.update(prev);
                    }
                }
            }
        } catch (IOException e) {
            CatchHandler.generalCatchHandler(e);
        }
    }

And with parallelism 1 I get the proper order: 1,2,3. With more than that I can receive 1 in one thread and 3 from another, as all the belongs to the same user key and these 3 states are going to be partitioned in different threads. My question: Is there anyhow to do this with more parallelism? Kind regards.

2

2 Answers

0
votes

It sounds like you want to keep all of the analysis for each user together, but perform analysis for different users in parallel. The way to do this is to key the stream by the userId. This does mean that for a single user, their events are being handled by a single (non-parallel) pipeline.

If that's too slow, there are probably things you can do to speed it up. The things that typically help the most include: more efficient serialization, doing pre-aggregation or incremental aggregation, removing keyBys or rebalances, and enabling object reuse.

0
votes

I have found the issue, is in the backend, which is assigning the same userId to a different users (don't know how but is happening and is not flink), that is why the pattern never match, because different users with the same userId sending events in different orders are treated in the same subtask, they get merge at the end and event1 from user1 can appear after event2 from user2 for example. Kind regards