I have a custom state calculation that is represented Set<Long>
and it will keep getting updated as my Datastream<Set<Long>>
sees new events from Kafka. Now, every time my state is updated I want to print the updated state to stdout. wondering how to do that in Flink? Little confused with all the window and trigger operations and I keep getting the following error.
Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
I just want to know how to print my aggregated stream Datastream<Set<Long>>
to stdout or write it back to another kafka topic?
Below is the snippet of the code that throws the error.
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings);
DataStream<Set<Long>> stream = bsTableEnv.toAppendStream(kafkaSourceTable, Row.class)
stream
.aggregate(new MyCustomAggregation(100))
.process(new ProcessFunction<Set<Long>, Object>() {
@Override
public void processElement(Set<Long> value, Context ctx, Collector<Object> out) throws Exception {
System.out.println(value.toString());
}
});