//composer.json
{
"require": {
"php-amqplib/php-amqplib": "^2.11"
}
}
//publisher.php
<?php
ini_set('display_errors', 1);
ini_set('display_startup_errors', 1);
error_reporting(E_ALL);
require_once(__DIR__ . '/vendor/autoload.php');
define("RABBITMQ_HOST", "localhost");
define("RABBITMQ_PORT", 5672);
define("RABBITMQ_USERNAME", "guest");
define("RABBITMQ_PASSWORD", "guest");
define("RABBITMQ_QUEUE_NAME", "task_queue");
$connection = new \PhpAmqpLib\Connection\AMQPStreamConnection(
RABBITMQ_HOST,
RABBITMQ_PORT,
RABBITMQ_USERNAME,
RABBITMQ_PASSWORD
);
$channel = $connection->channel();
# Create the queue if it does not already exist.
$channel->queue_declare(
$queue = RABBITMQ_QUEUE_NAME,
$passive = false,
$durable = true,
$exclusive = false,
$auto_delete = false,
$nowait = false,
$arguments = null,
$ticket = null
);
$jobId = 0;
while (true) {
$jobArray = [
'id' => $jobId++,
'task' => 'sleep',
'sleep_period' => rand(0, 3)
];
$msg = new \PhpAmqpLib\Message\AMQPMessage(
json_encode($jobArray, JSON_UNESCAPED_SLASHES),
['delivery_mode' => 2] # Non-persistent (1) or persistent (2).
);
$channel->basic_publish($msg, '', RABBITMQ_QUEUE_NAME);
print 'Job created' . PHP_EOL;
sleep(1);
}
//worker.php
<?php
ini_set('display_errors', 1);
ini_set('display_startup_errors', 1);
error_reporting(E_ALL);
require_once __DIR__ . '/vendor/autoload.php';
define("RABBITMQ_HOST", "localhost");
define("RABBITMQ_PORT", 5672);
define("RABBITMQ_USERNAME", "guest");
define("RABBITMQ_PASSWORD", "guest");
define("RABBITMQ_QUEUE_NAME", "task_queue");
$connection = new \PhpAmqpLib\Connection\AMQPStreamConnection(
RABBITMQ_HOST,
RABBITMQ_PORT,
RABBITMQ_USERNAME,
RABBITMQ_PASSWORD
);
$channel = $connection->channel();
# Create the queue if it doesnt already exist.
$channel->queue_declare(
$queue = RABBITMQ_QUEUE_NAME,
$passive = false,
$durable = true,
$exclusive = false,
$auto_delete = false,
$nowait = false,
$arguments = null,
$ticket = null
);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$callback = function ($msg) {
echo " [x] Received ", $msg->body, "\n";
$job = json_decode($msg->body, $assocForm = true);
sleep($job['sleep_period']);
echo " [x] Done", "\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume(
$queue = RABBITMQ_QUEUE_NAME,
$consumer_tag = '',
$no_local = false,
$no_ack = false,
$exclusive = false,
$nowait = false,
$callback
);
while (count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();