0
votes

Getting Exception in thread "main" java.lang.IndexOutOfBoundsException in Apache Flink. I have tried to follow the example at here, however I get mentioned exemption. I have a snippet of my main class below. I will appreciate any pointer on what I have done wrong or how to troubleshoot the error.

DataStream<CSBEvent> inputEventStream = env.addSource(
                new FlinkKafkaConsumer09<CSBEvent>("tester03", new EventDeserializationSchema(), properties));

 Pattern<CSBEvent, ?> warningPattern = Pattern.<CSBEvent>begin("first")
                .subtype(CSBEvent.class)
                .where(evt -> evt.getEvent().getStatus().equals("FAILED"))
                .next("second")
                .subtype(CSBEvent.class)
                .where(evt -> evt.getEvent().getStatus().equals("FAILED"))
                .within(Time.seconds(10));

        // Create a pattern stream from our warning pattern
        PatternStream<CSBEvent> tempPatternStream = CEP.pattern(inputEventStream.keyBy("userID"), warningPattern); //   .pattern(

        // Generate temperature warnings for each matched warning pattern
        DataStream<CSBEvent> warnings = tempPatternStream.select(
            (Map<String, CSBEvent> pattern) -> {
                CSBEvent first = (CSBEvent) pattern.get("first");
                CSBEvent second = (CSBEvent) pattern.get("second");

                return new CSBEvent(second.getUserID());
            }
        );

        // Alert pattern: Two consecutive request failure warnings appearing within a time interval of 20 seconds
        Pattern<CSBEvent, ?> alertPattern = Pattern.<CSBEvent>begin("first")
                .next("second")
                .within(Time.seconds(20));

        // Create a pattern stream from our alert pattern
        PatternStream<CSBEvent> alertPatternStream = CEP.pattern(
                warnings.keyBy("userID"),
                alertPattern);

.... here is the stacktrace:

Exception in thread "main" java.lang.IndexOutOfBoundsException
at org.apache.flink.api.java.typeutils.PojoTypeInfo.getTypeAt(PojoTypeInfo.java:259)
at org.apache.flink.streaming.util.keys.KeySelectorUtil.getSelectorForKeys(KeySelectorUtil.java:62)
at org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:276)
at org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:272)
at de.cep.CorrelatorVersion101.main(CorrelatorVersion101.java:61)
1

1 Answers

0
votes

From you code it looks like you are using Flink 1.2 right? Have you considered updating to a newer version? Currently we are close to releasing Flink 1.4.

In addition, can you post the whole stack trace to see where the exception is thrown?