我有一个简单的生产者/消费者amqp设置如下:
producer -> e1:jobs_queue -> consumer -> e2:results_queue -> result_handler
制片人发送了一些工作.消费者一次下拉一个作业,并处理它们,将结果推送到另一个队列.然后由result_handler将它们拉出,将结果发布到数据库.
有时消费者会失败 – 它可能会被操作系统杀死或抛出异常.如果在处理消息时发生这种情况,则此消息将丢失,不会产生相应的结果,我很难过.如果失败的工作重新排队,我会很高兴.
我正在寻找的是一种设计模式,用于确保消费者处理作业完成并将相应的结果放入* results_queue *,或者如果失败则将作业放回* jobs_queue *.由于消费者是失败的,消费者不应负责管理与其自身监督相关的任何消息.
我们知道,如果出现以下情况,消费者无法处理工作:
>它从* job_queue *中获取了一个作业,并且在一些超时后没有产生任何结果
>它从* job_queue *找到了一份工作,然后就死了
对于我的应用程序,我们可以通过简单地等待处理作业超时来捕获第二种情况.在生产中,将有许多工人要监督,所有工作都从一个共同的工作列表中拉出来,并将结果放入一个结果交换/队列中.
最佳答案 实现您想要的最简单方法是手动处理收到的消息的确认.在node-amqp中,它就像在queue.subscribe调用中添加选项{ack:true}一样简单.然后,您可以通过调用队列中的某个函数来确认消息.在node-amqp的情况下,它是queue.shift().
您还可以使用prefetchCount设置允许使用者的未确认消息数.
如果消费者断开连接,现在将重新传送任何未确认的消息(对任何连接的消费者).
通过将队列设置为durable和autoDelete:false,您还可以确保在重新启动MQ服务器或断开最后一个使用者时不会删除队列(及其上的消息).