|
|
<?php
|
|
|
|
|
|
namespace com;
|
|
|
|
|
|
use think\Cache;
|
|
|
|
|
|
/**
|
|
|
* 伪队列,(用于导入导出轮询时排队)
|
|
|
*
|
|
|
* @author Ymob
|
|
|
* @date 2019-09-30
|
|
|
*/
|
|
|
class PseudoQueue
|
|
|
{
|
|
|
|
|
|
/**
|
|
|
* 队列名称
|
|
|
*
|
|
|
* @var string
|
|
|
*/
|
|
|
protected $name = '';
|
|
|
|
|
|
/**
|
|
|
* 队列最大数量,默认0,不限数量
|
|
|
*
|
|
|
* @var int
|
|
|
*/
|
|
|
protected $max = 0;
|
|
|
|
|
|
/**
|
|
|
* 最大可执行数
|
|
|
*
|
|
|
* @var int
|
|
|
*/
|
|
|
protected $max_exec = 0;
|
|
|
|
|
|
/**
|
|
|
* 队列任务请求最大间隔时间,单位秒 默认600秒
|
|
|
*
|
|
|
* *考虑网络波动等原因不建议小于 10 秒*
|
|
|
*
|
|
|
* @var int
|
|
|
*/
|
|
|
protected $overtime = 600;
|
|
|
|
|
|
/**
|
|
|
* 队列
|
|
|
*
|
|
|
* @var array
|
|
|
*/
|
|
|
protected $queue = [];
|
|
|
|
|
|
/**
|
|
|
* 当前任务ID
|
|
|
*
|
|
|
* @var string
|
|
|
*/
|
|
|
public $task_id = '';
|
|
|
|
|
|
/**
|
|
|
* 错误信息
|
|
|
*
|
|
|
* @var string
|
|
|
*/
|
|
|
public $error = '';
|
|
|
|
|
|
/**
|
|
|
* 构造函数
|
|
|
*
|
|
|
* @param string $name 队列名称
|
|
|
* @param int $max_exec 最大可执行数
|
|
|
*/
|
|
|
public function __construct($name, $max_exec)
|
|
|
{
|
|
|
$this->name = 'QUEUE_' . $name;
|
|
|
$this->max_exec = $max_exec;
|
|
|
$this->queue = Cache::get($this->name) ?: [];
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 设置当前任务ID
|
|
|
*
|
|
|
* @param string $task_id 队列索引
|
|
|
* @return bool
|
|
|
*/
|
|
|
public function setTaskId($task_id)
|
|
|
{
|
|
|
$index = $this->getIndex($task_id);
|
|
|
if ($index === false) {
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
$this->task_id = $task_id;
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 设置队列最大长度
|
|
|
*
|
|
|
* @param int $max
|
|
|
* @return void
|
|
|
*/
|
|
|
public function setMax(int $max)
|
|
|
{
|
|
|
$this->max = $max;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 队列最大长度
|
|
|
*
|
|
|
* @return int
|
|
|
*/
|
|
|
public function getMax()
|
|
|
{
|
|
|
return $this->max;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 设置队列任务请求最大间隔时间 (单位:秒s)
|
|
|
*
|
|
|
* @param int $overtime
|
|
|
* @return void
|
|
|
*/
|
|
|
public function setOvertime(int $overtime)
|
|
|
{
|
|
|
$this->overtime = $overtime;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 读取队列任务请求最大间隔时间
|
|
|
*
|
|
|
* @return int
|
|
|
*/
|
|
|
public function getOvertime()
|
|
|
{
|
|
|
return $this->overtime;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 生成任务ID,并加入队列
|
|
|
*
|
|
|
* @return string
|
|
|
*/
|
|
|
public function makeTaskId()
|
|
|
{
|
|
|
do {
|
|
|
$task_id = md5(time() . rand(100, 999));
|
|
|
} while (in_array($task_id, array_column($this->queue, 'task_id')));
|
|
|
|
|
|
if (!$this->enqueue($task_id)) {
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
$this->task_id = $task_id;
|
|
|
return $task_id;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 入队
|
|
|
*
|
|
|
* @param string $$this->queue 任务ID
|
|
|
* @return bool
|
|
|
*/
|
|
|
public function enqueue($task_id)
|
|
|
{
|
|
|
if ($this->max > 0) {
|
|
|
if (count($this->queue) >= $this->max) {
|
|
|
$this->error = '队列长度达到上限';
|
|
|
return false;
|
|
|
}
|
|
|
}
|
|
|
$this->queue[] = [
|
|
|
'task_id' => $task_id,
|
|
|
// 上次处理时间
|
|
|
'last_processing_time' => time(),
|
|
|
];
|
|
|
Cache::set($this->name, $this->queue);
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 出队
|
|
|
*
|
|
|
* @param string $task_id 任务ID,默认当前任务ID
|
|
|
* @return bool
|
|
|
*/
|
|
|
public function dequeue($task_id = null)
|
|
|
{
|
|
|
$task_id = $task_id ?: $this->task_id;
|
|
|
$index = $this->getIndex($task_id);
|
|
|
if ($index === false) {
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
unset($this->queue[$index]);
|
|
|
$this->queue = array_values($this->queue);
|
|
|
Cache::set($this->name, $this->queue);
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 当前任务是否可执行
|
|
|
*
|
|
|
* @param string $task_id 任务ID,默认当前任务ID
|
|
|
* @return bool
|
|
|
*/
|
|
|
public function canExec($task_id = null)
|
|
|
{
|
|
|
$task_id = $task_id ?: $this->task_id;
|
|
|
|
|
|
$index = $this->getIndex($task_id);
|
|
|
if ($index === false) {
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
$res = false;
|
|
|
|
|
|
for ($i = 0; $i < $this->max_exec; $i++) {
|
|
|
if (!isset($this->queue[$i])) {
|
|
|
break;
|
|
|
}
|
|
|
// 判断任务处理是否超时
|
|
|
$temp_time = $this->queue[$i]['last_processing_time'] + $this->overtime;
|
|
|
if (time() > $temp_time) {
|
|
|
$this->dequeue($this->queue[$i]['task_id']);
|
|
|
$i--;
|
|
|
$index--;
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
if ($index == $i) {
|
|
|
$res = true;
|
|
|
$this->queue[$i]['last_processing_time'] = time();
|
|
|
Cache::set($this->name, $this->queue);
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
if (!$res) {
|
|
|
$this->error = '服务器繁忙,排队中...' . ($index - $this->max_exec);
|
|
|
}
|
|
|
|
|
|
return $res;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 获取 [当前] 任务所在队列索引
|
|
|
*
|
|
|
* @param string $task_id 任务ID,默认当前任务ID
|
|
|
* @return mixed bool | int
|
|
|
*/
|
|
|
public function getIndex($task_id = null)
|
|
|
{
|
|
|
$task_id = $task_id ?: $this->task_id;
|
|
|
|
|
|
$index = array_search($task_id, array_column($this->queue, 'task_id'));
|
|
|
if ($index === false) {
|
|
|
$this->error = '队列中不存在该任务';
|
|
|
}
|
|
|
|
|
|
return $index;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
*
|
|
|
*/
|
|
|
/**
|
|
|
* 任务缓存数据
|
|
|
*
|
|
|
* @param string $key 缓存键
|
|
|
* @param mixed $val 缓存值,不传改参数为获取值
|
|
|
* @return mixed|void
|
|
|
* @author Ymob
|
|
|
* @datetime 2019-10-22 13:52:19
|
|
|
*/
|
|
|
public function cache($key, $val = null)
|
|
|
{
|
|
|
if (!$this->task_id) {
|
|
|
$this->error = '任务ID不存在';
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
$index = $this->getIndex($this->task_id);
|
|
|
|
|
|
if ($val === null) {
|
|
|
return $this->queue[$index]['cache'][$key];
|
|
|
} else {
|
|
|
$this->queue[$index]['cache'][$key] = $val;
|
|
|
Cache::set($this->name, $this->queue);
|
|
|
}
|
|
|
|
|
|
}
|
|
|
}
|