RPC(Remote Procedure Call)—远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。—百度百科
实际后台开发中,rpc是服务器与服务器交互的方式之一,隐藏底层网络实现,代码程式化,开发效率高,BUG少。
通过一个简单的demo来说明go官方rpc包的应用。
项目结构:
rpc
----Makefile
----src
----client
----main.go
----protocol
----type.go
----server
----main.go
rpc/Makefile
GOPATH := $(shell pwd)
all:
GOPATH=${GOPATH} go install client
GOPATH=${GOPATH} go install server
rpc/protocol/type.go
package protocol
const(
RPC_ADDITION = "Calculator.Addition"
RPC_SUBTRACTION = "Calculator.Subtraction"
RPC_MULTIPLICATION = "Calculator.Multiplication"
RPC_DIVISION = "Calculator.Division"
)
type Param struct {
A int32
B int32
}
RPC客户端实现,包含同步(Call)和异步(Go)调用方式,通常为了效率会使用异步方式。
rpc/src/client/main.go
package main
import "net/rpc"
import (
. "protocol"
"fmt"
"time"
)
var ( _CLIENT *rpc.Client
_RPC_MSG chan *rpc.Call
_CAN_CANCEL chan bool
)
func main() {
DialRpcServer()
//起个协程处理异步rpc调用结果
go loop()
//测试同步的方式调用rpc服务
param := Param{A:int32(10),B:int32(30)}
reply := int32(0)
SyncCallRpcFunc(RPC_ADDITION, ¶m, &reply)
fmt.Printf("Sync Call Addition Result %d \n", reply)
SyncCallRpcFunc(RPC_SUBTRACTION, ¶m, &reply)
fmt.Printf("Sync Call Subtraction Result %d \n", reply)
////测试异步的方式调用rpc服务
ASyncCallRpcFunc(RPC_MULTIPLICATION, ¶m, &reply)
ASyncCallRpcFunc(RPC_DIVISION, ¶m, &reply)
//阻塞等待异步调用完成
<- _CAN_CANCEL
}
func init(){
_RPC_MSG = make(chan *rpc.Call, 1024)
_CAN_CANCEL = make(chan bool)
}
func DialRpcServer(){
c, e := rpc.DialHTTP("tcp", "127.0.0.1:2311")
if e != nil {
fmt.Errorf("Dial RPC Error %s", e.Error())
}
_CLIENT = c
}
//重连RPC服务器
func ReDialRpcServer() bool{
c, e := rpc.DialHTTP("tcp", "127.0.0.1:2311")
if e != nil {
fmt.Printf("ReDial RPC Error %s \n", e.Error())
return false
}
_CLIENT = c
fmt.Println("ReDial Rpc Server Succ")
return true
}
//同步rpc调用
func SyncCallRpcFunc(method string, args interface{}, reply interface{}){
if nil == _CLIENT{
for{//如果断线就等到重连上为止
if ReDialRpcServer(){
break
}
time.Sleep(5000 * time.Millisecond)
}
}
_CLIENT.Call(method, args, reply)
}
//异步rpc调用
func ASyncCallRpcFunc(method string, args interface{}, reply interface{}){
if nil == _CLIENT{
for{//如果断线就等到重连上为止
if ReDialRpcServer(){
break
}
time.Sleep(5000 * time.Millisecond)
}
}
// Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call)的done如果填nil会构建个新的channel用于接受结果
_CLIENT.Go(method, args, reply, _RPC_MSG)
}
//接收异步调用的返回
func loop(){
for{
select {
case rpcMsg, ok := <- _RPC_MSG:
if !ok{
fmt.Errorf("Rpc Call Error")
}
rpcMsgHandler(rpcMsg)
}
}
_CAN_CANCEL <- true
}
// 处理异步rpc的返回值
func rpcMsgHandler(msg * rpc.Call){
switch msg.ServiceMethod {
case RPC_ADDITION:
reply := msg.Reply.(*int32)
fmt.Printf("Addtoion Result [%d] \n", *reply)
case RPC_SUBTRACTION:
reply := msg.Reply.(*int32)
fmt.Printf("Subtraction Result [%d] \n", *reply)
case RPC_MULTIPLICATION:
reply := msg.Reply.(*int32)
fmt.Printf("Multiplication Result [%d] \n", *reply)
case RPC_DIVISION:
reply := msg.Reply.(*int32)
fmt.Printf("Division Result [%d] \n", *reply)
default:
fmt.Errorf("Can Not Handler Reply [%s] \n", msg.ServiceMethod)
}
}
RPC服务器的实现。
rpc/src/server/main.go
package main
import "net/rpc"
import (
. "protocol"
"errors"
"net"
"fmt"
"net/http"
)
type Calculator struct {}
var ( _DATA *Calculator
_CAN_CANCEL chan bool
)
func main() {
runRpcServer()
}
func init(){
_DATA = new(Calculator)
_CAN_CANCEL = make(chan bool)
}
func runRpcServer(){
//rpc包里面定义了个DefaultServer,缺省的Register和HandleHTTP均是对DefaultServer作的操作,如果想定制新的Server,就自己写
rpc.Register(_DATA)
rpc.HandleHTTP()
l,e := net.Listen("tcp","127.0.0.1:2311")
if e != nil{
fmt.Errorf("Create Listener Error %s", e.Error())
}
go http.Serve(l, nil)
//阻塞主进程,等待客户端输入
<-_CAN_CANCEL
}
//输出方法的格式要求:func (t *T) MethodName(argType T1, replyType *T2) error
func (*Calculator) Addition(param *Param, reply *int32) error{
*reply = param.A + param.B
return nil
}
func (*Calculator) Subtraction(param *Param, reply *int32) error{
*reply = param.A - param.B
return nil
}
func (*Calculator) Multiplication(param *Param, reply *int32) error{
*reply = param.A * param.B
return nil
}
func (*Calculator) Division(param *Param, reply *int32) error{
if 0 == param.B{
return errors.New("divide by zero")
}
*reply = param.A/param.B
return nil
}