I am running flink from within eclipse where necessary jars have been fetched by Maven. My machine has a processor with eight cores and the streaming application I have to write reads lines from its input and calculates some statistics.
When I run the program on my machine, I expected flink to use all the cores of the CPU as well-threaded code. However, when I watch the cores, I see that only one core is being used. I tried many things and left in the following code my last try, i.e. setting the parallelism of the environment. I also tried to set it for the stream alone and so on.
public class SemSeMi {
public static void main(String[] args) throws Exception {
System.out.println("Starting Main!");
System.out.println(org.apache.flink.core.fs.local.LocalFileSystem
.getLocalFileSystem().getWorkingDirectory());
StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment();
env.setParallelism(8);
env.socketTextStream("localhost", 9999).flatMap(new SplitterX());
env.execute("Something");
}
public static class SplitterX implements
FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence,
Collector<Tuple2<String, Integer>> out) throws Exception {
// Do Nothing!
}
}
}
I fed the programm with data using netcat:
nc -lk 9999 < fileName
The question is how to make the program scale locally and use all available cores?