本文可看成是对Doug Lea Scalable IO in Java一文的翻译。
当前分布式计算 Web Services盛行天下,这些网络服务的底层都离不开对socket的操作。他们都有一个共同的结构:
1. Read request
2. Decode request
3. Process service
4. Encode reply
5. Send reply
经典的网络服务的设计如下图,在每个线程中完成对数据的处理:
但这种模式在用户负载增加时,性能将下降非常的快。我们需要重新寻找一个新的方案,保持数据处理的流畅,很显然,事件触发机制是最好的解决办法,当有事件发生时,会触动handler,然后开始数据的处理。
Reactor模式类似于AWT中的Event处理:
Reactor模式参与者
1.Reactor 负责响应IO事件,一旦发生,广播发送给相应的Handler去处理,这类似于AWT的thread
2.Handler 是负责非堵塞行为,类似于AWT ActionListeners;同时负责将handlers与event事件绑定,类似于AWT addActionListener
如图:
Java的NIO为reactor模式提供了实现的基础机制,它的Selector当发现某个channel有数据时,会通过SlectorKey来告知我们,在此我们实现事件和handler的绑定。
我们来看看Reactor模式代码:
- publicclassReactorimplementsRunnable{
- finalSelectorselector;
- finalServerSocketChannelserverSocket;
- Reactor(intport)throwsIOException{
- selector=Selector.open();
- serverSocket=ServerSocketChannel.open();
- InetSocketAddressaddress=newInetSocketAddress(InetAddress.getLocalHost(),port);
- serverSocket.socket().bind(address);
- serverSocket.configureBlocking(false);
- //向selector注册该channel
- SelectionKeysk=serverSocket.register(selector,SelectionKey.OP_ACCEPT);
- logger.debug("-->StartserverSocket.register!");
- //利用sk的attache功能绑定Acceptor如果有事情,触发Acceptor
- sk.attach(newAcceptor());
- logger.debug("-->attach(newAcceptor()!");
- }
- voidrun(){//normallyinanewThread
- try{
- while(!Thread.interrupted())
- {
- selector.select();
- Setselected=selector.selectedKeys();
- Iteratorit=selected.iterator();
- //Selector如果发现channel有OP_ACCEPT或READ事件发生,下列遍历就会进行。
- while(it.hasNext())
- //来一个事件第一次触发一个accepter线程
- //以后触发SocketReadHandler
- dispatch((SelectionKey)(it.next()));
- selected.clear();
- }
- }catch(IOExceptionex){
- logger.debug("reactorstop!"+ex);
- }
- //运行Acceptor或SocketReadHandler
- voiddispatch(SelectionKeyk){
- Runnabler=(Runnable)(k.attachment());
- if(r!=null){
- //r.run();
- classAcceptorimplementsRunnable{//inner
- voidrun(){
- logger.debug("-->readyforaccept!");
- SocketChannelc=serverSocket.accept();
- if(c!=null)
- //调用Handler来处理channel
- newSocketReadHandler(selector,c);
- logger.debug("acceptstop!"+ex);
- }
以上代码中巧妙使用了SocketChannel的attach功能,将Hanlder和可能会发生事件的channel链接在一起,当发生事件时,可以立即触发相应链接的Handler。
再看看Handler代码:
将数据读出后,可以将这些数据处理线程做成一个线程池,这样,数据读出后,立即扔到线程池中,这样加速处理速度:
进一步,我们可以使用多个Selector分别处理连接和读事件。一个高性能的Java网络服务机制就要形成,激动人心的集群并行计算即将实现。
原文链接:https://www.f2er.com/react/308284.html