此服务适用于以下场景:
比如:同时从多个OTA获取酒店报价,同时抓取多个网页等。
此服务目前在140和199两台服务器上运行,用HAProxy实现的负载均衡双机热备(感谢兴国提供支持),目前每台服务器上跑8个work进程和100个task_worker进程。
本服务当前主要用于酒店询价(正在逐步向此服务迁移),目前每天处理40万连接请求,执行80万个子任务,预计迁移工作全部完成后,每天处理100万次以上请求,服务上线运行1个月,表现稳定。
酒店列表页最低价接口迁移到此服务上后,请求时间小于2秒的请求由80%提高到89%,请求时间大于5秒的请求由7%降低到2.5%,还有进一步优化空间。
如果想使用此服务,只需实现一个interface的三个方法,这个interface是:hotel\youyu\swooleTaskInterface
接口定义:
<?php namespace hotel\youyu; interface swooleTaskInterface { //获取所有子任务的参数 public static function getAllTaskParams($params); //执行子任务,返回子任务执行结果 public static function task($task_id, $from_id, $params); //所有子任务执行完毕后的回调方法,数据返回给客户端 public static function workFinish($params, $work_data); } ?>
举个例子,下面这个类可以异步获取一个酒店所有OTA的最低价(为了便于理解简化了代码)
<?php namespace hotel\youyu\swooleTask; class hotelLowestPriceAllOta implements \hotel\youyu\swooleTaskInterface { public static function getAllTaskParams($params) { $otas = \Ko_Tool_Singleton::OInstance('KHotel_YouYuOtaReferApi')->getHotelOtaList($params['hotel_id']); $info = array(); foreach ($otas as $ota_id) { $info[] = array_merge($params, array('ota_id' => $ota_id)); } return $info; } public static function task($task_id, $from_id, $info) { $params = $info['params']; $low_price = \KHotel_YouYuApi::getLowestPrice($params['hotel_id'], $params['check_in'], $params['check_out'], $params['ota_id']); //echo "onTask \t fd:{$info['fd']}\t from_id:$from_id\t task_id:$task_id\t task_count:{$info['task_count']}\n"; return $low_price; } public static function workFinish($params, $data) { usort($data, function($a, $b) { if ($a['price'] === false && $b > 0) { return 1; } if ($b['price'] === false && $a > 0) { return -1; } if ($a['price'] === $b['price']) { return 0; } return $a['price'] < $b['price'] ? -1 : 1; }); return array_values($data); } } ?>
服务器端代码在 /www_mfw/app/youyu/youyu_swoole_server.php
服务器端代码很简单:
<?php KCmd_lock::lock(); class Server { public $serv; private $package_eof = "\r\n"; private $data = null; function onReceive($serv, $fd, $from_id, $params) { echo "onReceive $fd\n"; $_params = json_decode($params, true); echo($params); echo "\n"; $class_name = $_params['task_class']; if (class_exists($class_name)) { $all_task_params = $class_name::getAllTaskParams($_params['data']); if (is_array($all_task_params) && count($all_task_params) > 0) { foreach ($all_task_params as $task_params) { $info = array( 'class_name' => $class_name, 'params' => $task_params, 'task_count' => count($all_task_params), 'fd' => $fd ); $this->serv->task($info); } } else { $this->serv->send($fd, json_encode(array('data'=>array())).$this->package_eof); } } else { echo "class $class_name no exists\n"; $this->serv->send($fd, json_encode(array('data'=>array())).$this->package_eof); } } function onTask($serv, $task_id, $from_id, $info) { $data = $info['class_name']::task($task_id, $from_id, $info); $serv->finish(array_merge($info, array('from_id' => $from_id, 'data'=>$data))); } function onTaskFinish($serv, $task_id, $info) { $key = $info['fd'] . '_' . $info['from_id']; $this->data[$key][$task_id] = $info['data']; //echo "onFinish\tfd:{$info['fd']}\tfrom_id:{$info['from_id']}\t" //. "task_id:$task_id\ttask_count:" . count($this->data[$key]) . "-{$info['task_count']}\n"; if (count($this->data[$key]) == $info['task_count']) { $return = $info['class_name']::workFinish($info['params'], $this->data[$key]); $serv->send($info['fd'], json_encode($return) . $this->package_eof); unset($this->data[$key]); echo "clear memory\n\n"; } } function onWorkStart($serv, $worker_id) { if($worker_id >= $serv->setting['worker_num']) { swoole_set_process_name("youyu_swoole_server.php:task_worker"); } else { swoole_set_process_name("youyu_swoole_server.php:worker"); } } function run() { $serv = new swoole_server("0.0.0.0", 9501, SWOOLE_PROCESS, SWOOLE_SOCK_TCP); $this->serv = $serv; $this->serv->set(array( 'worker_num' => 8, 'task_worker_num' => 100, 'open_eof_split' => true, 'package_eof' => $this->package_eof, )); $serv->on('connect', function ($serv, $fd) { echo "Client: Connect $fd.\n"; }); $serv->on('start', function() { echo "server start...\n\n"; }); $this->serv->on('workerStart', array($this, 'onWorkStart')); $this->serv->on('receive', array($this, 'onReceive')); $serv->on('close', function ($serv, $fd) { echo "Client: Close $fd.\n\n"; }); $this->serv->on('task', array($this, 'onTask')); $this->serv->on('finish', array($this, 'onTaskFinish')); $this->serv->start(); } } $server= new Server; $server->run(); ?>
swoole客户端虽然支持异步连接,但只能用于cli模式,如果想在fast-cgi模式也实现并行处理,可以使用while + swoole_client_select函数实现, 举例(为了便于理解简化了代码):
<?php //同时获取一组酒店所有OTA的最低价 $clients = array(); $list = Ko_Tool_Singleton::OInstance('KHotel_Youyu_hotelInquiryHistoryApi')->getByIDs($ids); foreach ($list as $one) { $client = new KHotel_Youyu_swooleClient(); if ($client->connect()) { $params = array( //此处task_class必须填写实现自swooleTaskInterface接口的那个类的类名(包括namespace) 'task_class' => 'hotel\youyu\swooleTask\hotelLowestPriceAllOta', //以下是传递的参数 'data' => array( 'check_in' => $one['check_in'], 'check_out' => $one['check_out'], 'hotel_id' => $one['hotel_id'] ) ); $client->send($params); $clients[$client->sock] = $client; }; } while (!empty($clients)) { $write = $error = array(); $read = array_values($clients); $n = swoole_client_select($read, $write, $error, 1); if ($n > 0) { foreach ($read as $index => $c) { if ($c) { $data = $c->recv(); var_dump($data); unset($clients[$c->sock]); } } } } ?>
其中KHotel_Youyu_swooleClient是我在swoole_client基础上简单封装的一个类,代码如下:
<?php class KHotel_Youyu_swooleClient extends swoole_client { //宏定义,本地IP和一个端口,由HAProxy代理到真正的服务器IP和端口 private $ip = LOCAL_IP; private $port = YOUYU_TASK_PORT; private $package_eof = "\r\n"; public function __construct() { parent::__construct(SWOOLE_SOCK_TCP, SWOOLE_SOCK_SYNC); } //如果不使用swoole_client_select,则需要把$timeout设置的长一些,不然会造成服务器端未来得及返回数据客户端就执行recv()了 //$test参数用于在同一台服务器上测试 public function connect($timeout = 0.1, $test = 0) { parent::set(array( 'open_eof_split' => true, 'package_eof' => $this->package_eof, )); $ip = $test ? '127.0.0.1' : $this->ip; $port = $test ? 9501 : $this->port; return parent::connect($ip, $port , $timeout); } public function errorInfo() { return "Error: {" . socket_strerror(parent::errCode) . "[" . parent::errCode . "]\n"; } //数据传输使用json格式,自动解析 public function send($data) { parent::send(json_encode($data)."\r\n"); } public function recv() { return json_decode(parent::recv(), true); } } ?>