我在fork / join配置中有一个saga设置.
在传奇上定义的事件
> FileMetadataMsg
> FileReadyMsg
> SomeOtherMsg
当文件进入单独的侦听器时,进程开始.
>发布SagaStart(correlationId)
>发布FileSavedToMsg(correlationId,fileLoc)
>发布FileMetadataMsg(correlationId,metadata)
>发布FileReadyMsg(correlationId,fileLoc)
下游端点对文件做了一些工作
消费者LT; FileSavedToMsg>
>发布SomeOtherMsg(GotTheFileMsg.correlationId,data)
我在saga_skipped队列中获取了FileSavedToMsg.我只能假设它是由于在FileSavedToMsg上有一个correlationId,因为saga本身没有在其状态机中使用FileSavedToMsg而且没有Event< FileSavedToMsg>.
如果这就是为什么……我应该在CorrelationId以外的字段中传递correlationId,那么传奇看不到它?我需要它,所以我可以用它标记SomeOtherMsg.
以下是saga端点的定义方式
return Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri("rabbitmq://localhost/"), h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.ReceiveEndpoint(host, "study_saga", epCfg =>
{
epCfg.StateMachineSaga(machine, repository);
});
});
以下是工作者端点的定义方式
return Bus.Factory.CreateUsingRabbitMq(x =>
{
var host = x.Host(new Uri("rabbitmq://localhost/"), h =>
{
h.Username("guest");
h.Password("guest");
});
x.ReceiveEndpoint(host, "study_3d_volume_worker", c =>
{
c.PrefetchCount = 1;
c.Instance(_studyCreatedMsgConsumer);
});
});
它们在同一台机器上运行,但在单独的Console / Topshelf应用程序中运行.
最佳答案 如果您在队列上收到的消息未被该接收端点上的使用者消耗,则可能是您之前正在使用该消息类型并将其从使用者(或您的情况下为saga)中删除,或者您正在使用来自其他目的的队列,它消耗了该消息类型.
无论哪种方式,如果您进入RabbitMQ管理控制台并查找队列,您可以展开Bindings V形,单击以转到相同名称的交换(这是标准的MassTransit约定),然后展开交换的绑定查看哪些消息类型(名为.NET类型名称的交换)绑定到该交换.
如果您看到端点未使用的那个,那就是罪魁祸首.您可以使用UI解除绑定,之后发布的消息将不再发送到队列.