序
本文主要研究一下reactor-netty中TcpClient的newHandler过程
maven
<dependency> <groupId>io.projectreactor.ipc</groupId> <artifactId>reactor-netty</artifactId> <version>0.7.3.RELEASE</version> </dependency>
TcpClient.newHandler
reactor-netty-0.7.3.RELEASE-sources.jar!/reactor/ipc/netty/tcp/TcpClient.java
/** * @param handler * @param address * @param secure * @param onSetup * * @return a new Mono to connect on subscribe */ protected Mono<NettyContext> newHandler(BiFunction<? super NettyInbound,? super NettyOutbound,? extends Publisher<Void>> handler,InetSocketAddress address,boolean secure,Consumer<? super Channel> onSetup) { final BiFunction<? super NettyInbound,? extends Publisher<Void>> targetHandler = null == handler ? ChannelOperations.noopHandler() : handler; return Mono.create(sink -> { SocketAddress remote = address != null ? address : options.getAddress(); ChannelPool pool = null; PoolResources poolResources = options.getPoolResources(); if (poolResources != null) { pool = poolResources.selectOrCreate(remote,options,doHandler(null,sink,secure,remote,null,null),options.getLoopResources().onClient(options.preferNative())); } ContextHandler<SocketChannel> contextHandler = doHandler(targetHandler,pool,onSetup); sink.onCancel(contextHandler); if (pool == null) { Bootstrap b = options.get(); b.remoteAddress(remote); b.handler(contextHandler); contextHandler.setFuture(b.connect()); } else { contextHandler.setFuture(pool.acquire()); } }); }
- 这里使用了Mono的sink来创建返回Mono<NettyContext>
- 这里使用poolResources.selectOrCreate来获取一个channelPool
- 然后创建一个contextHandler
- 最后调用contextHandler.setFuture设置channel
- 注意这里调用了两次doHandler方法,第一次调用pool参数为null,第二次调用传入了新创建的pool
TcpResources.selectOrCreate
reactor-netty-0.7.3.RELEASE-sources.jar!/reactor/ipc/netty/tcp/TcpResources.java
public ChannelPool selectOrCreate(SocketAddress address,Supplier<? extends Bootstrap> bootstrap,Consumer<? super Channel> onChannelCreate,EventLoopGroup group) { return defaultPools.selectOrCreate(address,bootstrap,onChannelCreate,group); }
这里委托给DefaultPoolResources
DefaultPoolResources.selectOrCreate
reactor-netty-0.7.3.RELEASE-sources.jar!/reactor/ipc/netty/resources/DefaultPoolResources.java
public ChannelPool selectOrCreate(SocketAddress remote,EventLoopGroup group) { SocketAddress address = remote; for (; ; ) { Pool pool = channelPools.get(remote); if (pool != null) { return pool; } Bootstrap b = bootstrap.get(); if (remote != null) { b = b.remoteAddress(remote); } else { address = b.config() .remoteAddress(); } if (log.isDebugEnabled()) { log.debug("New {} client pool for {}",name,address); } pool = new Pool(b,provider,group); if (channelPools.putIfAbsent(address,pool) == null) { return pool; } pool.close(); } }
可以看到这里先get,get不到则new一个Pool然后放进channelPools中
DefaultPoolResources#Pool
final static class Pool extends AtomicBoolean implements ChannelPoolHandler,ChannelPool,ChannelHealthChecker { final ChannelPool pool; final Consumer<? super Channel> onChannelCreate; final EventLoopGroup defaultGroup; final AtomicInteger activeConnections = new AtomicInteger(); final Future<Boolean> HEALTHY; final Future<Boolean> UNHEALTHY; @SuppressWarnings("unchecked") Pool(Bootstrap bootstrap,PoolFactory provider,EventLoopGroup group) { this.pool = provider.newPool(bootstrap,this,this); this.onChannelCreate = onChannelCreate; this.defaultGroup = group; HEALTHY = group.next() .newSucceededFuture(true); UNHEALTHY = group.next() .newSucceededFuture(false); } @Override public Future<Boolean> isHealthy(Channel channel) { return channel.isActive() ? HEALTHY : UNHEALTHY; } @Override public Future<Channel> acquire() { return pool.acquire(); } @Override public Future<Channel> acquire(Promise<Channel> promise) { return pool.acquire(promise); } @Override public Future<Void> release(Channel channel) { return pool.release(channel); } @Override public Future<Void> release(Channel channel,Promise<Void> promise) { return pool.release(channel,promise); } @Override public void close() { if(compareAndSet(false,true)) { pool.close(); } } @Override public void channelReleased(Channel ch) throws Exception { activeConnections.decrementAndGet(); if (log.isDebugEnabled()) { log.debug("Released {},now {} active connections",ch.toString(),activeConnections); } } @Override public void channelAcquired(Channel ch) throws Exception { activeConnections.incrementAndGet(); if (log.isDebugEnabled()) { log.debug("Acquired {},activeConnections); } } @Override public void channelCreated(Channel ch) throws Exception { activeConnections.incrementAndGet(); if (log.isDebugEnabled()) { log.debug("Created {},activeConnections); } if (onChannelCreate != null) { onChannelCreate.accept(ch); } } @Override public String toString() { return pool.getClass() .getSimpleName() + "{" + "activeConnections=" + activeConnections + '}'; } }
可以看到这里是使用provider.newPool来创建底层的ChannelPool
这里的provider是个Lambda表达式,SimpleChannelPool::new
interface PoolFactory { ChannelPool newPool(Bootstrap b,ChannelPoolHandler handler,ChannelHealthChecker checker); }
使用的是SimpleChannelPool的Bootstrap bootstrap,final ChannelPoolHandler handler,ChannelHealthChecker healthCheck这三个参数的构造器
Pool本身则实现了ChannelPoolHandler以及ChannelHealthChecker接口
netty-transport-4.1.20.Final-sources.jar!/io/netty/channel/pool/SimpleChannelPool.java
/** * Creates a new instance. * * @param bootstrap the{@link Bootstrap} that is used for connections * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions * @param healthCheck the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is * still healthy when obtain from the {@link ChannelPool} */ public SimpleChannelPool(Bootstrap bootstrap,ChannelHealthChecker healthCheck) { this(bootstrap,handler,healthCheck,true); }
ChannelPoolHandler
netty-transport-4.1.20.Final-sources.jar!/io/netty/channel/pool/ChannelPoolHandler.java
/** * Handler which is called for varIoUs actions done by the {@link ChannelPool}. */ public interface ChannelPoolHandler { /** * Called once a {@link Channel} was released by calling {@link ChannelPool#release(Channel)} or * {@link ChannelPool#release(Channel,Promise)}. * * This method will be called by the {@link EventLoop} of the {@link Channel}. */ void channelReleased(Channel ch) throws Exception; /** * Called once a {@link Channel} was acquired by calling {@link ChannelPool#acquire()} or * {@link ChannelPool#acquire(Promise)}. * * This method will be called by the {@link EventLoop} of the {@link Channel}. */ void channelAcquired(Channel ch) throws Exception; /** * Called once a new {@link Channel} is created in the {@link ChannelPool}. * * This method will be called by the {@link EventLoop} of the {@link Channel}. */ void channelCreated(Channel ch) throws Exception; }
ChannelHealthChecker
netty-transport-4.1.20.Final-sources.jar!/io/netty/channel/pool/ChannelHealthChecker.java
/** * Called before a {@link Channel} will be returned via {@link ChannelPool#acquire()} or * {@link ChannelPool#acquire(Promise)}. */ public interface ChannelHealthChecker { /** * {@link ChannelHealthChecker} implementation that checks if {@link Channel#isActive()} returns {@code true}. */ ChannelHealthChecker ACTIVE = new ChannelHealthChecker() { @Override public Future<Boolean> isHealthy(Channel channel) { EventLoop loop = channel.eventLoop(); return channel.isActive()? loop.newSucceededFuture(Boolean.TRUE) : loop.newSucceededFuture(Boolean.FALSE); } }; /** * Check if the given channel is healthy which means it can be used. The returned {@link Future} is notified once * the check is complete. If notified with {@link Boolean#TRUE} it can be used {@link Boolean#FALSE} otherwise. * * This method will be called by the {@link EventLoop} of the {@link Channel}. */ Future<Boolean> isHealthy(Channel channel); }
SimpleChannelPool
netty-transport-4.1.20.Final-sources.jar!/io/netty/channel/pool/SimpleChannelPool.java
/** * Simple {@link ChannelPool} implementation which will create new {@link Channel}s if someone tries to acquire * a {@link Channel} but none is in the pool atm. No limit on the maximal concurrent {@link Channel}s is enforced. * * This implementation uses LIFO order for {@link Channel}s in the {@link ChannelPool}. * */ public class SimpleChannelPool implements ChannelPool { private static final AttributeKey<SimpleChannelPool> POOL_KEY = AttributeKey.newInstance("channelPool"); private static final IllegalStateException FULL_EXCEPTION = ThrowableUtil.unknownStackTrace( new IllegalStateException("ChannelPool full"),SimpleChannelPool.class,"releaseAndOffer(...)"); private final Deque<Channel> deque = PlatformDependent.newConcurrentDeque(); private final ChannelPoolHandler handler; private final ChannelHealthChecker healthCheck; private final Bootstrap bootstrap; private final boolean releaseHealthCheck; private final boolean lastRecentUsed; //...... /** * Poll a {@link Channel} out of the internal storage to reuse it. This will return {@code null} if no * {@link Channel} is ready to be reused. * * Sub-classes may override {@link #pollChannel()} and {@link #offerChannel(Channel)}. Be aware that * implementations of these methods needs to be thread-safe! */ protected Channel pollChannel() { return lastRecentUsed ? deque.pollLast() : deque.pollFirst(); } /** * Offer a {@link Channel} back to the internal storage. This will return {@code true} if the {@link Channel} * could be added,{@code false} otherwise. * * Sub-classes may override {@link #pollChannel()} and {@link #offerChannel(Channel)}. Be aware that * implementations of these methods needs to be thread-safe! */ protected boolean offerChannel(Channel channel) { return deque.offer(channel); } }
SimpleChannelPool使用一个LIFO的Deque来维护Channel
SimpleChannelPool.acquire
netty-transport-4.1.20.Final-sources.jar!/io/netty/channel/pool/SimpleChannelPool.java
@Override public final Future<Channel> acquire() { return acquire(bootstrap.config().group().next().<Channel>newPromise()); } @Override public Future<Channel> acquire(final Promise<Channel> promise) { checkNotNull(promise,"promise"); return acquireHealthyFromPoolOrNew(promise); } /** * Tries to retrieve healthy channel from the pool if any or creates a new channel otherwise. * @param promise the promise to provide acquire result. * @return future for acquiring a channel. */ private Future<Channel> acquireHealthyFromPoolOrNew(final Promise<Channel> promise) { try { final Channel ch = pollChannel(); if (ch == null) { // No Channel left in the pool bootstrap a new Channel Bootstrap bs = bootstrap.clone(); bs.attr(POOL_KEY,this); ChannelFuture f = connectChannel(bs); if (f.isDone()) { notifyConnect(f,promise); } else { f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { notifyConnect(future,promise); } }); } return promise; } EventLoop loop = ch.eventLoop(); if (loop.inEventLoop()) { doHealthCheck(ch,promise); } else { loop.execute(new Runnable() { @Override public void run() { doHealthCheck(ch,promise); } }); } } catch (Throwable cause) { promise.tryFailure(cause); } return promise; }
注意这里调用了pollChannel从deque中获取并进行healthCheck,如果为null则新建立一个
SimpleChannelPool.release
@Override public final Future<Void> release(Channel channel) { return release(channel,channel.eventLoop().<Void>newPromise()); } @Override public Future<Void> release(final Channel channel,final Promise<Void> promise) { checkNotNull(channel,"channel"); checkNotNull(promise,"promise"); try { EventLoop loop = channel.eventLoop(); if (loop.inEventLoop()) { doReleaseChannel(channel,promise); } else { loop.execute(new Runnable() { @Override public void run() { doReleaseChannel(channel,promise); } }); } } catch (Throwable cause) { closeAndFail(channel,cause,promise); } return promise; } private void doReleaseChannel(Channel channel,Promise<Void> promise) { assert channel.eventLoop().inEventLoop(); // Remove the POOL_KEY attribute from the Channel and check if it was acquired from this pool,if not fail. if (channel.attr(POOL_KEY).getAndSet(null) != this) { closeAndFail(channel,// Better include a stacktrace here as this is an user error. new IllegalArgumentException( "Channel " + channel + " was not acquired from this ChannelPool"),promise); } else { try { if (releaseHealthCheck) { doHealthCheckOnRelease(channel,promise); } else { releaseAndOffer(channel,promise); } } catch (Throwable cause) { closeAndFail(channel,promise); } } } private void doHealthCheckOnRelease(final Channel channel,final Promise<Void> promise) throws Exception { final Future<Boolean> f = healthCheck.isHealthy(channel); if (f.isDone()) { releaseAndOfferIfHealthy(channel,promise,f); } else { f.addListener(new FutureListener<Boolean>() { @Override public void operationComplete(Future<Boolean> future) throws Exception { releaseAndOfferIfHealthy(channel,f); } }); } } /** * Adds the channel back to the pool only if the channel is healthy. * @param channel the channel to put back to the pool * @param promise offer operation promise. * @param future the future that contains information fif channel is healthy or not. * @throws Exception in case when Failed to notify handler about release operation. */ private void releaseAndOfferIfHealthy(Channel channel,Promise<Void> promise,Future<Boolean> future) throws Exception { if (future.getNow()) { //channel turns out to be healthy,offering and releasing it. releaseAndOffer(channel,promise); } else { //channel not healthy,just releasing it. handler.channelReleased(channel); promise.setSuccess(null); } } private void releaseAndOffer(Channel channel,Promise<Void> promise) throws Exception { if (offerChannel(channel)) { handler.channelReleased(channel); promise.setSuccess(null); } else { closeAndFail(channel,FULL_EXCEPTION,promise); } }
在release的时候调用offerChannel将Channel放回deque中
使用三个参数的构造器创建的SimpleChannelPool,其releaseHealthCheck值为true,即释放的时候进行health check
TcpClient.doHandler
/** * Create a {@link ContextHandler} for {@link Bootstrap#handler()} * * @param handler user provided in/out handler * @param sink user provided bind handler * @param secure if operation should be secured * @param pool if channel pool * @param onSetup if operation has local setup callback * * @return a new {@link ContextHandler} */ protected ContextHandler<SocketChannel> doHandler(BiFunction<? super NettyInbound,MonoSink<NettyContext> sink,SocketAddress providedAddress,ChannelPool pool,Consumer<? super Channel> onSetup) { return ContextHandler.newClientContext(sink,loggingHandler,providedAddress,handler == null ? EMPTY : (ch,c,msg) -> ChannelOperations.bind(ch,c)); }
这里调用ContextHandler.newClientContext创建了一个ContextHandler<SocketChannel>
ContextHandler.newClientContext
reactor-netty-0.7.3.RELEASE-sources.jar!/reactor/ipc/netty/channel/ContextHandler.java
/** * Create a new client context with optional pool support * * @param sink * @param options * @param loggingHandler * @param secure * @param providedAddress * @param channelOpFactory * @param pool * @param <CHANNEL> * * @return a new {@link ContextHandler} for clients */ public static <CHANNEL extends Channel> ContextHandler<CHANNEL> newClientContext( MonoSink<NettyContext> sink,ClientOptions options,LoggingHandler loggingHandler,ChannelOperations.OnNew<CHANNEL> channelOpFactory) { if (pool != null) { return new PooledClientContextHandler<>(channelOpFactory,pool); } return new ClientContextHandler<>(channelOpFactory,providedAddress); }
注意这里将newHandler的Lambda表达式注册为ChannelOperations.OnNew<CHANNEL>的channelOpFactory
第一次调用doHandler的时候pool为null,创建的是ClientContextHandler;等pool创建好了,第二次调用doHandler的时候,pool不为null,创建的是PooledClientContextHandler
PooledClientContextHandler
reactor-netty-0.7.3.RELEASE-sources.jar!/reactor/ipc/netty/channel/PooledClientContextHandler.java
@Override public void fireContextActive(NettyContext context) { if (!fired) { fired = true; if (context != null) { sink.success(context); } else { sink.success(); } } } @Override @SuppressWarnings("unchecked") public void setFuture(Future<?> future) { Objects.requireNonNull(future,"future"); Future<CHANNEL> f; for (; ; ) { f = this.future; if (f == DISPOSED) { if (log.isDebugEnabled()) { log.debug("Cancelled existing channel from pool: {}",pool.toString()); } sink.success(); return; } if (FUTURE.compareAndSet(this,f,future)) { break; } } if (log.isDebugEnabled()) { log.debug("Acquiring existing channel from pool: {} {}",future,pool.toString()); } ((Future<CHANNEL>) future).addListener(this); } final void connectOrAcquire(CHANNEL c) { if (DISPOSED == this.future) { if (log.isDebugEnabled()) { log.debug("Dropping acquisition {} because of {}","asynchronous user cancellation"); } dispoSEOperationThenRelease(c); sink.success(); return; } if (!c.isActive()) { log.debug("Immediately aborted pooled channel,re-acquiring new " + "channel: {}",c.toString()); release(c); setFuture(pool.acquire()); return; } ChannelOperationsHandler op = c.pipeline() .get(ChannelOperationsHandler.class); if (op == null) { if (log.isDebugEnabled()) { log.debug("Created new pooled channel: " + c.toString()); } c.closeFuture() .addListener(ff -> release(c)); return; } if (log.isDebugEnabled()) { log.debug("Acquired active channel: " + c.toString()); } if (createOperations(c,null) == null) { setFuture(pool.acquire()); } } public void operationComplete(Future<CHANNEL> future) throws Exception { if (future.isCancelled()) { if (log.isDebugEnabled()) { log.debug("Cancelled {}",future.toString()); } return; } if (DISPOSED == this.future) { if (log.isDebugEnabled()) { log.debug("Dropping acquisition {} because of {}","asynchronous user cancellation"); } if (future.isSuccess()) { dispoSEOperationThenRelease(future.get()); } sink.success(); return; } if (!future.isSuccess()) { if (future.cause() != null) { fireContextError(future.cause()); } else { fireContextError(new AbortedException("error while acquiring connection")); } return; } CHANNEL c = future.get(); if (c.eventLoop() .inEventLoop()) { connectOrAcquire(c); } else { c.eventLoop() .execute(() -> connectOrAcquire(c)); } }
fireContextActive,setFuture,connectOrAcquire,operationComplete这几个方法都会调用MonoCreate的success方法来产生数据
Mono.subscribe
reactor-core-3.1.3.RELEASE-sources.jar!/reactor/core/publisher/Mono.java
/** * Subscribe to this {@link Mono} and request unbounded demand. * <p> * This version doesn't specify any consumption behavior for the events from the * chain,especially no error handling,so other variants should usually be preferred. * * <p> * <img width="500" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/unbounded1.png" alt=""> * <p> * * @return a new {@link Disposable} that can be used to cancel the underlying {@link Subscription} */ public final Disposable subscribe() { if(this instanceof MonoProcessor){ MonoProcessor<T> s = (MonoProcessor<T>)this; s.connect(); return s; } else{ return subscribeWith(new LambdaMonoSubscriber<>(null,null)); } }
这里创建的是LambdaMonoSubscriber,最后调用的是MonoCreate的subscribe(actual)方法
reactor-core-3.1.3.RELEASE-sources.jar!/reactor/core/publisher/MonoCreate.java
public void subscribe(CoreSubscriber<? super T> actual) { DefaultMonoSink<T> emitter = new DefaultMonoSink<>(actual); actual.onSubscribe(emitter); try { callback.accept(emitter); } catch (Throwable ex) { emitter.error(Operators.onOperatorError(ex,actual.currentContext())); } }
这里的actual就是LambdaMonoSubscriber
这里的callback.accept就是调用newHandler里头的Mono.create里头的Lambda表达式,也就是mono的sink,触发建立连接发送请求
小结
TcpClient.newHandler返回的是一个Mono,而在subscribe的时候触发执行MonoCreate的Lambda表达式。