我创建了这个测试一个并行提取:
public static async Task ExtractToDirectoryAsync(this FileInfo file,DirectoryInfo folder) { ActionBlock<ZipArchiveEntry> block = new ActionBlock<ZipArchiveEntry>((entry) => { var path = Path.Combine(folder.FullName,entry.FullName); Directory.CreateDirectory(Path.GetDirectoryName(path)); entry.ExtractToFile(path); },new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 }); using (var archive = ZipFile.OpenRead(file.FullName)) { foreach (var entry in archive.Entries.Where(e => e.Name != string.Empty)) { block.Post(entry); } block.Complete(); await block.Completion; } }
并进行以下测试单元测试:
[TestMethod] public async Task ExtractTestAsync() { if (Resources.LocalExtractFolder.Exists) Resources.LocalExtractFolder.Delete(true); // Resources.LocalExtractFolder.Create(); await Resources.WebsiteZip.ExtractToDirectoryAsync(Resources.LocalExtractFolder); }
使用MaxDegreeOfParallelism = 1,事情工作,但2它不.
Test Name: ExtractTestAsync Test FullName: Composite.Azure.Tests.ZipFileTests.ExtractTestAsync Test Source: c:\Development\C1\local\CompositeC1\Composite.Azure.Tests\ZipFileTests.cs : line 21 Test Outcome: Failed Test Duration: 0:00:02.4138753 Result Message: Test method Composite.Azure.Tests.ZipFileTests.ExtractTestAsync threw exception: System.IO.InvalidDataException: Unknown block type. Stream might be corrupted. Result StackTrace: at System.IO.Compression.Inflater.Decode() at System.IO.Compression.Inflater.Inflate(Byte[] bytes,Int32 offset,Int32 length) at System.IO.Compression.DeflateStream.Read(Byte[] array,Int32 count) at System.IO.Stream.InternalCopyTo(Stream destination,Int32 bufferSize) at System.IO.Stream.CopyTo(Stream destination) at System.IO.Compression.ZipFileExtensions.ExtractToFile(ZipArchiveEntry source,String destinationFileName,Boolean overwrite) at System.IO.Compression.ZipFileExtensions.ExtractToFile(ZipArchiveEntry source,String destinationFileName) at Composite.Azure.Storage.Compression.ZipArchiveExtensions.<>c__DisplayClass6.<ExtractToDirectoryAsync>b__3(ZipArchiveEntry entry) in c:\Development\C1\local\CompositeC1\Composite.Azure.Storage\Compression\ZipArchiveExtensions.cs:line 37 at System.Threading.Tasks.Dataflow.ActionBlock`1.ProcessMessage(Action`1 action,KeyValuePair`2 messageWithId) at System.Threading.Tasks.Dataflow.ActionBlock`1.<>c__DisplayClass5.<.ctor>b__0(KeyValuePair`2 messageWithId) at System.Threading.Tasks.Dataflow.Internal.TargetCore`1.ProcessMessagesLoopCore() --- End of stack trace from prevIoUs location where exception was thrown --- at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Runtime.CompilerServices.TaskAwaiter.GetResult() at Composite.Azure.Storage.Compression.ZipArchiveExtensions.<ExtractToDirectoryAsync>d__8.MoveNext() in c:\Development\C1\local\CompositeC1\Composite.Azure.Storage\Compression\ZipArchiveExtensions.cs:line 48 --- End of stack trace from prevIoUs location where exception was thrown --- at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Runtime.CompilerServices.TaskAwaiter.GetResult() at Composite.Azure.Tests.ZipFileTests.<ExtractTestAsync>d__2.MoveNext() in c:\Development\C1\local\CompositeC1\Composite.Azure.Tests\ZipFileTests.cs:line 25 --- End of stack trace from prevIoUs location where exception was thrown --- at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
更新2
这是我自己去做并行,它不工作:)记住在continueWith中处理异常.
public static void ExtractToDirectorySemaphore(this FileInfo file,DirectoryInfo folder) { int MaxDegreeOfParallelism = 2; using (var archive = ZipFile.OpenRead(file.FullName)) { var semaphore = new Semaphore(MaxDegreeOfParallelism,MaxDegreeOfParallelism); foreach (var entry in archive.Entries.Where(e => e.Name != string.Empty)) { semaphore.WaitOne(); var task = Task.Run(() => { var path = Path.Combine(folder.FullName,entry.FullName); Directory.CreateDirectory(Path.GetDirectoryName(path)); entry.ExtractToFile(path); }); task.ContinueWith(handle => { try { //do any cleanup/post processing } finally { // Release the semaphore so the next thing can be processed semaphore.Release(); } }); } while(MaxDegreeOfParallelism-->0) semaphore.WaitOne(); //Wait here until the last task completes. } }
这里是异步版本:
public static Task ExtractToDirectorySemaphoreAsync(this FileInfo file,DirectoryInfo folder) { return Task.Factory.StartNew(() => { int MaxDegreeOfParallelism = 50; using (var archive = ZipFile.OpenRead(file.FullName)) { var semaphore = new Semaphore(MaxDegreeOfParallelism,MaxDegreeOfParallelism); foreach (var entry in archive.Entries.Where(e => e.Name != string.Empty)) { semaphore.WaitOne(); var task = Task.Run(() => { var path = Path.Combine(folder.FullName,entry.FullName); Directory.CreateDirectory(Path.GetDirectoryName(path)); entry.ExtractToFile(path); }); task.ContinueWith(handle => { try { //do any cleanup/post processing } finally { // Release the semaphore so the next thing can be processed semaphore.Release(); } },TaskContinuationOptions.AttachedToParent); // the outher task will wait for all. } } }); }
更新3
在handle.Exception中抛出以下异常.
{"Block length does not match with its complement."} [0] = {"A local file header is corrupt."}
必须找出ZipFile是否是线程安全的.
解决方法
拆解器:它只是一个概念证明.
在代码中的示例中使用ParallelZipFile.OpenRead替换ZipFile.OpenRead全部4个单位测试通过.
public class ParallelZipFile { public static ParallelZipArchive OpenRead(string path) { return new ParallelZipArchive(ZipFile.OpenRead(path),path); } } public class ParallelZipArchive : IDisposable { internal ZipArchive _archive; internal string _path; internal ConcurrentQueue<ZipArchive> FreeReaders = new ConcurrentQueue<ZipArchive>(); public ParallelZipArchive(ZipArchive zip,string path) { _path = path; _archive = zip; FreeReaders.Enqueue(zip); } public ReadOnlyCollection<ParallelZipArchiveEntry> Entries { get { var list = new List<ParallelZipArchiveEntry>(_archive.Entries.Count); int i = 0; foreach (var entry in _archive.Entries) list.Add(new ParallelZipArchiveEntry(i++,entry,this)); return new ReadOnlyCollection<ParallelZipArchiveEntry>(list); } } public void Dispose() { foreach (var archive in FreeReaders) archive.Dispose(); } } public class ParallelZipArchiveEntry { private ParallelZipArchive _parent; private int _entry; public string Name { get; set; } public string FullName { get; set; } public ParallelZipArchiveEntry(int entryNr,ZipArchiveEntry entry,ParallelZipArchive parent) { _entry = entryNr; _parent = parent; Name = entry.Name; FullName = entry.FullName; } public void ExtractToFile(string path) { ZipArchive value; Trace.TraceInformation(string.Format("Number of readers: {0}",_parent.FreeReaders.Count)); if (!_parent.FreeReaders.TryDequeue(out value)) value = ZipFile.OpenRead(_parent._path); value.Entries.Skip(_entry).First().ExtractToFile(path); _parent.FreeReaders.Enqueue(value); } }
单元测试
[TestClass] public class ZipFileTests { [ClassInitialize()] public static void PreInitialize(TestContext context) { if (Resources.LocalExtractFolderTruth.Exists) Resources.LocalExtractFolderTruth.Delete(true); ZipFile.ExtractToDirectory(Resources.WebsiteZip.FullName,Resources.LocalExtractFolderTruth.FullName); } [TestInitialize()] public void InitializeTests() { if (Resources.LocalExtractFolder.Exists) Resources.LocalExtractFolder.Delete(true); } [TestMethod] public void ExtractTest() { Resources.WebsiteZip.ExtractToDirectory(Resources.LocalExtractFolder); Assert.IsTrue(Helpers.DirectoryTools.CompareDirectories( Resources.LocalExtractFolderTruth,Resources.LocalExtractFolder)); } [TestMethod] public async Task ExtractAsyncTest() { await Resources.WebsiteZip.ExtractToDirectoryAsync(Resources.LocalExtractFolder); Assert.IsTrue(Helpers.DirectoryTools.CompareDirectories( Resources.LocalExtractFolderTruth,Resources.LocalExtractFolder)); } [TestMethod] public void ExtractSemaphoreTest() { Resources.WebsiteZip.ExtractToDirectorySemaphore(Resources.LocalExtractFolder); Assert.IsTrue(Helpers.DirectoryTools.CompareDirectories( Resources.LocalExtractFolderTruth,Resources.LocalExtractFolder)); } [TestMethod] public async Task ExtractSemaphoreAsyncTest() { await Resources.WebsiteZip.ExtractToDirectorySemaphoreAsync(Resources.LocalExtractFolder); Assert.IsTrue(Helpers.DirectoryTools.CompareDirectories( Resources.LocalExtractFolderTruth,Resources.LocalExtractFolder)); } }