0
votes

Good Day

I am receiving data (stream data) through flink/kafka. The port I am connecting to is the same port I need to write a message back to

TCP/IP -> Flink/Kafka Consumer -> Process Data -> Send Result to kafka topic and back to TCP/IP connection

// 1. Connect to TCP Stream (TCP (Socket) -> Kafka Stream INPUT)
val consumer_stream = env.socketTextStream(url, port, '\n')

// 2. Processing Data
.....

// 3. Write result to kafka topic 
consumer_stream.addSink(new FlinkKafkaProducer09[String](broker_url, topic_name, new SimpleStringSchema()))

// 4. Send result back to connected url ie.(Ref Step 1 URL) (url+port)
(This is where I need Assistance)

Connecting to URL and port works. I receive and process the data I write to the topic Now I also need to write back to the same url and port that I am connected to.{Since the Url and Port can send and receive data at the same time}

I got it to write to another port

// write to Different PORT
val socket_write: DataStreamSink[String] = out_data.writeToSocket(url, diff_port, new SimpleStringSchema())

This works... The problem is trying to write to the same port. When I use the same port that I am reading in from... The flink job fails

Any Ideas

Regards

1
Or Should I create a kafka producer with the same url. The problem is the TCP connection allows only so many connections so the idea is to use the same connection to receive dataSub Zero
If you want to reuse the input TCP connection from the SocketTextStreamFunction, then it won't be easily possible, because SocketTextStreamFunction does not expose the socket. Opening a new connection and writing the data back should, however, work. What is the exception with which Flink is failing?Till Rohrmann
On Flink Job Submission with same port for (Read and Write) these error pop up: 1)org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed. 2) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed 3) Caused by: java.net.ConnectException: Connection refused (Connection refused)Sub Zero
These errors don't appear when using different ports for reading and writing... Works on different portsSub Zero
Thanks for responding :) @TillRohrmannSub Zero

1 Answers

1
votes

You can use a customised SinkFunction to write the data back to the URL.

stream.addSink(new SinkFunction<String>() {
    // initialise the client to send the data
    public void invoke(String value) throws Exception {
        // send here.               
    }
}

Or with SocketClientSink

env.socketTextStream("localhost", 5555).map(x => { println(x); x }).addSink(new SocketClientSink[String]("localhost", 5555, new SimpleStringSchema))