1
votes

I have a script that does these two jobs : A. Connect to databases and retrieve data from it. B. Process retrieved data.

And so I create two threads for each of them. I also utilize Parallel::ForkManager to spawn child processes and do job A so that I can connect to multiple databases at a time.

The problem I have been trying to solve is when I add a blocking call at ForkManager->wait_all_children. Job A is not able to run its callback on finish.

The callback function though will run if I remove my second job (B) from the @threads array. So I think I have a misunderstanding in using threads, but I thought that these two threads are not supposed to be blocking each other. If it is not, what may be the issue here that is causing my child processes to be blocked from finishing its job ?

On the design note, I cannot do job B each time job A is done because job B is very expensive and it may run for quite some time. And therefore it will be blocking me from getting all of job A to be done. So, I would rather have a separate thread that does a "bulk" processing for all of the retrieved data periodically.

Here is my sample code that reproduce the problem I have :

use Data::Dumper;
use Parallel::ForkManager;
use threads;
use threads::shared;

my $isRunningJobThread :shared;    

my @jobs = ('a', 'b', 'c', 'd', 'e', 'f', 'g');
my @threads = (
    threads->create(\&jobThread),
    threads->create(\&compileCompletedJobThread)
);

$_->join for @threads;

print "All done\n";

sub jobThread{
    my $pm = Parallel::ForkManager->new(5);
    $pm->run_on_finish(sub {
        print "Job done\n";
    });

    $isRunningJobThread = 1;
    foreach my $job (@jobs) {
        $pm->start and next;
        print "Do job for : $job\n";
        $pm->finish;
    }

    $pm->wait_all_children;
}

sub compileCompletedJobThread{
    while($isRunningJobThread) {
        sleep 10;

        print "Compiling completed jobs\n";
    }
}
1
Why are you trying to use threads and P::FM at the same time? I would only use one or the other. - ThisSuitIsBlackNot
Always fork before creating any objects. Especially before creating threads! Your children aren't existing. Using POSIX::_exit instead of $pm->finish can force them to exit, but that can bite you in the ass. - ikegami
It seems that pFM isn't operating as usual when in a thread. (What exactly would it even mean to fork a process from within a thread?) From Process-scope Changes in perlthrtut: "Thinking of mixing fork() and threads? Please lie down and wait until the feeling passes." I'd say, choose either of the methods (don't forget queues if using threads). - zdim

1 Answers

3
votes

OK, woah. Slow down there. You're doing threads and fork in the same code. That's a really horrible idea. They're somewhat implementation dependent - you can be pretty sure they'll work, but both at the same time is asking for a world of pain. (Concurrency issues, race conditions, etc.).

In this particular case - you need to understand that what fork() does is take a complete copy of your process - in exactly the same state - and has a single difference - the return code of fork(). That means that the threads etc. will be cloned by the fork() too. Parallel::ForkManager is hiding some of that from you, by limiting the parallelism scope, but that's what's going on behind the scenes.

I'd urge you to step back and make a rewrite - what you appear to be doing would be MUCH better suited to using some worker threads and a Thread::Queue:

#!/usr/bin/env perl
use strict;
use warnings;

use threads;
use Thread::Queue;

#parallelism limit
my $num_threads = 5;

#input and output queues
my $work_q = Thread::Queue -> new();
my $result_q = Thread::Queue -> new;

#jobs as before
my @jobs = ('a', 'b', 'c', 'd', 'e', 'f', 'g');

#worker - reads from queue one item at a time. 
#exits if the queue is 'undef' which happens if it has been `end`ed. 
sub worker {
   while ( my $item = $work_q -> dequeue ) { 
      print threads -> self -> tid.": processing work item $item\n";
      #pretend we did some work, queue the result. 
      $result_q -> enqueue ( threads -> self -> tid . ": finished $item" );
   }
}

#spawn threads
threads -> create (\&worker) for 1..$num_threads;
#queue jobs
$work_q -> enqueue ( @jobs ); 
#close queue, so threads will exit when they hit the end of the queue. 
#dequeue will return 'undef' rather than blocking. 
$work_q -> end;  

#join all the threads. 
$_->join for threads -> list;
#all threads are finished, so we close the result queue. 
#again - so dequeue is undef when empty, rather than just blocking. 
$result_q -> end; 

while ( my $result = $result_q -> dequeue ) { 
    print "Got result of $result\n";
}

print "All done\n";

Since you indicate that you're looking to run the 'result_q' in parallel too, then you can do the same thing with the 'result handler' as another thread, with much the same result.

This gets slightly ticklish, in that you do need to know your 'exit' condition based on open/closed queues. But something like this:

#!/usr/bin/env perl
use strict;
use warnings;

use threads;
use Thread::Queue;

#parallelism limit
my $num_threads = 5;

#input and output queues
my $work_q   = Thread::Queue->new;
my $result_q = Thread::Queue->new;

#jobs as before
my @jobs = ( 'a', 'b', 'c', 'd', 'e', 'f', 'g' );

#worker - reads from queue one item at a time.
#exits if the queue is 'undef' which happens if it has been `end`ed.
sub worker {
   while ( my $item = $work_q->dequeue ) {
      print threads->self->tid . ": processing work item $item\n";

      #pretend we did some work, queue the result.
      $result_q->enqueue( threads->self->tid . ": finished $item" );
   }
}

#a thread to process the results in parallel
sub collator {
   while ( my $result = $result_q->dequeue ) {
      print "Got result of $result\n";
   }
}

#spawn threads
my @workers = map { threads->create( \&worker ) } 1 .. $num_threads;
my $collator = threads -> create ( \&collator );

#queue jobs
$work_q->enqueue(@jobs);

#close queue, so threads will exit when they hit the end of the queue.
#dequeue will return 'undef' rather than blocking.
$work_q->end;

#join all the threads.
$_->join for @workers;

#all threads are finished, so we close the result queue.
#again - so dequeue is undef when empty, rather than just blocking.
$result_q->end;

#reap 'collator' once it's finished.
$collator->join;


print "All done\n";

It's pretty much the same as above, but spawns a list of 'workers' - because then you can end the $work_q, wait for the "workers" to exit (and join) - and then you know there'll be no more results entering the $result_q and can then end that. (And wait for the collator to exit).