我有一个进程,每隔一段时间发送一次数据包,我需要根据数据包到达的时间等来管理该数据流.在某些时候,我也关闭流和过程.
现在,我正在使用一组计时器来做这件事,但我希望我可以用rxjs来做,因为它似乎非常适合这种事情.到目前为止,我没有取得多大成功.
问题
该流应该定期向我发送数据包,但它通常偏离很多,有时会卡住.
我希望在以下条件下关闭流:
>如果需要超过startDelay发送第一个数据包.
>发送第一个数据包后,如果两个数据包之间存在多于middleDelay的暂停.
>经过一段时间的maxChannelTime.
由于上述任何原因我即将关闭流时,我首先要求它礼貌地关闭以便它可以进行一些清理.有时它还会在清理过程中向我发送最终数据包.但是我想等待清理时间不要超过cleanupTime,并且在关闭流之前到达最后一个数据并忽略更多消息.
精
我将通过使用Observable包装事件来创建“流”.我这样做没有问题.
解决方法
棘手的问题.
我把它分解为两个阶段 – “受监管”(因为我们要定期检查)和“清理”.
向后工作,输出是
const regulated = source.takeUntil(close) const cleanup = source.skipUntil(close).takeUntil(cleanupCloser) const output = regulated.merge(cleanup)
‘闭门器’是在关闭时发出的可观察量(每个超时值更近一个).
const startTimeout = 600 const intervalTimeout = 200 const maxtimeTimeout = 3000 const cleanupTimeout = 300 const startCloser = Observable.timer(startTimeout) // emit once after initial delay .takeUntil(source) // cancel after source emits .mapTo('startTimeoutMarker') const intervalCloser = source.switchMap(x => // reset interval after each source emit Observable.timer(intervalTimeout) // emit once after intervalTimeout .mapTo('intervalTimeoutMarker') ) const maxtimeCloser = Observable.timer(maxtimeTimeout) // emit once after maxtime .takeUntil(startCloser) // cancel if startTimeout .takeUntil(intervalCloser) // cancel if intervalTimeout .mapTo('maxtimeTimeoutMarker') const close = Observable.merge(startCloser,intervalCloser,maxtimeCloser).take(1) const cleanupCloser = close.switchMap(x => // start when close emits Observable.timer(cleanupTimeout) // emit once after cleanup time ) .mapTo('cleanupTimeoutMarker')
这是一个工作样本CodePen(请一次运行一个测试)