代码示例:
Subject<Story> mStoryEmitter = PublishSubject.create(); private void getStory(int storyID) { HNApi.Factory.getInstance().getStory(storyID).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(getStoryObserver()); } mStoryListEmitter.subscribe(new Observer<List<Integer>>() { @Override public void onSubscribe(Disposable d) {} @Override public void onNext(List<Integer> value) { if(mRecyclerView != null) { mRecyclerView.setAdapter(null); if(mAdapter != null) { mAdapter.clear(); mAdapter = null; } } mAdapter = new SimpleRecyclerViewAdapter(); mRecyclerView.setAdapter(mAdapter); for(Integer storyID : value) { getStory(storyID); } } @Override public void onError(Throwable e) {} @Override public void onComplete() {} }); private DisposableObserver<Story> getStoryObserver() { DisposableObserver<Story> observer = new DisposableObserver<Story>() { @Override public void onNext(Story value) { mStoryEmitter.onNext(value); dispose(); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }; return observer; }
错误:
Throwing OutOfMemoryError "Could not allocate JNI Env" java.lang.IllegalStateException: Fatal Exception thrown on Scheduler. at io.reactivex.android.schedulers.HandlerScheduler$ScheduledRunnable.run(HandlerScheduler.java:111) at android.os.Handler.handleCallback(Handler.java:739) at android.os.Handler.dispatchMessage(Handler.java:95) at android.os.Looper.loop(Looper.java:148) at android.app.ActivityThread.main(ActivityThread.java:5417) at java.lang.reflect.Method.invoke(Native Method) at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:726) at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:616) Caused by: java.lang.OutOfMemoryError: Could not allocate JNI Env at java.lang.Thread.nativeCreate(Native Method) at java.lang.Thread.start(Thread.java:1063) at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:921) at java.util.concurrent.ThreadPoolExecutor.ensurePrestart(ThreadPoolExecutor.java:1556) at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:310) at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:543) at java.util.concurrent.ScheduledThreadPoolExecutor.submit(ScheduledThreadPoolExecutor.java:642) at io.reactivex.internal.schedulers.NewThreadWorker.scheduleActual(NewThreadWorker.java:120) at io.reactivex.internal.schedulers.IoScheduler$EventLoopWorker.schedule(IoScheduler.java:221) at io.reactivex.Scheduler.scheduleDirect(Scheduler.java:130) at io.reactivex.Scheduler.scheduleDirect(Scheduler.java:109) AppData::create pipe(2) Failed: Too many open files at io.reactivex.internal.operators.observable.ObservableSubscribeOn.subscribeActual(ObservableSubscribeOn.java:36) at io.reactivex.Observable.subscribe(Observable.java:10514) at io.reactivex.internal.operators.observable.ObservableObserveOn.subscribeActual(ObservableObserveOn.java:44) at io.reactivex.Observable.subscribe(Observable.java:10514) at com.example.MainActivity.getStory(MainActivity.java:100) at com.example.MainActivity.access$300(MainActivity.java:25) at com.example.MainActivity$2.onNext(MainActivity.java:67) at com.example.MainActivity$2.onNext(MainActivity.java:49) at io.reactivex.subjects.PublishSubject$PublishDisposable.onNext(PublishSubject.java:263) at io.reactivex.subjects.PublishSubject.onNext(PublishSubject.java:182) at com.example.MainActivity$5.onNext(MainActivity.java:147) at com.example.MainActivity$5.onNext(MainActivity.java:138) at io.reactivex.internal.operators.observable.ObservableObserveOn$ObserveOnObserver.drainNormal(ObservableObserveOn.java:198) at io.reactivex.internal.operators.observable.ObservableObserveOn$ObserveOnObserver.run(ObservableObserveOn.java:250) at io.reactivex.android.schedulers.HandlerScheduler$ScheduledRunnable.run(HandlerScheduler.java:109) ... 7 more AppData::create pipe(2) Failed: Too many open files FATAL EXCEPTION: main Process: com.example,PID: 15857 java.lang.IllegalStateException: Fatal Exception thrown on Scheduler. at io.reactivex.android.schedulers.HandlerScheduler$ScheduledRunnable.run(HandlerScheduler.java:111) at android.os.Handler.handleCallback(Handler.java:739) at android.os.Handler.dispatchMessage(Handler.java:95) at android.os.Looper.loop(Looper.java:148) at android.app.ActivityThread.main(ActivityThread.java:5417) at java.lang.reflect.Method.invoke(Native Method) at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:726) at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:616) Caused by: java.lang.OutOfMemoryError: Could not allocate JNI Env at java.lang.Thread.nativeCreate(Native Method) at java.lang.Thread.start(Thread.java:1063) at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:921) at java.util.concurrent.ThreadPoolExecutor.ensurePrestart(ThreadPoolExecutor.java:1556) at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:310) at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:543) at java.util.concurrent.ScheduledThreadPoolExecutor.submit(ScheduledThreadPoolExecutor.java:642) at io.reactivex.internal.schedulers.NewThreadWorker.scheduleActual(NewThreadWorker.java:120) at io.reactivex.internal.schedulers.IoScheduler$EventLoopWorker.schedule(IoScheduler.java:221) at io.reactivex.Scheduler.scheduleDirect(Scheduler.java:130) at io.reactivex.Scheduler.scheduleDirect(Scheduler.java:109) at io.reactivex.internal.operators.observable.ObservableSubscribeOn.subscribeActual(ObservableSubscribeOn.java:36) at io.reactivex.Observable.subscribe(Observable.java:10514) at io.reactivex.internal.operators.observable.ObservableObserveOn.subscribeActual(ObservableObserveOn.java:44) at io.reactivex.Observable.subscribe(Observable.java:10514) at com.example.MainActivity.getStory(MainActivity.java:100) at com.example.MainActivity.access$300(MainActivity.java:25) at com.example.MainActivity$2.onNext(MainActivity.java:67) at com.example.MainActivity$2.onNext(MainActivity.java:49) at io.reactivex.subjects.PublishSubject$PublishDisposable.onNext(PublishSubject.java:263) at io.reactivex.subjects.PublishSubject.onNext(PublishSubject.java:182) at com.example.MainActivity$5.onNext(MainActivity.java:147) at com.example.MainActivity$5.onNext(MainActivity.java:138) at io.reactivex.internal.operators.observable.ObservableObserveOn$ObserveOnObserver.drainNormal(ObservableObserveOn.java:198) at io.reactivex.internal.operators.observable.ObservableObserveOn$ObserveOnObserver.run(ObservableObserveOn.java:250) at io.reactivex.android.schedulers.HandlerScheduler$ScheduledRunnable.run(HandlerScheduler.java:109) ... 7 more
解决方法
而JakeWharton的回复实际上是在加速
The problem is that Schedulers.io() uses a cached thread pool without a limit and thus is trying to create 1500 threads. You should consider using a Scheduler that has a fixed limit of threads,or using RxJava 2.x’s parallel() operator to parallelize the operation to a fixed number of workers.
If you’re using raw Retrofit by default it uses OkHttp’s dispatcher which limits the threads to something like 64 (with a max of 5 per host). That’s why you aren’t seeing it fail.
If you use createAsync() when creating the RxJava2CallAdapterFactory it will create fully-async Observable instances that don’t require a subscribeOn and which use OkHttp’s Dispatcher just like Retrofit would otherwise. Then you only need observeOn to move back to the main thread,and you avoid all additional thread creation.