sobrique: (Default)
[personal profile] sobrique
This week, I have mostly been playing with Thread::Queue.
Once of the downsides of perl threading is that it's not particularly lightweight. Spawning lots of new threads to do a single task isn't a very efficient way of doing a task - especially if you have libraries imported, and large data tables.

So the method I've been playing with is queue oriented - spawn a number of threads equal to some arbitrary parallelism target - 1 per 'resource' consumed is a good bet (so for processor intensive stuff, one per processor - if you're doing remote access to 15 servers, one each).

And then implement a 'queue' which is a thread safe implementation of a FIFO queue (FIFO = First in, First out).

It uses the library Thread::Queue, so you include that at the start of your program. You don't actually strictly speaking need to be threading to use it though - there's other reasons to use a FIFO.

So as a sample:


#!/usr/bin/perl

use strict;
use warnings;

use threads;

use Thread::Queue;

my $worker_queue = Thread::Queue -> new;
my $QUEUE_END = "--::QUEUE_END::--"; #just a text pattern, that acts as a delimiter. 

sub worker_thread
{
  my $item = $worker_queue -> dequeue();
  until ( $item eq $QUEUE_END )
  {
    print threads -> self -> tid(), ": $item";
    sleep 1;
    $item = $worker_queue -> dequeue(); 
  }
}

my $thread1 = threads -> create ( \&worker_thread );
my $thread2 = threads -> create ( \&worker_thread );

for ( my $count = 0; $count < 100; $count++ )
{
  $worker_queue -> enqueue ( $count );
}

$worker_queue -> enqueue ( $QUEUE_END );
$worker_queue -> enqueue ( $QUEUE_END );

foreach my $thread ( threads -> list() )
{
  $thread -> join();
}




Fairly simple, but does allow for daisy chained processing (e.g. moving from one FIFO queue to the next).
The only slightly complicate part is in handling 'thread exiting'. I've taken to using an 'exit' signaler in the queue. (use an arbitrary pattern, and 'catch' when that occurs).
However the other possibility is in just using some kind of 'all done' shared variable, that you set once the queue is fully populated - because what you don't want to do is just assume that because the queue is empty, work is finished - because when you first start the thread, this might be the case, or perhaps if there's a dependency - or perhaps once the first items get 'dequeued' then the other threads might see an empty queue.

I've been using this mechanism to create a 'cascade' of tasks - run something on one (group of) server(s). Do a some processing. Run something based on the result on another server. This is well suited to queue style processing.
Similarly - because you're queue oriented, then it's also well suited to scaling up (or down) the parallelism. Such as when you're in a multi processor environment, for example - you may want to hog all the processors that are available, but you'll lose efficiency if you overdo it.

Profile

sobrique: (Default)
sobrique

December 2015

S M T W T F S
  12345
6789101112
13141516171819
20212223242526
2728 293031  

Most Popular Tags

Style Credit

Expand Cut Tags

No cut tags
Page generated Feb. 17th, 2026 03:01 pm
Powered by Dreamwidth Studios