我正在尝试编写一些代码,使得Web服务可以并行调用多个不同的服务器,因此TPL似乎是明显的选择.
只有我的一个网络服务电话将永远不会返回我想要的结果,而其他人都不会.我试图找出一种有效地拥有Task.WaitAny的方法,但只有当匹配条件的第一个任务返回时,才能解除阻塞.
我试图用WaitAny,但是无法找出放置过滤器的位置.我有这么远
public void SearchServers() { var servers = new[] {"server1","server2","server3","server4"}; var tasks = servers .Select(s => Task<bool>.Factory.StartNew(server => CallServer((string)server),s)) .ToArray(); Task.WaitAny(tasks); //how do I say "WaitAny where the result is true"? //Omitted: cancel any outstanding tasks since the correct server has been found } private bool CallServer(string server) { //... make the call to the server and return the result ... }
编辑:快速澄清,以防上面有任何混乱.我正在尝试执行以下操作:
>对于每个服务器,启动一个Task来检查它
>或者,等到服务器返回true(只有最多1个服务器将返回true)
>或者,等到所有服务器都返回false,即没有匹配.
解决方法
我可以想到的最好的是为每个任务指定一个ContinueWith,检查结果,如果真的取消其他任务.要取消任务,您可能需要使用
CancellationToken.
var tasks = servers .Select(s => Task.Run(...) .ContinueWith(t => if (t.Result) { // cancel other threads } ) ).ToArray();
更新:一个替代的解决方案将是WaitAny,直到正确的任务完成(但是它有一些缺点,例如从列表中删除已完成的任务,并从其余的任务中创建一个新阵列是相当繁重的操作)
List<Task<bool>> tasks = servers.Select(s => Task<bool>.Factory.StartNew(server => CallServer((string)server),s)).ToList(); bool result; do { int idx = Task.WaitAny(tasks.ToArray()); result = tasks[idx].Result; tasks.RemoveAt(idx); } while (!result && tasks.Count > 0); // cancel other tasks
更新2:现在我会用Rx:
[Fact] public async Task AwaitFirst() { var servers = new[] { "server1","server4" }; var server = await servers .Select(s => Observable .FromAsync(ct => CallServer(s,ct)) .Where(p => p) .Select(_ => s) ) .Merge() .FirstAsync(); output.WriteLine($"Got result from {server}"); } private async Task<bool> CallServer(string server,CancellationToken ct) { try { if (server == "server1") { await Task.Delay(TimeSpan.FromSeconds(1),ct); output.WriteLine($"{server} finished"); return false; } if (server == "server2") { await Task.Delay(TimeSpan.FromSeconds(2),ct); output.WriteLine($"{server} finished"); return false; } if (server == "server3") { await Task.Delay(TimeSpan.FromSeconds(3),ct); output.WriteLine($"{server} finished"); return true; } if (server == "server4") { await Task.Delay(TimeSpan.FromSeconds(4),ct); output.WriteLine($"{server} finished"); return true; } } catch(OperationCanceledException) { output.WriteLine($"{server} Cancelled"); throw; } throw new ArgumentOutOfRangeException(nameof(server)); }
测试在我的机器上需要3.32秒(这意味着它没有等待第4台服务器),我得到以下输出:
server1 finished server2 finished server3 finished server4 Cancelled Got result from server3