c# – 使用TPL数据流的网络命令处理

前端之家收集整理的这篇文章主要介绍了c# – 使用TPL数据流的网络命令处理前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
我正在研究一个系统,它涉及通过TCP网络连接接受命令,然后在执行这些命令时发送响应.相当基本的东西,但我希望支持一些要求:

>多个客户端可以同时连接并建立单独的会话.如果需要,会话可以根据需要持续长或短,同一客户端IP能够建立多个并行会话.
>每个会话可以同时处理多个命令,因为一些请求的操作可以并行执行.

我想使用async / await干净地实现它,并且基于我所读到的,TPL Dataflow听起来像是一种干净地将处理分解为可以在线程池上运行而不是绑定线程的漂亮块的好方法对于不同的会话/命令,阻塞等待句柄.

这就是我的开始(一些部分被剥离以简化,例如异常处理的细节;我还省略了一个为网络I / O提供有效等待的包装器):

private readonly Task _serviceTask;
    private readonly Task _commandsTask;
    private readonly CancellationTokenSource _cancellation;
    private readonly BufferBlock<Command> _pendingCommands;

    public NetworkService(ICommandProcessor commandProcessor)
    {
        _commandProcessor = commandProcessor;
        IsRunning = true;
        _cancellation = new CancellationTokenSource();
        _pendingCommands = new BufferBlock<Command>();
        _serviceTask = Task.Run((Func<Task>)RunService);
        _commandsTask = Task.Run((Func<Task>)RunCommands);
    }

    public bool IsRunning { get; private set; }

    private async Task RunService()
    {
        _listener = new TcpListener(IPAddress.Any,ServicePort);
        _listener.Start();

        while (IsRunning)
        {
            Socket client = null;
            try
            {
                client = await _listener.AcceptSocketAsync();
                client.Blocking = false;

                var session = RunSession(client);
                lock (_sessions)
                {
                    _sessions.Add(session);
                }
            }
            catch (Exception ex)
            { //Handling here...
            }
        }
    }

    private async Task RunCommands()
    {
        while (IsRunning)
        {
            var command = await _pendingCommands.ReceiveAsync(_cancellation.Token);
            var task = Task.Run(() => RunCommand(command));
        }
    }

    private async Task RunCommand(Command command)
    {
        try
        {
            var response = await _commandProcessor.RunCommand(command.Content);
            Send(command.Client,response);
        }
        catch (Exception ex)
        {
            //Deal with general command exceptions here...
        }
    }

    private async Task RunSession(Socket client)
    {
        while (client.Connected)
        {
            var reader = new DelimitedCommandReader(client);

            try
            {
                var content = await reader.ReceiveCommand();
                _pendingCommands.Post(new Command(client,content));
            }
            catch (Exception ex)
            {
                //Exception handling here...
            }
        }
    }

基础知识看起来很​​简单,但有一部分让我失望:我如何确保在关闭应用程序时,我等待所有待处理的命令任务完成?当我使用Task.Run执行命令时,我得到Task对象,但是如何跟踪挂起的命令,以便在允许服务关闭之前确保所有命令都完整?

我已经考虑过使用一个简单的List,在List完成后删除List中的命令,但我想知道我是否缺少TPL Dataflow中的一些基本工具,这些工具可以让我更干净地完成它.

编辑:

阅读更多关于TPL Dataflow的信息,我想知道我应该使用的是一个具有增加的MaxDegreeOfParallelism的TransformBlock,以允许处理并行命令吗?这设置了可并行运行的命令数量的上限,但我认为这对我的系统来说是一个合理的限制.我很想听听那些有TPL Dataflow经验的人知道我是否走在正确的轨道上.

解决方法

是的,所以…你有一半在这里使用TPL的力量.事实上,如果您正在订阅TPL DataFlow样式,那么您仍然在后台后台循环中手动接收来自BufferBlock中的项目的事实不是您想要的“方式”.

你要做的是将ActionBlock链接到BufferBlock并从中执行命令处理/发送.这也是您设置MaxDegreeOfParallelism以控制要处理的并发命令数的块.所以设置可能看起来像这样:

// Initialization logic to build up the TPL flow
_pendingCommands = new BufferBlock<Command>();
_commandProcessor = new ActionBlock<Command>(this.ProcessCommand);

_pendingCommands.LinkTo(_commandProcessor);

private Task ProcessCommand(Command command)
{
   var response = await _commandProcessor.RunCommand(command.Content);
   this.Send(command.Client,response);
}

然后,在您的关闭代码中,您需要通过在_pipelineCommands BufferBlock上调用Complete来表示您已完成将项目添加到管道中,然后等待_commandProcessor ActionBlock完成以确保所有项目都已通过管道.您可以通过获取块的Completion属性返回的Task并调用Wait来执行此操作:

_pendingCommands.Complete();
_commandProcessor.Completion.Wait();

如果你想获得奖励积分,你甚至可以将命令处理与命令发送分开.这将允许您将这些步骤彼此分开配置.例如,您可能需要限制处理命令的线程数,但希望有更多的发送响应.您可以通过简单地将TransformBlock引入流的中间来实现:

_pendingCommands = new BufferBlock<Command>();
_commandProcessor = new TransformBlock<Command,Tuple<Client,Response>>(this.ProcessCommand);
_commandSender = new ActionBlock<Tuple<Client,Response>(this.SendResponseToClient));

_pendingCommands.LinkTo(_commandProcessor);
_commandProcessor.LinkTo(_commandSender);

private Task ProcessCommand(Command command)
{
   var response = await _commandProcessor.RunCommand(command.Content);

   return Tuple.Create(command,response);
}

private Task SendResponseToClient(Tuple<Client,Response> clientAndResponse)
{
   this.Send(clientAndResponse.Item1,clientAndResponse.Item2);
}

您可能希望使用自己的数据结构而不是Tuple,它仅用于说明目的,但关键是这正是您想要用来分解管道的结构类型,以便您可以控制它的各个方面确切地说你可能需要.

猜你在找的C#相关文章