0
votes

I have a NodeJS server responsible for streaming data from an API and pushing the data to a local TCP port, 8080, on which Apache Spark is listening.

const net = require('net');
const client = new net.Socket();
const axios = require('axios');

client.connect(8080, '127.0.0.1');
client.on('connect', async () => {
  const res = await axios.get('https://api.co.za', {
    responseType: 'stream',
  });
  res.data.on('data', chunk => {
    client.write(chunk);
  });
});

Then Apache Spark attempts to read data from that port.

import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.{ SparkConf, SparkContext }

object DataStream {
  def main(args: Array[String]) {
    val sparkConfig = new SparkConf()
      .setAppName("Data Stream")
      .setMaster(sys.env.get("spark.master")
      .getOrElse("local[*]"))
    val sparkContext = new SparkContext(sparkConfig)
    sparkContext.setLogLevel("ERROR")

    val streamingContext = new StreamingContext(sparkContext, Seconds(1))

    val data = streamingContext.socketTextStream("127.0.0.1", 8080)
    data.print()

    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

Then I open the port 8080 with netcat: nc -l 8080

Here's my problem, if I start my Node process first, it pushes data to the port but I do not see Spark reacting to the data. If I start Spark first, my Node process says its writing but I can't see data arriving at port 8080.

If I send data through directly through netcat after nc -l 8080, Spark has no problems reading it.

Is there some sort of client exclusivity happening with these local ports? Is there an alternative way of opening a port to be used this way?

OS: Ubuntu 19.10

1
Hello, when you must do something like that, you must understand some things very important for your programs : they need to collaborate together. You are trying to exchange data through an api. A server which is producing data must be accessible. Some clients could now ask datas through your application service. Spark is one of them. netcat is maybe not enough to resolve that and I think it is YES to your last question, whatever your system is. If I were you, I will looking for a server implementation in my application service for Sockets ;) - BendaThierry.com

1 Answers

0
votes

I see what you are trying to do, however, it seems that both your applications are acting as clients. If im not mistaken, Spark pulls data from the data source. You therefore need to change your NodeJS code to act as a server. I tried this on my end and it worked with pySpark:

const net = require("net");

const server = net.createServer(function (socket) {
  socket.write("Write you chunk here.");
  socket.end("You can also send a connection treatment string here.");
});

server.on("error", (err) => {
  console.log(`Error in server:\n${err}`);
});

server.listen(8080, "127.0.0.1", () => {
  console.log("Nodejs server ready to respond.");
});

In this way, the Spark application will send a request to the nodejs server and wait for the response. The spark application therefore pulls the data from the streaming source (In this case simulated by the NodeJS server). I did not try to run your Java code but given that you checked it with netcat, the problem is not in your java code. I used this python code for a simple word count application to validate and verify that it works:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)

# Listen for line on a server from a specific port
lines = ssc.socketTextStream("localhost", 8080)

# Split lines into words (using a flatMap, one to many)
words = lines.flatMap(lambda line: line.split(" "))

# Create count value pair
pairs = words.map(lambda word: (word, 1))

# Group by key
wordCounts = pairs.reduceByKey(lambda x, y: x+y)

# Print results
wordCounts.pprint()

ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate