In a recent deleted post I have asked the following questions:
I am trying to write a multi-threading program which implements the Producer/Consumer model. Typically, I want to use one Producer which read lines from a file and put them in a BlockingQueue, and have multiple Consumers do some processing after retrieving the lines from the BlockingQueue and store the results in a new file.
I am hoping if you can give me some feedback on what I should consider to achieve high performance. I've spent weeks on reading about concurrency and synchronization because I don't want to miss anything, but I am looking for some external feed-back. Please find below the points I need information about.
- what type of BlockingQueue implementations should I use for better performance? I can't use a fixed sized BlockingQueue because we don't know how many lines on file. Or should I use it even if the Producer will be locked? (if the Queue if full)
- If 'f()' is the method that the Producers use to process the file lines. Knowing that I am using a BlockingQueue, should I synchronize f()? If yes, isn't that going to affect my application? because Other Consumers will have wait for the release of the lock.
I hope I didn't say anything wrong.
You have advised to implement something before asking questions, so I deleted the post and tried to implement the model. Here is my code.
The Producer where I have one thread read from a file and put them in a BlockingQueue.
class Producer implements Runnable {
private String location;
private BlockingQueue<String> blockingQueue;
private float numline=0;
protected transient BufferedReader bufferedReader;
protected transient BufferedWriter bufferedWriter;
public Producer (String location, BlockingQueue<String> blockingQueue) {
this.location=location;
this.blockingQueue=blockingQueue;
try {
bufferedReader = new BufferedReader(new FileReader(location));
// Create the file where the processed lines will be stored
createCluster();
} catch (FileNotFoundException e1) {
e1.printStackTrace();
}
}
@Override
public void run() {
String line=null;
try {
while ((line = bufferedReader.readLine()) != null) {
// Count the read lines
numline++;
blockingQueue.put(line);
}
} catch (IOException e) {
System.out.println("Problem reading the log file!");
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void createCluster () {
try {
String clusterName=location+".csv";
bufferedWriter = new BufferedWriter(new FileWriter(clusterName, true));
bufferedWriter.write("\n");
} catch (IOException e) {
e.printStackTrace();
}
}
}
The Consumer where multiple threads will the from the BlockingQueue and do some processing 'f()' and store the results in a new file.
class Consumer implements Runnable {
private String location;
private BlockingQueue<String> blockingQueue;
protected transient BufferedWriter bufferedWriter;
private String clusterName;
public Consumer (String location, BlockingQueue<String> blockingQueue) {
this.blockingQueue=blockingQueue;
this.location=location;
clusterName=location+".csv";
}
@Override
public void run() {
while (true) {
try {
//Retrieve the lines
String line = blockingQueue.take();
String result = doNormalize (line);
// TO DO
//
//bufferedWriter = new BufferedWriter(new FileWriter(clusterName, true));
//BufferedWriter.write(result+ "\n");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//Pattern pattern, Matcher matcher
private String doNormalize(String line){
String rules [] = getRules(); // return an array of Regex
String tmp="";
for (String rule : rules) {
Pattern pattern = Pattern.compile(rule);
Matcher matcher = pattern.matcher(line);
if (matcher.find()){
Set<String> namedGroups = getNamedGroupCandidates(rule);
Iterator<String> itr = namedGroups.iterator();
while(itr.hasNext()){
String value=itr.next();
tmp=tmp+matcher.group(value)+", ";
}
tmp = tmp + "\t";
break;
}
}
return tmp;
}
private Set<String> getNamedGroupCandidates(String regex) {
Set<String> namedGroups = new TreeSet<String>();
Matcher m = Pattern.compile("\\(\\?<([a-zA-Z][a-zA-Z0-9]*)>").matcher(regex);
while (m.find()) {
namedGroups.add(m.group(1));
}
return namedGroups;
}
}
and the code in my main class. that uses 1 Producer and 3 Consumers
BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);
Producer readingThread = new Producer(location, queue);
new Thread(readingThread).start();
Consumer normalizers = new Consumer(location,queue);
ExecutorService executor = Executors.newFixedThreadPool(3);
for (int i = 1; i <= 3; i++) {
executor.submit(normalizers);
}
System.out.println("Stopped");
executor.shutdown();
I know my code is incomplete since I need to close and flush the reader and writes, etc. But can you tell me the mistakes I made so far in implementing the Producer/Consumer Model ? And Also on the method f(), it is a method that process a line and produces a result, I don't think I should synchronize it because I want all the consumers to use at the same time.
EDIT
Finally, this post really confused me, it suggests that if Consumers store the results the on file, it will slow down the process. This might a problem because I want performance and speed.
Bests,