Good Day
I am receiving data (stream data) through flink/kafka. The port I am connecting to is the same port I need to write a message back to
TCP/IP -> Flink/Kafka Consumer -> Process Data -> Send Result to kafka topic and back to TCP/IP connection
// 1. Connect to TCP Stream (TCP (Socket) -> Kafka Stream INPUT)
val consumer_stream = env.socketTextStream(url, port, '\n')
// 2. Processing Data
.....
// 3. Write result to kafka topic
consumer_stream.addSink(new FlinkKafkaProducer09[String](broker_url, topic_name, new SimpleStringSchema()))
// 4. Send result back to connected url ie.(Ref Step 1 URL) (url+port)
(This is where I need Assistance)
Connecting to URL and port works. I receive and process the data I write to the topic Now I also need to write back to the same url and port that I am connected to.{Since the Url and Port can send and receive data at the same time}
I got it to write to another port
// write to Different PORT
val socket_write: DataStreamSink[String] = out_data.writeToSocket(url, diff_port, new SimpleStringSchema())
This works... The problem is trying to write to the same port. When I use the same port that I am reading in from... The flink job fails
Any Ideas
Regards
SocketTextStreamFunction
, then it won't be easily possible, becauseSocketTextStreamFunction
does not expose the socket. Opening a new connection and writing the data back should, however, work. What is the exception with which Flink is failing? – Till Rohrmann