Java NIO-非阻塞通信

    xiaoxiao2021-03-25  131

    Java NIO-非阻塞通信

    标签: 非阻塞通信JavaSocketNIOJava网络编程 3318人阅读 评论(0) 收藏 举报 分类: Network Programming(6)

    NIO(Non-block IO)指非阻塞通信,相对于其编程的复杂性,通常客户端并不需要使用非阻塞通信以提高性能,故这里只有服务端使用非阻塞通信方式实现

    客户端:

    [java] view plain copy package com.test.client;    import java.io.DataInputStream;  import java.io.DataOutputStream;  import java.io.IOException;  import java.net.InetAddress;  import java.net.InetSocketAddress;  import java.nio.channels.SocketChannel;    import org.apache.log4j.Logger;    import com.test.util.SocketIO;    public class Client {      static Logger logger = Logger.getLogger(Client.class);      private int port = 10000;      private SocketChannel socketChannel;            public Client(){          try {              socketChannel = SocketChannel.open();              InetAddress host = InetAddress.getLocalHost();              InetSocketAddress addr = new InetSocketAddress(host, port);                            socketChannel.connect(addr);                            logger.debug("***");              logger.debug("client ip:"+socketChannel.socket().getLocalAddress());              logger.debug("client port:"+socketChannel.socket().getLocalPort());              logger.debug("server ip:"+socketChannel.socket().getInetAddress());              logger.debug("server port:"+socketChannel.socket().getPort());              logger.debug("***");          } catch (IOException e) {              e.printStackTrace();              logger.error("Cilent socket establish failed!");          }          logger.info("Client socket establish success!");      }            public void request(String request){          try{              DataInputStream input = SocketIO.getInput(socketChannel.socket());              DataOutputStream output = SocketIO.getOutput(socketChannel.socket());                            if(null != request && !request.equals("")){                  byte[] bytes = request.getBytes("utf-8");                  output.write(bytes);                            bytes = new byte[64];                  int num = input.read(bytes);                  byte[] answer = new byte[num];                  System.arraycopy(bytes, 0, answer, 0, num);                  if(num > 0){                      logger.info("server answer:"+new String(answer,"utf-8"));                  }else{                      logger.info("No server answer.");                  }              }          }catch(Exception e){              e.printStackTrace();              logger.error("client request error");          }finally{              if(null != socketChannel){                  try{                      socketChannel.close();                  }catch(Exception e){                      e.printStackTrace();                      logger.error("socket close error");                  }              }          }      }            public static void main(String[] args){          Client client1 = new Client();          //Client client2 = new Client();          client1.request("your name?");          //client2.request("your name?");      }  }  

    服务端:

    [java] view plain copy package com.test.server;    import java.net.InetSocketAddress;  import java.nio.ByteBuffer;  import java.nio.channels.SelectionKey;  import java.nio.channels.Selector;  import java.nio.channels.ServerSocketChannel;  import java.nio.channels.SocketChannel;  import java.nio.charset.Charset;  import java.util.Iterator;  import java.util.Set;    import org.apache.log4j.Logger;    public class Server {      static Logger logger = Logger.getLogger(Server.class);      private Selector selector;      private ServerSocketChannel serverSocketChannel;      private int queueNum = 10;      private int bindPort = 10000;      private int step = 0;      private Charset charset = Charset.forName("utf-8");      private ByteBuffer buffer = ByteBuffer.allocate(64);            public Server(){          try{              //为ServerSocketChannel监控接收连接就绪事件              //为SocketChannel监控连接就绪事件、读就绪事件以及写就绪事件              selector = Selector.open();              //作用相当于传统通信中的ServerSocket              //支持阻塞模式和非阻塞模式              serverSocketChannel = ServerSocketChannel.open();              serverSocketChannel.socket().setReuseAddress(true);              //非阻塞模式              serverSocketChannel.configureBlocking(false);              //serverSocketChannel.socket()会获得一个和当前信道相关联的socket              serverSocketChannel.socket().bind(new InetSocketAddress(bindPort),queueNum);                            //注册接收连接就绪事件              //注册事件后会返回一个SelectionKey对象用以跟踪注册事件句柄              //该SelectionKey将会放入Selector的all-keys集合中,如果相应的事件触发              //该SelectionKey将会放入Selector的selected-keys集合中              serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);          }catch(Exception e){              e.printStackTrace();              logger.error("Server establish error!");          }          logger.info("Server start up!");      }        public void service() throws Exception{          //判断是否有触发事件          while(selector.select() > 0){              Set<SelectionKey> selectedKeys = selector.selectedKeys();              Iterator<SelectionKey> iterator = selectedKeys.iterator();                            while(iterator.hasNext()){                  SelectionKey selectionKey = iterator.next();                  //处理事件后将事件从Selector的selected-keys集合中删除                  iterator.remove();                  try{                      if(selectionKey.isAcceptable()){                          this.Acceptable(selectionKey);                      }else if(selectionKey.isReadable()){                          this.Readable(selectionKey);                      }else if(selectionKey.isWritable()){                          this.Writable(selectionKey);                      }                  }catch(Exception e){                      e.printStackTrace();                      logger.error("event deal exception!");                  }              }          }      }            private void Acceptable(SelectionKey selectionKey) throws Exception{          logger.info("accept:"+(++step));                    ServerSocketChannel ssc = (ServerSocketChannel)selectionKey.channel();          SocketChannel sc = (SocketChannel)ssc.accept();                    sc.configureBlocking(false);          sc.register(selector, SelectionKey.OP_READ);                    logger.info(selectionKey.hashCode());      }            private void Readable(SelectionKey selectionKey) throws Exception{          logger.info("read:"+(++step));                    SocketChannel sc = (SocketChannel)selectionKey.channel();                    buffer.clear();          int num = sc.read(buffer);          String request = "";          if(num > 0){              buffer.flip();                            request = charset.decode(buffer).toString();              sc.register(selector, SelectionKey.OP_WRITE,request);          }else{              sc.close();          }                    logger.info(selectionKey.hashCode()+":"+request);      }            private void Writable(SelectionKey selectionKey) throws Exception{          logger.info("write:"+(++step));                    String request = (String)selectionKey.attachment();          SocketChannel sc = (SocketChannel)selectionKey.channel();                    String answer = "not supported";          if(request.equals("your name?")){              answer = "server";          }                    logger.info(selectionKey.hashCode()+":"+answer);                    buffer.clear();          buffer.put(charset.encode(answer));          buffer.flip();          while(buffer.hasRemaining())              sc.write(buffer);                    sc.close();      }            public static void main(String[] args) {          Server server = new Server();          try{              server.service();          }catch(Exception e){              e.printStackTrace();              logger.error("Server run exception!");          }      }  }  

    IO工具类:

    [java] view plain copy package com.test.util;    import java.io.DataInputStream;  import java.io.DataOutputStream;  import java.io.IOException;  import java.io.InputStream;  import java.io.OutputStream;  import java.net.Socket;    public class SocketIO{      public static DataInputStream getInput(Socket socket) throws IOException{          //接收缓存区大小,socket获取输入流之前设置          socket.setReceiveBufferSize(10);          InputStream input = socket.getInputStream();          return new DataInputStream(input);      }            public static DataOutputStream getOutput(Socket socket) throws IOException{          //发送缓存区大小,socket获取输出流之前设置          socket.setSendBufferSize(10);          OutputStream output = socket.getOutputStream();          return new DataOutputStream(output);      }  }  

    log4j日志配置文件:

    [plain] view plain copy log4j.rootLogger=debug,logOutput    log console out put   log4j.appender.logOutput=org.apache.log4j.ConsoleAppender  log4j.appender.logOutput.layout=org.apache.log4j.PatternLayout  log4j.appender.logOutput.layout.ConversionPattern=%p%d{[yy-MM-dd HH:mm:ss]}[%c] -> %m%n  

    server端的运行结果:

    [plain] view plain copy INFO[13-10-16 11:40:41][com.test.server.Server] -> Server start up!  INFO[13-10-16 11:40:53][com.test.server.Server] -> accept:1  INFO[13-10-16 11:41:14][com.test.server.Server] -> 20469344  INFO[13-10-16 11:41:21][com.test.server.Server] -> read:2  INFO[13-10-16 11:41:37][com.test.server.Server] -> 11688861:your name?  INFO[13-10-16 11:43:00][com.test.server.Server] -> write:3  INFO[13-10-16 11:43:00][com.test.server.Server] -> 11688861:server  

    可以看到readable方法中的SelectionKey和writable方法中的SelectionKey的哈希码是完全相同的,是同一个SelectionKey

    SelectionKey是在SocketChannel类或ServerSocketChannel类注册要监控的事件时产生的,这两个类本身并没有register方法,需要查看它们共同父类AbstractSelectableChannel(只有关键代码):

    [java] view plain copy public abstract class AbstractSelectableChannel          extends SelectableChannel{      ......      // Keys that have been created by registering this channel with selectors.      // They are saved because if this channel is closed the keys must be      // deregistered.  Protected by keyLock.      private SelectionKey[] keys = null;        public final SelectionKey register(Selector sel, int ops, Object att)              throws ClosedChannelException{          if (!isOpen())              throw new ClosedChannelException();          if ((ops & ~validOps()) != 0)              throw new IllegalArgumentException();          synchronized (regLock) {              if (blocking)                  throw new IllegalBlockingModeException();              SelectionKey k = findKey(sel);              if (k != null) {                  k.interestOps(ops);                  k.attach(att);              }              if (k == null) {                  // New registration                  k = ((AbstractSelector)sel).register(this, ops, att);                  addKey(k);              }              return k;          }      }        private SelectionKey findKey(Selector sel) {          synchronized (keyLock) {              if (keys == null)                  return null;              for (int i = 0; i < keys.length; i++)                  if ((keys[i] != null) && (keys[i].selector() == sel))                      return keys[i];              return null;          }      }        void removeKey(SelectionKey k) {            // package-private          synchronized (keyLock) {              for (int i = 0; i < keys.length; i++)              if (keys[i] == k) {                  keys[i] = null;                  keyCount--;              }              ((AbstractSelectionKey)k).invalidate();          }      }      ......  }  

    ServerSocketChannel和Socketchannel向Selector中注册了特定事件,Selector就会监控这些事件是否发生。ServerSocketChannel和Socketchannel都为AbstractSelectableChannel类的子类,AbstractSelectableChannel类的register方法负责注册事件,该方法会返回一个SelectionKey对象,该对象用于跟踪被注册事件

    [java] view plain copy public abstract class SelectionKey {      protected SelectionKey() { }        public abstract SelectableChannel channel();        public abstract Selector selector();      ......  }  

    一个Selector对象中包含了3种类型的键集(即SelectionKey集合,SelectionKey在以下部分被称为“键”)

    1,all-keys:所有注册至该Selector的事件键集(selector.keys())

    2,selected-keys:相关事件已经被Selector捕获的键集(selector.selectedKeys())

    3,cancel-keys:已被取消的键集(无法访问该集合)

    selected-keys和cancel-keys都为all-keys的子集,对于一个新建的Selector,这3个键集都为空

    注册事件时会将相应的SelectionKey加入Selector的all-keys键集中

    取消SelectionKey或者关闭了SelectionKey相关联的Channel,则会将相应的SelectionKey加入cancel-keys键集中

    当执行选择器的选择操作时(selector.select(),对于选择器来说,这个方法应该是相当重要的):

    1,将cancel-keys中的每个SelectionKey从3个键集中移除(如果存在的话),并注销SelectionKey所关联的Channel,cancel-keys键集变为空集。

    2,如果监控的事件发生,Selector会将相关的SelectionKey加入selected-keys键集中

     

    以下为对源代码的分析、推测:

    Selector作为选择器,保存了所有的Selectionkey(注册的,取消的,触发的),通过上面的AbstractSelectableChannel类的源代码,发现Channel本身也保存了一个自身关联的SelectionKey数组,这看起来是完全没有必要的,但是仔细看一下register方法,能看出些许端倪:

    Selector本身维护了3个集合,all-keys,selected-keys和cancel-keys,频繁的注册操作、取消注册将会导致这3个集合频繁的变化,伴随频繁变化的是频繁的加锁,这会严重的降低Selector的性能,毕竟一个Selector会被多个Channel作为选择器使用,本身非阻塞的实现方式就是提高性能的一种解决方式

    当注册新的事件时,如果存在该通道相关的SelectionKey,则更新该SelectionKey所关注的事件以及其携带的附加信息,如果不存在,则添加新的SelectionKey

    这样做的好处是,比起删除以前的SelectionKey,添加新的SelectionKey,修改SelectionKey所关注的事件以及其携带的附加信息显然是更好的选择,毕竟不需要修改Selector所维护的键集,当然也不需要频繁加锁(通过查看Selector类的api,SelectionKey并不是thread-safe的,显然并没有加锁,但是并没有什么问题),能够提供更好的性能

     

    总之,SelectionKey的哈希码会重复是很正常的,毕竟不是单纯的注册时新建、触发后删除方式,Java实现时进行了优化

    转载请注明原文地址: https://ju.6miu.com/read-8283.html

    最新回复(0)