I'm new to Spark and currently battling a problem related to save the result of a Spark Stream to file after Context time. So the issue is: I want a query to run for 60seconds and save all the input it reads in that time to a file and also be able to define the file name for future processing.
Initially I thought the code below would be the way to go:
sc.socketTextStream("localhost", 12345)
.foreachRDD(rdd -> {
rdd.saveAsTextFile("./test");
});
However, after running, I realized that not only it saved a different file for each input read - (imagine that I have random numbers generating at random pace at that port), so if in one second it read 1 the file would contain 1 number, but if it read more the file would have them, instead of writing just one file with all the values from that 60s timeframe - but also I wasn't able to name the file, since the argument inside saveAsTextFile was the desired directory.
So I would like to ask if there is any spark native solution so I don't have to solve it by "java tricks", like this:
sc.socketTextStream("localhost", 12345)
.foreachRDD(rdd -> {
PrintWriter out = new PrintWriter("./logs/votes["+dtf.format(LocalDateTime.now().minusMinutes(2))+","+dtf.format(LocalDateTime.now())+"].txt");
List<String> l = rdd.collect();
for(String voto: l)
out.println(voto + " "+dtf.format(LocalDateTime.now()));
out.close();
});
I searched the spark documentation of similar problems but was unable to find a solution :/ Thanks for your time :)