IObservable<T>
实现.
从高级别发送HTTP请求并保持连接打开.以长度为前缀的项目被发送到消费.
基本上,它是使用await
keyword循环调用到Stream.ReadAsync
. IObserver<T>
interface实现(从Observable.Create
或Dataflow library的块,无关紧要,它是一个实现细节)传递给此循环,然后调用上的metods IObserver< T>实现,产生可观察的.
但是,在此循环开始处理之前必须完成许多事情,这需要调用Task<T>
返回方法,所有这些方法都可以使用await关键字在C#5.0中更容易调用.像这样的东西:
public async Task<IObservable<string>> Create(string parameter,CancellationToken cancellationToken) { // Make some call that requires await. var response = await _httpClient.SendAsync(parameter,cancellationToken). ConfigureAwait(false); // Create a BufferBlock which will be the observable. var block = new BufferBlock<T>(); // Start some background task which will use the block and publish to it // on a separate task. This is not something that is awaited for. ConsumeResponse(response,block,cancellationToken); // Create the observable. return block.AsObservable(); }
也就是说,我正在返回一个任务< IObservable< T>>从我的方法,但我觉得我在Reactive Extensions中缺少一些东西,这将允许我使用await来促进我需要进行的调用,但也返回一个IObservable< T>而不是任务< IObservable< T>>.
Reactive Extensions中的哪个方法允许我创建一个observable,它需要在从创建方法返回之前等待方法?
我发现最接近的是Observable.DeferAsync.假设对我的方法的调用和observable的使用是这样的:
public async Task Observe() { // NOT the real name of the interface,but explains it's role here. IObservableFactory factory; // Create is really named something else. IObservable<string> observable = factory.Create("parameter"); // Subscribe. observable.Subscribe(o => Console.WriteLine("Observed: {0}",o)); // Wait. await observable; }
使用DeferAsync在这里不起作用,因为对Subscribe的调用将发送第一个请求,然后读取它,然后对observable的await调用将创建第二个订阅,但是创建一个不同的observable.
或者,最终返回任务< IObservable< T>>在Reactive Framework中执行此操作的适当方法?
随后,由于该方法返回任务< T>,因此最好通过CancellationToken
取消操作.也就是说,我可以理解CancellationToken用于取消observable的创建,但它是否也应该用于取消实际的observable(因为它可以被传递下去以读取流等).
我的直觉说不,因为这里存在违反问题分离以及取消的DRY原则:
>取消创建和取消观察是两个不同的事情.
>调用Subscribe
将返回IDisposable
实施,这将取消订阅.
解决方法
这并不意味着你不能混合幕后的概念.以下是使用Observable.Using,Task.ToObservable和CancellationDisposable执行所需操作的方法
首先,修改您的方法以返回任务< ISourceBlock< string>>:
public async Task<ISourceBlock<string>> CreateBlock(string parameter,cancellationToken).ConfigureAwait(false); // Create a BufferBlock which will be the observable. var block = new BufferBlock<T>(); // Start some background task which will use the block and publish to it // on a separate task. This is not something that is awaited for. ConsumeResponse(response,cancellationToken); return block; }
public IObservable<string> Create(string parameter) { // Create a cancellation token that will be canceled when the observable is unsubscribed,use this token in your call to CreateBlock. // Use ToObservable() to convert the Task to an observable so we can then // use SelectMany to subscribe to the block itself once it is available return Observable.Using(() => new CancellationDisposable(),cd => CreateBlock(parameter,cd.Token) .ToObservable() .SelectMany(block => block.AsObservable())); }
编辑:我发现Rx已经使用FromAsync实现了这种模式:
public IObservable<string> Create(string parameter) { return Observable.FromAsync(token => CreateBlock(parameter,token)) .SelectMany(block => block.AsObservable()); }
还有,DeferAsync,这更合适,因为你的Task实际上是在创建你真正想要观察的Observable(例如你的块):
public IObservable<string> Create(string parameter) { return Observable.DeferAsync(async token => (await CreateBlock(parameter,token)).AsObservable()); }