You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
wkcrm/extend/com/PseudoQueue.php

294 lines
6.0 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

<?php
namespace com;
use think\Cache;
/**
* 伪队列,(用于导入导出轮询时排队)
*
* @author Ymob
* @date 2019-09-30
*/
class PseudoQueue
{
/**
* 队列名称
*
* @var string
*/
protected $name = '';
/**
* 队列最大数量默认0不限数量
*
* @var int
*/
protected $max = 0;
/**
* 最大可执行数
*
* @var int
*/
protected $max_exec = 0;
/**
* 队列任务请求最大间隔时间,单位秒 默认600秒
*
* *考虑网络波动等原因不建议小于 10 秒*
*
* @var int
*/
protected $overtime = 600;
/**
* 队列
*
* @var array
*/
protected $queue = [];
/**
* 当前任务ID
*
* @var string
*/
public $task_id = '';
/**
* 错误信息
*
* @var string
*/
public $error = '';
/**
* 构造函数
*
* @param string $name 队列名称
* @param int $max_exec 最大可执行数
*/
public function __construct($name, $max_exec)
{
$this->name = 'QUEUE_' . $name;
$this->max_exec = $max_exec;
$this->queue = Cache::get($this->name) ?: [];
}
/**
* 设置当前任务ID
*
* @param string $task_id 队列索引
* @return bool
*/
public function setTaskId($task_id)
{
$index = $this->getIndex($task_id);
if ($index === false) {
return false;
}
$this->task_id = $task_id;
return true;
}
/**
* 设置队列最大长度
*
* @param int $max
* @return void
*/
public function setMax(int $max)
{
$this->max = $max;
}
/**
* 队列最大长度
*
* @return int
*/
public function getMax()
{
return $this->max;
}
/**
* 设置队列任务请求最大间隔时间 (单位秒s)
*
* @param int $overtime
* @return void
*/
public function setOvertime(int $overtime)
{
$this->overtime = $overtime;
}
/**
* 读取队列任务请求最大间隔时间
*
* @return int
*/
public function getOvertime()
{
return $this->overtime;
}
/**
* 生成任务ID并加入队列
*
* @return string
*/
public function makeTaskId()
{
do {
$task_id = md5(time() . rand(100, 999));
} while (in_array($task_id, array_column($this->queue, 'task_id')));
if (!$this->enqueue($task_id)) {
return false;
}
$this->task_id = $task_id;
return $task_id;
}
/**
* 入队
*
* @param string $$this->queue 任务ID
* @return bool
*/
public function enqueue($task_id)
{
if ($this->max > 0) {
if (count($this->queue) >= $this->max) {
$this->error = '队列长度达到上限';
return false;
}
}
$this->queue[] = [
'task_id' => $task_id,
// 上次处理时间
'last_processing_time' => time(),
];
Cache::set($this->name, $this->queue);
return true;
}
/**
* 出队
*
* @param string $task_id 任务ID默认当前任务ID
* @return bool
*/
public function dequeue($task_id = null)
{
$task_id = $task_id ?: $this->task_id;
$index = $this->getIndex($task_id);
if ($index === false) {
return false;
}
unset($this->queue[$index]);
$this->queue = array_values($this->queue);
Cache::set($this->name, $this->queue);
return true;
}
/**
* 当前任务是否可执行
*
* @param string $task_id 任务ID默认当前任务ID
* @return bool
*/
public function canExec($task_id = null)
{
$task_id = $task_id ?: $this->task_id;
$index = $this->getIndex($task_id);
if ($index === false) {
return false;
}
$res = false;
for ($i = 0; $i < $this->max_exec; $i++) {
if (!isset($this->queue[$i])) {
break;
}
// 判断任务处理是否超时
$temp_time = $this->queue[$i]['last_processing_time'] + $this->overtime;
if (time() > $temp_time) {
$this->dequeue($this->queue[$i]['task_id']);
$i--;
$index--;
continue;
}
if ($index == $i) {
$res = true;
$this->queue[$i]['last_processing_time'] = time();
Cache::set($this->name, $this->queue);
break;
}
}
if (!$res) {
$this->error = '服务器繁忙,排队中...' . ($index - $this->max_exec);
}
return $res;
}
/**
* 获取 [当前] 任务所在队列索引
*
* @param string $task_id 任务ID默认当前任务ID
* @return mixed bool | int
*/
public function getIndex($task_id = null)
{
$task_id = $task_id ?: $this->task_id;
$index = array_search($task_id, array_column($this->queue, 'task_id'));
if ($index === false) {
$this->error = '队列中不存在该任务';
}
return $index;
}
/**
*
*/
/**
* 任务缓存数据
*
* @param string $key 缓存键
* @param mixed $val 缓存值,不传改参数为获取值
* @return mixed|void
* @author Ymob
* @datetime 2019-10-22 13:52:19
*/
public function cache($key, $val = null)
{
if (!$this->task_id) {
$this->error = '任务ID不存在';
return false;
}
$index = $this->getIndex($this->task_id);
if ($val === null) {
return $this->queue[$index]['cache'][$key];
} else {
$this->queue[$index]['cache'][$key] = $val;
Cache::set($this->name, $this->queue);
}
}
}