I am develping a Spark Streaming application in which I need to use the input streams from two servers in Python, each sending a JSON message per second to the Spark Context.
My problem is, if I perform operations on just one stream, everything works well. But if I have two streams from different servers, then Spark freezes just before it can print anything, and only starts working again when both servers have sent all the JSON messages they had to send (when it detects that 'socketTextStream is not receiving data.
Here is my code:
JavaReceiverInputDStream<String> streamData1 = ssc.socketTextStream("localhost",996,
StorageLevels.MEMORY_AND_DISK_SER);
JavaReceiverInputDStream<String> streamData2 = ssc.socketTextStream("localhost", 9995,StorageLevels.MEMORY_AND_DISK_SER);
JavaPairDStream<Integer, String> dataStream1= streamData1.mapToPair(new PairFunction<String, Integer, String>() {
public Tuple2<Integer, String> call(String stream) throws Exception {
Tuple2<Integer,String> streamPair= new Tuple2<Integer, String>(1, stream);
return streamPair;
}
});
JavaPairDStream<Integer, String> dataStream2= streamData2.mapToPair(new PairFunction<String, Integer, String>() {
public Tuple2<Integer, String> call(String stream) throws Exception {
Tuple2<Integer,String> streamPair= new Tuple2<Integer, String>(2, stream);
return streamPair;
}
});
dataStream2.print(); //for example
Notice that there are no ERROR messages, Spark simple freezes after starting the context, and while I get JSON messages from the ports it doesn't show anything.
Thank you very much.