背景
在web应用中,为了更好的拆分服务,一个较大的业务会拆分为多个自服务,每个子服务独自完成部分功能,众多子服务一起支撑较大的业务。
这就不可避免的遇到一个复杂的业务需要串行地调用多个子服务的场景,特别是一些业务逻辑上没有依赖的部分,这时候,如果可以有批量rpc调用,那就如虎添翼了。
好消息是,鸟哥开发的Yar就是支持并行rpc调用的框架,他是以扩展的形式提供并行RPC功能。据说在微博内部已经广发使用了,然而,对于线上已经在跑的业务,给编译一个扩展,运营成本还是挺高的。
思路
PHP原生的curl_multi*函数簇,也可以支持一次发送多个RPC调用的作用,使用方式为:
实现
<?php
/**
* 基于 curl_multi 的并行处理
*
* 使用示例
* $url1 = "http://baidu.com";
* $url2 = "http://sina.com";
*
* $objMulti = new Multi();
* $objMulti->init();
* $objMulti->register('url1', $url1);
* $objMulti->register('url2', $url2);
* $objMulti->call();
* $res = $objMulti->getResult('url1');
* var_dump($res);
* $res = $objMulti->getResult('url2');
* var_dump($res);
*/
class Multi {
/* default 500k */
const MAX_RESPONSE_SIZE = 512000;
/* max redirs */
const MAX_REDIRS = 1;
/*链接重试*/
const MAX_CONNECT_RETRY = 3;
/* 默认链接超时时间 */
const MAX_CONNECT_TIMEOUT = 1000;
/*默认超时时间*/
const MAX_TIME_OUT = 3000;
/*UTF8 字符编码*/
const ENCODE_UTF8 = 'utf-8';
/* cURL 多个句柄 */
protected $_obj_multi_handler = null;
/* curl句柄列表 */
protected $_arr_curl_handler = array();
/* curl 参数 */
protected $_arr_curl_options = array();
/* 调用返回的结果 */
protected $_arr_result = array();
/**
* @desc 初始化curl_multi句柄
* @author dayeshisir(dayeshisir@weidian.com)
* @date 2016-10-09
* @param
* @return
* @note
*/
public function init($config = array()) {
if (is_null($this->_obj_multi_handler)) {
$this->_obj_multi_handler = curl_multi_init();
}
$str_user_agent = isset($_SERVER['HTTP_USER_AGENT']) ? $_SERVER['HTTP_USER_AGENT'] : '';
$str_referer = isset($_SERVER['HTTP_REFERER']) ? $_SERVER['HTTP_REFERER'] : '';
$this->_arr_curl_options = array(
CURLOPT_NOSIGNAL => 1, //注意,毫秒超时一定要设置这个
CURLOPT_FOLLOWLOCATION => false,
CURLOPT_HEADER => false,
CURLOPT_MAXREDIRS => self::MAX_REDIRS,
CURLOPT_RETURNTRANSFER => true,
CURLOPT_CONNECTTIMEOUT_MS => self::MAX_CONNECT_TIMEOUT,
CURLOPT_TIMEOUT_MS => self::MAX_TIME_OUT,
CURLOPT_USERAGENT => $str_user_agent,
CURLOPT_REFERER => $str_referer,
CURLOPT_ENCODING => '',
);
return true;
}
/**
* @desc 注册待调用的服务
* @param key : string : 用来区分各个自调用
* @param url : string : url地址,可包含问号
* @param date : array : 入参数组,key-value对,post或get的参数
* @param request_type : string : 请求的类型,例如post,get
* @param config : array : 附加的配置,具体数值例如:
* array(
* CURLOPT_COOKIE => string 设定HTTP请求中"Cookie: "部分的内容。多个cookie用分号分隔,分号后带一个空格(例如, "fruit=apple; colour=red")。
* CURLOPT_REFERER => string 请求头中"Referer: "的内容
* CURLOPT_USERAGENT => string "User-Agent: "头的字符串。如:“Mozilla/4.0 (compatible; MSIE 5.01; Windows NT 5.0)”
* CURLOPT_HTTPHEADER => array header头列表,如:array('host:http://abc.com','Content-type: text/plain')
* CURLOPT_TIMEOUT => int 超时时间,秒
* )
*
* @return resource
*/
public function register($key, $url, $data = array(), $config = array(), $request_type = 'post') {
if (null === $this->_obj_multi_handler) {
return false;
}
//初始化curl会话
$obj_ch = curl_init();
//curl参数
extract($this->_arr_curl_options);
$option = $this->_arr_curl_options;
//get 参数处理(url处理)
if ('get' === $request_type && !empty($data)) {
$url .= (strpos($url, '?') ? '&' : '?') . http_build_query($data);
}
$option[CURLOPT_URL] = $url;
//post 参数处理
if ('post' === $request_type && empty($data)) {
$option[CURLOPT_POSTFIELDS] = $data;
}
//其它参数合并
$option += $config;
//批量设置会话参数
curl_setopt_array($obj_ch, $option);
// 添加到curl句柄
if (0 !== curl_multi_add_handle($this->_obj_multi_handler, $obj_ch)) {
return false;
}
// 如果原来有相同的$key,则删除之
if (isset($this->_arr_curl_handler[$key])) {
curl_multi_remove_handle($this->_obj_multi_handler, $this->_arr_curl_handler[$key]); //移除原curl句柄
curl_close($this->_arr_curl_handler[$key]); //关闭原curl会话
unset($this->_arr_curl_handler[$key]); //删除原$k
}
$this->_arr_curl_handler[$key] = $obj_ch;
return true;
}
/**
* @desc 执行批量请求,返回批量数据
* @param null
* @return array
*/
public function call() {
//无批量执行句柄,返回空数组
if (is_null($this->_obj_multi_handler)) {
return $this->_arr_result;
}
$active = null;
//执行
do {
$mrc = curl_multi_exec($this->_obj_multi_handler, $active);
} while ($mrc == CURLM_CALL_MULTI_PERFORM);
while ($active && $mrc == CURLM_OK) {
while(curl_multi_exec($this->_obj_multi_handler, $active) === CURLM_CALL_MULTI_PERFORM);
if (curl_multi_select($this->_obj_multi_handler) != -1) {
do {
$mrc = curl_multi_exec($this->_obj_multi_handler, $active);
} while ($mrc == CURLM_CALL_MULTI_PERFORM);
}
}
//获取返回结果
foreach ($this->_arr_curl_handler as $key => $curl_handler) {
$strResult = curl_multi_getcontent($curl_handler);
$this->_arr_result[$key] = json_decode($strResult, true);
curl_multi_remove_handle($this->_obj_multi_handler, $curl_handler);//移除句柄
curl_close($curl_handler);//关闭会话
}
curl_multi_close($this->_obj_multi_handler);//关闭批量会话
//清理数据,恢复初始状态,为下一次使用做好准备
$this->_obj_multi_handler = null;
$this->_arr_curl_handler = array();
return $this->_arr_result;
}
/**
* @desc 获取结果
* @param key : string : 用来区分子服务的键值
* @return array
*/
public function getResult($key) {
return isset($this->_arr_result[$key]) ? $this->_arr_result[$key] : array();
}
}
$url1 = "http://www.baidu.com";
$url2 = "http://www.sina.com.cn";
$objMulti = new Multi();
$objMulti->init();
$objMulti->register('url1', $url1);
$objMulti->register('url2', $url2);
$objMulti->call();
$res = $objMulti->getResult('url1');
var_dump($res);
$res = $objMulti->getResult('url2');
var_dump($res);