本文主要是教大家实现一个具有基本功能的rpc框架。
技术选型
很多RPC框架的实现都是基于protobuf的,包括我们公司服务端引擎实现的rpc框架也是基于protobuf。从前做过一些简单的测试,对相同数据进行序列化,protobuf对数据的压缩率明显高于对比的msgpack、bson和json;相同的对于序列化时间上的比较,msgpack则最快。但是protobuf最让我受不了的就是需要定义固定的协议,并且使用对应的工具生成协议代码,然后才能使用,用起来需要一定的学习成本。
因此我在想,是否可以基于msgpack实现一个不需要预先定义通信协议的rpc框架呢?答案当然是肯定的。
设计原型
class rpc_service {
void handle_data();
void register_rpc();
void remote_call();
};
目前rpc_service这个类只是个原型,我们提供三个接口:
- handle_data: 用于处理原生数据,将原生数据序列化为msgpack
- register_rpc: 用于Server端注册rpc函数
- remote_call: Stub端调用远程rpc函数
msgpack都提供了什么
这里我使用了msgpack c++(https://github.com/msgpack/msgpack-c)。我们可以利用msgpack将参数进行打包和解包。
// 将参数打包成字符串
std::tuple<int, bool, std::string> t(1, false, "hi");
std::stringstream buffer;
msgpack::pack(buffer, t);
buffer.seekg(0);
std::string s(buffer.str());
// 对字符串进行解包
std::tuple<int, bool, std::string> dest;
msgpack::object_handle oh = msgpack::unpack(str.data(), str.size());
oh.get().convert(t);
实现一个简单rpc,我们只需要msgpack上面介绍的功能就可以了。
实现remote_call函数
先从最简单的函数开始实现。
template<typename ... Args>
void remote_call(const std::string& method, Args&& ... args);
思路很简单,我们将method字符串和args打包后的msgpack字符串都存入一个字符串流中。这里的字符串流我们定义为rpc_stream(后续将详细讲解rpc_stream)的实现,这里先简单介绍一下:
class rpc_stream : private asio::noncopyable {
public:
rpc_stream();
public:
std::istream& read_stream() { return stream_; }
std::ostream& write_stream() { return stream_; }
streambuf_ex& buf() { return buf_; }
void reset();
char* c_str();
private:
streambuf_ex buf_;
std::iostream stream_;
};
typedef std::shared_ptr<rpc_stream> rpc_stream_ptr;
rpc_stream是我写的服务端引擎的一个副产品,基于asio::streambuf的字节流处理类。接口也比较简单易懂:
- read/write_stream: 读出/输入流
- buf: 数据的缓冲区,提供size接口获得缓冲区大小
- c_str: 将字节流中的数据转化为字符串
到目前为止,就可以动手开始写remote_call函数了。
class rpc_service {
public:
rpc_service(): stream_ptr_(new hex_engine::rpc::rpc_stream()) {}
template<typename ... Args>
void remote_call(const std::string& method, Args&& ... args) {
// 1.构造tuple
auto t = std::make_tuple(std::forward<Args>(args)...);
// 2.总长度
uint32_t total_sz = 0;
stream_ptr_->write_stream().write((char*)&total_sz, sizeof(total_sz));
// 3.函数名称
uint8_t method_sz = method.size();
stream_ptr_->write_stream().write((char*)&method_sz, sizeof(method_sz));
stream_ptr_->write_stream().write(method.c_str(), method_sz);
// 4.写入参数
msgpack::pack(stream_ptr_->write_stream(), t);
total_sz = stream_ptr_->buf().size() - sizeof(total_sz);
// 5.重新写入总长度
stream_ptr_->write_stream().seekp(0);
stream_ptr_->write_stream().write((char*)&total_sz, sizeof(total_sz));
}
hex_engine::rpc::rpc_stream_ptr stream_ptr() { return stream_ptr_; }
private:
hex_engine::rpc::rpc_stream_ptr stream_ptr_;
};
这样数据便存在rpc_stream的字节流中了,之后可以通过c_str接口将字节流的数据导出,通过网络传输到远端。
接下来
后面一章将讲解Server端Rpc的实现