我有以下情况,我认为可能是很常见的:
>有一个任务(一个UI命令处理程序)可以同步或异步地完成.
>命令可能会比处理得更快.
>如果命令已经有待处理的任务,那么新的命令处理程序任务应该被排队并顺序处理.
>每个新任务的结果可能取决于上一个任务的结果.
应该注意取消,但为了简单起见,我想把它放在这个问题的范围之外.此外,线程安全(并发)不是必需的,但是必须支持重新入侵.
以下是我正在尝试实现的一个基本示例(作为控制台应用程序,为简单起见):
using System; using System.Threading.Tasks; namespace ConsoleApp { class Program { static void Main(string[] args) { var asyncOp = new AsyncOp<int>(); Func<int,Task<int>> handleAsync = async (arg) => { Console.WriteLine("this task arg: " + arg); //await Task.Delay(arg); // make it async return await Task.FromResult(arg); // sync }; Console.WriteLine("Test #1..."); asyncOp.RunAsync(() => handleAsync(1000)); asyncOp.RunAsync(() => handleAsync(900)); asyncOp.RunAsync(() => handleAsync(800)); asyncOp.CurrentTask.Wait(); Console.WriteLine("\nPress any key to continue to test #2..."); Console.ReadLine(); asyncOp.RunAsync(() => { asyncOp.RunAsync(() => handleAsync(200)); return handleAsync(100); }); asyncOp.CurrentTask.Wait(); Console.WriteLine("\nPress any key to exit..."); Console.ReadLine(); } // AsyncOp class AsyncOp<T> { Task<T> _pending = Task.FromResult(default(T)); public Task<T> CurrentTask { get { return _pending; } } public Task<T> RunAsync(Func<Task<T>> handler) { var pending = _pending; Func<Task<T>> wrapper = async () => { // await the prev task var prevResult = await pending; Console.WriteLine("\nprev task result: " + prevResult); // start and await the handler return await handler(); }; _pending = wrapper(); return _pending; } } } }
输出:
Test #1... prev task result: 0 this task arg: 1000 prev task result: 1000 this task arg: 900 prev task result: 900 this task arg: 800 Press any key to continue to test #2... prev task result: 800 prev task result: 800 this task arg: 200 this task arg: 100 Press any key to exit...
它按照要求工作,直到在第2测试中引入重入:
asyncOp.RunAsync(() => { asyncOp.RunAsync(() => handleAsync(200)); return handleAsync(100); });
期望的输出应该是100,200,而不是200,100,因为已经有一个待处理的外部任务为100.这显然是因为内部任务同步执行,破坏逻辑var pending = _pending; / * … * / _pending = wrapper()作为外部任务.
如何使测试#2工作?
一个解决方案是使用Task.Factory.StartNew(…,TaskScheduler.FromCurrentSynchronizationContext())为每个任务强制执行异步,但是我不想对可能在内部同步的命令处理程序强加异步执行.,我不想依赖任何特定同步上下文的行为(即依赖该Task.Factory.StartNew应该在创建的任务实际启动之前返回).
在现实生活的项目中,我对AsyncOp在上面负责,但是无法控制命令处理程序(即在handleAsync内部的任何内容).
解决方法@H_403_31@
我几乎忘记了可以手动构建一个任务,而不需要启动或调度它.然后,“Task.Factory.StartNew” vs “new Task(…).Start”把我放回轨道.我认为这是少数情况之一,Task<TResult>
构造函数可能实际上是有用的,以及嵌套任务(Task< Task&T)>和
Task.Unwrap()
:
// AsyncOp
class AsyncOp<T>
{
Task<T> _pending = Task.FromResult(default(T));
public Task<T> CurrentTask { get { return _pending; } }
public Task<T> RunAsync(Func<Task<T>> handler,bool useSynchronizationContext = false)
{
var pending = _pending;
Func<Task<T>> wrapper = async () =>
{
// await the prev task
var prevResult = await pending;
Console.WriteLine("\nprev task result: " + prevResult);
// start and await the handler
return await handler();
};
var task = new Task<Task<T>>(wrapper);
var inner = task.Unwrap();
_pending = inner;
task.RunSynchronously(useSynchronizationContext ?
TaskScheduler.FromCurrentSynchronizationContext() :
TaskScheduler.Current);
return inner;
}
}
输出:
Test #1...
prev task result: 0
this task arg: 1000
prev task result: 1000
this task arg: 900
prev task result: 900
this task arg: 800
Press any key to continue to test #2...
prev task result: 800
this task arg: 100
prev task result: 100
this task arg: 200
如果需要,通过添加一个锁来保护_pending,使AsyncOp线程安全现在变得非常简单.
更新,以下是该模式的最新版本,它使用TaskCompletionSource并且是线程安全的:
/// <summary>
/// AsyncOperation
/// By Noseratio - https://stackoverflow.com/a/21427264
/// </summary>
/// <typeparam name="T">Task result type</typeparam>
class AsyncOperation<T>
{
readonly object _lock = new Object();
Task<T> _currentTask = null;
CancellationTokenSource _currentCts = null;
// a client of this class (e.g. a viewmodel) has an option
// to handle TaskSucceeded or TaskFailed,if needed
public event EventHandler<TaskEventArgs> TaskSucceeded = null;
public event EventHandler<TaskEventArgs> TaskFailing = null;
public Task<T> CurrentTask
{
get
{
lock (_lock)
return _currentTask;
}
}
public bool IsCurrent(Task task)
{
lock (_lock)
return task == _currentTask;
}
public bool IsPending
{
get
{
lock (_lock)
return _currentTask != null && !_currentTask.IsCompleted;
}
}
public bool IsCancellationRequested
{
get
{
lock (_lock)
return _currentCts != null && _currentCts.IsCancellationRequested;
}
}
public void Cancel()
{
lock (_lock)
{
if (_currentTask != null && !_currentTask.IsCompleted)
_currentCts.Cancel();
}
}
/// <summary>
/// Start the task routine and observe the result of the prevIoUs task routine
/// </summary>
/// <param name="routine"></param>
/// <param name="token"></param>
/// <param name="cancelPrevIoUs"></param>
/// <param name="throwImmediately"></param>
public Task<T> StartAsync(
Func<CancellationToken,Task<T>> routine,CancellationToken token,bool cancelPrevIoUs = true,bool throwImmediately = true)
{
Task<T> prevIoUsTask = null; // pending instance
CancellationTokenSource prevIoUsCts = null; // pending instance CTS
CancellationTokenSource thisCts = CancellationTokenSource.CreateLinkedTokenSource(token);
TaskCompletionSource<T> thisTcs = new TaskCompletionSource<T>(); // this task
CancellationToken thisToken; // this task's cancellation Token
Task<T> routineTask = null; // as returned by routine
lock (_lock)
{
// remember the _currentTask as prevIoUsTask
prevIoUsTask = _currentTask;
prevIoUsCts = _currentCts;
thisToken = thisCts.Token;
// set the new _currentTask
_currentTask = thisTcs.Task;
_currentCts = thisCts;
}
Action startAsync = async () =>
{
// because startAsync is "async void" method,// any exception not handled inside it
// will be immediately thrown on the current synchronization context,// more details: https://stackoverflow.com/a/22395161/1768303
// run and await this task
try
{
// await the prevIoUs task instance
if (prevIoUsTask != null)
{
if (cancelPrevIoUs)
prevIoUsCts.Cancel();
try
{
await prevIoUsTask;
}
catch (OperationCanceledException)
{
// ignore prevIoUs cancellations
}
}
thisToken.ThrowIfCancellationRequested();
routineTask = routine(thisToken);
await routineTask;
}
catch (Exception ex)
{
// ignore cancellation
if (ex is OperationCanceledException)
{
System.Diagnostics.Debug.Print("Task cancelled,id={0}",thisTcs.Task.Id);
thisTcs.SetCanceled();
return;
}
// fire TaskFailing
System.Diagnostics.Debug.Print("Task failing,thisTcs.Task.Id);
if (this.TaskFailing != null)
{
var args = new TaskEventArgs(thisTcs.Task,ex);
this.TaskFailing(this,args);
if (args.Handled)
{
// exception handled
// make thisTcs cancelled rather than faulted
thisTcs.SetCanceled();
return;
}
}
// exception unhandled
thisTcs.SetException(ex);
if (throwImmediately)
throw; // rethrow on the current synchronization context
// exception should be observed via CurrentTask.Exception
return;
}
// success,fire TaskSucceeded
System.Diagnostics.Debug.Print("Task succeded,thisTcs.Task.Id);
thisTcs.SetResult(routineTask.Result);
if (this.TaskSucceeded != null)
this.TaskSucceeded(this,new TaskEventArgs(thisTcs.Task));
};
startAsync();
return thisTcs.Task;
}
// StartAsync with CancellationToken.None
public Task<T> StartAsync(
Func<CancellationToken,bool throwImmediately = true)
{
return StartAsync(routine,CancellationToken.None,cancelPrevIoUs: true,throwImmediately: true);
}
/// <summary>
/// TaskEventArgs
/// </summary>
public class TaskEventArgs : EventArgs
{
public Task<T> Task { get; private set; }
public Exception Exception { get; private set; }
public bool Handled { get; set; }
public TaskEventArgs(Task<T> task,Exception exception = null)
{
this.Task = task;
this.Exception = exception;
}
}
}
Task<TResult>
构造函数可能实际上是有用的,以及嵌套任务(Task< Task&T)>和
Task.Unwrap()
:
// AsyncOp class AsyncOp<T> { Task<T> _pending = Task.FromResult(default(T)); public Task<T> CurrentTask { get { return _pending; } } public Task<T> RunAsync(Func<Task<T>> handler,bool useSynchronizationContext = false) { var pending = _pending; Func<Task<T>> wrapper = async () => { // await the prev task var prevResult = await pending; Console.WriteLine("\nprev task result: " + prevResult); // start and await the handler return await handler(); }; var task = new Task<Task<T>>(wrapper); var inner = task.Unwrap(); _pending = inner; task.RunSynchronously(useSynchronizationContext ? TaskScheduler.FromCurrentSynchronizationContext() : TaskScheduler.Current); return inner; } }
输出:
Test #1... prev task result: 0 this task arg: 1000 prev task result: 1000 this task arg: 900 prev task result: 900 this task arg: 800 Press any key to continue to test #2... prev task result: 800 this task arg: 100 prev task result: 100 this task arg: 200
如果需要,通过添加一个锁来保护_pending,使AsyncOp线程安全现在变得非常简单.
更新,以下是该模式的最新版本,它使用TaskCompletionSource并且是线程安全的:
/// <summary> /// AsyncOperation /// By Noseratio - https://stackoverflow.com/a/21427264 /// </summary> /// <typeparam name="T">Task result type</typeparam> class AsyncOperation<T> { readonly object _lock = new Object(); Task<T> _currentTask = null; CancellationTokenSource _currentCts = null; // a client of this class (e.g. a viewmodel) has an option // to handle TaskSucceeded or TaskFailed,if needed public event EventHandler<TaskEventArgs> TaskSucceeded = null; public event EventHandler<TaskEventArgs> TaskFailing = null; public Task<T> CurrentTask { get { lock (_lock) return _currentTask; } } public bool IsCurrent(Task task) { lock (_lock) return task == _currentTask; } public bool IsPending { get { lock (_lock) return _currentTask != null && !_currentTask.IsCompleted; } } public bool IsCancellationRequested { get { lock (_lock) return _currentCts != null && _currentCts.IsCancellationRequested; } } public void Cancel() { lock (_lock) { if (_currentTask != null && !_currentTask.IsCompleted) _currentCts.Cancel(); } } /// <summary> /// Start the task routine and observe the result of the prevIoUs task routine /// </summary> /// <param name="routine"></param> /// <param name="token"></param> /// <param name="cancelPrevIoUs"></param> /// <param name="throwImmediately"></param> public Task<T> StartAsync( Func<CancellationToken,Task<T>> routine,CancellationToken token,bool cancelPrevIoUs = true,bool throwImmediately = true) { Task<T> prevIoUsTask = null; // pending instance CancellationTokenSource prevIoUsCts = null; // pending instance CTS CancellationTokenSource thisCts = CancellationTokenSource.CreateLinkedTokenSource(token); TaskCompletionSource<T> thisTcs = new TaskCompletionSource<T>(); // this task CancellationToken thisToken; // this task's cancellation Token Task<T> routineTask = null; // as returned by routine lock (_lock) { // remember the _currentTask as prevIoUsTask prevIoUsTask = _currentTask; prevIoUsCts = _currentCts; thisToken = thisCts.Token; // set the new _currentTask _currentTask = thisTcs.Task; _currentCts = thisCts; } Action startAsync = async () => { // because startAsync is "async void" method,// any exception not handled inside it // will be immediately thrown on the current synchronization context,// more details: https://stackoverflow.com/a/22395161/1768303 // run and await this task try { // await the prevIoUs task instance if (prevIoUsTask != null) { if (cancelPrevIoUs) prevIoUsCts.Cancel(); try { await prevIoUsTask; } catch (OperationCanceledException) { // ignore prevIoUs cancellations } } thisToken.ThrowIfCancellationRequested(); routineTask = routine(thisToken); await routineTask; } catch (Exception ex) { // ignore cancellation if (ex is OperationCanceledException) { System.Diagnostics.Debug.Print("Task cancelled,id={0}",thisTcs.Task.Id); thisTcs.SetCanceled(); return; } // fire TaskFailing System.Diagnostics.Debug.Print("Task failing,thisTcs.Task.Id); if (this.TaskFailing != null) { var args = new TaskEventArgs(thisTcs.Task,ex); this.TaskFailing(this,args); if (args.Handled) { // exception handled // make thisTcs cancelled rather than faulted thisTcs.SetCanceled(); return; } } // exception unhandled thisTcs.SetException(ex); if (throwImmediately) throw; // rethrow on the current synchronization context // exception should be observed via CurrentTask.Exception return; } // success,fire TaskSucceeded System.Diagnostics.Debug.Print("Task succeded,thisTcs.Task.Id); thisTcs.SetResult(routineTask.Result); if (this.TaskSucceeded != null) this.TaskSucceeded(this,new TaskEventArgs(thisTcs.Task)); }; startAsync(); return thisTcs.Task; } // StartAsync with CancellationToken.None public Task<T> StartAsync( Func<CancellationToken,bool throwImmediately = true) { return StartAsync(routine,CancellationToken.None,cancelPrevIoUs: true,throwImmediately: true); } /// <summary> /// TaskEventArgs /// </summary> public class TaskEventArgs : EventArgs { public Task<T> Task { get; private set; } public Exception Exception { get; private set; } public bool Handled { get; set; } public TaskEventArgs(Task<T> task,Exception exception = null) { this.Task = task; this.Exception = exception; } } }