2
votes

Does anyone know about a robust failover mechanism for Laravel queues?

At one point my beanstalkd server had some kind of error (still figuring out what went wrong) triggering Pheanstalk_Exception_ConnectionException in the Laravel (4) queue driver. As a result, new jobs couldn't be pushed to the queue.

What I'm trying to accomplish is to have some sort of failover driver for QueueInterface which can take multiple instances of drivers, so I can define for example the 'sync' or 'redis' driver as a failover queue. Then as soon as beanstalkd fails, jobs will be executed by this driver and no work will be lost.

1

1 Answers

4
votes

I'm just going to take a really small stab at this for you and hope that it gives an idea... i think everyone has different needs for queues, so this is just what I have done.

To be honest I kind of just stripped out a bunch of my code to try and simplify what I did. Try to make some sense out of it I guess.

In your queue config :

'connections' => array(
    'beanstalkd' => array(
        'driver' => 'beanstalk_extended',
        'host'   => 'my.ip.address',
        'queue'  => 'default',
        'ttr'    => 60,
    ),
    'my_fallback' => array(
        'driver' => 'sync',
    ),
);

In a ServiceProvider@boot :

/**
 * Boot the Beanstalkd queue.
 */
protected function bootQueue()
{
    $this->app['queue']->extend('beanstalk_extended', function () {
        return new BeanstalkConnector;
    });

    $this->app->bindShared('queue.failer', function($app) {
        $config = $app['config']['queue.failed'];
        return new DatabaseFailedJobProvider($app['db'], $config['database'], $config['table']);
    });

    $this->app->bindShared('queue.worker', function($app) {
        return new Worker($app['queue'], $app['queue.failer'], $app['events']);
    });
}

The connector :

<?php namespace App\Framework\Queue\Connectors;

use Illuminate\Queue\Connectors\ConnectorInterface;
use Pheanstalk_Pheanstalk as Pheanstalk;
use App\Framework\Queue\Beanstalk;

class BeanstalkConnector implements ConnectorInterface
{

/**
 * Establish a queue connection.
 *
 * @param  array $config
 * @return \Illuminate\Queue\QueueInterface
 */
    public function connect(array $config)
    {
        $pheanstalk = new Pheanstalk($config['host']);
        $bean = new Beanstalk($pheanstalk, $config['queue'], array_get($config, 'ttr', Pheanstalk::DEFAULT_TTR));
       return $bean;
    }
}

Then inside Beanstalkd extension :

/**
 * Push a new job onto the queue.
 *
 * @param  string $job
 * @param  mixed $data
 * @param  string $queue
 * @return int
 */
public function push($job, $data = '', $queue = null)
{
    try {
        $queue = $this->getQueue($queue);
        $id = parent::push($job, $data, $queue);
        return $id;
    } catch (\Exception $e) {
        return \Queue::connection('my_fallback')->push($job, $data, $queue);
    }
}