public static IPropagatorBlock<TInput,TOutput> Encapsulate<TInput,TOutput>( ITargetBlock<TInput> target,ISourceBlock<TOutput> source)
使您能够将多个块封装到单个转换块中.它返回一个
IPropagatorBlock<TInput,TOutput>
它代表管道的起始和结束块.
但是,如果我的管道中的最后一个块是ActionBlock,我不能使用它,因为ActionBlock不是SourceBlock,并且函数的返回类型将是ITargetBlock,而不是IPropagatorBlock.
基本上,我正在寻找的是这样的功能:
public static ITargetBlock<TStart> Encapsulate<TStart,TEnd>( ITargetBlock<TStart> startBlock,ActionBlock<TEnd> endBlock)
这是一个明智的事情,或者我错过了一些简单的东西?我不太清楚如何写它 – 特别是完成连线.我需要创建自己的自定义块类型吗?
编辑:
好的,所以看过@Panagiotis Kanavos的回复,做了一些修补,我想出了这个.这基于EncapsulatingPropagator类,它是现有DataflowBlock.Encapsulate方法使用的类:
internal sealed class EncapsulatingTarget<TStart,TEnd> : ITargetBlock<TStart> { private readonly ITargetBlock<TStart> startBlock; private readonly ActionBlock<TEnd> endBlock; public EncapsulatingTarget(ITargetBlock<TStart> startBlock,ActionBlock<TEnd> endBlock) { this.startBlock = startBlock; this.endBlock = endBlock; } public Task Completion { get { return this.endBlock.Completion; } } public void Complete() { this.startBlock.Complete(); } void IDataflowBlock.Fault(Exception exception) { if (exception == null) { throw new ArgumentNullException("exception"); } this.startBlock.Fault(exception); } public DataflowMessageStatus OfferMessage( DataflowMessageHeader messageHeader,TStart messageValue,ISourceBlock<TStart> source,bool consumeToAccept) { return this.startBlock.OfferMessage(messageHeader,messageValue,source,consumeToAccept); } }
解决方法
例如,Sliding Window样本缓冲发布到其输入块的所有传入消息,并在滑动窗口到达其输出块时输出一批所有检索到的消息.
该方法的名称造成了很多混乱,但是当你理解它们的目的时它们才有意义:
> target参数是前面的块将连接到的目标(输入)端点以发送消息.在这种情况下,处理传入消息并决定是否发布到输出(源)块的ActionBlock是有意义的.
> source参数是后续步骤将连接到哪个源(输出)端点以接收消息.将ActionBlock用作源是没有意义的,因为它没有任何输出.
接受ActionBlock方法作为源的Encapsulate变体没有用,因为您可以简单地从任何前一步骤链接到动作块.
编辑
如果您想模块化管道,即将其分解为可重用,更易于管理,您可以创建一个构造的类,您可以使用普通的旧类.在该类中,您正常构建管道片段,链接块(确保完成传播),然后将最后一步的第一步和完成任务公开为公共属性,例如:
class MyFragment { public TransformationBlock<SomeMessage,SomeOther> Input {get;} public Task Completion {get;} ActionBlock<SomeOther> _finalBlock; public MyFragment() { Input=new TransformationBlock<SomeMessage,SomeOther>(MyFunction); _finalBlock=new ActionBlock<SomeOther>(MyMethod); var linkOptions = new DataflowLinkOptions {PropagateCompletion = true} Input.LinkTo(_finalBlock,linkOptions); } private SomeOther MyFunction(SomeMessage msg) { ... } private void MyMethod(SomeOther msg) { ... } }
要将片段连接到管道,您只需要从管道块链接到公开的输入块.等待完成,只需等待暴露的完成任务.
如果需要,您可以在这里停止,或者您可以实现ITargetBlock以使片段看起来像Target块.您只需将所有方法委托给Input块,将Completion属性委托给最终块.
例如:
class MyFragment:ITargetBlock<SomeMessage> { .... public Task Completion {get;} public void Complete() { Input.Complete() }; public void Fault(Exception exc) { Input.Fault(exc); } DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader,TInput messageValue,ISourceBlock<TInput> source,bool consumeToAccept) { return Input.OfferMessage(messageHeader,consumeToAccept); } }
编辑2
使用@bornfromanegg的类可以将构建片段的行为与暴露输入和完成的样板分开:
public ITargetBlock<SomeMessage> BuildMyFragment() { var input=new TransformationBlock<SomeMessage,SomeOther>(MyFunction); var step2=new TransformationBlock<SomeOther,SomeFinal>(MyFunction2); var finalBlock=new ActionBlock<SomeFinal>(MyMethod); var linkOptions = new DataflowLinkOptions {PropagateCompletion = true} input.LinkTo(step2,linkOptions); step2.LinkTo(finalBlock,linkOptions); return new EncapsulatingTarget(input,finalBlock); }