适用场景
此服务适用于以下场景:
- 客户端交给服务器端一项工作(work)来执行
- 这项工作可以被拆分成若干个子任务(task),子任务之间互不干扰,可以并行处理
- 服务器端处理完所有的子任务后,把结果汇总,返回给客户端,完成本次工作
比如:同时从多个OTA获取酒店报价,同时抓取多个网页等。
此服务目前在140和199两台服务器上运行,用HAProxy实现的负载均衡双机热备(感谢兴国提供支持),目前每台服务器上跑8个work进程和100个task_worker进程。
本服务当前主要用于酒店询价(正在逐步向此服务迁移),目前每天处理40万连接请求,执行80万个子任务,预计迁移工作全部完成后,每天处理100万次以上请求,服务上线运行1个月,表现稳定。
酒店列表页最低价接口迁移到此服务上后,请求时间小于2秒的请求由80%提高到89%,请求时间大于5秒的请求由7%降低到2.5%,还有进一步优化空间。
实现接口
如果想使用此服务,只需实现一个interface的三个方法,这个interface是:hotel\youyu\swooleTaskInterface
接口定义:
<?php
namespace hotel\youyu;
interface swooleTaskInterface {
//获取所有子任务的参数
public static function getAllTaskParams($params);
//执行子任务,返回子任务执行结果
public static function task($task_id, $from_id, $params);
//所有子任务执行完毕后的回调方法,数据返回给客户端
public static function workFinish($params, $work_data);
}
?>
举个例子,下面这个类可以异步获取一个酒店所有OTA的最低价(为了便于理解简化了代码)
<?php
namespace hotel\youyu\swooleTask;
class hotelLowestPriceAllOta implements \hotel\youyu\swooleTaskInterface {
public static function getAllTaskParams($params) {
$otas = \Ko_Tool_Singleton::OInstance('KHotel_YouYuOtaReferApi')->getHotelOtaList($params['hotel_id']);
$info = array();
foreach ($otas as $ota_id) {
$info[] = array_merge($params, array('ota_id' => $ota_id));
}
return $info;
}
public static function task($task_id, $from_id, $info) {
$params = $info['params'];
$low_price = \KHotel_YouYuApi::getLowestPrice($params['hotel_id'], $params['check_in'],
$params['check_out'], $params['ota_id']);
//echo "onTask \t fd:{$info['fd']}\t from_id:$from_id\t task_id:$task_id\t task_count:{$info['task_count']}\n";
return $low_price;
}
public static function workFinish($params, $data) {
usort($data, function($a, $b) {
if ($a['price'] === false && $b > 0) {
return 1;
}
if ($b['price'] === false && $a > 0) {
return -1;
}
if ($a['price'] === $b['price']) {
return 0;
}
return $a['price'] < $b['price'] ? -1 : 1;
});
return array_values($data);
}
}
?>
服务器端
服务器端代码在 /www_mfw/app/youyu/youyu_swoole_server.php
服务器端代码很简单:
<?php
KCmd_lock::lock();
class Server
{
public $serv;
private $package_eof = "\r\n";
private $data = null;
function onReceive($serv, $fd, $from_id, $params) {
echo "onReceive $fd\n";
$_params = json_decode($params, true);
echo($params);
echo "\n";
$class_name = $_params['task_class'];
if (class_exists($class_name)) {
$all_task_params = $class_name::getAllTaskParams($_params['data']);
if (is_array($all_task_params) && count($all_task_params) > 0) {
foreach ($all_task_params as $task_params) {
$info = array(
'class_name' => $class_name,
'params' => $task_params,
'task_count' => count($all_task_params),
'fd' => $fd
);
$this->serv->task($info);
}
} else {
$this->serv->send($fd, json_encode(array('data'=>array())).$this->package_eof);
}
} else {
echo "class $class_name no exists\n";
$this->serv->send($fd, json_encode(array('data'=>array())).$this->package_eof);
}
}
function onTask($serv, $task_id, $from_id, $info) {
$data = $info['class_name']::task($task_id, $from_id, $info);
$serv->finish(array_merge($info, array('from_id' => $from_id, 'data'=>$data)));
}
function onTaskFinish($serv, $task_id, $info) {
$key = $info['fd'] . '_' . $info['from_id'];
$this->data[$key][$task_id] = $info['data'];
//echo "onFinish\tfd:{$info['fd']}\tfrom_id:{$info['from_id']}\t"
//. "task_id:$task_id\ttask_count:" . count($this->data[$key]) . "-{$info['task_count']}\n";
if (count($this->data[$key]) == $info['task_count']) {
$return = $info['class_name']::workFinish($info['params'], $this->data[$key]);
$serv->send($info['fd'], json_encode($return) . $this->package_eof);
unset($this->data[$key]);
echo "clear memory\n\n";
}
}
function onWorkStart($serv, $worker_id) {
if($worker_id >= $serv->setting['worker_num']) {
swoole_set_process_name("youyu_swoole_server.php:task_worker");
} else {
swoole_set_process_name("youyu_swoole_server.php:worker");
}
}
function run() {
$serv = new swoole_server("0.0.0.0", 9501, SWOOLE_PROCESS, SWOOLE_SOCK_TCP);
$this->serv = $serv;
$this->serv->set(array(
'worker_num' => 8,
'task_worker_num' => 100,
'open_eof_split' => true,
'package_eof' => $this->package_eof,
));
$serv->on('connect', function ($serv, $fd) {
echo "Client: Connect $fd.\n";
});
$serv->on('start', function() {
echo "server start...\n\n";
});
$this->serv->on('workerStart', array($this, 'onWorkStart'));
$this->serv->on('receive', array($this, 'onReceive'));
$serv->on('close', function ($serv, $fd) {
echo "Client: Close $fd.\n\n";
});
$this->serv->on('task', array($this, 'onTask'));
$this->serv->on('finish', array($this, 'onTaskFinish'));
$this->serv->start();
}
}
$server= new Server;
$server->run();
?>
客户端
swoole客户端虽然支持异步连接,但只能用于cli模式,如果想在fast-cgi模式也实现并行处理,可以使用while + swoole_client_select函数实现, 举例(为了便于理解简化了代码):
<?php
//同时获取一组酒店所有OTA的最低价
$clients = array();
$list = Ko_Tool_Singleton::OInstance('KHotel_Youyu_hotelInquiryHistoryApi')->getByIDs($ids);
foreach ($list as $one) {
$client = new KHotel_Youyu_swooleClient();
if ($client->connect()) {
$params = array(
//此处task_class必须填写实现自swooleTaskInterface接口的那个类的类名(包括namespace)
'task_class' => 'hotel\youyu\swooleTask\hotelLowestPriceAllOta',
//以下是传递的参数
'data' => array(
'check_in' => $one['check_in'],
'check_out' => $one['check_out'],
'hotel_id' => $one['hotel_id']
)
);
$client->send($params);
$clients[$client->sock] = $client;
};
}
while (!empty($clients)) {
$write = $error = array();
$read = array_values($clients);
$n = swoole_client_select($read, $write, $error, 1);
if ($n > 0) {
foreach ($read as $index => $c) {
if ($c) {
$data = $c->recv();
var_dump($data);
unset($clients[$c->sock]);
}
}
}
}
?>
其中KHotel_Youyu_swooleClient是我在swoole_client基础上简单封装的一个类,代码如下:
<?php
class KHotel_Youyu_swooleClient extends swoole_client {
//宏定义,本地IP和一个端口,由HAProxy代理到真正的服务器IP和端口
private $ip = LOCAL_IP;
private $port = YOUYU_TASK_PORT;
private $package_eof = "\r\n";
public function __construct() {
parent::__construct(SWOOLE_SOCK_TCP, SWOOLE_SOCK_SYNC);
}
//如果不使用swoole_client_select,则需要把$timeout设置的长一些,不然会造成服务器端未来得及返回数据客户端就执行recv()了
//$test参数用于在同一台服务器上测试
public function connect($timeout = 0.1, $test = 0) {
parent::set(array(
'open_eof_split' => true,
'package_eof' => $this->package_eof,
));
$ip = $test ? '127.0.0.1' : $this->ip;
$port = $test ? 9501 : $this->port;
return parent::connect($ip, $port , $timeout);
}
public function errorInfo() {
return "Error: {" . socket_strerror(parent::errCode) . "[" . parent::errCode . "]\n";
}
//数据传输使用json格式,自动解析
public function send($data) {
parent::send(json_encode($data)."\r\n");
}
public function recv() {
return json_decode(parent::recv(), true);
}
}
?>
目前的不足
- 每一个task任务获取的数据都是保存在内存中,如果数据量很大的话,可能造成内存溢出,应该考虑把存储这块抽象出来,可以保存在临时表中或其他什么地方
- 缺少进程管理模块监控服务健康状况