我有以下情况,我认为可能是很常见的:
>有一个任务(一个UI命令处理程序)可以同步或异步地完成.
>命令可能会比处理得更快.
>如果命令已经有待处理的任务,那么新的命令处理程序任务应该被排队并顺序处理.
>每个新任务的结果可能取决于上一个任务的结果.
应该注意取消,但为了简单起见,我想把它放在这个问题的范围之外.此外,线程安全(并发)不是必需的,但是必须支持重新入侵.
以下是我正在尝试实现的一个基本示例(作为控制台应用程序,为简单起见):
using System; using System.Threading.Tasks; namespace ConsoleApp { class Program { static void Main(string[] args) { var asyncOp = new AsyncOp<int>(); Func<int,Task<int>> handleAsync = async (arg) => { Console.WriteLine("this task arg: " + arg); //await Task.Delay(arg); // make it async return await Task.FromResult(arg); // sync }; Console.WriteLine("Test #1..."); asyncOp.RunAsync(() => handleAsync(1000)); asyncOp.RunAsync(() => handleAsync(900)); asyncOp.RunAsync(() => handleAsync(800)); asyncOp.CurrentTask.Wait(); Console.WriteLine("\nPress any key to continue to test #2..."); Console.ReadLine(); asyncOp.RunAsync(() => { asyncOp.RunAsync(() => handleAsync(200)); return handleAsync(100); }); asyncOp.CurrentTask.Wait(); Console.WriteLine("\nPress any key to exit..."); Console.ReadLine(); } // AsyncOp class AsyncOp<T> { Task<T> _pending = Task.FromResult(default(T)); public Task<T> CurrentTask { get { return _pending; } } public Task<T> RunAsync(Func<Task<T>> handler) { var pending = _pending; Func<Task<T>> wrapper = async () => { // await the prev task var prevResult = await pending; Console.WriteLine("\nprev task result: " + prevResult); // start and await the handler return await handler(); }; _pending = wrapper(); return _pending; } } } }
输出:
Test #1... prev task result: 0 this task arg: 1000 prev task result: 1000 this task arg: 900 prev task result: 900 this task arg: 800 Press any key to continue to test #2... prev task result: 800 prev task result: 800 this task arg: 200 this task arg: 100 Press any key to exit...
它按照要求工作,直到在第2测试中引入重入:
asyncOp.RunAsync(() => { asyncOp.RunAsync(() => handleAsync(200)); return handleAsync(100); });
期望的输出应该是100,200,而不是200,100,因为已经有一个待处理的外部任务为100.这显然是因为内部任务同步执行,破坏逻辑var pending = _pending; / * … * / _pending = wrapper()作为外部任务.
如何使测试#2工作?
一个解决方案是使用Task.Factory.StartNew(…,TaskScheduler.FromCurrentSynchronizationContext())为每个任务强制执行异步,但是我不想对可能在内部同步的命令处理程序强加异步执行.,我不想依赖任何特定同步上下文的行为(即依赖该Task.Factory.StartNew应该在创建的任务实际启动之前返回).
在现实生活的项目中,我对AsyncOp在上面负责,但是无法控制命令处理程序(即在handleAsync内部的任何内容).
解决方法
我几乎忘记了可以手动构建一个任务,而不需要启动或调度它.然后,“Task.Factory.StartNew” vs “new Task(…).Start”把我放回轨道.我认为这是少数情况之一,
Task<TResult>
构造函数可能实际上是有用的,以及嵌套任务(Task< Task&T)>和
Task.Unwrap()
:
// AsyncOp class AsyncOp<T> { Task<T> _pending = Task.FromResult(default(T)); public Task<T> CurrentTask { get { return _pending; } } public Task<T> RunAsync(Func<Task<T>> handler,bool useSynchronizationContext = false) { var pending = _pending; Func<Task<T>> wrapper = async () => { // await the prev task var prevResult = await pending; Console.WriteLine("\nprev task result: " + prevResult); // start and await the handler return await handler(); }; var task = new Task<Task<T>>(wrapper); var inner = task.Unwrap(); _pending = inner; task.RunSynchronously(useSynchronizationContext ? TaskScheduler.FromCurrentSynchronizationContext() : TaskScheduler.Current); return inner; } }
输出:
Test #1... prev task result: 0 this task arg: 1000 prev task result: 1000 this task arg: 900 prev task result: 900 this task arg: 800 Press any key to continue to test #2... prev task result: 800 this task arg: 100 prev task result: 100 this task arg: 200
如果需要,通过添加一个锁来保护_pending,使AsyncOp线程安全现在变得非常简单.
更新,以下是该模式的最新版本,它使用TaskCompletionSource并且是线程安全的:
/// <summary> /// AsyncOperation /// By Noseratio - https://stackoverflow.com/a/21427264 /// </summary> /// <typeparam name="T">Task result type</typeparam> class AsyncOperation<T> { readonly object _lock = new Object(); Task<T> _currentTask = null; CancellationTokenSource _currentCts = null; // a client of this class (e.g. a viewmodel) has an option // to handle TaskSucceeded or TaskFailed,if needed public event EventHandler<TaskEventArgs> TaskSucceeded = null; public event EventHandler<TaskEventArgs> TaskFailing = null; public Task<T> CurrentTask { get { lock (_lock) return _currentTask; } } public bool IsCurrent(Task task) { lock (_lock) return task == _currentTask; } public bool IsPending { get { lock (_lock) return _currentTask != null && !_currentTask.IsCompleted; } } public bool IsCancellationRequested { get { lock (_lock) return _currentCts != null && _currentCts.IsCancellationRequested; } } public void Cancel() { lock (_lock) { if (_currentTask != null && !_currentTask.IsCompleted) _currentCts.Cancel(); } } /// <summary> /// Start the task routine and observe the result of the prevIoUs task routine /// </summary> /// <param name="routine"></param> /// <param name="token"></param> /// <param name="cancelPrevIoUs"></param> /// <param name="throwImmediately"></param> public Task<T> StartAsync( Func<CancellationToken,Task<T>> routine,CancellationToken token,bool cancelPrevIoUs = true,bool throwImmediately = true) { Task<T> prevIoUsTask = null; // pending instance CancellationTokenSource prevIoUsCts = null; // pending instance CTS CancellationTokenSource thisCts = CancellationTokenSource.CreateLinkedTokenSource(token); TaskCompletionSource<T> thisTcs = new TaskCompletionSource<T>(); // this task CancellationToken thisToken; // this task's cancellation Token Task<T> routineTask = null; // as returned by routine lock (_lock) { // remember the _currentTask as prevIoUsTask prevIoUsTask = _currentTask; prevIoUsCts = _currentCts; thisToken = thisCts.Token; // set the new _currentTask _currentTask = thisTcs.Task; _currentCts = thisCts; } Action startAsync = async () => { // because startAsync is "async void" method,// any exception not handled inside it // will be immediately thrown on the current synchronization context,// more details: https://stackoverflow.com/a/22395161/1768303 // run and await this task try { // await the prevIoUs task instance if (prevIoUsTask != null) { if (cancelPrevIoUs) prevIoUsCts.Cancel(); try { await prevIoUsTask; } catch (OperationCanceledException) { // ignore prevIoUs cancellations } } thisToken.ThrowIfCancellationRequested(); routineTask = routine(thisToken); await routineTask; } catch (Exception ex) { // ignore cancellation if (ex is OperationCanceledException) { System.Diagnostics.Debug.Print("Task cancelled,id={0}",thisTcs.Task.Id); thisTcs.SetCanceled(); return; } // fire TaskFailing System.Diagnostics.Debug.Print("Task failing,thisTcs.Task.Id); if (this.TaskFailing != null) { var args = new TaskEventArgs(thisTcs.Task,ex); this.TaskFailing(this,args); if (args.Handled) { // exception handled // make thisTcs cancelled rather than faulted thisTcs.SetCanceled(); return; } } // exception unhandled thisTcs.SetException(ex); if (throwImmediately) throw; // rethrow on the current synchronization context // exception should be observed via CurrentTask.Exception return; } // success,fire TaskSucceeded System.Diagnostics.Debug.Print("Task succeded,thisTcs.Task.Id); thisTcs.SetResult(routineTask.Result); if (this.TaskSucceeded != null) this.TaskSucceeded(this,new TaskEventArgs(thisTcs.Task)); }; startAsync(); return thisTcs.Task; } // StartAsync with CancellationToken.None public Task<T> StartAsync( Func<CancellationToken,bool throwImmediately = true) { return StartAsync(routine,CancellationToken.None,cancelPrevIoUs: true,throwImmediately: true); } /// <summary> /// TaskEventArgs /// </summary> public class TaskEventArgs : EventArgs { public Task<T> Task { get; private set; } public Exception Exception { get; private set; } public bool Handled { get; set; } public TaskEventArgs(Task<T> task,Exception exception = null) { this.Task = task; this.Exception = exception; } } }