现在来问题了.我们有一个ID列表,我们定期(每2秒)为每个ID调用StoredProcedure.
每个项目需要单独检查2秒,因为它们是在运行时添加和删除的.
此外,我们希望配置最大并行度,因为DB不应同时充满300个线程.
正在处理的项目在完成上一次执行之前不应重新安排进行处理.原因是我们想要防止排队很多项目,以防数据库出现延迟.
现在我们正在使用一个自行开发的组件,它有一个主线程,定期检查需要安排处理的项目.一旦它有了列表,它就会删除那些基于自定义IOCP的线程池,然后使用等待句柄等待正在处理的项目.然后下一次迭代开始. IOCP因为它提供的工作窃取.
我想用TPL / .NET 4版本替换这个自定义实现,我想知道你将如何解决它(理想的简单和易读/可维护).
我知道这篇文章:http://msdn.microsoft.com/en-us/library/ee789351.aspx,但它只是限制了使用的线程数量.叶子偷工作,定期执行物品….
理想情况下,它将成为一个通用组件,可用于需要定期为项目列表完成的所有任务.
欢迎任何投入,
TIA
马丁
解决方法
Tasks
.对于初学者,我会在
ConcurrentQueue
(默认值)附近设置
BlockingCollection
,而在BlockingCollection上没有设置
BoundedCapacity
来存储需要处理的ID.
// Setup the blocking collection somewhere when your process starts up (OnStart for a Windows service) BlockingCollection<string> idsToProcess = new BlockingCollection<string>();
从那里我将使用BlockingCollection::GetConsumingEnumerable
从BlockingCollection::GetConsumingEnumerable
返回的枚举.在ForEach调用中,您将设置ParallelOptions::MaxDegreeOfParallelism
在ForEach的主体内,您将执行存储过程.
现在,一旦存储过程执行完成,您就说您不想重新安排执行至少两秒钟.没问题,安排一个带有回调的System.Threading.Timer
,它只会将ID添加回提供的回调中的BlockingCollection.
Parallel.ForEach( idsToProcess.GetConsumingEnumerable(),new ParallelOptions { MaxDegreeOfParallelism = 4 // read this from config },(id) => { // ... execute sproc ... // Need to declare/assign this before the delegate so that we can dispose of it inside Timer timer = null; timer = new Timer( _ => { // Add the id back to the collection so it will be processed again idsToProcess.Add(id); // Cleanup the timer timer.Dispose(); },null,// no state,id wee need is "captured" in the anonymous delegate 2000,// probably should read this from config Timeout.Infinite); }
最后,当进程关闭时,您将调用BlockingCollection::CompleteAdding
,以便使用停止阻塞和完成处理可枚举,并且Parallel :: ForEach将退出.例如,如果这是Windows服务,您将在OnStop
执行此操作.
// When ready to shutdown you just signal you're done adding idsToProcess.CompleteAdding();
更新
您在评论中提出了一个有效的问题,即您可能在任何给定点处理大量ID,并担心每个ID的计时器会产生过多的开销.我绝对同意这一点.因此,在您同时处理大量ID的情况下,我将从使用每个ID的定时器更改为使用另一个队列来保存由单个短间隔计时器监视的“休眠”ID.首先,你需要一个ConcurrentQueue来放置睡着的ID:
ConcurrentQueue<Tuple<string,DateTime>> sleepingIds = new ConcurrentQueue<Tuple<string,DateTime>>();
现在,我在这里使用两部分Tuple
进行说明,但您可能希望为它创建一个更强类型的结构(或者至少使用using语句将其替换)以提高可读性.元组具有id和DateTime,表示它何时被放入队列.
现在,您还需要设置将监视此队列的计时器:
Timer wakeSleepingIdsTimer = new Timer( _ => { DateTime utcNow = DateTime.UtcNow; // Pull all items from the sleeping queue that have been there for at least 2 seconds foreach(string id in sleepingIds.TakeWhile(entry => (utcNow - entry.Item2).TotalSeconds >= 2)) { // Add this id back to the processing queue idsToProcess.Enqueue(id); } },// no state Timeout.Infinite,// no due time 100 // wake up every 100ms,probably should read this from config );
然后你只需要改变Parallel :: ForEach来执行以下操作,而不是为每个设置一个计时器:
(id) => { // ... execute sproc ... sleepingIds.Enqueue(Tuple.Create(id,DateTime.UtcNow)); }