1
votes

I am running flink from within eclipse where necessary jars have been fetched by Maven. My machine has a processor with eight cores and the streaming application I have to write reads lines from its input and calculates some statistics.

When I run the program on my machine, I expected flink to use all the cores of the CPU as well-threaded code. However, when I watch the cores, I see that only one core is being used. I tried many things and left in the following code my last try, i.e. setting the parallelism of the environment. I also tried to set it for the stream alone and so on.

public class SemSeMi {


    public static void main(String[] args) throws Exception {
        System.out.println("Starting Main!");

        System.out.println(org.apache.flink.core.fs.local.LocalFileSystem
                .getLocalFileSystem().getWorkingDirectory());

        StreamExecutionEnvironment env = StreamExecutionEnvironment
                .getExecutionEnvironment();

        env.setParallelism(8);

        env.socketTextStream("localhost", 9999).flatMap(new SplitterX());

        env.execute("Something");       
    }

    public static class SplitterX implements
            FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence,
                Collector<Tuple2<String, Integer>> out) throws Exception {
            // Do Nothing!

        }
    }
} 

I fed the programm with data using netcat:

 nc -lk 9999 < fileName

The question is how to make the program scale locally and use all available cores?

1

1 Answers

2
votes

You don't have to specify the degree of parallelism explicitly. Jobs which are run with the default setting will set the parallelism automatically to the number of available cores.

In your case, the source will be run with parallelism of 1 since reading from a socket cannot be distributed. However, for the flatMap operation the system will instantiate 8 instances. If you turn on logging, then you will also see it. Now the input data is distributed to the flatMap tasks in a round-robin fashion. Each of the flatMap tasks is executed by an individual thread.

I would suspect that the reason why you only see load on a single core is because the SplitterX does not do any work. Try the following code which counts the number of characters in each String and then prints the result to the console:

public static void main(String[] args) throws Exception {
    System.out.println("Starting Main!");

    System.out.println(org.apache.flink.core.fs.local.LocalFileSystem
        .getLocalFileSystem().getWorkingDirectory());

    StreamExecutionEnvironment env = StreamExecutionEnvironment
        .getExecutionEnvironment();

    env.socketTextStream("localhost", 9999).flatMap(new SplitterX()).print();

    env.execute("Something");
}

public static class SplitterX implements
    FlatMapFunction<String, Tuple2<String, Integer>> {
    @Override
    public void flatMap(String sentence,
                        Collector<Tuple2<String, Integer>> out) throws Exception {
        out.collect(Tuple2.of(sentence, sentence.length()));

    }
}

The numbers at the start of each line tell you which task printed the result.