0
votes

I have the following problem, and I am not sure how to design parts of the solution:

I have a large text file that I read line by line. I need to process each line and update a HashMap.

AFAIK I need one producer thread to read the lines from the file, and dispatch the lines to a pool of consumer threads. The consumer threads should update the ConcurrentHashMap and then get new lines.

My questions are: How can the consumer threads access the ConcurrentHashMap? If I use a fixed thread pool, does the producer need to add the line to a queue first, or can it simply submit or execute a new consumer?

EDIT: Zim-Zam is correct; I want the consumers to dump their results into the ConcurrentHashMap when they finish.

I create the ConcurrentHashMap in the main thread, and pass references to it to the Consumers in their constructors. The Consumers should either add or increment an AtomicInteger in their run methods. How can I tell in the main thread when all of the lines are read and the consumers are finished?

Thanks again.

4
why do you want to use concurrenthashmap ? queues are better suited for this type of usage.Peeyush

4 Answers

1
votes

You can either have all of the consumers share the same queue that the producer adds to, or else you can give each consumer its own queue that the producer accesses via a circular linked list or a similar data structure so that each consumer's queue receives more or less the same amount of data (e.g. if you have 3 consumers, then the producer would add data to queue1, then queue2, then queue3, then queue1, etc).

You can give each consumer a reference to the same ConcurrentHashMap (e.g. in the consumer's constructor), or else you can make the ConcurrentHashMap accessible via a static getter method.

1
votes

I think you don't really need to use producer consumer queue in the way you suggested.

Simply have the main queue reading the file, and for each line you read, create a corresponding Runnable object (treat it as a command) and put it to the thread pool executor. The content of the Runnable object is simply the logic of handle that line and putting result to the concurrentHashMap

The ThreadPoolExecutor can be created with a bounded or unbounded blocking queue, depends on the behavior you want.

In pseudo code it is something like this:

class LineHandler implements Runnable {
    String line;
    ConcurrentHashMap resultMap;
    public LineHandler(String line, ConcurrentHashMap resultMap) {
        this.line = line;
        this.resultMap = resultMap;
    }

    @Override
    public void run() {
        // work on line
        // update resultMap
    }
}

// logic in your file reader thread, supposed to be in a loop:

while (moreLinesInFile()) {
    String line = readFromFile();
    threadPoolExecutor.submit(new LineHandler(line, concurrentHashMap));
}

threadPoolExecutor.shutdown();
0
votes

Use a CountDownLatch.

// in main thread
// assume consumers are in some kind of container
List<MyConsumer> consumers...
CountDownLatch latch = new CountDownLatch( consumers.size() );

for( MyConsumer c : consumers ) {
    c.setLatch( latch );
    c.start(); // starts asychronous, or submit to executor, whatever you're doing
}

// block main thread, optionally timing out
latch.await();


// Then in consumer when it's done it's work:
latch.countDown();
0
votes

I would suggest you use a BlockingQueue to store the to be processed lines.

After the main thread finished parsing the file, the main thread puts a poison object as the last object into the queue and waits with awaitTermination(...) for the consumers to finish.

The poison object is handled in a special way in a consumer thread. The consumer thread that processes the posion object attemts to shutdown() the ExecutorService, while the main thread is waiting.

As for the result of the consumers just add them to some threadsafe container. The producer/consumer problem is handled by the Queue: poll(...), put(...).

Hope i could help