序
本文主要研究下FluxInterval的机制
FluxInterval
reactor-core-3.1.3.RELEASE-sources.jar!/reactor/core/publisher/FluxInterval.java
/** * Periodically emits an ever increasing long value either via a ScheduledExecutorService * or a custom async callback function * @see <a href="https://github.com/reactor/reactive-streams-commons">Reactive-Streams-Commons</a> */ final class FluxInterval extends Flux<Long> { final Scheduler timedScheduler; final long initialDelay; final long period; final TimeUnit unit; FluxInterval( long initialDelay,long period,TimeUnit unit,Scheduler timedScheduler) { if (period < 0L) { throw new IllegalArgumentException("period >= 0 required but it was " + period); } this.initialDelay = initialDelay; this.period = period; this.unit = Objects.requireNonNull(unit,"unit"); this.timedScheduler = Objects.requireNonNull(timedScheduler,"timedScheduler"); } @Override public void subscribe(CoreSubscriber<? super Long> actual) { Worker w = timedScheduler.createWorker(); IntervalRunnable r = new IntervalRunnable(actual,w); actual.onSubscribe(r); try { w.schedulePeriodically(r,initialDelay,period,unit); } catch (RejectedExecutionException ree) { if (!r.cancelled) { actual.onError(Operators.onRejectedExecution(ree,r,null,actual.currentContext())); } } } }
可以看到这里利用Scheduler来创建一个定时调度任务IntervalRunnable
IntervalRunnable
static final class IntervalRunnable implements Runnable,Subscription,InnerProducer<Long> { final CoreSubscriber<? super Long> actual; final Worker worker; volatile long requested; static final AtomicLongFieldUpdater<IntervalRunnable> REQUESTED = AtomicLongFieldUpdater.newUpdater(IntervalRunnable.class,"requested"); long count; volatile boolean cancelled; IntervalRunnable(CoreSubscriber<? super Long> actual,Worker worker) { this.actual = actual; this.worker = worker; } @Override public CoreSubscriber<? super Long> actual() { return actual; } @Override @Nullable public Object scanUnsafe(Attr key) { if (key == Attr.CANCELLED) return cancelled; return InnerProducer.super.scanUnsafe(key); } @Override public void run() { if (!cancelled) { if (requested != 0L) { actual.onNext(count++); if (requested != Long.MAX_VALUE) { REQUESTED.decrementAndGet(this); } } else { cancel(); actual.onError(Exceptions.failWithOverflow("Could not emit tick " + count + " due to lack of requests" + " (interval doesn't support small downstream requests that replenish slower than the ticks)")); } } } @Override public void request(long n) { if (Operators.validate(n)) { Operators.addCap(REQUESTED,this,n); } } @Override public void cancel() { if (!cancelled) { cancelled = true; worker.dispose(); } } }
这里重点看requested变量,run方法每次判断requested,如果requested为0则销毁worker,否则则每次发射一个元素计数就减一
而subscriber如果有继续request的话,则会增加requested的值
实例1
public static void main(String[] args) throws InterruptedException { Flux<Long> flux = Flux.interval(Duration.ofMillis(1)) .doOnNext(e -> { System.out.println(e); }).doOnError(e -> e.printStackTrace()); System.out.println("begin to subscribe"); flux.subscribe(e -> { System.out.println(e); try { TimeUnit.MINUTES.sleep(30); } catch (InterruptedException e1) { e1.printStackTrace(); } }); TimeUnit.MINUTES.sleep(30); }
这个例子requested是Long.MAX_VALUE,但是由于subscribe的线程跟运行interval的线程一样,由于里头执行了sleep操作也导致interval的调度也跟着阻塞住了。
实例2
public static void main(String[] args) throws InterruptedException { Flux<Long> flux = Flux.interval(Duration.ofMillis(1)) .doOnNext(e -> { System.out.println(e); }) //NOTE 这里request prefetch=256个 .publishOn(Schedulers.newElastic("publish-thread")) .doOnError(e -> e.printStackTrace()); System.out.println("begin to subscribe"); AtomicInteger count = new AtomicInteger(0); //NOTE 得有subscribe才能触发request flux.subscribe(e -> { LOGGER.info("receive:{}",e); try { //NOTE 使用publishOn将subscribe与interval的线程分开 if(count.get() == 0){ TimeUnit.MINUTES.sleep(2); } count.incrementAndGet(); } catch (InterruptedException e1) { e1.printStackTrace(); } }); TimeUnit.MINUTES.sleep(30); }
使用publishOn将subscriber线程与interval线程隔离,使其sleep不阻塞interval
这里publishOn隐含了一个prefetch参数,默认是Queues.SMALL_BUFFER_SIZE即Math.max(16,Integer.parseInt(System.getProperty("reactor.bufferSize.small","256")));
public final Flux<T> publishOn(Scheduler scheduler) { return publishOn(scheduler,Queues.SMALL_BUFFER_SIZE); } final Flux<T> publishOn(Scheduler scheduler,boolean delayError,int prefetch,int lowTide) { if (this instanceof Callable) { if (this instanceof Fuseable.ScalarCallable) { @SuppressWarnings("unchecked") Fuseable.ScalarCallable<T> s = (Fuseable.ScalarCallable<T>) this; try { return onAssembly(new FluxSubscribeOnValue<>(s.call(),scheduler)); } catch (Exception e) { //leave FluxSubscribeOnCallable defer exception call } } @SuppressWarnings("unchecked") Callable<T> c = (Callable<T>)this; return onAssembly(new FluxSubscribeOnCallable<>(c,scheduler)); } return onAssembly(new FluxPublishOn<>(this,scheduler,delayError,prefetch,lowTide,Queues.get(prefetch))); }
这里使用Queues.get(prefetch)创建一个间接的队列来盛放元素
这个实例最后输出
//...... 21:06:03.108 [publish-thread-2] INFO com.example.demo.FluxTest - receive:254 21:06:03.108 [publish-thread-2] INFO com.example.demo.FluxTest - receive:255 reactor.core.Exceptions$OverflowException: Could not emit tick 256 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks) at reactor.core.Exceptions.failWithOverflow(Exceptions.java:215) at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:121) at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59) at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
由于第一次request默认是256,之后在发射256个元素之后,subscriber没有跟上,导致interval的worker被cancel掉了,于是后续消费完256个元素之后,紧挨着就是OverflowException这个异常
小结
reactor本身并不依赖线程,只有interval,delayElements等方法才会创建线程。而reactor本身是观察者设计模式的扩展,采用push+backpressure模式,一开始调用subscribe方法就触发request N请求推送数据,之后publisher就onNext推送数据,直到complete或cancel。实例1是因为线程阻塞导致interval的onNext阻塞,实例2是interval被cancel掉导致flux关闭。
原文链接:https://www.f2er.com/react/301936.html