3
votes

I am a newbie to perl, so please excuse my ignorance. (I'm using windows 7)

I have borrowed echicken's threads example script and wanted to use it as a basis for a script to make a number of system calls, but I have run into an issue which is beyond my understanding. To illustrate the issue I am seeing, I am doing a simple ping command in the example code below.

  • $nb_process is the number or simultaneous running threads allowed.
  • $nb_compute as the number of times we want to run the sub routine (i.e the total number of time we will issue the ping command).

When I set $nb_compute and $nb_process to be same value as each other, it works perfectly.

However when I reduce $nb_process (to restrict the number of running threads at any one time), it seems to lock once the number of threads defined in $nb_process have started.

It works fine if I remove the system call (ping command).

I see the same behaviour for other system calls (it'd not just ping).

Please could someone help? I have provided the script below.

#!/opt/local/bin/perl -w  
 use threads;  
 use strict;  
 use warnings;  

 my @a = ();  
 my @b = ();  


 sub sleeping_sub ( $ $ $ ); 

 print "Starting main program\n";  

 my $nb_process = 3;  
 my $nb_compute = 6;  
 my $i=0;  
 my @running = ();  
 my @Threads;  
 while (scalar @Threads < $nb_compute) {  

     @running = threads->list(threads::running);  
     print "LOOP $i\n";  
     print "  - BEGIN LOOP >> NB running threads = ".(scalar @running)."\n";  

     if (scalar @running < $nb_process) {  
         my $thread = threads->new( sub { sleeping_sub($i, \@a, \@b) });  
         push (@Threads, $thread);  
         my $tid = $thread->tid;  
         print "  - starting thread $tid\n";  
     }  
     @running = threads->list(threads::running);  
     print "  - AFTER STARTING >> NB running Threads = ".(scalar @running)."\n";  
     foreach my $thr (@Threads) {  
         if ($thr->is_running()) {  
             my $tid = $thr->tid;  
             print "  - Thread $tid running\n";  
         }  
         elsif ($thr->is_joinable()) {  
             my $tid = $thr->tid;  
             $thr->join;  
             print "  - Results for thread $tid:\n";  
             print "  - Thread $tid has been joined\n";  
         }  
     }  

     @running = threads->list(threads::running);  
     print "  - END LOOP >> NB Threads = ".(scalar @running)."\n";  
     $i++;  
 }  

 print "\nJOINING pending threads\n";  
 while (scalar @running != 0) {  
    foreach my $thr (@Threads) {  
         $thr->join if ($thr->is_joinable());  
     }  
     @running = threads->list(threads::running);  
}  
 print "NB started threads = ".(scalar @Threads)."\n";  
 print "End of main program\n";  


 sub sleeping_sub ( $ $ $ ) { 
    my @res2 = `ping 136.13.221.34`; 
    print "\n@res2";
    sleep(3);  
 } 
1
I have run your scrip and I am seeing that "LOOP 1" never gets printed for some reason. I have not figured out why atm :( - Sibster

1 Answers

3
votes

The main problem with your program is that you have a busy loop that tests whether a thread can be joined. This is wasteful. Furthermore, you could reduce the amount of global variables to better understand your code.

Other eyebrow-raiser:

  • Never ever use prototypes, unless you know exactly what they mean.
  • The sleeping_sub does not use any of its arguments.
  • You use the threads::running list a lot without contemplating whether this is actually correct.

It seems you only want to run N workers at once, but want to launch M workers in total. Here is a fairly elegant way to implement this. The main idea is that we have a queue between threads where threads that just finished can enqueue their thread ID. This thread will then be joined. To limit the number of threads, we use a semaphore:

use threads; use strict; use warnings;
use feature 'say';  # "say" works like "print", but appends newline.
use Thread::Queue;
use Thread::Semaphore;

my @pieces_of_work = 1..6;
my $num_threads = 3;
my $finished_threads = Thread::Queue->new;
my $semaphore = Thread::Semaphore->new($num_threads);

for my $task (@pieces_of_work) {
  $semaphore->down;  # wait for permission to launch a thread

  say "Starting a new thread...";

  # create a new thread in scalar context
  threads->new({ scalar => 1 }, sub {
    my $result = worker($task);                # run actual task
    $finished_threads->enqueue(threads->tid);  # report as joinable "in a second"
    $semaphore->up;                            # allow another thread to be launched
    return $result;
  });

  # maybe join some threads
  while (defined( my $thr_id = $finished_threads->dequeue_nb )) {
    join_thread($thr_id);
  }
}

# wait for all threads to be finished, by "down"ing the semaphore:
$semaphore->down for 1..$num_threads;
# end the finished thread ID queue:
$finished_threads->enqueue(undef);

# join any threads that are left:
while (defined( my $thr_id = $finished_threads->dequeue )) {
  join_thread($thr_id);
}

With join_thread and worker defined as

sub worker {
  my ($task) = @_;
  sleep rand 2; # sleep random amount of time
  return $task + rand; # return some number
}

sub join_thread {
  my ($tid) = @_;
  my $thr = threads->object($tid);
  my $result = $thr->join;
  say "Thread #$tid returned $result";
}

we could get the output:

Starting a new thread...
Starting a new thread...
Starting a new thread...
Starting a new thread...
Thread #3 returned 3.05652608754778
Starting a new thread...
Thread #1 returned 1.64777186731541
Thread #2 returned 2.18426146087901
Starting a new thread...
Thread #4 returned 4.59414651998983
Thread #6 returned 6.99852684265667
Thread #5 returned 5.2316971836585

(order and return values are not deterministic).

The usage of a queue makes it easy to tell which thread has finished. Semaphores make it easier to protect resources, or limit the amount of parallel somethings.

The main benefit of this pattern is that far less CPU is used, when contrasted to your busy loop. This also shortens general execution time.

While this is a very big improvement, we could do better! Spawning threads is expensive: This is basically a fork() without all the copy-on-write optimizations on Unix systems. The whole interpreter is copied, including all variables, all state etc. that you have already created.

Therefore, as threads should be used sparingly, and be spawned as early as possible. I already introduced you to queues that can pass values between threads. We can extend this so that a few worker threads constantly pull work from an input queue, and return via an output queue. The difficulty now is to have the last thread to exit finish the output queue.

use threads; use strict; use warnings;
use feature 'say';
use Thread::Queue;
use Thread::Semaphore;

# define I/O queues
my $input_q  = Thread::Queue->new;
my $output_q = Thread::Queue->new;

# spawn the workers
my $num_threads = 3;
my $all_finished_s = Thread::Semaphore->new(1 - $num_threads); # a negative start value!
my @workers;
for (1 .. $num_threads) {
  push @workers, threads->new( { scalar => 1 }, sub {
    while (defined( my $task = $input_q->dequeue )) {
      my $result = worker($task);
      $output_q->enqueue([$task, $result]);
    }
    # we get here when the input queue is exhausted.
    $all_finished_s->up;
    # end the output queue if we are the last thread (the semaphore is > 0).
    if ($all_finished_s->down_nb) {
      $output_q->enqueue(undef);
    }
  });
}

# fill the input queue with tasks
my @pieces_of_work = 1 .. 6;
$input_q->enqueue($_) for @pieces_of_work;

# finish the input queue
$input_q->enqueue(undef) for 1 .. $num_threads;

# do something with the data
while (defined( my $result = $output_q->dequeue )) {
  my ($task, $answer) = @$result;
  say "Task $task produced $answer";
}

# join the workers:
$_->join for @workers;

With worker defined as before, we get:

Task 1 produced 1.15207098293783
Task 4 produced 4.31247785766295
Task 5 produced 5.96967474718984
Task 6 produced 6.2695013168678
Task 2 produced 2.02545636412421
Task 3 produced 3.22281619053999

(The three threads would get joined after all output is printed, so that output would be boring).

This second solution gets a bit simpler when we detach the threads – the main thread won't exit before all threads have exited, because it is listening to the input queue which is finished by the last thread.