I am using apache flink(v1.10.0) to compute RabbitMQ message, the sink the result to MySQL, now I am compute like this:
consumeRecord.keyBy("gameType")
.timeWindowAll(Time.seconds(5))
.reduce((d1, d2) -> {
d1.setRealPumpAmount(d1.getRealPumpAmount() + d2.getRealPumpAmount());
d1.setPumpAmount(d1.getPumpAmount() + d2.getPumpAmount());
return d1;
})
.addSink(new SinkFunction<ReportPump>() {
@Override
public void invoke(ReportPump value, Context context) throws Exception {
// save to mysql
}
});
But now the sink method only get one row in each invoke, if one of rows in this batch failed,I could not rollback the batch operate.Now I want to get batch of one window and sink to database once, if failed, I rollback the insert and Apache Flink's checkpoint.This is what I trying to do now:
consumeRecord.keyBy("gameType")
.timeWindowAll(Time.seconds(5)).reduce(new ReduceFunction<ReportPump>() {
@Override
public ReportPump reduce(ReportPump d1, ReportPump d2) throws Exception {
d1.setRealPumpAmount(d1.getRealPumpAmount() + d2.getRealPumpAmount());
d1.setPumpAmount(d1.getPumpAmount() + d2.getPumpAmount());
return d1;
}
})
.apply(new AllWindowFunction<ReportPump, List<ReportPump>, TimeWindow>() {
@Override
public void apply(TimeWindow window, Iterable<ReportPump> values, Collector<List<ReportPump>> out) throws Exception {
ArrayList<ReportPump> employees = Lists.newArrayList(values);
if (employees.size() > 0) {
out.collect(employees);
}
}
})
.addSink(new SinkFunction<List<ReportPump>>() {
@Override
public void invoke(List<ReportPump> value, Context context) throws Exception {
PumpRealtimeHandler.invoke(value);
}
});
but the apply function give tips: Cannot resolve method 'apply' in 'SingleOutputStreamOperator'
. How to reduce it and get list of the batch data and flush to database only once?