推荐:《PHP视频教程》
前言
模拟supervisor进程管理DEMO(简易实现)
没错,是造轮子!目的在于学习!
截图:
php入门到就业线上直播课:进入学习
Apipost = Postman + Swagger + Mock + Jmeter 超好用的API调试工具:点击使用
在图中自己实现了一个
Copy
子进程的功能。如果用在AMQP增减消费者时,我觉得应该会很有用。
实现
1、在主进程循环内启动子进程执行命令
2、在web输入 127.0.0.1:7865 获取子进程状态
3、socket接收请求消息,并且执行相应操作,返回web页面
4、回收子进程,防止称为僵尸进程
不足:无法持续监听错误页面。由于socket得到的响应是通过include
函数加载的,所以在加载的页面内不能出现tail -f
命令,否则stream就会掉入了死循环了~。我想应该有方案解决(写了socket+多进程模式,模仿fpm在接收到请求之后就启动一个子进程去处理的模式,但是执行有问题。因此将代码贴出来希望得到大家的指点)。
延伸:由于对进程可以很好的管理(期望如此),那么就可以定制化自己的一些需求,比如:(1)定制AMQP的消费者进程管理服务。(2)模拟crontab定时服务。
知识点
代码实现的过程中,有很多的细节是值得学习的。
1、在while()循环中,启用了stream的非阻塞模式。所以不能在循环中使用sleep(1)
,而是用stream_select($read, $write, $except, 1)
让stream内部阻塞。
关于阻塞非阻塞模式,可以参阅这里
2、能够执行外部程序的函数很多,但是都稍有不同。这里采用的是proc_open
,是一个很强大的函数。在这之前我曾用pcntl_exec
执行过外部程序,但是需要先pcntl_fork
。而用其他的如exec
,shell_exec
无法对子进程进行管理。
3、重启或停止等操作子进程时,只是先更改主进程中该子进程在内存中的的状态,并不是真正的对子进程操作。在统一处init()
处理子进程。如此才能防止因为子进程启动时的上下文导致的一些怪异的现象。
代码
由于代码过多,所以如果你对我的方案有更好的建议可以在github这里看。
主进程代码:Process.php
<?phprequire_once __DIR__ . '/Consumer.php';require_once __DIR__ . '/StreamConnection.php';require_once __DIR__ . '/Http.php';class Process{/*** 待启动的消费者数组 */protected $consumers = array();protected $childPids = array();const PPID_FILE = __DIR__ . '/process';protected $serializerConsumer;public function __construct(){$this->consumers = $this->getConsumers();}// 这里是个DEMO,实际可以用读取配置文件的方式。public function getConsumers(){$consumer = new Consumer(['program' => 'test','command' => '/usr/bin/php test.php','directory' => __DIR__,'logfile' => __DIR__ . '/test.log','uniqid' => uniqid(),'auto_restart' => false,]);return [$consumer->uniqid => $consumer,];}public function run(){if (empty($this->consumers)) {// consumer emptyreturn;}if ($this->_notifyMaster()) {// master alivereturn;}$pid = pcntl_fork();if ($pid < 0) {exit;} elseif ($pid > 0) {exit;}if (!posix_setsid()) {exit;}$stream = new StreamConnection('tcp://0.0.0.0:7865');@cli_set_process_title('AMQP Master Process');// 将主进程ID写入文件file_put_contents(self::PPID_FILE, getmypid());// master进程继续while (true) {$this->init();pcntl_signal_dispatch();$this->waitpid();// 如果子进程被全部回收,则主进程退出// if (empty($this->childPids)) {// $stream->close($stream->getSocket());// break;// }$stream->accept(function ($uniqid, $action) {$this->handle($uniqid, $action);return $this->display();});}}protected function init(){foreach ($this->consumers as &$c) {switch ($c->state) {case Consumer::RUNNING:case Consumer::STOP:break;case Consumer::NOMINAL:case Consumer::STARTING:$this->fork($c);break;case Consumer::STOPING:if ($c->pid && posix_kill($c->pid, SIGTERM)) {$this->reset($c, Consumer::STOP);}break;case Consumer::RESTART:if (empty($c->pid)) {$this->fork($c);break;}if (posix_kill($c->pid, SIGTERM)) {$this->reset($c, Consumer::STOP);$this->fork($c);}break;default:break;}}}protected function reset(Consumer $c, $state){$c->pid = '';$c->uptime = '';$c->state = $state;$c->process = null;}protected function waitpid(){foreach ($this->childPids as $uniqid => $pid) {$result = pcntl_waitpid($pid, $status, WNOHANG);if ($result == $pid || $result == -1) {unset($this->childPids[$uniqid]);$c = &$this->consumers[$uniqid];$state = pcntl_wifexited($status) ? Consumer::EXITED : Consumer::STOP;$this->reset($c, $state);}}}/** * 父进程存活情况下,只会通知父进程信息,否则可能产生多个守护进程 */private function _notifyMaster(){$ppid = file_get_contents(self::PPID_FILE );$isAlive = $this->checkProcessAlive($ppid);if (!$isAlive) return false;return true;}public function checkProcessAlive($pid){if (empty($pid)) return false;$pidinfo = `ps co pid {$pid} | xargs`;$pidinfo = trim($pidinfo);$pattern = "/.*?PID.*?(\d+).*?/";preg_match($pattern, $pidinfo, $matches);return empty($matches) ? false : ($matches[1] == $pid ? true : false);}/** * fork一个新的子进程 */protected function fork(Consumer $c){$descriptorspec = [2 => ['file', $c->logfile, 'a'],];$process = proc_open('exec ' . $c->command, $descriptorspec, $pipes, $c->directory);if ($process) {$ret = proc_get_status($process);if ($ret['running']) {$c->state = Consumer::RUNNING;$c->pid = $ret['pid'];$c->process = $process;$c->uptime = date('m-d H:i');$this->childPids[$c->uniqid] = $ret['pid'];} else {$c->state = Consumer::EXITED;proc_close($process);}} else {$c->state = Consumer::ERROR;}return $c;}public function display(){$location = 'http://127.0.0.1:7865';$basePath = Http::$basePath;$scriptName = isset($_SERVER['SCRIPT_NAME']) &&!empty($_SERVER['SCRIPT_NAME']) &&$_SERVER['SCRIPT_NAME'] != '/' ? $_SERVER['SCRIPT_NAME'] : '/index.php';if ($scriptName == '/index.html') {return Http::status_301($location);}$sourcePath = $basePath . $scriptName;if (!is_file($sourcePath)) {return Http::status_404();}ob_start();include $sourcePath;$response = ob_get_contents();ob_clean();return Http::status_200($response);}public function handle($uniqid, $action){if (!empty($uniqid) && !isset($this->consumers[$uniqid])) {return;}switch ($action) {case 'refresh':break;case 'restartall':$this->killall(true);break;case 'stopall':$this->killall();break;case 'stop':$c = &$this->consumers[$uniqid];if ($c->state != Consumer::RUNNING) break;$c->state = Consumer::STOPING;break;case 'start':$c = &$this->consumers[$uniqid];if ($c->state == Consumer::RUNNING) break;$c->state = Consumer::STARTING;break;case 'restart':$c = &$this->consumers[$uniqid];$c->state = Consumer::RESTART;break;case 'copy':$c = $this->consumers[$uniqid];$newC = clone $c;$newC->uniqid = uniqid('C');$newC->state = Consumer::NOMINAL;$newC->pid = '';$this->consumers[$newC->uniqid] = $newC;break;default:break;}}protected function killall($restart = false){foreach ($this->consumers as &$c) {$c->state = $restart ? Consumer::RESTART : Consumer::STOPING;}}}$cli = new Process();$cli->run();
登录后复制
Consumer消费者对象
<?phprequire_once __DIR__ . '/BaseObject.php';class Consumer extends BaseObject{/** 开启多少个消费者 */public $numprocs = 1;/** 当前配置的唯一标志 */public $program;/** 执行的命令 */public $command;/** 当前工作的目录 */public $directory;/** 通过 $qos $queueName $duplicate 生成的 $queue */public $queue;/** 程序执行日志记录 */public $logfile = '';/** 消费进程的唯一ID */public $uniqid;/** 进程IDpid */public $pid;/** 进程状态 */public $state = self::NOMINAL;/** 自启动 */public $auto_restart = false;public $process;/** 启动时间 */public $uptime;const RUNNING = 'running';const STOP = 'stoped';const NOMINAL = 'nominal';const RESTART = 'restart';const STOPING = 'stoping';const STARTING = 'stating';const ERROR = 'error';const BLOCKED = 'blocked';const EXITED = 'exited';const FATEL = 'fatel';}
登录后复制
stream相关代码:StreamConnection.php
<?phpclass StreamConnection{protected $socket;protected $timeout = 2; //sprotected $client;public function __construct($host){$this->socket = $this->connect($host);}public function connect($host){$socket = stream_socket_server($host, $errno, $errstr);if (!$socket) {exit('stream error');}stream_set_timeout($socket, $this->timeout);stream_set_chunk_size($socket, 1024);stream_set_blocking($socket, false);$this->client = [$socket];return $socket;}public function accept(Closure $callback){$read = $this->client;if (stream_select($read, $write, $except, 1) < 1) return;if (in_array($this->socket, $read)) {$cs = stream_socket_accept($this->socket);$this->client[] = $cs;}foreach ($read as $s) {if ($s == $this->socket) continue;$header = fread($s, 1024);if (empty($header)) {$index = array_search($s, $this->client);if ($index)unset($this->client[$index]);$this->close($s);continue;}Http::parse_http($header);$uniqid = isset($_GET['uniqid']) ? $_GET['uniqid'] : '';$action = isset($_GET['action']) ? $_GET['action'] : '';$response = $callback($uniqid, $action);$this->write($s, $response);$index = array_search($s, $this->client);if ($index)unset($this->client[$index]);$this->close($s);}}public function write($socket, $response){$ret = fwrite($socket, $response, strlen($response));}public function close($socket){$flag = fclose($socket);}public function getSocket(){return $this->socket;}}
登录后复制
Http响应代码:Http.php
<?phpclass Http{public static $basePath = __DIR__ . '/views';public static $max_age = 120; //秒/**函数: parse_http*描述: 解析http协议*/public static function parse_http($http){// 初始化$_POST = $_GET = $_COOKIE = $_REQUEST = $_SESSION = $_FILES =array();$GLOBALS['HTTP_RAW_POST_DATA'] = '';// 需要设置的变量名$_SERVER = array('QUERY_STRING' => '','REQUEST_METHOD' => '','REQUEST_URI' => '','SERVER_PROTOCOL' => '','SERVER_SOFTWARE' => '','SERVER_NAME' => '','HTTP_HOST' => '','HTTP_USER_AGENT' => '','HTTP_ACCEPT' => '','HTTP_ACCEPT_LANGUAGE' => '','HTTP_ACCEPT_ENCODING' => '','HTTP_COOKIE' => '','HTTP_CONNECTION' => '','REMOTE_ADDR' => '','REMOTE_PORT' => '0','SCRIPT_NAME' => '','HTTP_REFERER' => '','CONTENT_TYPE' => '','HTTP_IF_NONE_MATCH' => '',);// 将header分割成数组list($http_header, $http_body) = explode("\r\n\r\n", $http, 2);$header_data = explode("\r\n", $http_header);list($_SERVER['REQUEST_METHOD'], $_SERVER['REQUEST_URI'], $_SERVER['SERVER_PROTOCOL']) = explode(' ', $header_data[0]);unset($header_data[0]);foreach ($header_data as $content) {// \r\n\r\nif (empty($content)) {continue;}list($key, $value) = explode(':', $content, 2);$key = strtolower($key);$value = trim($value);switch ($key) {case 'host':$_SERVER['HTTP_HOST'] = $value;$tmp = explode(':', $value);$_SERVER['SERVER_NAME'] = $tmp[0];if (isset($tmp[1])) {$_SERVER['SERVER_PORT'] = $tmp[1];}break;case 'cookie':$_SERVER['HTTP_COOKIE'] = $value;parse_str(str_replace('; ', '&', $_SERVER['HTTP_COOKIE']), $_COOKIE);break;case 'user-agent':$_SERVER['HTTP_USER_AGENT'] = $value;break;case 'accept':$_SERVER['HTTP_ACCEPT'] = $value;break;case 'accept-language':$_SERVER['HTTP_ACCEPT_LANGUAGE'] = $value;break;case 'accept-encoding':$_SERVER['HTTP_ACCEPT_ENCODING'] = $value;break;case 'connection':$_SERVER['HTTP_CONNECTION'] = $value;break;case 'referer':$_SERVER['HTTP_REFERER'] = $value;break;case 'if-modified-since':$_SERVER['HTTP_IF_MODIFIED_SINCE'] = $value;break;case 'if-none-match':$_SERVER['HTTP_IF_NONE_MATCH'] = $value;break;case 'content-type':if (!preg_match('/boundary="?(\S+)"?/', $value, $match)) {$_SERVER['CONTENT_TYPE'] = $value;} else {$_SERVER['CONTENT_TYPE'] = 'multipart/form-data';$http_post_boundary = '--' . $match[1];}break;}}// script_name$_SERVER['SCRIPT_NAME'] = parse_url($_SERVER['REQUEST_URI'], PHP_URL_PATH);// QUERY_STRING$_SERVER['QUERY_STRING'] = parse_url($_SERVER['REQUEST_URI'], PHP_URL_QUERY);if ($_SERVER['QUERY_STRING']) {// $GETparse_str($_SERVER['QUERY_STRING'], $_GET);} else {$_SERVER['QUERY_STRING'] = '';}// REQUEST$_REQUEST = array_merge($_GET, $_POST);return array('get' => $_GET, 'post' => $_POST, 'cookie' => $_COOKIE, 'server' => $_SERVER, 'files' => $_FILES);}public static function status_404(){return <<<EOFHTTP/1.1 404 OKcontent-type: text/htmlEOF;}public static function status_301($location){return <<<EOFHTTP/1.1 301 Moved PermanentlyContent-Length: 0Content-Type: text/plainLocation: $locationCache-Control: no-cacheEOF;}public static function status_304(){return <<<EOFHTTP/1.1 304 Not ModifiedContent-Length: 0EOF;}public static function status_200($response){$contentType = $_SERVER['CONTENT_TYPE'];$length = strlen($response);$header = '';if ($contentType)$header = 'Cache-Control: max-age=180';return <<<EOFHTTP/1.1 200 OKContent-Type: $contentTypeContent-Length: $length$header$responseEOF;}}
登录后复制
待执行的脚本:test.php
<?phpwhile(true) {file_put_contents(__DIR__.'/test.log', date('Y-m-d H:i:s'));sleep(1);}
登录后复制
在当前目录下的视图页面:
|- Process.php
|- Http.php
|- StreamConnection.php
|- Consumer.php
|- BaseObject.php
|- views/
更多编程相关知识,请访问:编程教学!!
以上就是PHP模拟supervisor的进程管理的详细内容,更多请关注php中文网其它相关文章!