对于它的价值,我的消费者处理排队的项目只不过是通过SOAP将它们上传到一个不是100%可靠的网络应用程序.如果无法建立连接或调用SOAP调用失败,我应该丢弃这些项目并返回队列以获取更多信息.由于SOAP的开销,我试图最大化队列中可以在一次SOAP调用中发送的项目数.
有时,我的制作人可能会比我的消费者能够删除和处理它们更快地添加项目.如果队列已满并且我的生产者需要添加另一个项目,我需要将新项目排队,然后将最旧的项目出列,以便队列的大小保持固定.基本上,我需要始终保留队列中生成的最新项目(即使这意味着某些项目不会被消耗,因为我的消费者当前正在处理以前的项目).
关于生产者如果队列中的项目是固定的那样保留数字,我从这个问题中发现了一个潜在的想法:
Fixed size queue which automatically dequeues old values upon new enques
我目前在ConcurrentQueue周围使用一个包装类(基于该答案)和Enqueue()方法,如下所示:
public class FixedSizeQueue<T> { readonly ConcurrentQueue<T> queue = new ConcurrentQueue<T>(); public int Size { get; private set; } public FixedSizeQueue(int size) { Size = size; } public void Enqueue(T obj) { // add item to the queue queue.Enqueue(obj); lock (this) // lock queue so that queue.Count is reliable { while (queue.Count > Size) // if queue count > max queue size,then dequeue an item { T objOut; queue.TryDequeue(out objOut); } } } }
我创建了这个类的实例,对队列有一个大小限制,如下所示:
FixedSizeQueue<string> incomingMessageQueue = new FixedSizeQueue<string>(10); // 10 item limit
我启动了我的生产者任务,它开始填充队列.我的Enqueue()方法中的代码似乎在添加项目时从队列中删除最旧的项目时正常工作导致队列计数超过最大大小.现在我需要我的消费者任务来将项目出列并处理它们,但这就是我的大脑混淆的地方.为我的消费者实现Dequeue方法的最佳方法是什么,它将在某个时刻获取队列的快照并将所有项目出列以进行处理(生产者可能仍然在此过程中将项目添加到队列中)?
解决方法
这将是你的GetAll()方法:
public T[] GetAll() { lock (syncObj) // so that we don't clear items we didn't get with ToArray() { var result = queue.ToArray(); T trash; while(!queue.IsEmpty) queue.TryDequeue(out trash); } }
由于你必须清除队列,你可以简单地组合这两个操作;创建一个适当大小的数组(使用queue.Count),然后在队列不为空时,在返回之前将一个项目出列并将其放入数组中.
现在,这是具体问题的答案.我现在必须在良心上穿上我的CodeReview.SE帽子并指出一些事情:
>永远不要使用锁(这个).您永远不会知道其他对象可能将您的对象用作锁定焦点,因此当对象从内部锁定自身时会被阻止.最佳做法是锁定一个私有范围的对象实例,通常只创建一个被锁定的对象:private readonly object syncObj = new object();
>因为无论如何你都要锁定包装器的关键部分,我会使用普通的List< T>而不是并发集合.访问速度更快,更容易清理,因此您可以比ConcurrentQueue允许的更简单地完成您所做的工作.要排队,请在索引零之前锁定同步对象Insert(),然后使用RemoveRange()从索引大小中删除任何项目到列表的当前计数.要出列,请锁定同一个同步对象,调用myList.ToArray()(来自Linq命名空间;与ConcurrentQueue完全相同),然后在返回数组之前调用myList.Clear().不能简单:
public class FixedSizeQueue<T> { private readonly List<T> queue = new List<T>(); private readonly object syncObj = new object(); public int Size { get; private set; } public FixedSizeQueue(int size) { Size = size; } public void Enqueue(T obj) { lock (syncObj) { queue.Insert(0,obj) if(queue.Count > Size) queue.RemoveRange(Size,Count-Size); } } public T[] Dequeue() { lock (syncObj) { var result = queue.ToArray(); queue.Clear(); return result; } } }
>您似乎明白使用此模型将您排队的物品扔掉了.这通常不是一件好事,但我愿意给你怀疑的好处.但是,我会说使用BlockingCollection有一种无损的方法来实现这一点. BlockingCollection包装任何IProducerConsumerCollection,包括大多数System.Collections.Concurrent类,并允许您指定队列的最大容量.然后,该集合将阻止任何尝试从空队列或任何尝试添加到完整队列的线程出队的线程,直到已添加或删除项目,以便有可能获得某些内容或插入空间.这是实现具有最大大小的生产者 – 消费者队列的最佳方式,或者是否需要“轮询”以查看是否有供消费者使用的内容的最佳方式.如果你走这条路,只有消费者扔掉的东西才会被扔掉;消费者将看到生产者投入的所有行,并对每个行做出自己的决定.