3
votes

In a recent deleted post I have asked the following questions:


I am trying to write a multi-threading program which implements the Producer/Consumer model. Typically, I want to use one Producer which read lines from a file and put them in a BlockingQueue, and have multiple Consumers do some processing after retrieving the lines from the BlockingQueue and store the results in a new file.

I am hoping if you can give me some feedback on what I should consider to achieve high performance. I've spent weeks on reading about concurrency and synchronization because I don't want to miss anything, but I am looking for some external feed-back. Please find below the points I need information about.

  • what type of BlockingQueue implementations should I use for better performance? I can't use a fixed sized BlockingQueue because we don't know how many lines on file. Or should I use it even if the Producer will be locked? (if the Queue if full)
  • If 'f()' is the method that the Producers use to process the file lines. Knowing that I am using a BlockingQueue, should I synchronize f()? If yes, isn't that going to affect my application? because Other Consumers will have wait for the release of the lock.

I hope I didn't say anything wrong.


You have advised to implement something before asking questions, so I deleted the post and tried to implement the model. Here is my code.

The Producer where I have one thread read from a file and put them in a BlockingQueue.

class Producer implements Runnable {
    private String location;
    private BlockingQueue<String> blockingQueue;

    private float numline=0;


    protected transient BufferedReader bufferedReader;
    protected transient BufferedWriter bufferedWriter;


    public Producer (String location, BlockingQueue<String> blockingQueue) {
        this.location=location;
        this.blockingQueue=blockingQueue;

        try {
            bufferedReader = new BufferedReader(new FileReader(location));

            // Create the file where the processed lines will be stored
            createCluster();

        } catch (FileNotFoundException e1) {
            e1.printStackTrace();
        }
    }

    @Override
    public void run() {
        String line=null;
        try {
            while ((line = bufferedReader.readLine()) != null) {
                // Count the read lines
                numline++;
                blockingQueue.put(line);
            }
        } catch (IOException e) {
            System.out.println("Problem reading the log file!");
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

    public void createCluster () {
        try {
            String clusterName=location+".csv";
            bufferedWriter = new BufferedWriter(new FileWriter(clusterName, true));
            bufferedWriter.write("\n");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

The Consumer where multiple threads will the from the BlockingQueue and do some processing 'f()' and store the results in a new file.

class Consumer implements Runnable {
    private String location;
    private BlockingQueue<String> blockingQueue;

    protected transient BufferedWriter bufferedWriter;

    private String clusterName;

    public Consumer (String location, BlockingQueue<String> blockingQueue) {
        this.blockingQueue=blockingQueue;
        this.location=location;

        clusterName=location+".csv";
    }

    @Override
    public void run() {
        while (true) {
            try {
                //Retrieve the lines
                String line = blockingQueue.take();
                String result = doNormalize (line);
                // TO DO
                //
                //bufferedWriter = new BufferedWriter(new FileWriter(clusterName, true));
                //BufferedWriter.write(result+ "\n");

            } catch (InterruptedException e) {
                e.printStackTrace();
            } 
        }
    }

//Pattern pattern, Matcher matcher
    private String doNormalize(String line){
        String rules [] = getRules(); // return an array of Regex
        String tmp="";

        for (String rule : rules) {
            Pattern pattern = Pattern.compile(rule);
            Matcher matcher = pattern.matcher(line);

            if (matcher.find()){
                Set<String> namedGroups = getNamedGroupCandidates(rule);
                Iterator<String> itr = namedGroups.iterator();
                while(itr.hasNext()){
                    String value=itr.next();
                    tmp=tmp+matcher.group(value)+", ";
                }


        tmp = tmp + "\t";
                    break;
                }
            }
            return tmp;

        }
private Set<String> getNamedGroupCandidates(String regex) {
            Set<String> namedGroups = new TreeSet<String>();
            Matcher m = Pattern.compile("\\(\\?<([a-zA-Z][a-zA-Z0-9]*)>").matcher(regex);
            while (m.find()) {
                namedGroups.add(m.group(1));
            }
            return namedGroups;
        }
}

and the code in my main class. that uses 1 Producer and 3 Consumers

BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);

            Producer readingThread = new Producer(location, queue);
            new Thread(readingThread).start();

            Consumer normalizers = new Consumer(location,queue);
            ExecutorService executor = Executors.newFixedThreadPool(3);
            for (int i = 1; i <= 3; i++) {
                executor.submit(normalizers);
            }
            System.out.println("Stopped");
            executor.shutdown();

I know my code is incomplete since I need to close and flush the reader and writes, etc. But can you tell me the mistakes I made so far in implementing the Producer/Consumer Model ? And Also on the method f(), it is a method that process a line and produces a result, I don't think I should synchronize it because I want all the consumers to use at the same time.

EDIT

Finally, this post really confused me, it suggests that if Consumers store the results the on file, it will slow down the process. This might a problem because I want performance and speed.

Bests,

1
syncrhonization is needed when reading or writing to shared mutable state. You haven't posted anything of your f() method, so we can't possibly know if it must be synchronized or not. If it's a pure function that takes a String, computes somehign and returns it, then it doesn't access any shared state, so it doesn't need any synchronization. What does the f() method do? What makes you think that you need multithreading in the first place? Multi-threading a program doesn't automatically make it faster. Many times, it actually makes it slower.JB Nizet
Thank for your answer. f() method is parsing the content of the file using regular expressions (See Edit) I don't think it has any shared state since is just applies regular expressions on the Queue element. But because my code is using regular expressions on constantly generated elements, this task is process consuming, so researchers and developers use multi-threading to 'speed up' the processing, and I agree, often multi-threading makes it a lot slower. so is it the case for my code ?user8751092
The first two obvious optimizations you should do, way before multithreading, is to avoid compiling the same regexps again and again and again. Make getRules() return a list of Pattern, compiled once and only once. Do the same for the regex used in getNamedGroupCandidates: compile it once and for all. And use a StringBuilder to produce your strings, instead of String concatenation.JB Nizet
so is it the case for my code ?: you are the only one to be able to answer that question. Make a good single-threaded version, measure, then make a good multi-threaded version, measure, then compare the results.JB Nizet
Thank very much. I am working on the optimizations you've suggested, I didn't pay attention, sorry. I will edit my code once I finish to get your feedback if it's okay. measure, then compare the results. Actually that's what I am trying to do: measuring the performance using multi-threading (1 thread to above), I am suspecting if I add a lot of threads, my code will be slow down because many CPU time will be spent for blocking, but to do the comparison I need to write a good program. What about the post I linked, will the Consumers will slow down the process if the write on the same file?user8751092

1 Answers

0
votes

For my second problem: "The SingleConsumer to "know" that the multiple consumers have done consuming/processing all the lines.". I was inspired from this post combining this comment: Each consumer should send a "I terminated" message to queue 2, and if the single output consumer received all of these, it can also terminate.

So, for the Consumers; here is a what I wrote in the run() method:

@Override
public void run() {
// A Consumer keeps taking elements from the queue 1, as long as the Producer is
// producing and as long as queue 1 is not empty.
    while (true) {
        try {

            //Retrieve the lines
            String line = firstBlockingQueue.take(); 
If a special terminating value is found.
            if (line==POISON_PILL) {
// The consumer notifies other consumers and the SignleConsumer that operates on queue 2
// and then terminates.
                firstBlockingQueue.put(POISON_PILL);
                secondBlockingQueue.put(SINGLE_POISIN_PILL);
                return;
            }
            // Put the normalized events on the new Queue
            String result = doNormalize (line);
            if (result!=null) {
                secondBlockingQueue.put(result);
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        } 
    }
} 

As for the SinglerConsumer, it should count the "I finished processing" message sent by the Consumers or I am using it as a SINGLE_POISON_PILL. and terminates when that counter reaches the number of consumers in queue 1.

while (true) {
    try {
        //Retrieve the lines
        String line = secondBlockingQueue.take();
        if (line==SINGLE_POISIN_PILL) {

            setCounter(getCounter()+1);
            if (getCounter()== threadNumber) {
                System.out.println("All "+getCounter()+" threads have finished.  \n Stopping..");
                return;
            }
        }

        try {
            if (line != SINGLE_POISIN_PILL) {
                System.out.println(line);
                bufferedWriter.write(line+"\n");
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    } 
}

For my second problem, apparently all I had to do is add:

        if (line==SINGLE_POISIN_PILL) {
            setCounter(getCounter()+1);
            if (getCounter()== threadNumber) {
                System.out.println("All "+getCounter()+" threads have finished.  \n Stopping..");
                try {
         if (bufferedWriter != null) 
         {
             bufferedWriter.flush();
             bufferedWriter.close();
         }
     } catch (IOException e) {
         e.printStackTrace();
     }
                return;
            }
        }

Once I flushed and closed the buffer, the buffer starting writing.

Hoping for your feedback.