3
votes

I have a working PHP script that publishes messages to RabbitMQ that it parses out of a tab-delimited text file. I literally copy/pasted the working code from that file to another file and wanted to establish a consumer that will retrieve messages published to the exchange, json_decode them and insert them into a database.

Every attempt and even copy/paste the sample code from PHP.net site, and even examples in SO, fail with blank white screen and no error messages, then it even kills php-fpm process.

Any idea why the queue will not bind and what is going wrong here?

  • Nginx -> php-fpm
  • PHP 5.3.x
  • Macbook Pro (OSX Lion)
  • RabbitMQ (librabbitmq and pecl amqp installed)

Here is one example of what I tried, but I've tried PHP.net and SO examples on AMQP docs and none work. I can publish fine, but when I try to bind a queue it fails and eventually php-fpm locks up.

<?php
// Report all PHP errors
error_reporting(E_ALL);

/*****************************************
 * MQ settings
 ****************************************/
$mq = array(
           'host' => 'localhost',
           'port' => 5672,
           'login' => 'guest',
           'password' => 'guest',
           'exchange' => 'gbus.user',
           'routing_key' => 'gbus.test.mike',
           );

/*****************************************
 * Connect to queue
 ****************************************/

$conn_args = array('host' => $mq['host'], 'port' => $mq['port'], 'login' => $mq['login'], 'password' => $mq['password']);
$conn = new AMQPConnection($conn_args);
$conn->connect();

$ch = new AMQPChannel($conn);

// Create a new queue
$q = new AMQPQueue($ch);
$q->declare('test-queue');
$q->bind($mq['exchange'],$mq['routing_key']);

?>
<br>
<font color="blue" face="arial" size="4">File Contents</font>
<hr>
<?php 
while(true){
    $msg=$q->get();
    if ($msg['count']>-1){
        echo "\n--------\n";
        print_r($msg['msg']);
        echo "\n--------\n";
    }
    sleep(1);   
}
if (!$conn->disconnect()) {
    throw new Exception('Could not disconnect');
}
?>

Here is example of what I used to publish to queue and every time I run this, I view in the RabbitMQ control panel the 20 new messages. I limit to 20 for testing but file has tens of thousands of rows.

Working publish code:

<?php
/*****************************************
 * MQ settings
 ****************************************/
$mq = array(
           'host' => 'localhost',
           'port' => 5672,
           'login' => 'guest',
           'password' => 'guest',
           'exchange' => 'gbus.user',
           'routing_key' => 'gbus.test.mike',
           );

/*****************************************
 * Connect to queue
 ****************************************/

$conn_args = array('host' => $mq['host'], 'port' => $mq['port'], 'login' => $mq['login'], 'password' => $mq['password']);
$conn = new AMQPConnection($conn_args);
$conn->connect();

$ch = new AMQPChannel($conn);

$ex = new AMQPExchange($ch);
$ex->setName($mq['exchange']);


/*****************************************
 * Parse the file
 ****************************************/
$filename = "/tmp/Users.txt";
$board = "test";

$fd = fopen ($filename, "r");
$contents = fread ($fd,filesize ($filename));

fclose ($fd);
$delimiter = "\r\n";
$rows = explode($delimiter, $contents);
$counter = 0;
?>
<br>
<font color="blue" face="arial" size="4">File Rows (first 20)</font>
<hr>
<?php 
foreach ( $rows as $row )
{
    $counter++;
    echo "<b>Row $counter: </b> $row<br>";

    // build list columns
    list($login_name, $pwd, $account_type, $access_level, $status, $first_name, $last_name, $agent_code) = explode("\t", $row);

    // build assoc array for json
    $user = array("domain"=>$board, "username"=>$login_name, "user_id"=>$agent_code, "password"=>$pwd, "first_name"=>$first_name, "last_name"=>$last_name);

    // Publish a message to the exchange with a routing key
    $ex->publish(json_encode($user), $mq['routing_key'], AMQP_NOPARAM, array("content_type"=>"application/data"));

    if($counter == 20) {
        break;
    }
}

$ch->close();
$conn->close();
?>
1
I have tried all examples here and publish works, but consume fails: us3.php.net/manual/en/amqp.examples.php - Mike S.
I have also un-bound the original queue to the exchange to be sure it wasn't "stealing" the messages (not sure if it ack'ed or not as via control panel). - Mike S.
In bin folder of rabbmitmq folder you have rabbitmqctl do rabbitmqctl list_queues to list your queues list_exchanges will list exchanges and list_bindings will give you the debug information that will help you. Checkout rabbitmq.com/man/rabbitmqctl.1.man.html for more details. Post the dumps here that would be helpful in finding your the problem. - MaX
With removing the queue bind call, I can get it to declare, but then server crashes again. Any time I include bind, it fails. Not sure if just Mac OSX Lion issue; haven't tried on linux box yet. Odd one. - Mike S.
Documents of AMQP on php.net are outdated ( basically not updated) , like $queue->declare is depreciated but still it is present in documents there. So be careful while learning from there. If you are using rabbitMQ then you can use "videlalvaro/php-amqplib" - voila

1 Answers

0
votes

Have you tried the examples found here: https://github.com/rabbitmq/rabbitmq-tutorials/tree/master/php?

The two you want to look at are: emit_log.php receive_logs.php

The library used is different to the PECL/built in library, I believe the built in one does not support consume.