java – Storm中的延迟队列实现 – Kafka,Cassandra,Redis或Beanstalk?

我有一个风暴拓扑来处理来自Kafka的消息,并根据手头的任务在Cassandra中进行HTTP调用/保存.我会在消息到来时立即处理.由于来自HTTP等外部源的响应,很少有消息未被完全处理.我想实现重试的指数退避机制,以便在一段时间后HTTP服务器不响应/返回错误消息重试.我可以想到几个我可以实现它们的想法.我想知道哪些是更好的解决方案,如果有任何其他我可以使用的解决方案是容错的.由于这用于实现指数退避,因此每条消息将具有不同的延迟时间.

>在Kafka中发送另一个主题,以后再使用.我首选的解决方案我知道我们可以使用Kafka偏移量,因此在后期使用消息.我怎么也找不到文档/示例代码来做同样的事情.如果有人可以帮助我,这将是非常有帮助的.
>编写消息Cassandra / Redis并编写调度程序以获取未处理且准备好使用的消息并将其发送到Kafka,以便我的风暴拓扑可以使用它. (其他遗留项目中的现有解决方案(非风暴))
>延迟发送到Beanstalk(其他遗留项目中的现有解决方案(非风暴).我怎么想避免使用此解决方案并仅在我不能使用的情况下使用它).

虽然这几乎是我想做的事情.我无法找到实现delayProcessingUntil的文档,如Kafka – Delayed Queue implementation using high level consumer中所述

我已经从Data-store完成了预定的工作,并且在过去使用Beanstalk延迟了,但我更喜欢使用Kafka.

最佳答案 Kafka spout内置了指数退避消息重试.您可以通过喷口配置配置初始延迟,延迟乘数和最大延迟.如果螺栓中有错误,您可以调用collector.fail(输入).在那之后你只需要让它喷出来进行重试.

https://github.com/apache/storm/blob/v0.10.0/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java

点赞