0
votes

I'm new to Spark and currently battling a problem related to save the result of a Spark Stream to file after Context time. So the issue is: I want a query to run for 60seconds and save all the input it reads in that time to a file and also be able to define the file name for future processing.

Initially I thought the code below would be the way to go:

sc.socketTextStream("localhost", 12345)
                .foreachRDD(rdd -> {
                    rdd.saveAsTextFile("./test");
                });

However, after running, I realized that not only it saved a different file for each input read - (imagine that I have random numbers generating at random pace at that port), so if in one second it read 1 the file would contain 1 number, but if it read more the file would have them, instead of writing just one file with all the values from that 60s timeframe - but also I wasn't able to name the file, since the argument inside saveAsTextFile was the desired directory.

So I would like to ask if there is any spark native solution so I don't have to solve it by "java tricks", like this:

sc.socketTextStream("localhost", 12345)
                .foreachRDD(rdd -> {
                    PrintWriter out = new PrintWriter("./logs/votes["+dtf.format(LocalDateTime.now().minusMinutes(2))+","+dtf.format(LocalDateTime.now())+"].txt");
                    List<String> l = rdd.collect();
                    for(String voto: l)
                        out.println(voto + "    "+dtf.format(LocalDateTime.now()));
                    out.close();
                });

I searched the spark documentation of similar problems but was unable to find a solution :/ Thanks for your time :)

1
collect is never a trick - thebluephantom
What I meant with trick is using java default PrintWriter to save a string to file, instead of using (what I assume must exist) a spark solution. TBH I'm having some trouble understanding how foreachRDD works since in the case show above with saveAsTextFile it only saves one value, but in other scenarios it worked with all data - Pedro Lima
things are what they are with Spark - thebluephantom

1 Answers

1
votes

Below is the template to consume socket stream data using new Spark APIs.

import org.apache.spark.sql.streaming.{OutputMode, Trigger}

object ReadSocket {

  def main(args: Array[String]): Unit = {
    val spark = Constant.getSparkSess

    //Start reading from socket
    val dfStream = spark.readStream
      .format("socket")
      .option("host","127.0.0.1") // Replace it your socket host
      .option("port","9090")
      .load()

    dfStream.writeStream
      .trigger(Trigger.ProcessingTime("1 minute")) // Will trigger 1 minute
      .outputMode(OutputMode.Append) // Batch will processed for the data arrived in last 1 minute
      .foreachBatch((ds,id) => { //
        ds.foreach(row => { // Iterdate your data set
          //Put around your File generation logic
          println(row.getString(0)) // Thats your record
        })
      }).start().awaitTermination()
  }

}

For code explanation please read read the code inline comments

Java Version

import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.streaming.Trigger;

public class ReadSocketJ {

    public static void main(String[] args) throws StreamingQueryException {
        SparkSession spark = Constant.getSparkSess();


        Dataset<Row> lines = spark
                .readStream()
                .format("socket")
                .option("host", "127.0.0.1") // Replace it your socket host
                .option("port", "9090")
                .load();

        lines.writeStream()
                .trigger(Trigger.ProcessingTime("5 seconds"))
                .foreachBatch((VoidFunction2<Dataset<Row>, Long>) (v1, v2) -> {
                    v1.as(Encoders.STRING())
                            .collectAsList().forEach(System.out::println);
                }).start().awaitTermination();


    }
}