前端之家收集整理的这篇文章主要介绍了
Reactor && Netty,
前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
http://ifeve.com/netty-reactor-4/
1、Reactor的由来
Reactor是一种广泛应用在服务器端开发的设计模式。Reactor中文大多译为“反应堆”,从Reactor的兄弟“Proactor”(多译为前摄器)就能看得出来,这两个词的中文翻译其实都不是太好,不够形象。实际上,Reactor模式又有别名“Dispatcher”或者“Notifier”,我觉得这两个都更加能表明它的本质。
那么,Reactor模式究竟是个什么东西呢?这要从事件驱动的开发方式说起。我们知道,对于应用服务器,一个主要规律就是,cpu的处理速度是要远远快于IO速度的,如果cpu为了IO操作(例如从Socket读取一段数据)而阻塞显然是不划算的。好一点的方法是分为多进程或者线程去进行处理,但是这样会带来一些进程切换的开销,试想一个进程一个数据读了500ms,期间进程切换到它3次,但是cpu却什么都不能干,就这么切换走了,是不是也不划算?
这时先驱们找到了事件驱动,或者叫回调的方式,来完成这件事情。这种方式就是,应用业务向一个中间人注册一个回调(event handler),当IO就绪后,就这个中间人产生一个事件,并通知此handler进行处理。这种回调的方式,也体现了“好莱坞原则”(Hollywood principle)-“Don’t call us,we’ll call you”,在我们熟悉的IoC中也有用到。看来软件开发真是互通的!
好了,我们现在来看Reactor模式。在前面事件驱动的例子里有个问题:我们如何知道IO就绪这个事件,谁来充当这个中间人?Reactor模式的答案是:由一个不断等待和循环的单独进程(线程)来做这件事,它接受所有handler的注册,并负责先操作系统查询IO是否就绪,在就绪后就调用指定handler进行处理,这个角色的名字就叫做Reactor。
2、Reactor与NIO
Java中的NIO可以很好的和Reactor模式结合。关于NIO中的Reactor模式,我想没有什么资料能比Doug Lea大神(不知道Doug Lea?看看JDK集合包和并发包的作者吧)在《Scalable IO in Java》解释的更简洁和全面了。NIO中Reactor的核心是Selector
,我写了一个简单的Reactor示例,这里我贴一个核心的Reactor的循环(这种循环结构又叫做EventLoop
),剩余代码在这里。
01 |
public void run() { |
@H_301_76@
02 |
try { |
@H_301_76@
03 |
while(!Thread.interrupted()) { |
@H_301_76@
04 |
selector.select(); |
@H_301_76@
05 |
Set selected = selector.selectedKeys(); |
@H_301_76@
06 |
Iterator it = selected.iterator(); |
@H_301_76@
07 |
(it.hasNext()) |
@H_301_76@
08 |
dispatch((SelectionKey) (it.next())); |
@H_301_76@
09 |
selected.clear(); |
@H_301_76@
10 |
} |
@H_301_76@
11 |
}catch (IOException ex) { |
@H_301_76@
12 |
13 |
} |
@H_301_76@
3、与Reactor相关的其他概念
前面提到了Proactor模式,这又是什么呢?简单来说,Reactor模式里,操作系统只负责通知IO就绪,具体的IO操作(例如读写)仍然是要在业务进程里阻塞的去做的,而Proactor模式则更进一步,由操作系统将IO操作执行好(例如读取,会将数据直接读到内存buffer中),而handler只负责处理自己的逻辑,真正做到了IO与程序处理异步执行。所以我们一般又说Reactor是同步IO,Proactor是异步IO。
关于阻塞和非阻塞、异步和非异步,以及UNIX底层的机制,大家可以看看这篇文章IO – 同步,异步,阻塞,非阻塞 (亡羊补牢篇),以及陶辉(《深入理解Nginx》的作者)《高性能网络编程》的系列。
1、多线程下的Reactor
讲了一堆Reactor,我们回到Netty。在《Scalable IO in Java》中讲到了一种多线程下的Reactor模式。在这个模式里,mainReactor只有一个,负责响应client的连接请求,并建立连接,它使用一个NIO Selector;subReactor可以有一个或者多个,每个subReactor都会在一个独立线程中执行,并且维护一个独立的NIO Selector。
这样的好处很明显,因为subReactor也会执行一些比较耗时的IO操作,例如消息的读写,使用多个线程去执行,则更加有利于发挥cpu的运算能力,减少IO等待时间。
2、Netty中的Reactor与NIO
好了,了解了多线程下的Reactor模式,我们来看看Netty吧(以下部分主要针对NIO,OIO部分更加简单一点,不重复介绍了)。Netty里对应mainReactor的角色叫做“Boss”,而对应subReactor的角色叫做”Worker”。Boss负责分配请求,Worker负责执行,好像也很贴切!以TCP的Server端为例,这两个对应的实现类分别为NioServerBoss
和NioWorker
(Server和Client的Worker没有区别,因为建立连接之后,双方就是对等的进行传输了)。
Netty 3.7中Reactor的EventLoop在AbstractNioSelector.run()
中,它实现了Runnable
接口。这个类是Netty NIO部分的核心。它的逻辑非常复杂,其中还包括一些对JDK Bug的处理(例如rebuildSelector
),刚开始读的时候不需要深入那么细节。我精简了大部分代码,保留主干如下:
abstract
class
AbstractNioSelector
implements
NioSelector {
@H_
301_76@
|
@H_301_76@
|
@H_301_76@
protectedvolatile Selector selector; |
@H_301_76@
|
@H_301_76@
//内部任务队列 |
@H_301_76@
privatefinal Queue taskQueue = new ConcurrentLinkedQueue(); |
@H_301_76@
//selector循环 |
@H_301_76@
run() { |
@H_301_76@
for(;;) { |
@H_301_76@
//处理内部任务队列 |
@H_301_76@
14 |
processTaskQueue(); |
@H_301_76@
15 |
//处理selector事件对应逻辑 |
@H_301_76@
16 |
process(selector); |
@H_301_76@
17 |
(Throwable t) { |
@H_301_76@
18 |
19 |
Thread.sleep(1000 ); |
@H_301_76@
20 |
(InterruptedException e) { |
@H_301_76@
21 |
// Ignore. |
@H_301_76@
22 |
23 |
} |
@H_301_76@
24 |
25 |
26 |
27 |
processTaskQueue() { |
@H_301_76@
28 |
(;;) { |
@H_301_76@
29 |
Runnable task = taskQueue.poll(); |
@H_301_76@
30 |
if(task == null ) { |
@H_301_76@
31 |
break; |
@H_301_76@
32 |
33 |
task.run(); |
@H_301_76@
34 |
35 |
36 |
37 |
process(Selector selector)throws IOException; |
@H_301_76@
其中process是主要的处理事件的逻辑,例如在AbstractNioWorker
中,处理逻辑如下:
Set selectedKeys = selector.selectedKeys(); |
@H_301_76@
(selectedKeys.isEmpty()) { |
@H_301_76@
return; |
@H_301_76@
(Iterator i = selectedKeys.iterator(); i.hasNext();) { |
@H_301_76@
SelectionKey k = i.next(); |
@H_301_76@
i.remove(); |
@H_301_76@
{ |
@H_301_76@
intreadyOps = k.readyOps(); |
@H_301_76@
((readyOps & SelectionKey.OP_READ) !=0 || readyOps == ) { |
@H_301_76@
(!read(k)) { |
@H_301_76@
// Connection already closed - no need to handle write. |
@H_301_76@
continue((readyOps & SelectionKey.OP_WRITE) != writeFromSelectorLoop(k); |
@H_301_76@
(CancelledKeyException e) { |
@H_301_76@
close(k); |
@H_301_76@
(cleanUpCancelledKeys()) { |
@H_301_76@
; |
@H_301_76@
} |
@H_301_76@
这不就是第二部分提到的selector经典用法了么?
在4.0之后,作者觉得NioSelector
这个叫法,以及区分NioBoss
和NioWorker
的做法稍微繁琐了点,干脆就将这些合并成了NioEventLoop
,从此这两个角色就不做区分了。我倒是觉得新版本的会更优雅一点。
3、Netty中的多线程
下面我们来看Netty的多线程部分。一旦对应的Boss或者Worker启动,就会分配给它们一个线程去一直执行。对应的概念为BossPool
和WorkerPool
。对于每个NioServerSocketChannel
,Boss的Reactor有一个线程,而Worker的线程数由Worker线程池大小决定,但是默认最大不会超过cpu核数*2,当然,这个参数可以通过NioServerSocketChannelFactory
构造函数的参数来设置。
1
publicNioServerSocketChannelFactory( |
@H_
301_76@
2 |
Executor bossExecutor,Executor workerExecutor, |
@H_301_76@
3 |
workerCount) { |
@H_301_76@
4 |
this(bossExecutor, 1 ,workerExecutor,workerCount); |
@H_301_76@
5 |
最后我们比较关心一个问题,我们之前ChannlePipeline 中的ChannleHandler是在哪个线程执行的呢?答案是在Worker线程里执行的,并且会阻塞Worker的EventLoop。例如,在NioWorker 中,读取消息完毕之后,会触发MessageReceived 事件,这会使得Pipeline中的handler都得到执行。
booleanread(SelectionKey k) { |
@H_301_76@
.... |
@H_301_76@
(readBytes >// Fire the event. |
@H_301_76@
fireMessageReceived(channel,buffer); |
@H_301_76@
|