01 August 2016

适用场景

此服务适用于以下场景:

  1. 客户端交给服务器端一项工作(work)来执行
  2. 这项工作可以被拆分成若干个子任务(task),子任务之间互不干扰,可以并行处理
  3. 服务器端处理完所有的子任务后,把结果汇总,返回给客户端,完成本次工作

比如:同时从多个OTA获取酒店报价,同时抓取多个网页等。

此服务目前在140和199两台服务器上运行,用HAProxy实现的负载均衡双机热备(感谢兴国提供支持),目前每台服务器上跑8个work进程和100个task_worker进程。

本服务当前主要用于酒店询价(正在逐步向此服务迁移),目前每天处理40万连接请求,执行80万个子任务,预计迁移工作全部完成后,每天处理100万次以上请求,服务上线运行1个月,表现稳定。

酒店列表页最低价接口迁移到此服务上后,请求时间小于2秒的请求由80%提高到89%,请求时间大于5秒的请求由7%降低到2.5%,还有进一步优化空间。

实现接口

如果想使用此服务,只需实现一个interface的三个方法,这个interface是:hotel\youyu\swooleTaskInterface

接口定义:

<?php
namespace hotel\youyu;
interface swooleTaskInterface {
    //获取所有子任务的参数
    public static function getAllTaskParams($params);

    //执行子任务,返回子任务执行结果
    public static function task($task_id, $from_id, $params);

    //所有子任务执行完毕后的回调方法,数据返回给客户端
    public static function workFinish($params, $work_data);
}
?>

举个例子,下面这个类可以异步获取一个酒店所有OTA的最低价(为了便于理解简化了代码)

<?php
namespace hotel\youyu\swooleTask;
class hotelLowestPriceAllOta implements \hotel\youyu\swooleTaskInterface {

	public static function getAllTaskParams($params) {
		$otas = \Ko_Tool_Singleton::OInstance('KHotel_YouYuOtaReferApi')->getHotelOtaList($params['hotel_id']);
        $info = array();
        foreach ($otas as $ota_id) {
            $info[] = array_merge($params, array('ota_id' => $ota_id));
        }
        return $info;
    }

    public static function task($task_id, $from_id, $info) {
        $params = $info['params'];
        $low_price = \KHotel_YouYuApi::getLowestPrice($params['hotel_id'], $params['check_in'],
        $params['check_out'], $params['ota_id']);
        //echo "onTask  \t fd:{$info['fd']}\t from_id:$from_id\t task_id:$task_id\t task_count:{$info['task_count']}\n";
        return $low_price;
    }

    public static function workFinish($params, $data) {
                usort($data, function($a, $b) {
                if ($a['price'] === false && $b > 0) {
                   return 1;
                }
                if ($b['price'] === false && $a > 0) {
                 return -1;
                }
                if ($a['price'] === $b['price']) {
                    return 0;
                }
                return $a['price'] < $b['price'] ? -1 : 1;
            });
        return array_values($data);
    }

}
?>

服务器端

服务器端代码在 /www_mfw/app/youyu/youyu_swoole_server.php

服务器端代码很简单:

<?php
KCmd_lock::lock();
class Server
{
	public $serv;
	private $package_eof = "\r\n";
	private $data = null;
	function onReceive($serv, $fd, $from_id, $params)	{
		echo "onReceive $fd\n";
		$_params = json_decode($params, true);
		echo($params);
		echo "\n";
		$class_name = $_params['task_class'];
		if (class_exists($class_name)) {
			$all_task_params = $class_name::getAllTaskParams($_params['data']);
			if (is_array($all_task_params) && count($all_task_params) > 0) {
                foreach ($all_task_params as $task_params) {
                    $info = array(
                        'class_name' => $class_name,
                        'params' => $task_params,
                        'task_count' => count($all_task_params),
                        'fd' => $fd
                    );
                    $this->serv->task($info);
                }
            } else {
                $this->serv->send($fd, json_encode(array('data'=>array())).$this->package_eof);
            }
        } else {
            echo "class $class_name no exists\n";
            $this->serv->send($fd, json_encode(array('data'=>array())).$this->package_eof);
        }
    }


    function onTask($serv, $task_id, $from_id, $info) {
        $data = $info['class_name']::task($task_id, $from_id, $info);
        $serv->finish(array_merge($info, array('from_id' => $from_id, 'data'=>$data)));
    }

    function onTaskFinish($serv, $task_id, $info) {
        $key = $info['fd'] . '_' . $info['from_id'];
        $this->data[$key][$task_id] = $info['data'];
        //echo "onFinish\tfd:{$info['fd']}\tfrom_id:{$info['from_id']}\t"
        //. "task_id:$task_id\ttask_count:" . count($this->data[$key]) . "-{$info['task_count']}\n";
        if (count($this->data[$key]) == $info['task_count']) {
            $return = $info['class_name']::workFinish($info['params'], $this->data[$key]);
            $serv->send($info['fd'], json_encode($return) . $this->package_eof);
            unset($this->data[$key]);
            echo "clear memory\n\n";
        }
    }

    function onWorkStart($serv, $worker_id) {
        if($worker_id >= $serv->setting['worker_num']) {
            swoole_set_process_name("youyu_swoole_server.php:task_worker");
        } else {
            swoole_set_process_name("youyu_swoole_server.php:worker");
        }
    }

    function run() {
        $serv = new swoole_server("0.0.0.0", 9501, SWOOLE_PROCESS, SWOOLE_SOCK_TCP);
        $this->serv  = $serv;
        $this->serv->set(array(
            'worker_num' => 8,
            'task_worker_num' => 100,
            'open_eof_split' => true,
            'package_eof' => $this->package_eof,
        ));
        $serv->on('connect', function ($serv, $fd) {
            echo "Client: Connect $fd.\n";
        });

        $serv->on('start', function() {
            echo "server start...\n\n";
        });
        $this->serv->on('workerStart', array($this, 'onWorkStart'));
        $this->serv->on('receive', array($this, 'onReceive'));
        $serv->on('close', function ($serv, $fd) {
            echo "Client: Close $fd.\n\n";
        });
        $this->serv->on('task', array($this, 'onTask'));
        $this->serv->on('finish', array($this, 'onTaskFinish'));
        $this->serv->start();
    }
}

$server= new Server;
$server->run();
?>

客户端

swoole客户端虽然支持异步连接,但只能用于cli模式,如果想在fast-cgi模式也实现并行处理,可以使用while + swoole_client_select函数实现, 举例(为了便于理解简化了代码):

<?php
    //同时获取一组酒店所有OTA的最低价
    $clients = array();
    $list = Ko_Tool_Singleton::OInstance('KHotel_Youyu_hotelInquiryHistoryApi')->getByIDs($ids);
    foreach ($list as $one) {
        $client = new KHotel_Youyu_swooleClient();
        if ($client->connect()) {
            $params = array(
                //此处task_class必须填写实现自swooleTaskInterface接口的那个类的类名(包括namespace)
                'task_class' => 'hotel\youyu\swooleTask\hotelLowestPriceAllOta',
                //以下是传递的参数
                'data' => array(
                    'check_in' => $one['check_in'],
                    'check_out' => $one['check_out'],
                    'hotel_id' => $one['hotel_id']
                )
            );
            $client->send($params);
            $clients[$client->sock] = $client;
        };
    }

    while (!empty($clients)) {
        $write = $error = array();
        $read = array_values($clients);
        $n = swoole_client_select($read, $write, $error, 1);
        if ($n > 0)	{
            foreach ($read as $index => $c)	{
                if ($c) {
                    $data = $c->recv();
                    var_dump($data);
                    unset($clients[$c->sock]);
                }
            }
        }
    }
?>

其中KHotel_Youyu_swooleClient是我在swoole_client基础上简单封装的一个类,代码如下:

<?php
class KHotel_Youyu_swooleClient extends swoole_client {
    //宏定义,本地IP和一个端口,由HAProxy代理到真正的服务器IP和端口
    private $ip = LOCAL_IP;
    private $port = YOUYU_TASK_PORT;
    private $package_eof = "\r\n";

    public function __construct() {
        parent::__construct(SWOOLE_SOCK_TCP, SWOOLE_SOCK_SYNC);
    }

    //如果不使用swoole_client_select,则需要把$timeout设置的长一些,不然会造成服务器端未来得及返回数据客户端就执行recv()了
    //$test参数用于在同一台服务器上测试
    public function connect($timeout = 0.1, $test = 0) {
        parent::set(array(
            'open_eof_split' => true,
            'package_eof' => $this->package_eof,
        ));
        $ip = $test ? '127.0.0.1' : $this->ip;
        $port = $test ? 9501 : $this->port;
        return parent::connect($ip, $port , $timeout);
    }

    public function errorInfo() {
        return  "Error: {" . socket_strerror(parent::errCode) . "[" . parent::errCode . "]\n";
    }

    //数据传输使用json格式,自动解析
    public function send($data) {
        parent::send(json_encode($data)."\r\n");
    }

    public function recv() {
        return json_decode(parent::recv(), true);
    }
}
?>

目前的不足

  1. 每一个task任务获取的数据都是保存在内存中,如果数据量很大的话,可能造成内存溢出,应该考虑把存储这块抽象出来,可以保存在临时表中或其他什么地方
  2. 缺少进程管理模块监控服务健康状况