I want to stop java streaming context in spark after processing 100 records in a file. The problem is the code in if statement does not executed when streaming starts. The following code will explain my idea:
public static void main(String[] args) throws Exception {
int ff = testSparkStreaming();
System.out.println("wqwqwq");
System.out.println(ff);
}
public static int testSparkStreaming() throws IOException, InterruptedException {
int numberInst = 0
String savePath = "Path to Model";
final NaiveBayesModel savedModel = NaiveBayesModel.load(jssc.sparkContext().sc(), savePath);
BufferedReader br = new BufferedReader(new FileReader("C://testStream//copy.csv"));
Queue<JavaRDD<String>> rddQueue = new LinkedList<JavaRDD<String>>();
List<String> list = Lists.newArrayList();
String line = "";
while ((line = br.readLine()) != null) {
list.add(line);
}
br.close();
rddQueue.add(jssc.sparkContext().parallelize(list));
numberInst+= list.size();
JavaDStream<String> dataStream = jssc.queueStream(rddQueue);
dataStream.print();
if (numberInst == 100){
System.out.println("should stop");
jssc.wait();
}
jssc.start();
jssc.awaitTermination();
return numberInst;
}
My question is how can I stop the streaming when numberInst == 100 and move the execution to main method to run the following statements.
P.S: in the previous code, If statement is not executed:
if (numberInst == 100){
System.out.println("should stop");
jssc.wait();
}