You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
339 lines
9.8 KiB
339 lines
9.8 KiB
<?php
|
|
/**
|
|
* This file is part of workerman.
|
|
*
|
|
* Licensed under The MIT License
|
|
* For full copyright and license information, please see the MIT-LICENSE.txt
|
|
* Redistributions of files must retain the above copyright notice.
|
|
*
|
|
* @author walkor<walkor@workerman.net>
|
|
* @copyright walkor<walkor@workerman.net>
|
|
* @link http://www.workerman.net/
|
|
* @license http://www.opensource.org/licenses/mit-license.php MIT License
|
|
*/
|
|
namespace Workerman\Events;
|
|
|
|
/**
|
|
* select eventloop
|
|
*/
|
|
class Select implements EventInterface
|
|
{
|
|
/**
|
|
* All listeners for read/write event.
|
|
*
|
|
* @var array
|
|
*/
|
|
public $_allEvents = array();
|
|
|
|
/**
|
|
* Event listeners of signal.
|
|
*
|
|
* @var array
|
|
*/
|
|
public $_signalEvents = array();
|
|
|
|
/**
|
|
* Fds waiting for read event.
|
|
*
|
|
* @var array
|
|
*/
|
|
protected $_readFds = array();
|
|
|
|
/**
|
|
* Fds waiting for write event.
|
|
*
|
|
* @var array
|
|
*/
|
|
protected $_writeFds = array();
|
|
|
|
/**
|
|
* Fds waiting for except event.
|
|
*
|
|
* @var array
|
|
*/
|
|
protected $_exceptFds = array();
|
|
|
|
/**
|
|
* Timer scheduler.
|
|
* {['data':timer_id, 'priority':run_timestamp], ..}
|
|
*
|
|
* @var \SplPriorityQueue
|
|
*/
|
|
protected $_scheduler = null;
|
|
|
|
/**
|
|
* All timer event listeners.
|
|
* [[func, args, flag, timer_interval], ..]
|
|
*
|
|
* @var array
|
|
*/
|
|
protected $_eventTimer = array();
|
|
|
|
/**
|
|
* Timer id.
|
|
*
|
|
* @var int
|
|
*/
|
|
protected $_timerId = 1;
|
|
|
|
/**
|
|
* Select timeout.
|
|
*
|
|
* @var int
|
|
*/
|
|
protected $_selectTimeout = 100000000;
|
|
|
|
/**
|
|
* Paired socket channels
|
|
*
|
|
* @var array
|
|
*/
|
|
protected $channel = array();
|
|
|
|
/**
|
|
* Construct.
|
|
*/
|
|
public function __construct()
|
|
{
|
|
// Init SplPriorityQueue.
|
|
$this->_scheduler = new \SplPriorityQueue();
|
|
$this->_scheduler->setExtractFlags(\SplPriorityQueue::EXTR_BOTH);
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function add($fd, $flag, $func, $args = array())
|
|
{
|
|
switch ($flag) {
|
|
case self::EV_READ:
|
|
case self::EV_WRITE:
|
|
$count = $flag === self::EV_READ ? \count($this->_readFds) : \count($this->_writeFds);
|
|
if ($count >= 1024) {
|
|
echo "Warning: system call select exceeded the maximum number of connections 1024, please install event/libevent extension for more connections.\n";
|
|
} else if (\DIRECTORY_SEPARATOR !== '/' && $count >= 256) {
|
|
echo "Warning: system call select exceeded the maximum number of connections 256.\n";
|
|
}
|
|
$fd_key = (int)$fd;
|
|
$this->_allEvents[$fd_key][$flag] = array($func, $fd);
|
|
if ($flag === self::EV_READ) {
|
|
$this->_readFds[$fd_key] = $fd;
|
|
} else {
|
|
$this->_writeFds[$fd_key] = $fd;
|
|
}
|
|
break;
|
|
case self::EV_EXCEPT:
|
|
$fd_key = (int)$fd;
|
|
$this->_allEvents[$fd_key][$flag] = array($func, $fd);
|
|
$this->_exceptFds[$fd_key] = $fd;
|
|
break;
|
|
case self::EV_SIGNAL:
|
|
// Windows not support signal.
|
|
if(\DIRECTORY_SEPARATOR !== '/') {
|
|
return false;
|
|
}
|
|
$fd_key = (int)$fd;
|
|
$this->_signalEvents[$fd_key][$flag] = array($func, $fd);
|
|
\pcntl_signal($fd, array($this, 'signalHandler'));
|
|
break;
|
|
case self::EV_TIMER:
|
|
case self::EV_TIMER_ONCE:
|
|
$timer_id = $this->_timerId++;
|
|
$run_time = \microtime(true) + $fd;
|
|
$this->_scheduler->insert($timer_id, -$run_time);
|
|
$this->_eventTimer[$timer_id] = array($func, (array)$args, $flag, $fd);
|
|
$select_timeout = ($run_time - \microtime(true)) * 1000000;
|
|
if( $this->_selectTimeout > $select_timeout ){
|
|
$this->_selectTimeout = $select_timeout;
|
|
}
|
|
return $timer_id;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* Signal handler.
|
|
*
|
|
* @param int $signal
|
|
*/
|
|
public function signalHandler($signal)
|
|
{
|
|
\call_user_func_array($this->_signalEvents[$signal][self::EV_SIGNAL][0], array($signal));
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function del($fd, $flag)
|
|
{
|
|
$fd_key = (int)$fd;
|
|
switch ($flag) {
|
|
case self::EV_READ:
|
|
unset($this->_allEvents[$fd_key][$flag], $this->_readFds[$fd_key]);
|
|
if (empty($this->_allEvents[$fd_key])) {
|
|
unset($this->_allEvents[$fd_key]);
|
|
}
|
|
return true;
|
|
case self::EV_WRITE:
|
|
unset($this->_allEvents[$fd_key][$flag], $this->_writeFds[$fd_key]);
|
|
if (empty($this->_allEvents[$fd_key])) {
|
|
unset($this->_allEvents[$fd_key]);
|
|
}
|
|
return true;
|
|
case self::EV_EXCEPT:
|
|
unset($this->_allEvents[$fd_key][$flag], $this->_exceptFds[$fd_key]);
|
|
if(empty($this->_allEvents[$fd_key]))
|
|
{
|
|
unset($this->_allEvents[$fd_key]);
|
|
}
|
|
return true;
|
|
case self::EV_SIGNAL:
|
|
if(\DIRECTORY_SEPARATOR !== '/') {
|
|
return false;
|
|
}
|
|
unset($this->_signalEvents[$fd_key]);
|
|
\pcntl_signal($fd, SIG_IGN);
|
|
break;
|
|
case self::EV_TIMER:
|
|
case self::EV_TIMER_ONCE;
|
|
unset($this->_eventTimer[$fd_key]);
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
/**
|
|
* Tick for timer.
|
|
*
|
|
* @return void
|
|
*/
|
|
protected function tick()
|
|
{
|
|
while (!$this->_scheduler->isEmpty()) {
|
|
$scheduler_data = $this->_scheduler->top();
|
|
$timer_id = $scheduler_data['data'];
|
|
$next_run_time = -$scheduler_data['priority'];
|
|
$time_now = \microtime(true);
|
|
$this->_selectTimeout = ($next_run_time - $time_now) * 1000000;
|
|
if ($this->_selectTimeout <= 0) {
|
|
$this->_scheduler->extract();
|
|
|
|
if (!isset($this->_eventTimer[$timer_id])) {
|
|
continue;
|
|
}
|
|
|
|
// [func, args, flag, timer_interval]
|
|
$task_data = $this->_eventTimer[$timer_id];
|
|
if ($task_data[2] === self::EV_TIMER) {
|
|
$next_run_time = $time_now + $task_data[3];
|
|
$this->_scheduler->insert($timer_id, -$next_run_time);
|
|
}
|
|
\call_user_func_array($task_data[0], $task_data[1]);
|
|
if (isset($this->_eventTimer[$timer_id]) && $task_data[2] === self::EV_TIMER_ONCE) {
|
|
$this->del($timer_id, self::EV_TIMER_ONCE);
|
|
}
|
|
continue;
|
|
}
|
|
return;
|
|
}
|
|
$this->_selectTimeout = 100000000;
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function clearAllTimer()
|
|
{
|
|
$this->_scheduler = new \SplPriorityQueue();
|
|
$this->_scheduler->setExtractFlags(\SplPriorityQueue::EXTR_BOTH);
|
|
$this->_eventTimer = array();
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function loop()
|
|
{
|
|
while (1) {
|
|
if(\DIRECTORY_SEPARATOR === '/') {
|
|
// Calls signal handlers for pending signals
|
|
\pcntl_signal_dispatch();
|
|
}
|
|
|
|
$read = $this->_readFds;
|
|
$write = $this->_writeFds;
|
|
$except = $this->_exceptFds;
|
|
|
|
if ($read || $write || $except) {
|
|
// Waiting read/write/signal/timeout events.
|
|
set_error_handler(function(){});
|
|
$ret = stream_select($read, $write, $except, 0, $this->_selectTimeout);
|
|
restore_error_handler();
|
|
} else {
|
|
usleep($this->_selectTimeout);
|
|
$ret = false;
|
|
}
|
|
|
|
|
|
if (!$this->_scheduler->isEmpty()) {
|
|
$this->tick();
|
|
}
|
|
|
|
if (!$ret) {
|
|
continue;
|
|
}
|
|
|
|
if ($read) {
|
|
foreach ($read as $fd) {
|
|
$fd_key = (int)$fd;
|
|
if (isset($this->_allEvents[$fd_key][self::EV_READ])) {
|
|
\call_user_func_array($this->_allEvents[$fd_key][self::EV_READ][0],
|
|
array($this->_allEvents[$fd_key][self::EV_READ][1]));
|
|
}
|
|
}
|
|
}
|
|
|
|
if ($write) {
|
|
foreach ($write as $fd) {
|
|
$fd_key = (int)$fd;
|
|
if (isset($this->_allEvents[$fd_key][self::EV_WRITE])) {
|
|
\call_user_func_array($this->_allEvents[$fd_key][self::EV_WRITE][0],
|
|
array($this->_allEvents[$fd_key][self::EV_WRITE][1]));
|
|
}
|
|
}
|
|
}
|
|
|
|
if($except) {
|
|
foreach($except as $fd) {
|
|
$fd_key = (int) $fd;
|
|
if(isset($this->_allEvents[$fd_key][self::EV_EXCEPT])) {
|
|
\call_user_func_array($this->_allEvents[$fd_key][self::EV_EXCEPT][0],
|
|
array($this->_allEvents[$fd_key][self::EV_EXCEPT][1]));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Destroy loop.
|
|
*
|
|
* @return void
|
|
*/
|
|
public function destroy()
|
|
{
|
|
|
|
}
|
|
|
|
/**
|
|
* Get timer count.
|
|
*
|
|
* @return integer
|
|
*/
|
|
public function getTimerCount()
|
|
{
|
|
return \count($this->_eventTimer);
|
|
}
|
|
}
|