reactor-netty的TcpClient如何往eventLoop提交task

前端之家收集整理的这篇文章主要介绍了reactor-netty的TcpClient如何往eventLoop提交task前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。

本文主要研究一下reactor-netty的TcpClient如何往eventLoop提交task

实例

  1. TcpClient client = TcpClient.create("localhost",8888);
  2. LOGGER.info("client:{}",client.getClass());
  3. Mono<? extends NettyContext> handler = client.newHandler((inbound,outbound) -> {
  4. return outbound.sendString(Mono.just("Hello World!")).then(inbound.receive()
  5. .asString().next().log().then());
  6. });
  7. LOGGER.info("handler:{}",handler.getClass()); //NOTE reactor.core.publisher.MonoCreate
  8. handler.subscribe();

TcpClient.newHandler

reactor-netty-0.7.3.RELEASE-sources.jar!/reactor/ipc/netty/tcp/TcpClient.java

  1. protected Mono<NettyContext> newHandler(BiFunction<? super NettyInbound,? super NettyOutbound,? extends Publisher<Void>> handler,InetSocketAddress address,boolean secure,Consumer<? super Channel> onSetup) {
  2.  
  3. final BiFunction<? super NettyInbound,? extends Publisher<Void>>
  4. targetHandler =
  5. null == handler ? ChannelOperations.noopHandler() : handler;
  6.  
  7. return Mono.create(sink -> {
  8. SocketAddress remote = address != null ? address : options.getAddress();
  9.  
  10. ChannelPool pool = null;
  11.  
  12. PoolResources poolResources = options.getPoolResources();
  13. if (poolResources != null) {
  14. pool = poolResources.selectOrCreate(remote,options,doHandler(null,sink,secure,remote,null,null),options.getLoopResources().onClient(options.preferNative()));
  15. }
  16.  
  17. ContextHandler<SocketChannel> contextHandler =
  18. doHandler(targetHandler,pool,onSetup);
  19. sink.onCancel(contextHandler);
  20.  
  21. if (pool == null) {
  22. Bootstrap b = options.get();
  23. b.remoteAddress(remote);
  24. b.handler(contextHandler);
  25. contextHandler.setFuture(b.connect());
  26. }
  27. else {
  28. contextHandler.setFuture(pool.acquire());
  29. }
  30. });
  31. }
注意这里的pool.acquire()或者b.connect()

SimpleChannelPool.acquireHealthyFromPoolOrNew

netty-transport-4.1.20.Final-sources.jar!/io/netty/channel/pool/SimpleChannelPool.java

  1. /**
  2. * Tries to retrieve healthy channel from the pool if any or creates a new channel otherwise.
  3. * @param promise the promise to provide acquire result.
  4. * @return future for acquiring a channel.
  5. */
  6. private Future<Channel> acquireHealthyFromPoolOrNew(final Promise<Channel> promise) {
  7. try {
  8. final Channel ch = pollChannel();
  9. if (ch == null) {
  10. // No Channel left in the pool bootstrap a new Channel
  11. Bootstrap bs = bootstrap.clone();
  12. bs.attr(POOL_KEY,this);
  13. ChannelFuture f = connectChannel(bs);
  14. if (f.isDone()) {
  15. notifyConnect(f,promise);
  16. } else {
  17. f.addListener(new ChannelFutureListener() {
  18. @Override
  19. public void operationComplete(ChannelFuture future) throws Exception {
  20. notifyConnect(future,promise);
  21. }
  22. });
  23. }
  24. return promise;
  25. }
  26. EventLoop loop = ch.eventLoop();
  27. if (loop.inEventLoop()) {
  28. doHealthCheck(ch,promise);
  29. } else {
  30. loop.execute(new Runnable() {
  31. @Override
  32. public void run() {
  33. doHealthCheck(ch,promise);
  34. }
  35. });
  36. }
  37. } catch (Throwable cause) {
  38. promise.tryFailure(cause);
  39. }
  40. return promise;
  41. }
  42.  
  43. /**
  44. * Bootstrap a new {@link Channel}. The default implementation uses {@link Bootstrap#connect()},sub-classes may
  45. * override this.
  46. * <p>
  47. * The {@link Bootstrap} that is passed in here is cloned via {@link Bootstrap#clone()},so it is safe to modify.
  48. */
  49. protected ChannelFuture connectChannel(Bootstrap bs) {
  50. return bs.connect();
  51. }
pool.acquire()最后调用的是SimpleChannelPool.acquireHealthyFromPoolOrNew,它最后调用connectChannel也是调用Bootstrap.connect

Bootstrap.connect

netty-transport-4.1.20.Final-sources.jar!/io/netty/bootstrap/Bootstrap.java

  1. /**
  2. * Connect a {@link Channel} to the remote peer.
  3. */
  4. public ChannelFuture connect() {
  5. validate();
  6. SocketAddress remoteAddress = this.remoteAddress;
  7. if (remoteAddress == null) {
  8. throw new IllegalStateException("remoteAddress not set");
  9. }
  10.  
  11. return doResolveAndConnect(remoteAddress,config.localAddress());
  12. }
  13.  
  14. /**
  15. * @see #connect()
  16. */
  17. private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress,final SocketAddress localAddress) {
  18. final ChannelFuture regFuture = initAndRegister();
  19. final Channel channel = regFuture.channel();
  20.  
  21. if (regFuture.isDone()) {
  22. if (!regFuture.isSuccess()) {
  23. return regFuture;
  24. }
  25. return doResolveAndConnect0(channel,remoteAddress,localAddress,channel.newPromise());
  26. } else {
  27. // Registration future is almost always fulfilled already,but just in case it's not.
  28. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
  29. regFuture.addListener(new ChannelFutureListener() {
  30. @Override
  31. public void operationComplete(ChannelFuture future) throws Exception {
  32. // Directly obtain the cause and do a null check so we only need one volatile read in case of a
  33. // failure.
  34. Throwable cause = future.cause();
  35. if (cause != null) {
  36. // Registration on the EventLoop Failed so fail the ChannelPromise directly to not cause an
  37. // IllegalStateException once we try to access the EventLoop of the Channel.
  38. promise.setFailure(cause);
  39. } else {
  40. // Registration was successful,so set the correct executor to use.
  41. // See https://github.com/netty/netty/issues/2586
  42. promise.registered();
  43. doResolveAndConnect0(channel,promise);
  44. }
  45. }
  46. });
  47. return promise;
  48. }
  49. }
注意这里调用了initAndRegister
然后调用doResolveAndConnect0

initAndRegister

netty-transport-4.1.20.Final-sources.jar!/io/netty/bootstrap/AbstractBootstrap.java

  1. final ChannelFuture initAndRegister() {
  2. Channel channel = null;
  3. try {
  4. channel = channelFactory.newChannel();
  5. init(channel);
  6. } catch (Throwable t) {
  7. if (channel != null) {
  8. // channel can be null if newChannel crashed (eg SocketException("too many open files"))
  9. channel.unsafe().closeForcibly();
  10. // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
  11. return new DefaultChannelPromise(channel,GlobalEventExecutor.INSTANCE).setFailure(t);
  12. }
  13. // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
  14. return new DefaultChannelPromise(new FailedChannel(),GlobalEventExecutor.INSTANCE).setFailure(t);
  15. }
  16.  
  17. ChannelFuture regFuture = config().group().register(channel);
  18. if (regFuture.cause() != null) {
  19. if (channel.isRegistered()) {
  20. channel.close();
  21. } else {
  22. channel.unsafe().closeForcibly();
  23. }
  24. }
  25.  
  26. // If we are here and the promise is not Failed,it's one of the following cases:
  27. // 1) If we attempted registration from the event loop,the registration has been completed at this point.
  28. // i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
  29. // 2) If we attempted registration from the other thread,the registration request has been successfully
  30. // added to the event loop's task queue for later execution.
  31. // i.e. It's safe to attempt bind() or connect() now:
  32. // because bind() or connect() will be executed *after* the scheduled registration task is executed
  33. // because register(),bind(),and connect() are all bound to the same thread.
  34.  
  35. return regFuture;
  36. }
这里先调用channelFactory.newChannel()来创建一个channel,之后进行初始化,这里可能抛出SocketException("too many open files"),异常的话直接设置fail并返回DefaultChannelPromise
注意这里调用了config().group().register(channel),在reactor-netty中这个group是MultithreadEventLoopGroup.java

io.netty.channel.ReflectiveChannelFactory.newChannel()

netty-transport-4.1.20.Final-sources.jar!/io/netty/channel/ReflectiveChannelFactory.java

  1. public T newChannel() {
  2. try {
  3. return clazz.getConstructor().newInstance();
  4. } catch (Throwable t) {
  5. throw new ChannelException("Unable to create Channel from class " + clazz,t);
  6. }
  7. }
这里new的是NioSocketChannel.class
netty-transport-4.1.20.Final-sources.jar!/io/netty/channel/socket/nio/NioSocketChannel.java
  1. private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
  2.  
  3. private static SocketChannel newSocket(SelectorProvider provider) {
  4. try {
  5. /**
  6. * Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
  7. * {@link SelectorProvider#provider()} which is called by each SocketChannel.open() otherwise.
  8. *
  9. * See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
  10. */
  11. return provider.openSocketChannel();
  12. } catch (IOException e) {
  13. throw new ChannelException("Failed to open a socket.",e);
  14. }
  15. }
  16. /**
  17. * Create a new instance using the given {@link SelectorProvider}.
  18. */
  19. public NioSocketChannel(SelectorProvider provider) {
  20. this(newSocket(provider));
  21. }
mac操作系统这里的provider是sun.nio.ch.KQueueSelectorProvider,openSocketChannel调用的是SelectorProviderImpl的方法

jre/lib/rt.jar!/sun/nio/ch/SelectorProviderImpl.class

  1. public SocketChannel openSocketChannel() throws IOException {
  2. return new SocketChannelImpl(this);
  3. }

jre/lib/rt.jar!/sun/nio/ch/SocketChannelImpl.class

  1. SocketChannelImpl(SelectorProvider var1) throws IOException {
  2. super(var1);
  3. this.fd = Net.socket(true);
  4. this.fdVal = IoUtil.fdVal(this.fd);
  5. this.state = 0;
  6. }
注意这里调用了Net.socket(true),创建FileDescriptor,可能抛出SocketException("too many open files")

Bootstrap.init(channel)

netty-transport-4.1.20.Final-sources.jar!/io/netty/bootstrap/Bootstrap.java

  1. void init(Channel channel) throws Exception {
  2. ChannelPipeline p = channel.pipeline();
  3. p.addLast(config.handler());
  4.  
  5. final Map<ChannelOption<?>,Object> options = options0();
  6. synchronized (options) {
  7. setChannelOptions(channel,logger);
  8. }
  9.  
  10. final Map<AttributeKey<?>,Object> attrs = attrs0();
  11. synchronized (attrs) {
  12. for (Entry<AttributeKey<?>,Object> e: attrs.entrySet()) {
  13. channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
  14. }
  15. }
  16. }
这里主要是设置一些options和属性

MultithreadEventLoopGroup.register(channel)

netty-transport-4.1.20.Final-sources.jar!/io/netty/channel/MultithreadEventLoopGroup.java

  1. public ChannelFuture register(Channel channel) {
  2. return next().register(channel);
  3. }
这里的next返回的是SingleThreadEventLoop

netty-transport-4.1.20.Final-sources.jar!/io/netty/channel/SingleThreadEventLoop.java

  1. public ChannelFuture register(Channel channel) {
  2. return register(new DefaultChannelPromise(channel,this));
  3. }
  4.  
  5. public ChannelFuture register(final ChannelPromise promise) {
  6. ObjectUtil.checkNotNull(promise,"promise");
  7. promise.channel().unsafe().register(this,promise);
  8. return promise;
  9. }
这里的unsafe是AbstractChannel$AbstractUnsafe

AbstractChannel$AbstractUnsafe

netty-transport-4.1.20.Final-sources.jar!/io/netty/channel/AbstractChannel.java

  1. public final void register(EventLoop eventLoop,final ChannelPromise promise) {
  2. if (eventLoop == null) {
  3. throw new NullPointerException("eventLoop");
  4. }
  5. if (isRegistered()) {
  6. promise.setFailure(new IllegalStateException("registered to an event loop already"));
  7. return;
  8. }
  9. if (!isCompatible(eventLoop)) {
  10. promise.setFailure(
  11. new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
  12. return;
  13. }
  14.  
  15. AbstractChannel.this.eventLoop = eventLoop;
  16.  
  17. if (eventLoop.inEventLoop()) {
  18. register0(promise);
  19. } else {
  20. try {
  21. eventLoop.execute(new Runnable() {
  22. @Override
  23. public void run() {
  24. register0(promise);
  25. }
  26. });
  27. } catch (Throwable t) {
  28. logger.warn(
  29. "Force-closing a channel whose registration task was not accepted by an event loop: {}",AbstractChannel.this,t);
  30. closeForcibly();
  31. closeFuture.setClosed();
  32. safeSetFailure(promise,t);
  33. }
  34. }
  35. }
这里可以看到调用eventLoop.execute,这个eventLoop是NioEventLoop,调用的是父类SingleThreadEventLoop.execute
注意这里对ChannelPromise再包装了一下,调用了register0操作

SingleThreadEventLoop.execute

  1. public void execute(Runnable task) {
  2. if (task == null) {
  3. throw new NullPointerException("task");
  4. }
  5.  
  6. boolean inEventLoop = inEventLoop();
  7. if (inEventLoop) {
  8. addTask(task);
  9. } else {
  10. startThread();
  11. addTask(task);
  12. if (isShutdown() && removeTask(task)) {
  13. reject();
  14. }
  15. }
  16.  
  17. if (!addTaskWakesUp && wakesUpForTask(task)) {
  18. wakeup(inEventLoop);
  19. }
  20. }
  21.  
  22. /**
  23. * Add a task to the task queue,or throws a {@link RejectedExecutionException} if this instance was shutdown
  24. * before.
  25. */
  26. protected void addTask(Runnable task) {
  27. if (task == null) {
  28. throw new NullPointerException("task");
  29. }
  30. if (!offerTask(task)) {
  31. reject(task);
  32. }
  33. }
  34.  
  35. final boolean offerTask(Runnable task) {
  36. if (isShutdown()) {
  37. reject();
  38. }
  39. return taskQueue.offer(task);
  40. }
可以看到这里execute调用的是addTask,而addTask调用offerTask,最后往taskQueue里头offer任务

register0

netty-transport-4.1.20.Final-sources.jar!/io/netty/channel/AbstractChannel.java

  1. private void register0(ChannelPromise promise) {
  2. try {
  3. // check if the channel is still open as it could be closed in the mean time when the register
  4. // call was outside of the eventLoop
  5. if (!promise.setUncancellable() || !ensureOpen(promise)) {
  6. return;
  7. }
  8. boolean firstRegistration = neverRegistered;
  9. doRegister();
  10. neverRegistered = false;
  11. registered = true;
  12.  
  13. // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
  14. // user may already fire events through the pipeline in the ChannelFutureListener.
  15. pipeline.invokeHandlerAddedIfNeeded();
  16.  
  17. safeSetSuccess(promise);
  18. pipeline.fireChannelRegistered();
  19. // Only fire a channelActive if the channel has never been registered. This prevents firing
  20. // multiple channel actives if the channel is deregistered and re-registered.
  21. if (isActive()) {
  22. if (firstRegistration) {
  23. pipeline.fireChannelActive();
  24. } else if (config().isAutoRead()) {
  25. // This channel was registered before and autoRead() is set. This means we need to begin read
  26. // again so that we process inbound data.
  27. //
  28. // See https://github.com/netty/netty/issues/4805
  29. beginRead();
  30. }
  31. }
  32. } catch (Throwable t) {
  33. // Close the channel directly to avoid FD leak.
  34. closeForcibly();
  35. closeFuture.setClosed();
  36. safeSetFailure(promise,t);
  37. }
  38. }
taskQueue取出来执行的是register0操作,主要是fireChannelRegistered以及fireChannelActive,这个方法将registered字段设置为true

netty-transport-4.1.20.Final-sources.jar!/io/netty/channel/socket/nio/NioSocketChannel.java

  1. public boolean isActive() {
  2. SocketChannel ch = javaChannel();
  3. return ch.isOpen() && ch.isConnected();
  4. }
是否active主要判断是否open,以及connected

Bootstrap.doResolveAndConnect0

netty-transport-4.1.20.Final-sources.jar!/io/netty/bootstrap/Bootstrap.java

  1. private ChannelFuture doResolveAndConnect0(final Channel channel,SocketAddress remoteAddress,final SocketAddress localAddress,final ChannelPromise promise) {
  2. try {
  3. final EventLoop eventLoop = channel.eventLoop();
  4. final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);
  5.  
  6. if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {
  7. // Resolver has no idea about what to do with the specified remote address or it's resolved already.
  8. doConnect(remoteAddress,promise);
  9. return promise;
  10. }
  11.  
  12. final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);
  13.  
  14. if (resolveFuture.isDone()) {
  15. final Throwable resolveFailureCause = resolveFuture.cause();
  16.  
  17. if (resolveFailureCause != null) {
  18. // Failed to resolve immediately
  19. channel.close();
  20. promise.setFailure(resolveFailureCause);
  21. } else {
  22. // Succeeded to resolve immediately; cached? (or did a blocking lookup)
  23. doConnect(resolveFuture.getNow(),promise);
  24. }
  25. return promise;
  26. }
  27.  
  28. // Wait until the name resolution is finished.
  29. resolveFuture.addListener(new FutureListener<SocketAddress>() {
  30. @Override
  31. public void operationComplete(Future<SocketAddress> future) throws Exception {
  32. if (future.cause() != null) {
  33. channel.close();
  34. promise.setFailure(future.cause());
  35. } else {
  36. doConnect(future.getNow(),promise);
  37. }
  38. }
  39. });
  40. } catch (Throwable cause) {
  41. promise.tryFailure(cause);
  42. }
  43. return promise;
  44. }
  45.  
  46. private static void doConnect(
  47. final SocketAddress remoteAddress,final ChannelPromise connectPromise) {
  48.  
  49. // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
  50. // the pipeline in its channelRegistered() implementation.
  51. final Channel channel = connectPromise.channel();
  52. channel.eventLoop().execute(new Runnable() {
  53. @Override
  54. public void run() {
  55. if (localAddress == null) {
  56. channel.connect(remoteAddress,connectPromise);
  57. } else {
  58. channel.connect(remoteAddress,connectPromise);
  59. }
  60. connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
  61. }
  62. });
  63. }
这里在initAndRegister执行成功之后,会触发doResolveAndConnect0,而这里才是真正的执行connect操作

Bootstrap.connect的主要步骤

能够执行多少connect,有这么多个关卡:
  • 创建和初始化channel:AbstractBootstrap的initAndRegister方法中newChannel及init()返回的ChannelFuture不是Failed的,可能因为SocketException("too many open files")无法创建FileDescriptor
  • 提交注册channel的task:往eventLoop注册这个register任务,这里要求taskQueue队列能够容纳得下,默认是Integer.MAX_VALUE没有问题;容纳不下则会reject这个task,抛出RejectedExecutionException(Force-closing a channel whose registration task was not accepted by an event loop),则这个promise被设置为failure,initAndRegister不成功则channel则直接被close掉
taskQueue中的register0(promise)这个任务,更改状态为REGISTERED,之后触发下面这个task去connect,之后判断是否open和connect,如果是则状态变更为ACTIVE
  • 注册这个register ChannelFuture的operationComplete回调:调用doResolveAndConnect0,doResolveAndConnect0执行的是channel的connect
channel的状态变化是Created->REGISTERED->CONNECT->ACTIVE
  1. 21:53:50.934 [reactor-tcp-nio-4] DEBUG reactor.ipc.netty.resources.DefaultPoolResources - Created [id: 0x1ebe331c],now 1 active connections
  2. 21:53:50.941 [reactor-tcp-nio-4] DEBUG reactor.ipc.netty.channel.ContextHandler - After pipeline DefaultChannelPipeline{(reactor.left.loggingHandler = io.netty.handler.logging.LoggingHandler),(SimpleChannelPool$1#0 = io.netty.channel.pool.SimpleChannelPool$1),(reactor.right.reactiveBridge = reactor.ipc.netty.channel.ChannelOperationsHandler)}
  3. 21:53:50.942 [reactor-tcp-nio-4] DEBUG reactor.ipc.netty.tcp.TcpClient - [id: 0x1ebe331c] REGISTERED
  4. 21:54:49.561 [reactor-tcp-nio-4] DEBUG reactor.ipc.netty.tcp.TcpClient - [id: 0x1ebe331c] CONNECT: localhost/127.0.0.1:8888
  5. 21:54:49.571 [reactor-tcp-nio-4] DEBUG reactor.ipc.netty.tcp.TcpClient - [id: 0x1ebe331c,L:/127.0.0.1:55386 - R:localhost/127.0.0.1:8888] ACTIVE

收发数据并关闭channel

变成active之后就会触发newHandler里头Lambda表达式往channel写数据发送
  1. 22:13:12.174 [main] DEBUG reactor.ipc.netty.channel.PooledClientContextHandler - Acquiring existing channel from pool: DefaultPromise@97e93f1(success: [id: 0x1ebe331c,L:/127.0.0.1:55386 - R:localhost/127.0.0.1:8888]) SimpleChannelPool{activeConnections=1}
  2. 22:13:19.773 [reactor-tcp-nio-4] DEBUG reactor.ipc.netty.channel.PooledClientContextHandler - Acquired active channel: [id: 0x1ebe331c,L:/127.0.0.1:55386 - R:localhost/127.0.0.1:8888]
  3. 22:13:25.291 [reactor-tcp-nio-4] DEBUG reactor.ipc.netty.channel.ChannelOperations - [Channel] [id: 0x1ebe331c,L:/127.0.0.1:55386 - R:localhost/127.0.0.1:8888] handler is being applied: com.example.demo.TcpTest$$Lambda$7/1541049864@41d1fa89
  4. 22:15:17.748 [reactor-tcp-nio-4] DEBUG reactor.ipc.netty.channel.ChannelOperationsHandler - [id: 0x1ebe331c,L:/127.0.0.1:55386 - R:localhost/127.0.0.1:8888] Writing object FluxMapFuseable
  5. 22:15:21.719 [reactor-tcp-nio-4] DEBUG io.netty.util.Recycler - -dio.netty.recycler.maxCapacityPerThread: 32768
  6. 22:15:21.719 [reactor-tcp-nio-4] DEBUG io.netty.util.Recycler - -dio.netty.recycler.maxSharedCapacityFactor: 2
  7. 22:15:21.719 [reactor-tcp-nio-4] DEBUG io.netty.util.Recycler - -dio.netty.recycler.linkCapacity: 16
  8. 22:15:21.719 [reactor-tcp-nio-4] DEBUG io.netty.util.Recycler - -dio.netty.recycler.ratio: 8
  9. 22:15:21.742 [reactor-tcp-nio-4] DEBUG io.netty.buffer.AbstractByteBuf - -dio.netty.buffer.bytebuf.checkAccessible: true
  10. 22:15:21.756 [reactor-tcp-nio-4] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@5c2a00d6
  11. 22:15:23.010 [reactor-tcp-nio-4] DEBUG reactor.ipc.netty.tcp.TcpClient - [id: 0x1ebe331c,L:/127.0.0.1:55386 - R:localhost/127.0.0.1:8888] WRITE: 12B
  12. +-------------------------------------------------+
  13. | 0 1 2 3 4 5 6 7 8 9 a b c d e f |
  14. +--------+-------------------------------------------------+----------------+
  15. |00000000| 48 65 6c 6c 6f 20 57 6f 72 6c 64 21 |Hello World! |
  16. +--------+-------------------------------------------------+----------------+
  17. 22:15:25.042 [reactor-tcp-nio-4] DEBUG reactor.ipc.netty.tcp.TcpClient - [id: 0x1ebe331c,L:/127.0.0.1:55386 - R:localhost/127.0.0.1:8888] FLUSH
  18. 22:15:27.861 [reactor-tcp-nio-4] DEBUG reactor.ipc.netty.channel.FluxReceive - [id: 0x1ebe331c,L:/127.0.0.1:55386 - R:localhost/127.0.0.1:8888] Subscribing inbound receiver [pending: 0,cancelled:false,inboundDone: false]
  19. 22:15:27.864 [reactor-tcp-nio-4] INFO reactor.Mono.Next.1 - onSubscribe(MonoNext.NextSubscriber)
  20. 22:15:27.869 [reactor-tcp-nio-4] INFO reactor.Mono.Next.1 - request(unbounded)
  21. 22:15:32.557 [reactor-tcp-nio-4] DEBUG reactor.ipc.netty.tcp.TcpClient - [id: 0x1ebe331c,L:/127.0.0.1:55386 - R:localhost/127.0.0.1:8888] READ: 12B
  22. +-------------------------------------------------+
  23. | 0 1 2 3 4 5 6 7 8 9 a b c d e f |
  24. +--------+-------------------------------------------------+----------------+
  25. |00000000| 48 65 6c 6c 6f 20 57 6f 72 6c 64 21 |Hello World! |
  26. +--------+-------------------------------------------------+----------------+
  27. 22:15:34.292 [reactor-tcp-nio-4] INFO reactor.Mono.Next.1 - onNext(Hello World!)
  28. 22:15:34.292 [reactor-tcp-nio-4] INFO reactor.Mono.Next.1 - onComplete()
  29. 22:15:34.293 [reactor-tcp-nio-4] DEBUG reactor.ipc.netty.channel.ChannelOperations - [Channel] [id: 0x1ebe331c,L:/127.0.0.1:55386 - R:localhost/127.0.0.1:8888] User Handler requesting close connection
  30. 22:15:34.296 [reactor-tcp-nio-4] DEBUG reactor.ipc.netty.tcp.TcpClient - [id: 0x1ebe331c,L:/127.0.0.1:55386 - R:localhost/127.0.0.1:8888] USER_EVENT: [Handler Terminated]
  31. 22:15:34.296 [reactor-tcp-nio-4] DEBUG reactor.ipc.netty.channel.ChannelOperationsHandler - [id: 0x1ebe331c,L:/127.0.0.1:55386 - R:localhost/127.0.0.1:8888] Disposing context reactor.ipc.netty.channel.PooledClientContextHandler@28add41a
  32. 22:15:34.296 [reactor-tcp-nio-4] DEBUG reactor.ipc.netty.channel.PooledClientContextHandler - Releasing channel: [id: 0x1ebe331c,L:/127.0.0.1:55386 - R:localhost/127.0.0.1:8888]
  33. 22:15:34.297 [reactor-tcp-nio-4] DEBUG reactor.ipc.netty.tcp.TcpClient - [id: 0x1ebe331c,L:/127.0.0.1:55386 - R:localhost/127.0.0.1:8888] CLOSE
  34. 22:15:35.967 [reactor-tcp-nio-4] DEBUG reactor.ipc.netty.resources.DefaultPoolResources - Released [id: 0x1ebe331c,L:/127.0.0.1:55386 ! R:localhost/127.0.0.1:8888],now 0 active connections
  35. 22:15:35.968 [reactor-tcp-nio-4] DEBUG reactor.ipc.netty.tcp.TcpClient - [id: 0x1ebe331c,L:/127.0.0.1:55386 ! R:localhost/127.0.0.1:8888] READ COMPLETE
  36. 22:15:35.969 [reactor-tcp-nio-4] DEBUG reactor.ipc.netty.tcp.TcpClient - [id: 0x1ebe331c,L:/127.0.0.1:55386 ! R:localhost/127.0.0.1:8888] INACTIVE
  37. 22:15:35.969 [reactor-tcp-nio-4] DEBUG reactor.ipc.netty.tcp.TcpClient - [id: 0x1ebe331c,L:/127.0.0.1:55386 ! R:localhost/127.0.0.1:8888] UNREGISTERED
注意这里channel的状态/操作变迁是ACTIVE->WRITE->FLUSH->READ->CLOSE->Released->READ COMPLETE->INACTIVE->UNREGISTERED

小结

  • 可以看到TcpClient.newHandler在subscribe的时候触发Lambda表达式触发建立连接,最后调用的是Bootstrap.connect
  • 而Bootstrap.connect则调用了MultithreadEventLoopGroup.register(channel)方法,其最后转为DefaultChannelPromise通过AbstractChannel$AbstractUnsafe来register
  • AbstractChannel$AbstractUnsafe则是调用了taskQueue.offer(task),来将这个register0(promise)放入eventLoop的taskQueue中
taskQueue是LinkedBlockingQueue,其大小是由DEFAULT_MAX_PENDING_TASKS这个参数来指定: Math.max(16,SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks",Integer.MAX_VALUE)),默认是Integer.MAX_VALUE,也就是相当于无界了。

doc

猜你在找的React相关文章