14 July 2016

最近用swoole实现了一个功能:异步非阻塞获取酒店OTA最低价。

场景是这样的:

一个酒店对应多个OTA(我们的供应商),我需要获取酒店每一个OTA在某一个入住/离店日期的最低报价。 当数据量很大时,比如目前1天需要获取20多万次酒店报价(这个数字还在增长),每一个酒店又有1-5个OTA(以后可能更多),这样每天需要调用上百万次获取最低价的接口,接口在没有缓存的情况下,使用http协议获取价格可能需要1-2秒,如果使用同步阻塞方式获取,所用时间会很长,甚至无法完成任务。

所以我选择用swoole来实现

程序流程大概是这样的:

客户端:

1.循环一组要获取价格的酒店ID

2.每次循环创建swoole客户端,向服务器send数据,维护一个clients数组,存放当前活动的客户端

3.使用while循环+swoole_client_select()方法监控从服务器端返回的数据,获取到一个客户端的数据就关闭相应客户端并从clients中删除此客户端

4.直到clients中元素个数为0,退出while循环

服务器端:

1.服务器端开启了8个worker进程和50个worker_task进程

2.一个客户端连接会占用一个worker进程,当onReceive事件被触发时,需要创建task进程,有几个OTA就创建几个task进程

3.onTask事件回调方法用来调用接口,获取酒店某个OTA的最低价

4.对于同一个客户端连接,服务器端必须要等待所有相关task进程都跑完以后,再统一返回数据给客户端,比如一个酒店有3个OTA,这3个OTA必须都获取到价格以后,才能返回给客户端,所以我用一个$data变量来存放同一个客户端不同task返回的数据

5.onTaskFinish事件回调中,把当前OTA最低价写入$data[$fd][$task_id]中

4.onTaskFinish事件回调中,判断同一个客户端的所有task都执行完成以后(count($data[$fd])===$ota_count),把数据send到客户端。

实际执行的效果还是不错的,已经稳定运行24小时,目前大概1秒钟可以获取20-30个酒店所有OTA的最低价,这个速度还可以提高,只要服务器内存够用,开启更多进程即可。

用swoole写代码和之前在PHP-fpm环境有一点最大的不同就是必须重视程序的容错性和健壮性,因为进程是常驻内存的,任何一个微小的bug都可能导致服务终止,还必须避免长期运行产生的内存泄露。

客户端伪代码

<?php
$clients = array();
$count = Ko_Tool_Singleton::OInstance('KHotel_Youyu_curHotelInquirySet')->getCount();
for ($i = 0; $i < $count; $i += 10) {
	$history_ids = array();
	//每次创建10个客户端
	for ($j = 0; $j < 10; $j++) {
		$history_ids[] = Ko_Tool_Singleton::OInstance('KHotel_Youyu_curHotelInquirySet')->pop();
	}
	$list = Ko_Tool_Singleton::OInstance('KHotel_Youyu_hotelInquiryHistoryApi')->getByIDs($history_ids);
	foreach ($list as $one) {
		$client = new KHotel_Youyu_swooleClient();
		if (!$client->connect()) {
			//error
		} else {
			$params = array(
				'action' => 'getHotelLowestPriceAllOta',
				'data' => array(
					'iq_id' => $one['id'],
					'check_in' => $one['check_in'],
					'check_out' => $one['check_out'],
					'hotel_id' => $one['hotel_id']
				)
			);
			$client->send($params);
			$clients[$client->sock] = $client;
		};
	}
	while (!empty($clients)) {
		$write = $error = array();
		$read = array_values($clients);
		$n = swoole_client_select($read, $write, $error, 2);
		if ($n > 0)	{
			foreach ($read as $index => $c)	{
				if ($c) {
					$response = $c->recv();
					$data = $response['data'];
					if (is_array($data['price_list'])) {
						$unique_key = array();
						foreach ($data['price_list'] as $key=>$one) {
							$unique_key[] = $data['iq_id'].'-'.$one['ota_id'];
							$up_data = array(
								'iq_id' => $data['iq_id'],
								'ota_id' => $one['ota_id'],
								'price' => $one['price'],
								'ori_price' => $one['ori_price']
							);
							Ko_Tool_Singleton::OInstance('KHotel_Youyu_lowestPriceHistoryApi')->iInsert($up_data);
						}
					}
					unset($clients[$c->sock]);
				}
			}
		}
	}
}
?>

服务器端伪代码

<?php
class Server {
	public $serv;
	private $package_eof = "\r\n";
	private $data = null;
	function onReceive($serv, $fd, $from_id, $params)	{
		$_params = json_decode($params, true);
		$info = array(
			'task' => $_params['action'],
			'params' => $_params['data'],
			'fd' => $fd
		);
		$this->getHotelLowestPriceAllOta($info);
	}
	function getHotelLowestPriceAllOta($info) {
		$params = $info['params'];
		$otas = Ko_Tool_Singleton::OInstance('KHotel_YouYuOtaReferApi')->getHotelOtaList($params['hotel_id']);
		$info['ota_count'] = count($otas);
		if (count($otas) > 0) {
			foreach ($otas as $one) {
				$info['params']['ota_id'] = $one['ota_id'];
				$this->serv->task($info);
			}
		} else {
			$this->serv->send($info['fd'], json_encode(array('data'=>array())).$this->package_eof);
		}
	}
	function onTask($serv, $task_id, $from_id, $info) {
			$params = $info['params'];
			$low_price = KHotel_YouYuApi::getLowestPrice($params['hotel_id'], $params['check_in'],
				$params['check_out'], false, false,	$params['ota_id']);
			$serv->finish(array('task' => $info['task'], 'from_id' => $from_id,
				'ota_count' => $info['ota_count'], 'fd' => $info['fd'], 'low_price' => $low_price, 'iq_id' => $params['iq_id']));
	}
	function onTaskFinish($serv, $task_id, $data) {
			$key = $data['fd'];
			$this->data[$key][$task_id] = $data['low_price'];
			if (count($this->data[$key]) == $data['ota_count']) {
				$return = array(
					'data' => array(
						'iq_id' => $data['iq_id'],
						'price_list' => array_values($this->data[$key])
					)
				);
				$serv->send($data['fd'], json_encode($return).$this->package_eof);
				unset($this->data[$key]);
			}
	}
	function onWorkStart($serv, $worker_id) {
		if($worker_id >= $serv->setting['worker_num']) {
			swoole_set_process_name("youyu_swoole_server.php:task_worker");
		} else {
			swoole_set_process_name("youyu_swoole_server.php:worker");
		}
	}
	function run() {
		$serv = new swoole_server("0.0.0.0", 9501, SWOOLE_PROCESS, SWOOLE_SOCK_TCP);
		$this->serv  = $serv;
		$this->serv->set(array(
			'worker_num' => 8,
			'task_worker_num' => 50,
			'open_eof_split' => true,
			'package_eof' => $this->package_eof,
		));
		$serv->on('connect', function ($serv, $fd) {
			echo "Client: Connect $fd.\n";
		});
		$serv->on('start', function() {
			echo "server start...\n\n";
		});
		$this->serv->on('workerStart', array($this, 'onWorkStart'));
		$this->serv->on('receive', array($this, 'onReceive'));
		//监听连接关闭事件
		$serv->on('close', function ($serv, $fd) {
			echo "Client: Close $fd.\n\n";
		});
		//处理异步任务
		$this->serv->on('task', array($this, 'onTask'));
		$this->serv->on('finish', array($this, 'onTaskFinish'));
		//启动服务器
		$this->serv->start();
	}
}
$server= new Server;
$server->run();
?>