我想获取几条消息,处理它们并在此之后将它们组合在一起.所以基本上我收到一条消息,把它放在一些队列中并继续从兔子接收消息.不同的线程将使用收到的消息监视此队列,并在金额足够时处理它们.我能找到的关于ack的所有内容仅包含一条在同一线程上处理的消息的示例.像这样(来自官方文档):
channel.basicQos(1);
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
文档也说明了这一点:
Channel instances must not be shared between threads. Applications
should prefer using a Channel per thread instead of sharing the same
Channel across multiple threads. While some operations on channels are
safe to invoke concurrently, some are not and will result in incorrect
frame interleaving on the wire.
所以我在这里很困惑.如果我正在寻找一些消息,同时频道正在接收来自兔子的另一条消息,那么当时它被认为是两个操作吗?在我看来,是的.
我试图从不同的线程确认同一频道上的消息,它似乎工作,但文档说我不应该在线程之间共享通道.所以我试图在不同的线程上使用不同的通道进行确认,但是它失败了,因为此通道的传递标签是未知的.
是否可以在收到的同一个帖子上确认消息?
UPD
我想要的一段代码示例.它是scala,但我认为这很简单.
case class AmqpMessage(envelope: Envelope, msgBody: String)
val queue = new ArrayBlockingQueue[AmqpMessage](100)
val consumeChannel = connection.createChannel()
consumeChannel.queueDeclare(queueName, true, false, true, null)
consumeChannel.basicConsume(queueName, false, new DefaultConsumer(consumeChannel) {
override def handleDelivery(consumerTag: String,
envelope: Envelope,
properties: BasicProperties,
body: Array[Byte]): Unit = {
queue.put(new AmqpMessage(envelope, new String(body)))
}
})
Future {
// this is different thread
val channel = connection.createChannel()
while (true) {
try {
val amqpMessage = queue.take()
channel.basicAck(amqpMessage.envelope.getDeliveryTag, false) // doesn't work
consumeChannel.basicAck(amqpMessage.envelope.getDeliveryTag, false) // works, but seems like not thread safe
} catch {
case e: Exception => e.printStackTrace()
}
}
}
最佳答案 尽管文档非常严格,但通道上的某些操作可以安全地同时调用.
您可以在不同的线程中确认消息,只要消费和执行是您在通道上执行的唯一操作即可.
看到这个问题,它涉及同样的事情: