In the following unit test case, some event specified by numberOfElements is generated and fed as a data stream. This unit cases randomly fails at the line.
assertEquals(numberOfElements, CollectSink.values.size());
Any explanation why Apache Flink is skipping the events.
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import static java.lang.Thread.sleep;
import static org.junit.Assert.assertEquals;
public class FlinkTest {
StreamExecutionEnvironment env;
@Before
public void setup() {
env = StreamExecutionEnvironment.createLocalEnvironment();
}
@Test
public void testStream1() throws Exception {
testStream();
}
@Test
public void testStream2() throws Exception {
testStream();
}
@Test
public void testStream3() throws Exception {
testStream();
}
@Test
public void testStream4() throws Exception {
testStream();
}
@Test
public void testStream() throws Exception {
final int numberOfElements = 50;
DataStream<Tuple2<String, Integer>> tupleStream = env.fromCollection(getCollectionOfBucketImps(numberOfElements));
CollectSink.values.clear();
tupleStream.addSink(new CollectSink());
env.execute();
sleep(2000);
assertEquals(numberOfElements, getCollectionOfBucketImps(numberOfElements).size());
assertEquals(numberOfElements, CollectSink.values.size());
}
public static List<Tuple2<String, Integer>> getCollectionOfBucketImps(int numberOfElements) throws InterruptedException {
List<Tuple2<String, Integer>> records = new ArrayList<>();
for (int i = 0; i < numberOfElements; i++) {
records.add(new Tuple2<>(Integer.toString(i % 10), i));
}
return records;
}
// create a testing sink
private static class CollectSink implements SinkFunction<Tuple2<String, Integer>> {
public static final List<Tuple2<String, Integer>> values = new ArrayList<>();
@Override
public synchronized void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
values.add(value);
}
}
}
For examples either of testStreamX case fails randomly.
Context: The code runs with 8 as parallelism setu since the cpu where it runs has 8 Cores