如果客户端将因网络错误而断开连接,则服务器必须在我的情况下关闭pub / sub连接.我知道ctx.Done()函数,但在我的情况下不知道如何正确使用它.有人可以解释一下吗?
grpc-go:1.7.0
去版本go1.8.4
func (a *API) Notifications(in *empty.Empty, stream pb.Service_NotificationsServer) error {
ctx := stream.Context()
_, ok := user.FromContext(ctx)
if !ok {
return grpc.Errorf(codes.Unauthenticated, "user not found")
}
pubsub := a.redisClient.Subscribe("notifications")
defer pubsub.Close()
for {
msg, err := pubsub.ReceiveMessage()
if err != nil {
grpclog.Warningf("Notifications: pubsub error: %v", err)
return grpc.Errorf(codes.Internal, "pubsub error %v", err)
}
notification := &pb.Notification{}
err = json.Unmarshal([]byte(msg.Payload), notification)
if err != nil {
grpclog.Warningf("Notifications: parse error: %v", err)
continue
}
if err := stream.Send(notification); err != nil {
grpclog.Warningf("Notifications: %v", err)
return err
}
grpclog.Infof("Notifications: send msg %v", notification)
}
}
最佳答案 你可以使用select.而不是正常从函数获取数据,使用通道获取数据和go例程来处理它.
有点像这样:
func (a *API) Notifications(in *empty.Empty, stream
pb.Service_NotificationsServer) error {
ctx := stream.Context()
_, ok := user.FromContext(ctx)
if !ok {
return grpc.Errorf(codes.Unauthenticated, "user not found")
}
pubsub := a.redisClient.Subscribe("notifications")
defer pubsub.Close()
// I can not build the code, so I assume the msg in your code Message struct
c := make(chan Message)
go func() {
for {
msg, err := pubsub.ReceiveMessage()
if err != nil {
grpclog.Warningf("Notifications: pubsub error: %v", err)
close(c)
return grpc.Errorf(codes.Internal, "pubsub error %v", err)
}
c<- msg
}
}()
for {
select {
case msg, ok := <-c:
if !ok {
// channel is closed handle it
}
notification := &pb.Notification{}
err = json.Unmarshal([]byte(msg.Payload), notification)
if err != nil {
grpclog.Warningf("Notifications: parse error: %v", err)
continue
}
if err := stream.Send(notification); err != nil {
grpclog.Warningf("Notifications: %v", err)
return err
}
grpclog.Infof("Notifications: send msg %v", notification)
case <- ctx.Done():
// do exit logic. some how close the pubsub, so next
// ReceiveMessage() return an error
// if forget to do that the go routine runs for ever
// until the end of main(), which I think its not what you wanted
pubsub.Close() // Its just pseudo code
return
}
}
}
从通道中读取消息(我假设类型为Message),并使用select的电源.
在这种情况下另外两个相关的事情:
>确保完成此功能后,go例程结束.我无法猜测,因为我不知道代码,但我假设有一个Close()方法关闭pubsub所以下一个ReceiveMessage返回错误. (我看到推迟做我希望的工作)
>如果在ctx.Done之前ReceiveMessage中存在错误,您可以关闭通道然后中断循环.