因公司业务需要,最近在设计一个通用队列功能模块,主体要求两大点:
- 用MySql实现事务型消息队列(当然,主流的队列服务可使用redis或者rabbitmq等,此处讨论的是mysql实现)
- php多进程消费队列消息
用MySql实现事务型消息队列
消息队列的作用有:异步化、解耦和消除峰值等。目前异步化对于我来说使用最频繁,在很多业务场景下,我们可以将实时性要求较低的请求转为异步处理,减小系统负载压力,提高系统稳定性。在离线数据异步处理过程中,消息队列要满足以下要求:
- 消息不能丢失,即使在系统失败的情况下。消息一旦被插入就一定会被至少处理一次(只被处理一次是最好的,但是实现起来有难度,所以只要求at-least-once semantic);
- FIFO顺序。(mysql id自增可满足此特性。当然,可以设计特殊参数做特殊处理)
- 支持多生产者(mysql支持并发操作,支持此特点)
- 支持多消费者。每个消息只能被其中一个消费者处理(业务的处理需要考虑幂等性)。
以上是队列实现的说明,具体用MySql实现事务型消息队列可以参考文章
https://spockwangs.github.io/…
此次设计的表结构如下:
CREATE TABLE `comom_queue` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id',
`type` tinyint(4) NOT NULL DEFAULT '0' COMMENT '队列类型,代码业务备注',
`conn_id` int(11) NOT NULL DEFAULT '0' COMMENT '消费者标识',
`param_content` text COMMENT '队列入参',
`callback` varchar(255) NOT NULL DEFAULT '' COMMENT '队列消费回调函数',
`status` tinyint(2) NOT NULL DEFAULT '0' COMMENT '0新建 1消费中 2成功 3失败 4需重试',
`create_time` int(11) NOT NULL DEFAULT '0' COMMENT '创建时间',
`update_time` int(11) NOT NULL DEFAULT '0' COMMENT '状态变更时间',
`preexec_time` int(11) NOT NULL DEFAULT '0' COMMENT '预消费时间',
`p_key` varchar(100) NOT NULL DEFAULT '' COMMENT '业务唯一标识key,查询用',
`mark` varchar(255) NOT NULL DEFAULT '' COMMENT '备注',
PRIMARY KEY (`id`),
KEY `indx_s` (`p_key`,`type`) USING BTREE,
KEY `indx_exec` (`conn_id`,`status`) USING BTREE,
KEY `indx_ty` (`type`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
说明下几个字段的设计:
- callback 队列中不同的业务消息有不同的业务处理,利用callback值回调对应的业务方法
- type 队列业务类型,区分不同的业务,可用不同的消费者分开消费。在FIFO的特点外,可单独开消费者对有特殊要求(消息优先级高)的业务消息进行消费
- preexec_time 预消费时间,有的业务消息有消费时间要求,可设置出队列时间
php多进程消费设计
此次php多进程的实现依赖pcntl,posix扩展,读者可自行检查是否安装了此拓展。queue队列服务设计和实现包括以下功能点:
- 主进程和子进程的运行时间可配
- 主进程(master进程)创建和监听子进程行为
- 创建定时器信号,主进程(master进程)定时监听队列信息,可用于消息堆积通知等
- 子进程(worker进程)消费消息
- 针对不同的业务消息可配置不同数量的子进程
- 各个业务子进程数可配置正常拉起数和最大进程数,根据队列积压情况,子进程动态启动进程数(暂未实现,后续添加)
不多说了,直接看代码,抽离出来的queue服务类代码如下:
<?php
/**
* Created by PhpStorm.
* User: Javion
* Date: 2018/12/7
* Time: 15:10
*/
abstract class queue
{
protected $process = []; // 子进程数组 ['type' => 'process_num']
protected $child = []; // 子进程pid数组
protected $result = []; // 计算的结果
protected $overTime = 0; //主进程超时时间
protected $startTime; //主进程运行时间
protected $childOverTime = 3600; //子进程超时时间
protected $alarm_time = 2;
public function __construct($process = [], $overTime = 0, $childOverTime = 3600)
{
if (!function_exists('pcntl_fork')) {
die("pcntl_fork not existing");
}
$this->process = $process;
$this->overTime = $overTime;
$this->childOverTime = $childOverTime;
$this->startTime = time();
}
/**
* 设置子进程
*/
public function setProcess($process)
{
$this->process = $process;
}
/**
* 设置检测时间间隔 单位s
*/
public function setAlarmTime($time){
$this->alarm_time = $time;
}
/**
* fork 子进程
*/
protected function forkProcess()
{
//循环创建每个type 的消费子进程
$process = $this->process;
foreach($process as $key => $num) {
for ($i = 0; $i < $num; $i++){
$this->forkOneProcess($key);
}
}
return $this;
}
/**
* 创建子进程操作
* @param $key
* @return $this
*/
private function forkOneProcess($key)
{
$pid = pcntl_fork();
if ($pid == 0) {
$id = getmypid();
$this->processDo($id, $key);
exit(0);
} else if ($pid > 0) {
//记录子进程信息
$childProcess = array(
'pid' => $pid,
'type' => $key,
'create_time' => time()
);
$this->child[$pid] = $childProcess;
}
return $this;
}
/**
* 子进程做的事情,消费者
*/
abstract protected function processDo($id, $key);
/**
* 队列数量检测
*/
abstract protected function checkQueueNum();
/**
* 等待子进程结束
*/
protected function waiteProcess()
{
while(count($this->child)) {
foreach($this->child as $pid => $item){
$res = pcntl_waitpid($pid,$status,WNOHANG);
pcntl_signal_dispatch();
if ( -1 == $res || $res > 0 ) {
unset($this->child[$pid]);
echo "pid $pid 退出", PHP_EOL;
//判断主进程是否超时 未超时拉起新的子进程
$leftTime = time() - $this->startTime;
if ($this->overTime > $leftTime){
$this->forkOneProcess($item['type']);
echo "创建新进程", PHP_EOL;
}
}//判断子进程是否存在且超时,超过时限20分钟则强制退出
elseif (posix_kill($pid, 0) && (time() - $item['create_time'] - 20*60) > $this->childOverTime){
posix_kill($pid, SIGUSR1);
echo "pid $pid 退出2", PHP_EOL;
}
}
}
return $this;
}
/**
* 队列检测
*/
protected function timeHandler(){
$this->checkQueueNum();
pcntl_alarm($this->alarm_time);
}
/**
* 启动
*/
public function runProcess() {
//注册信号
pcntl_signal(SIGALRM, array($this, 'timeHandler'));
pcntl_alarm($this->alarm_time);
$leftTime = time() - $this->startTime;
while(($this->overTime ==0 || $this->overTime > $leftTime)){
echo "新进程processlist", PHP_EOL;
$this->forkProcess()->waiteProcess();
$leftTime = time() - $this->startTime;
}
}
}
最后一个功能点:各个业务子进程数可配置正常拉起数和最大进程数,根据队列积压情况,子进程动态启动进程数 暂未实现。目前的queue服务设计如上,请各位看官多多指教!