Mina 2.0.9-详解3-Reactor

前端之家收集整理的这篇文章主要介绍了Mina 2.0.9-详解3-Reactor前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。

Reactor

在讲解Mina之前,我们先了解下Reactor模式,从一个简单例子入手,这样做的原因是,通过简单例子,我们就能快准狠地了解Mina的内部原理。

上一遍讲了Reacto相关的概念。其实主要有个三个概念:
1. 事件
2. 事件分发器
3. 事件处理器

事件主要是指各种IO事件,在通讯这一层主要有 accept,read,write 三个IO事件;
事件分发器主要工作是接收事件并派发给事件处理器来处理,起调度中心的作用;
事件处理器主要负责业务的逻辑处理,是我们最关注的部分;

服务端

下面的代码是服务端代码示例,主线程负责accept事件和事件派发工作,收到一个client连接后,会生成一个SocketReadHanler 任务,同时绑定read事件到这个SocketReadHanler上。

public class Reactor implements Runnable {

    //selector
    private final Selector            selector;
    //socket
    private final ServerSocketChannel serverSocketChannel;
    //线程池
    private final ExecutorService     service = Executors
                                                  .newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 10);

    /** * @throws IOException * */
    public Reactor(int port) throws IOException {
        selector = Selector.open();
        serverSocketChannel = ServerSocketChannel.open();
        InetSocketAddress address = new InetSocketAddress("localhost",port);
        serverSocketChannel.socket().bind(address);

        serverSocketChannel.configureBlocking(false);
        //register the channel 
        SelectionKey selectionKey = serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
        System.out.println("--> start serverSocket.register");

        //attache the acceptor
        selectionKey.attach(new Acceptor());
        System.out.println("-->attach new Acceptor");

    }

    /** * 事件到到后触发handler执行 */
    class Acceptor implements Runnable {

        /** * @see java.lang.Runnable#run() */
        @Override
        public void run() {
            try {
                System.out.println("-->ready for accept");
                SocketChannel channel = serverSocketChannel.accept();
                if (channel != null) {
                    new SocketReadHanler(selector,channel);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }

        }

    }

    /** * @see java.lang.Runnable#run() */
    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                selector.select();
                Set<SelectionKey> keys = selector.selectedKeys();
                Iterator<SelectionKey> it = keys.iterator();
                //it will execute if read event or accept event occurs
                while (it.hasNext()) {
                    //when one event comes,it will trigger the accept thread in the first time and trigger the SockerReadHandler after the first time
                    SelectionKey selectionKey = it.next();
                    dispatch(selectionKey);

                }
                keys.clear();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /** * event dispatcher,* 不同的handler绑定不同IO事件, * 这样事件到达后直接触发handler执行 * * @param key */
    void dispatch(SelectionKey key) {
        Runnable runnable = (Runnable) key.attachment();

        //放到线程池中,执行某个job
        Thread thread = new Thread(runnable,"thread_" + System.currentTimeMillis());
        service.execute(thread);

    }

}

SocketReadHanler 负责业务逻辑处理,它会绑定read监听事件到selector上。

public class SocketReadHanler implements Runnable {

    private final SocketChannel channel;
    private final SelectionKey  key;

    private static final int    READING = 0;
    int                         state   = READING;

    /** * 绑定IO事件到具体handler * @param sel * @param channel * @throws IOException */
    public SocketReadHanler(Selector sel,SocketChannel channel) throws IOException {
        this.channel = channel;
        this.channel.configureBlocking(false);
        key = this.channel.register(sel,0);

        //将SelectionKey绑定为本Handler 下一步有事件触发时,将调用本类的run方法
        //binding
        key.attach(this);
        //read
        key.interestOps(SelectionKey.OP_READ);
        sel.wakeup();
    }

    /** * @see java.lang.Runnable#run() */
    @Override
    public void run() {
        try {
            ByteBuffer clientBuffer = ByteBuffer.allocate(4096);
            if (key.isReadable()) { // 读信息 
                System.out.println("read:" + Thread.currentThread().getName());
                int count = channel.read(clientBuffer); // 将数据读入clientBuffer 
                if (count > 0) { // 当有数据读入时 
                    System.out.println("wait 30s");
                    Thread.sleep(30000);
                    clientBuffer.flip(); // 反转此缓冲区
                    // 如果需要,对缓冲区中的字符进行解码 
                    //处理数据
                    System.out.println(getChars(clientBuffer.array()));

                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private char[] getChars(byte[] bytes) {//将字节转为字符(解码)
        Charset cs = Charset.forName("UTF-8");
        ByteBuffer bb = ByteBuffer.allocate(bytes.length);
        bb.put(bytes);
        bb.flip();
        CharBuffer cb = cs.decode(bb);

        return cb.array();
    }

}

main函数

public class ReactorTest {

    public static void main(String[] args) {
        try {
            Reactor reactor = new Reactor(9527);
            reactor.run();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
}

从上面可以看出,各个io事件都绑定到一个selector上,不同的io事件到达后触发不同的handler处理。handler线程放到一个线程池中,保证线程的数量不会太大。

猜你在找的React相关文章