0
votes

I want to stop java streaming context in spark after processing 100 records in a file. The problem is the code in if statement does not executed when streaming starts. The following code will explain my idea:

    public static void main(String[] args) throws Exception {

        int ff = testSparkStreaming();

        System.out.println("wqwqwq");
        System.out.println(ff);

    }


    public static int testSparkStreaming() throws IOException, InterruptedException {

        int numberInst = 0
        String savePath = "Path to Model";
        final NaiveBayesModel savedModel = NaiveBayesModel.load(jssc.sparkContext().sc(), savePath);

        BufferedReader br = new BufferedReader(new FileReader("C://testStream//copy.csv"));
        Queue<JavaRDD<String>> rddQueue = new LinkedList<JavaRDD<String>>();
        List<String> list = Lists.newArrayList();
        String line = "";
        while ((line = br.readLine()) != null) {
            list.add(line);
        }
        br.close();

        rddQueue.add(jssc.sparkContext().parallelize(list));
        numberInst+= list.size();
        JavaDStream<String> dataStream = jssc.queueStream(rddQueue);
        dataStream.print();

        if (numberInst == 100){
             System.out.println("should stop");
             jssc.wait();
        }
        jssc.start();
        jssc.awaitTermination();

        return numberInst;

}

My question is how can I stop the streaming when numberInst == 100 and move the execution to main method to run the following statements.

P.S: in the previous code, If statement is not executed:

        if (numberInst == 100){
             System.out.println("should stop");
             jssc.wait();
        }
2

2 Answers

2
votes

You can try this:

    jssc.start();

    while (numberInst < 100){
        jssc.awaitTerminationOrTimeout(1000); // 1 second polling time, you can change it as per your usecase
    }

    jssc.stop();
0
votes

Did you try stop this like a Thread , i mean with interrup.