这似乎是一个非常简单的问题,但我已经尝试了我能想到的一切.基本上我有一个定时器路由,它将消息发送给一堆不同的bean.那些bean在交换机上设置了一个属性(我也在消息上尝试了一个标题)并且我希望所有这些bean的交换输出都被定向到一个过滤器(它检查属性或头部)然后可选择另一个端点.像这样的东西:
---> Bean A ---
/ \
timer --> multicast ------> Bean B ------> end --> filter --> endpoint
\ /
---> Bean C ---
目前路线看起来像这样,它适用于豆类的多播:
from("timer://my-timer?fixedRate=true&period=20000&delay=0")
.multicast()
.to("bean:beanA", "bean:beanB", "bean:beanC");
以下是我尝试过的一些解决方案:
解决方案1
from("timer://my-timer?fixedRate=true&period=20000&delay=0")
.multicast()
.to("bean:beanA", "bean:beanB", "bean:beanC")
.filter(new myPredicate())
.to("myOptionalEndpoint");
这使过滤器与bean平行而不是在它们之后.
解决方案2
from("timer://my-timer?fixedRate=true&period=20000&delay=0")
.multicast()
.to("bean:beanA", "bean:beanB", "bean:beanC")
.end()
.filter(new myPredicate())
.to("myOptionalEndpoint");
豆是否并行然后进行过滤.但是,未设置属性/标头.似乎交换机是新鲜的计时器,而不是通过豆类的那个…
编辑:我尝试设置正文,实际上到达过滤器的消息没有正文.我无法想象Camel会以某种方式消除消息的有效负载,所以我必须假设这个交换是来自计时器的新交换,而不是通过bean的交换.但是,它发生在bean完成之后.
解决方案3
from("timer://my-timer?fixedRate=true&period=20000&delay=0")
.multicast()
.beanRef("beanA").to("direct:temp")
.beanRef("beanB").to("direct:temp")
.beanRef("beanC").to("direct:temp")
.end()
from("direct:temp")
.filter(new myPredicate())
.to("myOptionalEndpoint");
消息按预期到达过滤器,但我设置的属性/标题已消失,因此没有消息通过过滤器.
编辑:身体已经离开这里太清楚了,我没有得到来自豆子的同样的交换……
为了澄清,我正在寻找一种解决方案,其中来自计时器的单个交换被多播到每个bean(所以现在我们有3个交换),然后将这3个中的每一个发送到过滤器.
任何人都可以帮我弄清楚如何建立这条路线?
最佳答案 您需要使用聚合策略以将所有结果聚合为一个.
下面是http://javarticles.com/2015/05/apache-camel-multicast-examples.html的一个很好的例子(参见带有自定义聚合策略的多播)部分
public class CamelMulticastAggregationExample {
public static final void main(String[] args) throws Exception {
JndiContext jndiContext = new JndiContext();
jndiContext.bind("myBean", new MyBean());
CamelContext camelContext = new DefaultCamelContext(jndiContext);
try {
camelContext.addRoutes(new RouteBuilder() {
public void configure() {
from("direct:start")
.multicast()
.aggregationStrategy(new JoinReplyAggregationStrategy())
.to("direct:a", "direct:b", "direct:c")
.end()
.to("stream:out");
from("direct:a")
.to("bean:myBean?method=addFirst");
from("direct:b")
.to("bean:myBean?method=addSecond");
from("direct:c")
.to("bean:myBean?method=addThird");
}
});
ProducerTemplate template = camelContext.createProducerTemplate();
camelContext.start();
template.sendBody("direct:start", "Multicast");
} finally {
camelContext.stop();
}
}
}
其中JoinReplyAggregationStrategy类如下所示
public class JoinReplyAggregationStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange exchange1, Exchange exchange2) {
if (exchange1 == null) {
return exchange2;
} else {
String body1 = exchange1.getIn().getBody(String.class);
String body2 = exchange2.getIn().getBody(String.class);
String merged = (body1 == null) ? body2 : body1 + "," + body2;
exchange1.getIn().setBody(merged);
return exchange1;
}
}
}
更新在您的情况下,您的聚合策略可能是将所有交换收集在一起,如下所示:
public class ListAggregationStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
Message newIn = newExchange.getIn();
Object newBody = newIn.getBody();
List list = null;
if (oldExchange == null) {
list = new ArrayList();
list.add(newBody);
newIn.setBody(list);
return newExchange;
} else {
Message in = oldExchange.getIn();
list = in.getBody(List.class);
list.add(newBody);
return oldExchange;
}
}
}