c# – 使用Rx在异步操作上阻止(可能超时)

前端之家收集整理的这篇文章主要介绍了c# – 使用Rx在异步操作上阻止(可能超时)前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
我正在尝试使用.NET的Reactive Extensions重写一些代码,但是我需要一些关于如何实现我的目标的指导.

我有一个类在一个低级库中封装了一些异步行为.想一想读或写网络的东西.当类启动时,它将尝试连接到环境,并且当成功时,它将通过从工作线程调用来发信号.

我想将这个异步行为变成一个同步调用,并且我已经创建了一个非常简化的例子,下面就是如何实现的:

ManualResetEvent readyEvent = new ManualResetEvent(false);

public void Start(TimeSpan timeout) {
  // Simulate a background process
  ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
  // Wait for startup to complete.
  if (!this.readyEvent.WaitOne(timeout))
    throw new TimeoutException();
}

void AsyncStart(TimeSpan delay) {
  Thread.Sleep(delay); // Simulate startup delay.
  this.readyEvent.Set();
}

在工作线程上运行AsyncStart只是模拟库的异步行为的一种方法,不是我的真实代码的一部分,低级库提供线程并在回调中调用我的代码.

请注意,如果启动在超时间隔内尚未完成,则Start方法将抛出TimeoutException异常.

我想重写这个代码来使用Rx.这是我的第一个尝试:

Subject<Unit> readySubject = new Subject<Unit>();

public void Start(TimeSpan timeout) {
  ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
  // Point A - see below
  this.readySubject.Timeout(timeout).First();
}

void AsyncStart(TimeSpan delay) {
  Thread.Sleep(delay);
  this.readySubject.OnNext(new Unit());
}

这是一个体面的尝试,但不幸的是它包含一个种族条件.如果启动快速完成(例如,如果延迟为0),并且如果在A点存在额外的延迟,则OnNext将在First执行之前在readySubject上调用.实质上,IObservable我正在应用超时,首先从来没有看到该启动已经完成,并且将抛出一个TimeoutException.

似乎已经创建了Observable.Defer来处理这样的问题.这是稍微复杂的尝试使用Rx:

Subject<Unit> readySubject = new Subject<Unit>();

void Start(TimeSpan timeout) {
  var ready = Observable.Defer(() => {
    ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
    // Point B - see below
    return this.readySubject.AsObservable();
  });
  ready.Timeout(timeout).First();
}

void AsyncStart(TimeSpan delay) {
  Thread.Sleep(delay);
  this.readySubject.OnNext(new Unit());
}

现在异步操作不会立即启动,但只有在使用IObservable时才会启动.不幸的是,仍然存在竞争条件,但这次在点B.如果异步操作在Defer lambda返回之前开始调用OnNext,它仍然丢失,并且TimeoutException将被超时抛出.

我知道我可以使用像Replay这样的操作来缓冲事件,但是我没有使用Rx的初始示例不会使用任何缓冲.有没有办法使用Rx来解决我的问题,没有竞争条件?在本例中,在IObservable已连接到本例的情况下才启动异步操作Timeout和First?

基于Paul Betts的答案是工作解决方案:

void Start(TimeSpan timeout) {
  var readySubject = new AsyncSubject<Unit>();
  ThreadPool.QueueUserWorkItem(_ => AsyncStart(readySubject,TimeSpan.FromSeconds(1)));
  // Point C - see below
  readySubject.Timeout(timeout).First();
}

void AsyncStart(ISubject<Unit> readySubject,TimeSpan delay) {
  Thread.Sleep(delay);
  readySubject.OnNext(new Unit());
  readySubject.OnCompleted();
}

有趣的部分是当点C的延迟比AsyncStart完成所需的时间更长时. AsyncSubject保留发送的最后通知,Timeout和First将仍然按预期的方式执行.

解决方法

所以,有一件事要知道Rx我认为很多人首先(包括我自己):如果你使用任何传统的线程功能,如ResetEvents,Thread.Sleeps或任何,你是做错了(tm ) – 这就像在LINQ中将数组投射到数组,因为你知道底层类型恰好是一个数组.

要知道的关键是,异步func由返回IObservable< TResult>的函数表示. – 这是魔术酱,让你在某些事情完成时发出信号.所以这里是如何“Rx-ify”更传统的异步功能,就像您在Silverlight Web服务中看到的那样:

IObservable<byte[]> readFromNetwork()
{
    var ret = new AsyncSubject();
    // Here's a traditional async function that you provide a callback to
    asyncReaderFunc(theFile,buffer => {
        ret.OnNext(buffer);
        ret.OnCompleted();
    });

    return ret;
}

This is a decent attempt but unfortunately it contains a race condition.

这就是AsyncSubject所在的地方 – 这样可以确保即使asyncReaderFunc打到“订阅”,AsyncSubject仍然会“重播”发生了什么.

所以,现在我们有了我们的功能,我们可以做很多有趣的事情:

// Make it into a sync function
byte[] results = readFromNetwork().First();

// Keep reading blocks one at a time until we run out
readFromNetwork().Repeat().TakeUntil(x => x == null || x.Length == 0).Subscribe(bytes => {
    Console.WriteLine("Read {0} bytes in chunk",bytes.Length);
})

// Read the entire stream and get notified when the whole deal is finished
readFromNetwork()
    .Repeat().TakeUntil(x => x == null || x.Length == 0)
    .Aggregate(new MemoryStream(),(ms,bytes) => ms.Write(bytes))
    .Subscribe(ms => {
        Console.WriteLine("Got {0} bytes in total",ms.ToArray().Length);
    });

// Or just get the entire thing as a MemoryStream and wait for it
var memoryStream = readFromNetwork()
    .Repeat().TakeUntil(x => x == null || x.Length == 0)
    .Aggregate(new MemoryStream(),bytes) => ms.Write(bytes))
    .First();
原文链接:https://www.f2er.com/csharp/93272.html

猜你在找的C#相关文章