我试图定期发出事件(每150ms),即使上游observable将更快地发送事件.
但是我收到MissingBackpressureException,即使我已经调用了onBackpressureBlock()
码:
SerializedSubject
跟踪:
05-06 00:38:25.532 28106-28166/com.instano.buyer W/System.err﹕ java.lang.RuntimeException: error response after onBackpressureBlock()
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at com.instano.retailer.instano.application.controller.Quotations.lambda$fetchQuotationMarkersForProduct$59(Quotations.java:67)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at com.instano.retailer.instano.application.controller.Quotations.access$lambda$5(Quotations.java)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at com.instano.retailer.instano.application.controller.Quotations$$Lambda$8.call(Unknown Source)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.Observable$11.onError(Observable.java:4193)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorDoOnEach$1.onError(OperatorDoOnEach.java:65)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorOnBackpressureBlock$BlockingSubscriber.complete(OperatorOnBackpressureBlock.java:81)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.util.BackpressureDrainManager.drain(BackpressureDrainManager.java:190)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.util.BackpressureDrainManager.terminateAndDrain(BackpressureDrainManager.java:129)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorOnBackpressureBlock$BlockingSubscriber.onError(OperatorOnBackpressureBlock.java:68)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onError(OperatorZip.java:324)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:332)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OnSubscribeTimerPeriodically$1.call(OnSubscribeTimerPeriodically.java:51)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.Scheduler$Worker$1.call(Scheduler.java:120)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:390)
05-06 00:38:25.572 28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.FutureTask.run(FutureTask.java:234)
05-06 00:38:25.582 28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:153)
05-06 00:38:25.592 28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:267)
05-06 00:38:25.602 28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1080)
05-06 00:38:25.602 28106-28166/com.instano.buyer W/System.err﹕ at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:573)
05-06 00:38:25.602 28106-28166/com.instano.buyer W/System.err﹕ at java.lang.Thread.run(Thread.java:841)
05-06 00:38:25.602 28106-28166/com.instano.buyer W/System.err﹕ Caused by: rx.exceptions.MissingBackpressureException
05-06 00:38:25.612 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:349)
05-06 00:38:25.642 28106-28166/com.instano.buyer W/System.err﹕ at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:330)
05-06 00:38:25.642 28106-28166/com.instano.buyer W/System.err﹕ ... 10 more
PS:Log.fatalError(错误)只是围绕Android.util.Log.e(…)的包装器
编辑
经过大量的反复试验,这对我来说已经成为一种习惯. zipWith(Observable.interval …)似乎是罪魁祸首和可能的框架错误.删除这些行(以及我的周期性发射功能)我的代码可以工作.
我正在使用一个可能从不同线程调用onNext的主题,然后我在它上面执行Obeservable运算符.
最佳答案
我认为(但我不确定)问题是你的背压配置是在zip操作符之后.
zip运算符需要缓冲一个Observable的项目以使用另一个Observable对其进行压缩.这是应该抛出异常的缓冲区. (见here)
为了解决您的问题,我认为您应该尝试在zip运算符中使用的一个(或每个)Observable上添加背压配置.
例如:
obs.zipWith(Observable.interval(150,TimeUnit.MILLISECONDS).onBackPressureDrop());
obs.onBackPressureBlock().zipWith(Observable.interval(150,TimeUnit.MILLISECONDS));