最近用swoole实现了一个功能:异步非阻塞获取酒店OTA最低价。
场景是这样的:
一个酒店对应多个OTA(我们的供应商),我需要获取酒店每一个OTA在某一个入住/离店日期的最低报价。 当数据量很大时,比如目前1天需要获取20多万次酒店报价(这个数字还在增长),每一个酒店又有1-5个OTA(以后可能更多),这样每天需要调用上百万次获取最低价的接口,接口在没有缓存的情况下,使用http协议获取价格可能需要1-2秒,如果使用同步阻塞方式获取,所用时间会很长,甚至无法完成任务。
所以我选择用swoole来实现
程序流程大概是这样的:
客户端:
1.循环一组要获取价格的酒店ID
2.每次循环创建swoole客户端,向服务器send数据,维护一个clients数组,存放当前活动的客户端
3.使用while循环+swoole_client_select()方法监控从服务器端返回的数据,获取到一个客户端的数据就关闭相应客户端并从clients中删除此客户端
4.直到clients中元素个数为0,退出while循环
服务器端:
1.服务器端开启了8个worker进程和50个worker_task进程
2.一个客户端连接会占用一个worker进程,当onReceive事件被触发时,需要创建task进程,有几个OTA就创建几个task进程
3.onTask事件回调方法用来调用接口,获取酒店某个OTA的最低价
4.对于同一个客户端连接,服务器端必须要等待所有相关task进程都跑完以后,再统一返回数据给客户端,比如一个酒店有3个OTA,这3个OTA必须都获取到价格以后,才能返回给客户端,所以我用一个$data变量来存放同一个客户端不同task返回的数据
5.onTaskFinish事件回调中,把当前OTA最低价写入$data[$fd][$task_id]中
4.onTaskFinish事件回调中,判断同一个客户端的所有task都执行完成以后(count($data[$fd])===$ota_count),把数据send到客户端。
实际执行的效果还是不错的,已经稳定运行24小时,目前大概1秒钟可以获取20-30个酒店所有OTA的最低价,这个速度还可以提高,只要服务器内存够用,开启更多进程即可。
用swoole写代码和之前在PHP-fpm环境有一点最大的不同就是必须重视程序的容错性和健壮性,因为进程是常驻内存的,任何一个微小的bug都可能导致服务终止,还必须避免长期运行产生的内存泄露。
客户端伪代码
<?php
$clients = array();
$count = Ko_Tool_Singleton::OInstance('KHotel_Youyu_curHotelInquirySet')->getCount();
for ($i = 0; $i < $count; $i += 10) {
$history_ids = array();
//每次创建10个客户端
for ($j = 0; $j < 10; $j++) {
$history_ids[] = Ko_Tool_Singleton::OInstance('KHotel_Youyu_curHotelInquirySet')->pop();
}
$list = Ko_Tool_Singleton::OInstance('KHotel_Youyu_hotelInquiryHistoryApi')->getByIDs($history_ids);
foreach ($list as $one) {
$client = new KHotel_Youyu_swooleClient();
if (!$client->connect()) {
//error
} else {
$params = array(
'action' => 'getHotelLowestPriceAllOta',
'data' => array(
'iq_id' => $one['id'],
'check_in' => $one['check_in'],
'check_out' => $one['check_out'],
'hotel_id' => $one['hotel_id']
)
);
$client->send($params);
$clients[$client->sock] = $client;
};
}
while (!empty($clients)) {
$write = $error = array();
$read = array_values($clients);
$n = swoole_client_select($read, $write, $error, 2);
if ($n > 0) {
foreach ($read as $index => $c) {
if ($c) {
$response = $c->recv();
$data = $response['data'];
if (is_array($data['price_list'])) {
$unique_key = array();
foreach ($data['price_list'] as $key=>$one) {
$unique_key[] = $data['iq_id'].'-'.$one['ota_id'];
$up_data = array(
'iq_id' => $data['iq_id'],
'ota_id' => $one['ota_id'],
'price' => $one['price'],
'ori_price' => $one['ori_price']
);
Ko_Tool_Singleton::OInstance('KHotel_Youyu_lowestPriceHistoryApi')->iInsert($up_data);
}
}
unset($clients[$c->sock]);
}
}
}
}
}
?>
服务器端伪代码
<?php
class Server {
public $serv;
private $package_eof = "\r\n";
private $data = null;
function onReceive($serv, $fd, $from_id, $params) {
$_params = json_decode($params, true);
$info = array(
'task' => $_params['action'],
'params' => $_params['data'],
'fd' => $fd
);
$this->getHotelLowestPriceAllOta($info);
}
function getHotelLowestPriceAllOta($info) {
$params = $info['params'];
$otas = Ko_Tool_Singleton::OInstance('KHotel_YouYuOtaReferApi')->getHotelOtaList($params['hotel_id']);
$info['ota_count'] = count($otas);
if (count($otas) > 0) {
foreach ($otas as $one) {
$info['params']['ota_id'] = $one['ota_id'];
$this->serv->task($info);
}
} else {
$this->serv->send($info['fd'], json_encode(array('data'=>array())).$this->package_eof);
}
}
function onTask($serv, $task_id, $from_id, $info) {
$params = $info['params'];
$low_price = KHotel_YouYuApi::getLowestPrice($params['hotel_id'], $params['check_in'],
$params['check_out'], false, false, $params['ota_id']);
$serv->finish(array('task' => $info['task'], 'from_id' => $from_id,
'ota_count' => $info['ota_count'], 'fd' => $info['fd'], 'low_price' => $low_price, 'iq_id' => $params['iq_id']));
}
function onTaskFinish($serv, $task_id, $data) {
$key = $data['fd'];
$this->data[$key][$task_id] = $data['low_price'];
if (count($this->data[$key]) == $data['ota_count']) {
$return = array(
'data' => array(
'iq_id' => $data['iq_id'],
'price_list' => array_values($this->data[$key])
)
);
$serv->send($data['fd'], json_encode($return).$this->package_eof);
unset($this->data[$key]);
}
}
function onWorkStart($serv, $worker_id) {
if($worker_id >= $serv->setting['worker_num']) {
swoole_set_process_name("youyu_swoole_server.php:task_worker");
} else {
swoole_set_process_name("youyu_swoole_server.php:worker");
}
}
function run() {
$serv = new swoole_server("0.0.0.0", 9501, SWOOLE_PROCESS, SWOOLE_SOCK_TCP);
$this->serv = $serv;
$this->serv->set(array(
'worker_num' => 8,
'task_worker_num' => 50,
'open_eof_split' => true,
'package_eof' => $this->package_eof,
));
$serv->on('connect', function ($serv, $fd) {
echo "Client: Connect $fd.\n";
});
$serv->on('start', function() {
echo "server start...\n\n";
});
$this->serv->on('workerStart', array($this, 'onWorkStart'));
$this->serv->on('receive', array($this, 'onReceive'));
//监听连接关闭事件
$serv->on('close', function ($serv, $fd) {
echo "Client: Close $fd.\n\n";
});
//处理异步任务
$this->serv->on('task', array($this, 'onTask'));
$this->serv->on('finish', array($this, 'onTaskFinish'));
//启动服务器
$this->serv->start();
}
}
$server= new Server;
$server->run();
?>