I have a Flink DataStream of type DataStream[(String, somecaseclass)]. I want to group-by on the first field of the Tuple which is String and create a ListBuffer[somecaseclass]. Below is what I have tried:
val emptylistbuffer = new ListBuffer[somecaseclass]
inputstream
.keyBy(0)
.fold(emptylistbuffer){case(outputbuffer,b) => {outputbuffer+=b._2}}
But this gives me an output for each row, meaning if there are 10 input rows, the first output row is just concatenated on first row only, the tenth row gives me a concatenated output on ten rows. However, I would want just the tenth row. I checked almost all the transformations on Flink DataStream but nothing suits the use-case.
Input:
(filename1.dat,somecaseclass("abc","1",2))
(filename1.dat,somecaseclass("dse","2",3))
(filename1.dat,somecaseclass("daa","1",4))
Expected output:
(filename.dat,ListBuffer(somecaseclass("abc","1",2),somecaseclass("dse","2",3),somecaseclass("daa","1",4)))