>文件处理可能非常耗时 – 必须在自己的任务线程上处理事件
>保持事件处理程序任务的句柄以等待OnStop()事件中的完成.
>跟踪上传文件的哈希值;如果没有不同,不要重新处理
>保留文件哈希值以允许OnStart()处理服务关闭时上载的文件.
>永远不要多次处理文件.
(关于#3,我们确实在没有变化的情况下获取事件…最值得注意的是因为FileWatchers的重复事件问题)
要做这些事情,我有两个字典 – 一个用于上传文件,另一个用于任务本身.这两个对象都是静态的,我需要在添加/删除/更新文件和任务时锁定它们.简化代码:
public sealed class TrackingFileSystemWatcher : FileSystemWatcher { private static readonly object fileWatcherDictionaryLock = new object(); private static readonly object runningTaskDictionaryLock = new object(); private readonly Dictionary<int,Task> runningTaskDictionary = new Dictionary<int,Task>(15); private readonly Dictionary<string,FileSystemWatcherProperties> fileWatcherDictionary = new Dictionary<string,FileSystemWatcherProperties>(); // Wired up elsewhere private void OnChanged(object sender,FileSystemEventArgs eventArgs) { this.ProcessModifiedDataFeed(eventArgs); } private void ProcessModifiedDataFeed(FileSystemEventArgs eventArgs) { lock (TrackingFileSystemWatcher.fileWatcherDictionaryLock) { // Read the file and generate hash here // Properties if the file has been processed before // ContainsNonNullKey is an extension method if (this.fileWatcherDictionary.ContainsNonNullKey(eventArgs.FullPath)) { try { fileProperties = this.fileWatcherDictionary[eventArgs.FullPath]; } catch (KeyNotFoundException keyNotFoundException) {} catch (ArgumentNullException argumentNullException) {} } else { // Create a new properties object } fileProperties.ChangeType = eventArgs.ChangeType; fileProperties.FileContentsHash = md5Hash; fileProperties.LastEventTimestamp = DateTime.Now; Task task; try { task = new Task(() => new DataFeedUploadHandler().UploadDataFeed(this.legalOrg,dataFeedFileData),TaskCreationOptions.LongRunning); } catch { .. } // Only lock long enough to add the task to the dictionary lock (TrackingFileSystemWatcher.runningTaskDictionaryLock) { try { this.runningTaskDictionary.Add(task.Id,task); } catch { .. } } try { task.ContinueWith(t => { try { lock (TrackingFileSystemWatcher.runningTaskDictionaryLock) { this.runningTaskDictionary.Remove(t.Id); } // Will this lock burn me? lock (TrackingFileSystemWatcher.fileWatcherDictionaryLock) { // Persist the file watcher properties to // disk for recovery at OnStart() } } catch { .. } }); task.Start(); } catch { .. } } } }
在同一对象的锁定中定义委托时,在ContinueWith()委托中请求锁定FileSystemWatcher集合的效果是什么?我希望它没问题,即使任务在ProcessModifiedDataFeed()释放锁之前启动,完成并进入ContinueWith(),任务线程也会被暂停,直到创建线程释放锁.但我想确保我没有踩到任何延迟执行的地雷.
看一下代码,我可以尽快释放锁,避免问题,但我还不确定……需要检查完整的代码才能确定.
UPDATE
为了遏制不断上升的“这段代码很糟糕”的评论,我有很好的理由抓住我做的例外情况,并抓住了很多这样的例子.这是一个具有多线程处理程序的Windows服务,它可能不会崩溃.永远.如果其中任何一个线程有未处理的异常,它会做什么.
此外,这些例外写入未来的防弹.我在下面的评论中给出的示例是为处理程序添加工厂……因为今天编写的代码,永远不会有空任务,但如果工厂没有正确实现,代码可能抛出异常.是的,那应该在测试中被捕获.但是,我的团队中有初级开发人员……“May.不.崩溃.” (另外,如果存在未处理的异常,它必须正常关闭,允许当前正在运行的线程完成 – 我们使用main()中设置的未处理异常处理程序).我们将企业级监视器配置为在事件日志中出现应用程序错误时发送警报 – 这些异常将记录并标记我们.这种方法是经过深思熟虑和讨论的决定.
每个可能的异常都经过仔细考虑和选择,分为两类 – 适用于单个数据馈送但不会关闭服务(大多数)的类别,以及那些表明清晰编程或其他根本导致代码对所有数据馈送都没用.例如,如果我们无法写入事件日志,我们选择关闭服务,因为这是我们指示数据馈送未得到处理的主要机制.异常是在本地捕获的,因为本地环境是唯一可以继续做出决定的地方.此外,允许异常冒泡到更高级别(1)违反了抽象概念,并且(2)在工作线程中没有意义.
我对反对处理异常的人数感到惊讶.如果我每次尝试都有一角硬币.那么(例外){什么都不做}我知道,你会在永恒的剩余时间内获得镍币的变化.我认为死亡1如果对.NET框架的调用或您自己的代码抛出异常,您需要考虑会导致该异常发生的场景并明确决定如何处理它.我的代码在IO操作中捕获UnauthorizedExceptions,因为当我考虑如何发生这种情况时,我意识到添加新的dataFeed目录需要授予服务帐户的权限(默认情况下它不会拥有它们).
我很欣赏这些建设性的意见……请不要批评简化的示例代码,并使用广泛的“这个糟透了”的画笔.代码不吸 – 它是防弹的,必然如此.
1如果Jon Skeet不同意,我只会说很长时间
解决方法
lock语句在内部使用Monitor类,它是可重入的.例如如果一个线程已经拥有/拥有锁,它可以多次获取一个锁. Multithreading and Locking (Thread-Safe operations)
而task.ContinueWith在ProcessModifiedDataFeed完成之前启动的另一种情况就像你说的那样.运行ContinueWith的线程只需要等待获取锁定.
我真的会考虑做任务.ContinueWith和在锁之外的task.Start(),如果你审查它.并且可以根据您发布的代码进行操作.
您还应该查看System.Collections.Concurrent命名空间中的ConcurrentDictionary.这将使代码更容易,你不必自己管理锁定.你在这里进行某种比较交换/更新if(this.fileWatcherDictionary.ContainsNonNullKey(eventArgs.FullPath)).例如只有在词典中没有添加.这是一个原子操作.使用ConcurrentDictionary没有任何功能,但有一个AddOrUpdate方法.也许你可以使用这种方法重写它.根据您的代码,您可以安全地使用ConcurrentDictionary至少用于runningTaskDictionary
哦和TaskCreationOptions.LongRunning实际上是为每个任务创建一个新线程,这是一种昂贵的操作. Windows内部线程池在新的Windows版本中是智能的,并且正在动态调整.它会“看到”你正在做很多IO的东西,并会根据需要和实际产生新的线程.
问候