workerman/redis-queue

基于Redis的消息队列,支持消息延迟处理。

项目地址:

https://github.com/walkor/redis-queue

安装:

composer config -g repo.packagist composer https://mirrors.aliyun.com/composer/
composer require workerman/redis-queue

示例

<?php
require __DIR__ . '/vendor/autoload.php';

use Workerman\Worker;
use Workerman\Lib\Timer;
use Workerman\RedisQueue\Client;

$worker = new Worker();
$worker->onWorkerStart = function () {
    $client = new Client('redis://127.0.0.1:6379');
   // 订阅
    $client->subscribe('user-1', function($data){
        echo "user-1\n";
        var_export($data);
    });
   // 订阅
    $client->subscribe('user-2', function($data){
        echo "user-2\n";
        var_export($data);
    });
    // 定时向队列发送消息
    Timer::add(1, function()use($client){
        $client->send('user-1', ['some', 'data']);
    });
};

Worker::runAll();

API


__construct (string $address, [array $options])

创建实例

  • $address 类似 redis://ip:6379,必须以redis开头.

  • $options 包括以下选项:

    • auth: 鉴权信息,默认 ''
    • db: db,默认 0
    • max_attempts: 消费失败后重试次数,默认5
    • retry_seconds: 重试时间间隔,单位秒。默认5

消费失败是指业务抛出异常Exception或者Error。消费失败后消息会放到延迟队列等待重试,重试次数由 max_attempts控制,重试间隔由retry_secondsmax_attempts共同控制。比如max_attempts为5,retry_seconds为10,第1次重试间隔为1*10秒,第2次重试时间间隔为2*10秒,第3次重试时间间隔为3*10秒,以此类推直到重试5次。如果超过了max_attempts设置测重试次数,则消息放入失败队列(key为redis-queue-failed)


send(String $queue, Mixed $data, [int $dely=0])

向队列发送一条消息

  • $queue 队列名, String 类型
  • $data 发布的具体消息,可以是数组或者字符串,Mixed 类型
  • $dely 延迟消费时间,单位秒,默认0, Int 类型

subscribe(mixed $queue, callable $callback)

订阅一个队列或者多个队列

  • $queue 队列名,可以是字符串或者包含多个队列名的数组
  • $callback 回调函数,格式为 function (Mixed $data),其中$data就是send($queue, $data)中的$data.

unsubscribe(mixed $queue)

取消订阅

  • $queue 队列名或者包含多个队列名的数组

在非workerman环境向队列发送消息

有时候一些项目运行在apache或者php-fpm环境,无法使用workerman/redis-queue项目,可以参考如下函数实现发送

function redis_queue_send($redis, $queue, $data, $delay = 0) {
    $queue_waiting = 'redis-queue-waiting';
    $queue_delay = 'redis-queue-delayed';
    $now = time();
    $package_str = json_encode([
        'id'       => rand(),
        'time'     => $now,
        'delay'    => 0,
        'attempts' => 0,
        'queue'    => $queue,
        'data'     => $data
    ]);
    if ($delay) {
        return $redis->zAdd($queue_delay, $now + $delay, $package_str);
    }
    return $redis->lPush($queue_waiting.$queue, $package_str);
}

其中,参数$redis为redis实例。例如redis扩展用法类似如下:

$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);
`

results matching ""

    No results matching ""