0
votes

As mentioned in Flink documentation, I was able to read text input from text server by opening a local socket using

amar@admin:~$ nc -l 12345

and then receiving it on Flink program using

DataStream<String> text = env.socketTextStream("localhost", 12345);

text.print();

env.execute();

However, as I am simulating some scenario, so I want to get Stream of Data from a VM (and then various VM's eventually) and send it to CEP program running on Host OS.

So, I have installed VM, using Vagrant and SSH into it using vagrant ssh

  1. the hostname of guest OS is precise64

  2. IP address using ifconfig = 10.0.2.15

Now, what I want to do, for now, is to see if I can send some data from VM and receive it in Flink program the same way I was able to do in the local environment.

I opened Netcat socket on guest os by using

vagrant@precise64:~$ nc -l 12345

and I tried to receive it on host program by using, but got error

DataStream<String> text = env.socketTextStream("precise64", 12345);

text.print();

env.execute();

I also tried [email protected] above, but I think I am doing it wrong.

any ideas, how should I approach to send DataStream from VM to Host Flink Program

Suggestions are most welcome, thanks in advance!

1

1 Answers

1
votes

You could try this:

1.Program:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object WindowWordCount {
  def main(args: Array[String]) {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val text = env.socketTextStream("localhost", 9999)

    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .map { (_, 1) }
      .keyBy(0)
      .timeWindow(Time.seconds(5))
      .sum(1)

    counts.print

    env.execute("Window Stream WordCount")
  }
}

2.After ran the above program.You could start this.

nc -lk 9999

This will work.