java – Camel:如何在多播后加入单个路径?

这似乎是一个非常简单的问题,但我已经尝试了我能想到的一切.基本上我有一个定时器路由,它将消息发送给一堆不同的bean.那些bean在交换机上设置了一个属性(我也在消息上尝试了一个标题)并且我希望所有这些bean的交换输出都被定向到一个过滤器(它检查属性或头部)然后可选择另一个端点.像这样的东西:

                       ---> Bean A ---
                      /               \
timer --> multicast ------> Bean B ------> end --> filter --> endpoint
                      \               /
                       ---> Bean C ---

目前路线看起来像这样,它适用于豆类的多播:

from("timer://my-timer?fixedRate=true&period=20000&delay=0")
  .multicast()
    .to("bean:beanA", "bean:beanB", "bean:beanC");

以下是我尝试过的一些解决方案:

解决方案1

from("timer://my-timer?fixedRate=true&period=20000&delay=0")
  .multicast()
    .to("bean:beanA", "bean:beanB", "bean:beanC")
  .filter(new myPredicate())
    .to("myOptionalEndpoint");

这使过滤器与bean平行而不是在它们之后.

解决方案2

from("timer://my-timer?fixedRate=true&period=20000&delay=0")
  .multicast()
    .to("bean:beanA", "bean:beanB", "bean:beanC")
  .end()
  .filter(new myPredicate())
    .to("myOptionalEndpoint");

豆是否并行然后进行过滤.但是,未设置属性/标头.似乎交换机是新鲜的计时器,而不是通过豆类的那个…

编辑:我尝试设置正文,实际上到达过滤器的消息没有正文.我无法想象Camel会以某种方式消除消息的有效负载,所以我必须假设这个交换是来自计时器的新交换,而不是通过bean的交换.但是,它发生在bean完成之后.

解决方案3

from("timer://my-timer?fixedRate=true&period=20000&delay=0")
  .multicast()
    .beanRef("beanA").to("direct:temp")
    .beanRef("beanB").to("direct:temp")
    .beanRef("beanC").to("direct:temp")
  .end()

from("direct:temp")
  .filter(new myPredicate())
    .to("myOptionalEndpoint");

消息按预期到达过滤器,但我设置的属性/标题已消失,因此没有消息通过过滤器.

编辑:身体已经离开这里太清楚了,我没有得到来自豆子的同样的交换……

为了澄清,我正在寻找一种解决方案,其中来自计时器的单个交换被多播到每个bean(所以现在我们有3个交换),然后将这3个中的每一个发送到过滤器.

任何人都可以帮我弄清楚如何建立这条路线?

最佳答案 您需要使用聚合策略以将所有结果聚合为一个.

下面是http://javarticles.com/2015/05/apache-camel-multicast-examples.html的一个很好的例子(参见带有自定义聚合策略的多播)部分

public class CamelMulticastAggregationExample {
  public static final void main(String[] args) throws Exception {
    JndiContext jndiContext = new JndiContext();
    jndiContext.bind("myBean", new MyBean());
    CamelContext camelContext = new DefaultCamelContext(jndiContext);
    try {
        camelContext.addRoutes(new RouteBuilder() {
            public void configure() {
                from("direct:start")
                .multicast()
                .aggregationStrategy(new JoinReplyAggregationStrategy())
                .to("direct:a", "direct:b", "direct:c")
                .end()
                .to("stream:out");

                from("direct:a")
                .to("bean:myBean?method=addFirst");

                from("direct:b")
                .to("bean:myBean?method=addSecond");

                from("direct:c")
                .to("bean:myBean?method=addThird");
            }
        });
        ProducerTemplate template = camelContext.createProducerTemplate();
        camelContext.start();
        template.sendBody("direct:start", "Multicast");
    } finally {
        camelContext.stop();
    }
  }

}

其中JoinReplyAggregationStrategy类如下所示

public class JoinReplyAggregationStrategy implements AggregationStrategy {

  public Exchange aggregate(Exchange exchange1, Exchange exchange2) {
    if (exchange1 == null) {
        return exchange2;
    } else {
        String body1 = exchange1.getIn().getBody(String.class);
        String body2 = exchange2.getIn().getBody(String.class);
        String merged = (body1 == null) ? body2 : body1 + "," + body2;
        exchange1.getIn().setBody(merged);
        return exchange1;
    }
  }
}

更新在您的情况下,您的聚合策略可能是将所有交换收集在一起,如下所示:

public class ListAggregationStrategy implements AggregationStrategy {

    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
            Message newIn = newExchange.getIn();
            Object newBody = newIn.getBody();
            List list = null;
            if (oldExchange == null) {
                    list = new ArrayList();
                    list.add(newBody);
                    newIn.setBody(list);
                    return newExchange;
            } else {
                    Message in = oldExchange.getIn();
                    list = in.getBody(List.class);
                    list.add(newBody);
                    return oldExchange;
            }
    }

}
点赞