我正在尝试在RabbitMQ上使用标头交换,使用混合的 java和python组件,我需要确认交付.
我似乎从python(pika)和java客户端获得了不同的行为.
在python中:
channel.exchange_declare(exchange='headers_test',
¦ ¦ ¦ ¦ ¦ ¦ ¦type='headers',
¦ ¦ ¦ ¦ ¦ ¦ ¦durable=True)
channel.confirm_delivery()
result = channel.basic_publish(exchange='headers_test',
¦ ¦ ¦ ¦ ¦ ¦ routing_key='',
¦ ¦ ¦ ¦ ¦ ¦ mandatory=True,
¦ ¦ ¦ ¦ ¦ ¦ body=message,
¦ ¦ ¦ ¦ ¦ ¦ properties=pika.BasicProperties(
¦ ¦ ¦ ¦ ¦ ¦ ¦ delivery_mode=2,
¦ ¦ ¦ ¦ ¦ ¦ ¦ headers=message_headers))
如果标头与任何绑定的使用者不匹配且无法路由消息,则结果为false
但在java / scala中:
channel.exchangeDeclare("headers_test", "headers", true, false, null)
channel.confirmSelect
val props = MessageProperties.PERSISTENT_BASIC.builder
¦ ¦ ¦ ¦ .headers(messageHeaders).build
channel.basicPublish("headers_test",
¦ ¦ ¦ ¦ ¦ ¦"", //routingKey
¦ ¦ ¦ ¦ ¦ ¦true, //mandatory
¦ ¦ ¦ ¦ ¦ ¦props,
¦ ¦ ¦ ¦ ¦ ¦"data".getBytes)
channel.waitForConfirmsOrDie()
这里,当messageHeaders找不到匹配项时,该消息似乎只是被删除而没有错误.
我错过了什么或两个客户的行为真的不一样吗?如何使用java中的头文件交换获得确认传递?
注意:我已经对队列路由设置进行了“复杂”交换,我宁愿避免在游戏中添加死信路由,也只是发送失败.
最佳答案 即使没有与您的标头匹配的队列,也会认为邮件已被确认.从文档( https://www.rabbitmq.com/confirms.html):
For unroutable messages, the broker will issue a confirm once the
exchange verifies a message won’t route to any queue (returns an empty
list of queues). If the message is also published as mandatory, the
basic.return is sent to the client before basic.ack. The same is true
for negative acknowledgements (basic.nack).
相反,您应该检查basic.return消息以检测消息是否已被路由.
我已经检查过wireshark,事实上我可以看到,如果没有路由消息,则会出现AMQP basic.return消息.
我认为你应该从这开始
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("App.handleReturn");
System.out.println("replyCode = [" + replyCode + "], replyText = [" + replyText + "], exchange = [" + exchange + "], routingKey = [" + routingKey + "], properties = [" + properties + "], body = [" + body + "]");
}
});
事实上,如果没有路由消息,我会得到:
replyCode = [312], replyText = [NO_ROUTE], exchange = [headers_logs],
routingKey = [], pro….
此外,如果您想模仿Pika在Java中的同步行为,您似乎可以通过在发布消息和注册确认侦听器之前获取当前发布序列号而不是依赖.waitForConfirmsOrDie()来实现.
所以完整的代码示例将是:
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("App.handleReturn");
System.out.println("replyCode = [" + replyCode + "], replyText = [" + replyText + "], exchange = [" + exchange + "], routingKey = [" + routingKey + "], properties = [" + properties + "], body = [" + body + "]");
}
});
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("App.handleAck");
System.out.println("deliveryTag = [" + deliveryTag + "], multiple = [" + multiple + "]");
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("App.handleNack");
System.out.println("deliveryTag = [" + deliveryTag + "], multiple = [" + multiple + "]");
}
});
long nextPublishSeqNo = channel.getNextPublishSeqNo();
System.out.println("nextPublishSeqNo = " + nextPublishSeqNo);
channel.basicPublish("headers_logs",
"",
true,
props,
"data".getBytes());
在返回/确认回调中,您需要查找在发布消息之前获得的频道的发布序列号.
如果您查看线路上发生的情况,如果消息尚未路由到任何队列,RabbitMq将发回一条basic.return消息,该消息还包含确认(传递标记).如果消息已被路由,RabbitMq将发回一条bacic.ack消息,该消息还包含确认消息.
似乎RabbitMq Java客户端总是在basicConfirm()之前调用basicReturn()回调,因此确定消息是否已被路由的逻辑可以是:
在频道上注册返回并确认听众;
记住频道的下一个发布序列号;
等待返回或确认回调.如果是返回回调 – 尚未路由消息,则应忽略对同一传递标记的进一步确认.如果在收到handleReturn()之前收到handleAck()回调,则表示消息已路由到队列.
虽然我不确定在哪种情况下可以调用.handleNack().