1
votes

My question might cause some confusion so please see Description first. It might be helpful to identify my problem. I will add my Code later at the end of the question (Any suggestions regarding my code structure/implementation is also welcomed). Thank you for any help in advance!

My question:

  1. How to define multiple sinks in Flink Batch processing without having it get data from one source repeatedly?

  2. What is the difference between createCollectionEnvironment() and getExecutionEnvironment() ? Which one should I use in local environment?

  3. What is the use of env.execute()? My code will output the result without this sentence. if I add this sentence it will pop an Exception:

-

Exception in thread "main" java.lang.RuntimeException: No new data sinks have been defined since the last execution. The last execution refers to the latest call to 'execute()', 'count()', 'collect()', or 'print()'. 
    at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:940) 
    at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:922) 
    at org.apache.flink.api.java.CollectionEnvironment.execute(CollectionEnvironment.java:34) 
    at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:816) 
    at MainClass.main(MainClass.java:114)

Description: New to programming. Recently I need to process some data (grouping data, calculating standard deviation, etc.) using Flink Batch processing. However I came to a point where I need to output two DataSet. The structure was something like this

From Source(Database) -> DataSet 1 (add index using zipWithIndex())-> DataSet 2 (do some calculation while keeping index) -> DataSet 3

First I output DataSet 2, the index is e.g. from 1 to 10000; And then I output DataSet 3 the index becomes from 10001 to 20000 although I did not change the value in any function. My guessing is when outputting DataSet 3 instead of using the result of previously calculated DataSet 2 it started from getting data from database again and then perform the calculation. With the use of ZipWithIndex() function it does not only give the wrong index number but also increase the connection to db.

I guess that this is relevant to the execution environment, as when I use

ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();

will give the "wrong" index number (10001-20000) and

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

will give the correct index number (1-10000) The time taken and number of database connections is different and the order of print will be reversed.

OS, DB, other environment details and versions: IntelliJ IDEA 2017.3.5 (Community Edition) Build #IC-173.4674.33, built on March 6, 2018 JRE: 1.8.0_152-release-1024-b15 amd64 JVM: OpenJDK 64-Bit Server VM by JetBrains s.r.o Windows 10 10.0

My Test code(Java):

public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();

    //Table is used to calculate the standard deviation as I figured that there is no such calculation in DataSet.
    BatchTableEnvironment tableEnvironment = TableEnvironment.getTableEnvironment(env);

    //Get Data from a mySql database
    DataSet<Row> dbData =
            env.createInput(
                    JDBCInputFormat.buildJDBCInputFormat()
                            .setDrivername("com.mysql.cj.jdbc.Driver")
                            .setDBUrl($database_url)
                            .setQuery("select value from $table_name where id =33")
                            .setUsername("username")
                            .setPassword("password")
                            .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.DOUBLE_TYPE_INFO))
                            .finish()
            );

    // Add index for assigning group (group capacity is 5)
    DataSet<Tuple2<Long, Row>> indexedData = DataSetUtils.zipWithIndex(dbData);

    // Replace index(long) with group number(int), and convert Row to double at the same time
    DataSet<Tuple2<Integer, Double>> rawData = indexedData.flatMap(new GroupAssigner());

    //Using groupBy() to combine individual data of each group into a list, while calculating the mean and range in each group
    //put them into a POJO named GroupDataClass
    DataSet<GroupDataClass> groupDS = rawData.groupBy("f0").combineGroup(new GroupCombineFunction<Tuple2<Integer, Double>, GroupDataClass>() {
        @Override
        public void combine(Iterable<Tuple2<Integer, Double>> iterable, Collector<GroupDataClass> collector) {
            Iterator<Tuple2<Integer, Double>> it = iterable.iterator();
            Tuple2<Integer, Double> var1 = it.next();
            int groupNum = var1.f0;

            // Using max and min to calculate range, using i and sum to calculate mean
            double max = var1.f1;
            double min = max;
            double sum = 0;
            int i = 1;

            // The list is to store individual value
            List<Double> list = new ArrayList<>();
            list.add(max);

            while (it.hasNext())
            {
                double next = it.next().f1;
                sum += next;
                i++;
                max = next > max ? next : max;
                min = next < min ? next : min;
                list.add(next);
            }

            //Store group number, mean, range, and 5 individual values within the group
            collector.collect(new GroupDataClass(groupNum, sum / i, max - min, list));
        }
    });

    //print because if no sink is created, Flink will not even perform the calculation.
    groupDS.print();


    // Get the max group number and range in each group to calculate average range
    // if group number start with 1 then the maximum of group number equals to the number of group
    // However, because this is the second sink, data will flow from source again, which will double the group number
    DataSet<Tuple2<Integer, Double>> rangeDS = groupDS.map(new MapFunction<GroupDataClass, Tuple2<Integer, Double>>() {
        @Override
        public Tuple2<Integer, Double> map(GroupDataClass in) {
            return new Tuple2<>(in.groupNum, in.range);
        }
    }).max(0).andSum(1);

    // collect and print as if no sink is created, Flink will not even perform the calculation.
    Tuple2<Integer, Double> rangeTuple = rangeDS.collect().get(0);
    double range = rangeTuple.f1/ rangeTuple.f0;
    System.out.println("range = " + range);
}

public static class GroupAssigner implements FlatMapFunction<Tuple2<Long, Row>, Tuple2<Integer, Double>> {
    @Override
    public void flatMap(Tuple2<Long, Row> input, Collector<Tuple2<Integer, Double>> out) {

        // index 1-5 will be assigned to group 1, index 6-10 will be assigned to group 2, etc.
        int n = new Long(input.f0 / 5).intValue() + 1;
        out.collect(new Tuple2<>(n, (Double) input.f1.getField(0)));
    }
}
1

1 Answers

3
votes
  1. It's fine to connect a source to multiple sink, the source gets executed only once and records get broadcasted to the multiple sinks. See this question Can Flink write results into multiple files (like Hadoop's MultipleOutputFormat)?

  2. getExecutionEnvironment is the right way to get the environment when you want to run your job. createCollectionEnvironment is a good way to play around and test. See the documentation

  3. The exception error message is very clear: if you call print or collect your data flow gets executed. So you have two choices:

  • Either you call print/collect at the end of your data flow and it gets executed and printed. That's good for testing stuff. Bear in mind you can only call collect/print once per data flow, otherwise it gets executed many time while it's not completely defined
  • Either you add a sink at the end of your data flow and call env.execute(). That's what you want to do once your flow is in a more mature shape.