0
votes

below is the pseudocode of my stream processing.

Datastream env = StreamExecutionEnvironment.getExecutionEnvironment()    
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

Datastream stream = env.addSource() .map(mapping to java object) 
    .filter(filter for specific type of events) 
    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(2)){})
    .timeWindowAll(Time.seconds(10));

//collect all records.
Datastream windowedStream = stream.apply(new AllWindowFunction(...))

Datastream processedStream = windowedStream.keyBy(...).reduce(...)

String outputPath = ""

final StreamingFileSink sink = StreamingFileSink.forRowFormat(...).build();

processedStream.addSink(sink)

The above code flow is creating multiple files and each file has records of different windows I guess. For example, records in each files have timestamps which ranges between 30-40 seconds, whereas window time is only 10 seconds. My expected output pattern is writing each window data to separate file. Any references or input on this would be of great help.

1
for the above question I understand that since I'm using streamingFileSink its writing all records to single file and since my machine multi core its creating multiple files. But I still need to make it work for writing different windowed stream output to different files. Any inputs on that would be helpful.user1506729

1 Answers

0
votes

Take a look at the BucketAssigner interface. It should be flexible enough to meet your needs. You just need to make sure that your stream events contain enough information to determine the path you want them written to.