masstransit – 为什么我在跳过的队列中收到消息

我在fork / join配置中有一个saga设置.

在传奇上定义的事件

> FileMetadataMsg
> FileReadyMsg
> SomeOtherMsg

当文件进入单独的侦听器时,进程开始.

>发布SagaStart(correlationId)
>发布FileSavedToMsg(correlationId,fileLoc)
>发布FileMetadataMsg(correlationId,metadata)
>发布FileReadyMsg(correlationId,fileLoc)

下游端点对文件做了一些工作

消费者LT; FileSavedToMsg>

>发布SomeOtherMsg(GotTheFileMsg.correlationId,data)

我在saga_skipped队列中获取了FileSavedToMsg.我只能假设它是由于在FileSavedToMsg上有一个correlationId,因为saga本身没有在其状态机中使用FileSavedToMsg而且没有Event< FileSavedToMsg>.

如果这就是为什么……我应该在CorrelationId以外的字段中传递correlationId,那么传奇看不到它?我需要它,所以我可以用它标记SomeOtherMsg.

以下是saga端点的定义方式

return Bus.Factory.CreateUsingRabbitMq(cfg =>
{
    var host = cfg.Host(new Uri("rabbitmq://localhost/"), h =>
            {
                h.Username("guest");
                h.Password("guest");
            });              

            cfg.ReceiveEndpoint(host, "study_saga", epCfg =>
            {
                epCfg.StateMachineSaga(machine, repository);
            });
});

以下是工作者端点的定义方式

return Bus.Factory.CreateUsingRabbitMq(x =>
{
    var host = x.Host(new Uri("rabbitmq://localhost/"), h =>
        {
            h.Username("guest");
            h.Password("guest");
        });

        x.ReceiveEndpoint(host, "study_3d_volume_worker", c =>
        {
            c.PrefetchCount = 1;
            c.Instance(_studyCreatedMsgConsumer);
        });
 });

它们在同一台机器上运行,但在单独的Console / Topshelf应用程序中运行.

最佳答案 如果您在队列上收到的消息未被该接收端点上的使用者消耗,则可能是您之前正在使用该消息类型并将其从使用者(或您的情况下为saga)中删除,或者您正在使用来自其他目的的队列,它消耗了该消息类型.

无论哪种方式,如果您进入RabbitMQ管理控制台并查找队列,您可以展开Bindings V形,单击以转到相同名称的交换(这是标准的MassTransit约定),然后展开交换的绑定查看哪些消息类型(名为.NET类型名称的交换)绑定到该交换.

如果您看到端点未使用的那个,那就是罪魁祸首.您可以使用UI解除绑定,之后发布的消息将不再发送到队列.

点赞