0
votes

I have a custom state calculation that is represented Set<Long> and it will keep getting updated as my Datastream<Set<Long>> sees new events from Kafka. Now, every time my state is updated I want to print the updated state to stdout. wondering how to do that in Flink? Little confused with all the window and trigger operations and I keep getting the following error.

Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?

I just want to know how to print my aggregated stream Datastream<Set<Long>> to stdout or write it back to another kafka topic?

Below is the snippet of the code that throws the error.

StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings);

DataStream<Set<Long>> stream = bsTableEnv.toAppendStream(kafkaSourceTable, Row.class)
   stream
      .aggregate(new MyCustomAggregation(100))
      .process(new ProcessFunction<Set<Long>, Object>() {
       @Override
         public void processElement(Set<Long> value, Context ctx, Collector<Object> out) throws Exception {
           System.out.println(value.toString());
         }
       });
1
Please explain in more detail what you want to accomplish. Outputting the entire Set after every event is going to be expensive, especially if the Set is growing with each event. Is this for debugging, or ???David Anderson
yes you got it! It is mainly for debugging so outputting every second is also good for me! Don't need an output after every event.user1870400
I can't figure out what's going on from the code you've shared. The error about timestamps and watermarks is only thrown from Flink's windowing code, and I don't see any windows. Also, there is no aggregate method on DataStreams -- only on windows. I think the printing would probably work, but the job is failing before getting there.David Anderson

1 Answers

0
votes

Keeping collections in state with Flink can be very expensive, because in some cases the collection will be frequently serialized and deserialized. When possible it is preferred to the use Flink's built-in ListState and MapState types.

Here's an example illustrating a few things:

public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env =
        StreamExecutionEnvironment.getExecutionEnvironment();

    env.fromElements(1L, 2L, 3L, 4L, 3L, 2L, 1L, 0L)
        .keyBy(x -> 1)
        .process(new KeyedProcessFunction<Integer, Long, List<Long>> () {
            private transient MapState<Long, Boolean> set;

            @Override
            public void open(Configuration parameters) throws Exception {
                set = getRuntimeContext().getMapState(new MapStateDescriptor<>("set", Long.class, Boolean.class));
            }

            @Override
            public void processElement(Long x, Context context, Collector<List<Long>> out) throws Exception {
                if (set.contains(x)) {
                    System.out.println("set contains " + x);
                } else {
                    set.put(x, true);
                    List<Long> list = new ArrayList<>();
                    Iterator<Long> iter = set.keys().iterator();
                    iter.forEachRemaining(list::add);
                    out.collect(list);
                }
            }
        })
        .print();

    env.execute();

}

Note that I wanted to use keyed state, but didn't have anything in the events to use as a key, so I just keyed the stream by a constant. This is normally not a good idea, as it prevents the processing from being done in parallel -- but since you are aggregating as a Set, that's not something you can do in parallel, so no harm done.

I'm representing the set of Longs as the keys of a MapState object. And when I want to output the set, I collect it as a List. When I just want to print something for debugging, I just use System.out.

What I see when I run this job in my IDE is this:

[1]
[1, 2]
[1, 2, 3]
[1, 2, 3, 4]
set contains 3
set contains 2
set contains 1
[0, 1, 2, 3, 4]

If you'd rather see what's in the MapState every second, you can use a Timer in the process function.