使用go作为RabbitMQ消费者的正确姿势

写在前面

在我们的生产环境中搭了两台rabbitmq, 前面架设了一台HAProxy做负载均衡,当我们的客户端连接到HAProxy,然后由HAProxy负责将链接分配给其中一台rabbitmq,客户端需要需要负责断线重连,需要将获取的数据,分配消息给相应的处理方法,然后还需要回复给rabbitmq ACK,这其中客户端需要负责断线重连的逻辑是很重要的,因为有可能客户端和HAProxy的连接是正常的,但是HAProxy和rabbitmq的链接因为网络波动断开了,那么这个时候客户端其实是没有工作的,并且会在rabbitmq中不断积累消息。

下面的内容给出了一个比较完善的处理逻辑,以供参考。

实战

定义接口

从之前的说明来看,这是一个典型的观察者模式,由RabbitMQ对象负责维护连接,获取消息,然后定义若干个接收者注册到RabbitMQ对象中,这时候RabbitMQ对象一旦收到了由RabbitMQ发来的数据,就可以将该消息分发到相应的接收者去处理,当接收者处理完成后告诉RabbitMQ对象消息消费成功,然后由RabbitMQ对象回复RabbitMQ ACK,当然可以在其中加上重试机制,接收者有可能因为某种情况处理失败,那么每隔一定的时间RabbitMQ对象需要重新调用一次接收者重新处理,直至成功,然后再返回ACK。

先来看看基本的接口约定

// Receiver 观察者模式需要的接口
// 观察者用于接收指定的queue到来的数据
type Receiver interface {
    QueueName() string     // 获取接收者需要监听的队列
    RouterKey() string     // 这个队列绑定的路由
    OnError(error)         // 处理遇到的错误,当RabbitMQ对象发生了错误,他需要告诉接收者处理错误
    OnReceive([]byte) bool // 处理收到的消息, 这里需要告知RabbitMQ对象消息是否处理成功
}

这样就将接收者和RabbitMQ对象之间就解耦了,这样后期如果需要添加新的接收者那就很容易了。

下面来看一看RabbitMQ对象的定义:
这里用到的RabbitMQ client是RabbitMQ官方的 Github

// RabbitMQ 用于管理和维护rabbitmq的对象
type RabbitMQ struct {
    wg sync.WaitGroup

    channel      *amqp.Channel
    exchangeName string // exchange的名称
    exchangeType string // exchange的类型
    receivers    []Receiver
}

// New 创建一个新的操作RabbitMQ的对象
func New() *RabbitMQ {
    // 这里可以根据自己的需要去定义
    return &RabbitMQ{
        exchangeName: ExchangeName,
        exchangeType: ExchangeType,
    }
}

RabbitMQ对象的初始化操作

这里RabbitMQ对象需要初始化交换机,注册接收者并初始化接收者监听的Queue,以及断线重连的机制

// prepareExchange 准备rabbitmq的Exchange
func (mq *RabbitMQ) prepareExchange() error {
    // 申明Exchange
    err := mq.channel.ExchangeDeclare(
        mq.exchangeName, // exchange
        mq.exchangeType, // type
        true,            // durable
        false,           // autoDelete
        false,           // internal
        false,           // noWait
        nil,             // args
    )

    if nil != err {
        return err
    }

    return nil
}


// run 开始获取连接并初始化相关操作
func (mq *RabbitMQ) run() {
    if !config.Global.RabbitMQ.Refresh() {
        log.Errorf("rabbit刷新连接失败,将要重连: %s", config.Global.RabbitMQ.URL)
        return
    }

    // 获取新的channel对象
    mq.channel = config.Global.RabbitMQ.Channel()

    // 初始化Exchange
    mq.prepareExchange()

    for _, receiver := range mq.receivers {
        mq.wg.Add(1)
        go mq.listen(receiver) // 每个接收者单独启动一个goroutine用来初始化queue并接收消息
    }

    mq.wg.Wait()

    log.Errorf("所有处理queue的任务都意外退出了")

    // 理论上mq.run()在程序的执行过程中是不会结束的
    // 一旦结束就说明所有的接收者都退出了,那么意味着程序与rabbitmq的连接断开
    // 那么则需要重新连接,这里尝试销毁当前连接
    config.Global.RabbitMQ.Distory()
}

// Start 启动Rabbitmq的客户端
func (mq *RabbitMQ) Start() {
    for {
        mq.run()
        
        // 一旦连接断开,那么需要隔一段时间去重连
        // 这里最好有一个时间间隔
        time.Sleep(3 * time.Second)
    }
}

注册接收者

// RegisterReceiver 注册一个用于接收指定队列指定路由的数据接收者
func (mq *RabbitMQ) RegisterReceiver(receiver Receiver) {
    mq.receivers = append(mq.receivers, receiver)
}

// Listen 监听指定路由发来的消息
// 这里需要针对每一个接收者启动一个goroutine来执行listen
// 该方法负责从每一个接收者监听的队列中获取数据,并负责重试
func (mq *RabbitMQ) listen(receiver Receiver) {
    defer mq.wg.Done()

    // 这里获取每个接收者需要监听的队列和路由
    queueName := receiver.QueueName()
    routerKey := receiver.RouterKey()

    // 申明Queue
    _, err := mq.channel.QueueDeclare(
        queueName, // name
        true,      // durable
        false,     // delete when usused
        false,     // exclusive(排他性队列)
        false,     // no-wait
        nil,       // arguments
    )
    if nil != err {
        // 当队列初始化失败的时候,需要告诉这个接收者相应的错误
        receiver.OnError(fmt.Errorf("初始化队列 %s 失败: %s", queueName, err.Error()))
    }

    // 将Queue绑定到Exchange上去
    err = mq.channel.QueueBind(
        queueName,       // queue name
        routerKey,       // routing key
        mq.exchangeName, // exchange
        false,           // no-wait
        nil,
    )
    if nil != err {
        receiver.OnError(fmt.Errorf("绑定队列 [%s - %s] 到交换机失败: %s", queueName, routerKey, err.Error()))
    }

    // 获取消费通道
    mq.channel.Qos(1, 0, true) // 确保rabbitmq会一个一个发消息
    msgs, err := mq.channel.Consume(
        queueName, // queue
        "",        // consumer
        false,     // auto-ack
        false,     // exclusive
        false,     // no-local
        false,     // no-wait
        nil,       // args
    )
    if nil != err {
        receiver.OnError(fmt.Errorf("获取队列 %s 的消费通道失败: %s", queueName, err.Error()))
    }

    // 使用callback消费数据
    for msg := range msgs {
        // 当接收者消息处理失败的时候,
        // 比如网络问题导致的数据库连接失败,redis连接失败等等这种
        // 通过重试可以成功的操作,那么这个时候是需要重试的
        // 直到数据处理成功后再返回,然后才会回复rabbitmq ack
        for !receiver.OnReceive(msg.Body) {
            log.Warnf("receiver 数据处理失败,将要重试")
            time.Sleep(1 * time.Second)
        }

        // 确认收到本条消息, multiple必须为false
        msg.Ack(false)
    }
}

整合到一起

接收者的逻辑这里就不写的,只要根据实际的业务逻辑并实现了接口就可以了,这个比较容易。

获取RabbitMQ的连接

rabbitmqConn, err = amqp.Dial(url)
if err != nil {
    panic("RabbitMQ 初始化失败: " + err.Error())
}
rabbitmqChannel, err = rabbitmqConn.Channel()
if err != nil {
    panic("打开Channel失败: " + err.Error())
}
// 启动并开始处理数据    
func main() {

    // 假设这里有一个AReceiver和BReceiver
    aReceiver := NewAReceiver()
    bReceiver := NewBReceiver()
    
    mq := rabbitmq.New()
    // 将这个接收者注册到
    mq.RegisterReceiver(aReceiver)
    mq.RegisterReceiver(bReceiver)
    mq.Start()
}

应用场景

举一个我们自己用于生产环境的例子:

我们主要是用于接收Mysql的变更,并增量更新Elasticsearch的索引,负责数据库变更监听的服务用的是Canel,它伪装成一个mysql slave,用于接收mysql binlog的变更通知,然后将变更的数据格式化后写入RabbitMQ,然后由go实现的消费者去订阅数据库的变更通知。

由于客户端并不关心表中哪些字段发生了变化,只需要知道数据库指定的表有变更,那么就将此次变更写入Elasticsearch,这个逻辑对于每一张监听的表都是一样的,那么这样我们就可以将需要监听表变更的操作完全配置化,我只要再配置文件中指定一个接收者并指定待消费的队列,然后就可以由程序自动生成若干的接收者并且依次注册进RabbitMQ对象中,这样我们只需要针对一些特殊的操作写相应地代码即可,这样大大简化了我们地工作量,来看一看配置文件:

[[autoReceivers]]
    receiverName  = "article_receiver"
    database     = "blog"
    tableName    = "articles"
    primaryKey   = "articleId"
    queueName    = "articles_queue"
    routerKey    = "blog.articles.*"
    esIndex      = "articles_idx"

[[autoReceivers]]
    receiverName  = "comment_receiver"
    database     = "blog"
    tableName    = "comments"
    primaryKey   = "commentId"
    queueName    = "comments_queue"
    routerKey    = "blog.comments.*"
    esIndex      = "comments_idx"

这个时候就需要调整一下接收者地注册函数了:

// WalkReceivers 使用callback遍历处理所有的接收者
// 这里地callback就是上面提到地 mq.RegisterReceiver
func WalkReceivers(callback func(rabbitmq.Receiver)) {
    successCount := 0

    // 遍历每一个配置项,依次生成需要自动创建接收者
    // 这里的congfig是统一获取配置地对象,大家根据实际情况处理就可以了
    for _, receiverCfg := range config.Global.AutoReceivers {
        // 验证每一个接收者的合法性
        err := receiverCfg.Validate()
        if err != nil {
            log.Criticalf("生成 %s 失败: %s, 使用该配置: %+v", receiverCfg.ReceiverName, err.Error(), receiverCfg)
            continue
        }

        // 将接收者注册到监听rabbitmq的对象中
        callback(NewAutoReceiver(receiverCfg))

        log.Infof("生成 %s 成功使用该配置: %+v", receiverCfg.ReceiverName, receiverCfg)
        successCount++
    }

    if successCount != len(config.Global.AutoReceivers) || successCount == 0 {
        panic("无法启动所有的接收者,请检查配置")
    }

    // 如有必要,这里可以继续添加需要手工创建的接收者
}

启动地流程也需要进行微调一下:


func registeAndStart() {
    mq := rabbitmq.New()

    // 遍历所有的receiver,将他们注册到rabbitmq中去
    WalkReceivers(mq.RegisterReceiver)
    log.Info("初始化所有的Receiver成功")

    mq.Start()
}

这样就定义好了两个receiver,启动程序后,就可以接收到数据库地变更并更新elasticsearch中地索引了,非常地方便。

写在最后

这个是对平时工作地一点总结,希望可以给大家带来帮助,如果文中有纰漏之处,还望指正,这里完整地代码就不贴了,文章里已经搭起了一个完整地框架了,剩下地就是业务逻辑了,如果有必要地化,我会整理成一个完整地项目放到github上。

    原文作者:这里好像没有人
    原文地址: https://segmentfault.com/a/1190000010516906
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞