我正在尝试找到一种方法来处理多个线程中的队列,动态调整消费者的数量.基本上这个任务是众所周知的:多个生成器创建消息并将它们提交到队列中,多个消费者处理来自队列的消息.现在,我想到了使用不同组件的不同方法,如System.Collections.Queue.Synchronized,System.Collections.Concurrent.ConcurrentQueue和System.Collections.Concurrent.BlockingCollection,但我无法决定如何正确使用它最高效率所以我很乐意通过您的输入获得一些明智的想法.
这里有更多细节:
>在某些情况下,消息率预计会非常密集,但处理方式相对简单;
>我不知道我应该有多少消费者;
>我希望流程调整当前消费者的数量,而不是阻止它们,具体取决于排队的消息数量(这意味着我希望为每一百条消息填充额外的消费者fe,并且消费者应停止,如果排队的消息数量比填充消息所需的数量少50,当消息量增加超过300时,将填充第三个消费者,并且当消息量降至250时,它应该停止.
这是个主意.现在,我考虑将ConcurrentQueue包装成一个封装Enqueue方法的类,并在排队后检查消息的数量,并决定启动另一个消费者.并且消费者应该在循环内有一个检查,应该做出关于停止它的决定.我想你会提出一些更有趣的解决方案.
顺便说一下,理论上,当最后一条消息被排队并且同时最后一个消费者已经停止时,我仍然不知道如何处理的情况之一.另一种情况也是关于停止 – 如果他们将在同一时间停止检查,将停止几个消费者.我该如何处理这些情况?
为了证明我的意思,请考虑以下示例:
class MessageController
{
private BlockingCollection<IMessage> messageQueue = new BlockingCollection<IMessage>();
int amountOfConsumers;
public void Enqueue(IMessage message)
{
messageQueue.Add(message); // point two
if (Math.Floor((double)messageQueue.Count / 100)+1 > amountOfConsumers) // point three
{
Task.Factory.StartNew(() =>
{
IMessage msg;
while ((messageQueue.Count > 0) && (Math.Floor((double)((messageQueue.Count + 50) / 100)) + 1 >= amountOfConsumers)) //point one
{
msg = messageQueue.Take();
//process msg...
}
ConsumerQuit(); // point four
});
Interlocked.Increment(ref amountOfConsumers);
}
}
public void ConsumerQuit()
{
Interlocked.Decrement(ref amountOfConsumers);
}
}
所以现在当我可以指出具体的代码行时,这些是问题:
>当最后一个消费者发现没有消息排队(@point one)并且在调用ConsumerQuit方法之前,最后一条消息到达并排队,然后检查其他消费者,结果是(@point three )仍然有一个消费者在工作,并且由于一个消费者对单个消息的绰绰有余 – 没有任何反应,然后最终调用ConsumerQuit,并且我有一个消息滞留在队列中.
06001
>当其中一个应该停止时,几个消费者同时进行了“一点”检查(fe messageQueue.Count是249),其中几个将停止,因为在ConsumerQuit被调用之前,其他几个人将进行此检查也.
06002
Here, in case when the last message is already enqueued, we have one consumer task left that has to handle 249 messages alone, however the worst case can be if all them will halt, after the last message, potentialy hundreds of messages will stuck.
最佳答案 看来我终于想出了一个解决方案,但不确定性能.请考虑以下代码,任何反馈将不胜感激!我仍然希望看到一些其他解决方案或想法,即使它们将完全不同并且需要对方法进行重大改变.这是目标:“一种在多个线程中处理队列,动态调整消费者数量的方法”
class MessageController
{
private BlockingCollection<IMessage> messageQueue = new BlockingCollection<IMessage>();
private ManualResetEvent mre = new ManualResetEvent(true);
private int amountOfConsumers;
object o = new object();
public void Enqueue(IMessage message)
{
messageQueue.Add(message);
mre.WaitOne();
if (Math.Floor((double)messageQueue.Count / 100)+1 > amountOfConsumers)
{
Interlocked.Increment(ref amountOfConsumers);
var task = Task.Factory.StartNew(() =>
{
IMessage msg;
bool repeat = true;
while (repeat)
{
while ((messageQueue.Count > 0) && (Math.Floor((double)((messageQueue.Count + 50) / 100)) + 1 >= amountOfConsumers))
{
msg = messageQueue.Take();
//process msg...
}
lock (o)
{
mre.Reset();
if ((messageQueue.Count == 0) || (Math.Ceiling((double)((messageQueue.Count + 51) / 100)) < amountOfConsumers))
{
ConsumerQuit();
repeat = false;
}
mre.Set();
}
}
});
}
}
public void ConsumerQuit()
{
Interlocked.Decrement(ref amountOfConsumers);
}
}