2
votes

In Apache Flink, I run a task on a remote cluster. I have a task containing the following command. I was wondering, why Flink web client consider my task as several different job in the completed jobs?

personRecords =    env.readCsvFile("path to input file/dataset1.csv")
                .lineDelimiter("\n").ignoreFirstLine()
                .fieldDelimiter(",")           
                .includeFields("111")
                .types(Integer.class, String.class, String.class);

pData = personRecords.map(new cleanerMap());
pData = pData.sortPartition(3, Order.ASCENDING).setParallelism(1);
env.setParallelism(4);
MultiKey_List = partitionedData.collect();
...
pData = p.partitionCustom(new MyPartitioner(), 3);
DataSet<Tuple2<Integer, Integer>> dPairs =    pData.mapPartition(new  Calc());
dPairs = dPairs.flatMap(new TC(dPairs.collect())).setParallelism(1);
env.execute();
1

1 Answers

3
votes

It is because the collect command triggers the execution of the job up to the point where you call it. Thus, it is unlike almost all other api calls eagerly evaluated. The other method which triggers the execution of the job is count.

All other sinks which you define after the collect will be executed by a subsequent collect call or when you call ExecutionEnvironment.execute.