我正在
Java的套接字上实现一个面向事件的层,我想知道是否有办法确定是否有待读取的数据.
我的常规方法是从套接字读入缓冲区,并在缓冲区填充给定量的字节时调用提供的回调(如果每次到达时都需要触发回调,则可以为0),但是我怀疑Java已经在为我做缓冲了.
InputStream的available()方法是否可靠?我应该只读()并在Socket上做自己的缓冲吗?或者还有另一种方式吗?
解决方法
不久,没有. available()不可靠(至少不适合我).我建议使用与Selector和SelectionKey连接的java.nio.channels.SocketChannel.这个解决方案有点基于事件,但比普通的套接字更复杂.
对于客户:
>构造套接字通道(套接字),打开一个选择器(selector = Selector.open();).
>使用非阻塞socket.configureBlocking(false);
>为socket连接注册选择器socket.register(selector,SelectionKey.OP_CONNECT);
>连接socket.connect(新的InetSocketAddress(主机,端口));
>看看是否有新的selector.select();
>如果“new”表示连接成功,则注册选择器OP_READ;如果“new”指的是可用的数据,只需从套接字读取即可.
但是,为了使它具有异步性,您需要设置一个单独的线程(尽管套接字被创建为非阻塞,但线程仍会阻塞),它会检查是否已经到达某些东西.
对于服务器,有ServerSocketChannel,您可以使用OP_ACCEPT.
private Thread readingThread = new ListeningThread(); /** * Listening thread - reads messages in a separate thread so the application does not get blocked. */ private class ListeningThread extends Thread { public void run() { running = true; try { while(!close) listen(); messenger.close(); } catch(ConnectException ce) { doNotifyConnectionFailed(ce); } catch(Exception e) { // e.printStackTrace(); messenger.close(); } running = false; } } /** * Connects to host and port. * @param host Host to connect to. * @param port Port of the host machine to connect to. */ public void connect(String host,int port) { try { SocketChannel socket = SocketChannel.open(); socket.configureBlocking(false); socket.register(this.selector,SelectionKey.OP_CONNECT); socket.connect(new InetSocketAddress(host,port)); } catch(IOException e) { this.doNotifyConnectionFailed(e); } } /** * Waits for an event to happen,processes it and then returns. * @throws IOException when something goes wrong. */ protected void listen() throws IOException { // see if there are any new things going on this.selector.select(); // process events Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while(iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); // check validity if(key.isValid()) { // if connectable... if(key.isConnectable()) { // ...establish connection,make messenger,and notify everyone SocketChannel client = (SocketChannel)key.channel(); // now this is tricky,registering for OP_READ earlier causes the selector not to wait for incoming bytes,which results in 100% cpu usage very,very fast if(client!=null && client.finishConnect()) { client.register(this.selector,SelectionKey.OP_READ); } } // if readable,tell messenger to read bytes else if(key.isReadable() && (SocketChannel)key.channel()==this.messenger.getSocket()) { // read message here } } } } /** * Starts the client. */ public void start() { // start a reading thread if(!this.running) { this.readingThread = new ListeningThread(); this.readingThread.start(); } } /** * Tells the client to close at nearest possible moment. */ public void close() { this.close = true; }
对于服务器:
/** * Constructs a server. * @param port Port to listen to. * @param protocol Protocol of messages. * @throws IOException when something goes wrong. */ public ChannelMessageServer(int port) throws IOException { this.server = ServerSocketChannel.open(); this.server.configureBlocking(false); this.server.socket().bind(new InetSocketAddress(port)); this.server.register(this.selector,SelectionKey.OP_ACCEPT); } /** * Waits for event,then exits. * @throws IOException when something goes wrong. */ protected void listen() throws IOException { // see if there are any new things going on this.selector.select(); // process events Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while(iter.hasNext()) { SelectionKey key = iter.next(); // do something with the connected socket iter.remove(); if(key.isValid()) this.process(key); } } /** * Processes a selection key. * @param key SelectionKey. * @throws IOException when something is wrong. */ protected void process(SelectionKey key) throws IOException { // if incoming connection if(key.isAcceptable()) { // get client SocketChannel client = (((ServerSocketChannel)key.channel()).accept()); try { client.configureBlocking(false); client.register(this.selector,SelectionKey.OP_READ); } catch(Exception e) { // catch } } // if readable,tell messenger to read else if(key.isReadable()) { // read } }
希望这可以帮助.