Java AIO-异步通信

    xiaoxiao2021-03-25  166

    Java AIO-异步通信

    标签: NIO2.0AIO异步非阻塞通信 395人阅读 评论(0) 收藏 举报 分类: Network Programming(6)

    AIO(Asynchronous IO)即异步IO,特别需要说明的是,Java AIO需要JDK 1.7的支持

    客户端

    [java] view plain copy package com.test.client;    import java.io.IOException;  import java.net.InetSocketAddress;  import java.net.StandardSocketOptions;  import java.nio.channels.AsynchronousChannelGroup;  import java.nio.channels.AsynchronousSocketChannel;  import java.util.concurrent.CountDownLatch;  import java.util.concurrent.ExecutorService;  import java.util.concurrent.Executors;    import org.apache.log4j.Logger;    import com.test.handler.client.ConnectCompleteHandler;    public class Client {      private Logger logger = Logger.getLogger(Client.class);        private String host = "127.0.0.1";      private int port = 9999;      private int poolSize = 10;            private static CountDownLatch serverStatus = new CountDownLatch(1);            public Client() throws Exception {          try {              //池中的每个线程都在等待IO事件,当IO操作完成后,调用池中的线程处理CompleteHandler              ExecutorService threadPool = Executors.newFixedThreadPool(poolSize);              AsynchronousChannelGroup asyncChannelGroup                       = AsynchronousChannelGroup.withThreadPool(threadPool);                            AsynchronousSocketChannel asyncSocketChannel =                       AsynchronousSocketChannel.open(asyncChannelGroup);              asyncSocketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);              asyncSocketChannel.connect(new InetSocketAddress(host, port),                       nullnew ConnectCompleteHandler(asyncSocketChannel));          } catch (IOException e) {              logger.error("Cilent socket establish failed!");              throw e;          }      }            public static void main(String[] args) throws Exception {          Client client = new Client();                    serverStatus.await();      }  }  

    客户端处理器

    [java] view plain copy package com.test.handler.client;    import java.io.UnsupportedEncodingException;  import java.nio.ByteBuffer;  import java.nio.channels.AsynchronousSocketChannel;  import java.nio.channels.CompletionHandler;    import org.apache.log4j.Logger;    //CompletionHandler<V,A>  //V-IO操作的结果,AsynchronousSocketChannel.open创建的异步连接,  //  asyncSocketChannel.connect实际没有IO操作,因此IO操作的结果为Void  //A-IO操作附件,  public class ConnectCompleteHandler           implements CompletionHandler<Void, Object> {      private Logger logger = Logger.getLogger(ConnectCompleteHandler.class);            AsynchronousSocketChannel asyncSocketChannel;            public ConnectCompleteHandler(              AsynchronousSocketChannel asyncSocketChannel){          this.asyncSocketChannel = asyncSocketChannel;      }            @Override      public void completed(Void result, Object attachment) {          //使用asyncChannelGroup中保存的线程池中的线程进行处理          logger.info("Deal thread of [ConnectCompleteHandler] : "                   + Thread.currentThread().getName());                String request = "Hi, this is client!";          logger.info("The request sent by client is : " + request);          try {              byte[] reqBytes = request.getBytes("utf-8");              ByteBuffer writeByteBuffer = ByteBuffer.allocate(reqBytes.length);              writeByteBuffer.put(reqBytes);              writeByteBuffer.flip();              asyncSocketChannel.write(writeByteBuffer, asyncSocketChannel,                      new WriteCompleteHandler());          } catch (UnsupportedEncodingException e) {              e.printStackTrace();          }      }        @Override      public void failed(Throwable exc, Object attachment) {          logger.error("Connection error!");      }  }   [java] view plain copy package com.test.handler.client;    import java.nio.ByteBuffer;  import java.nio.channels.AsynchronousSocketChannel;  import java.nio.channels.CompletionHandler;    import org.apache.log4j.Logger;    //CompletionHandler<V,A>  //V-IO操作的结果,这里是write操作写成功的字节数  //A-IO操作附件,这里传入AsynchronousSocketChannel便于获得服务端响应  public class WriteCompleteHandler           implements CompletionHandler<Integer, AsynchronousSocketChannel> {      private Logger logger = Logger.getLogger(WriteCompleteHandler.class);            @Override      public void completed(Integer result,               AsynchronousSocketChannel asyncSocketChannel) {          logger.info("Deal thread of [WriteCompleteHandler] : "                   + Thread.currentThread().getName());          logger.info("Write bytes : " + result.intValue());          if(result.intValue() == -1){              logger.error("Send request to server error!");          }else{              ByteBuffer readByteBuffer = ByteBuffer.allocate(100);              //获取服务端发送的响应              asyncSocketChannel.read(readByteBuffer, readByteBuffer,                       new ReadCompleteHandler());          }      }        @Override      public void failed(Throwable exc,               AsynchronousSocketChannel asyncSocketChannel) {          logger.error("Write message error!");          exc.printStackTrace();      }  }   [java] view plain copy package com.test.handler.client;    import java.io.UnsupportedEncodingException;  import java.nio.ByteBuffer;  import java.nio.channels.CompletionHandler;    import org.apache.log4j.Logger;    //CompletionHandler<V,A>  //V-IO操作的结果,这里是read操作成功读取的字节数  //A-IO操作附件,由于WriteCompleteHandler中调用asyncSocketChannel.read方法时  //  传入了ByteBuffer,所以这里是ByteBuffer  public class ReadCompleteHandler           implements CompletionHandler<Integer, ByteBuffer> {      private Logger logger = Logger.getLogger(ReadCompleteHandler.class);            @Override      public void completed(Integer result, ByteBuffer respByteBuffer) {          logger.info("Deal thread of [ReadCompleteHandler] : "                   + Thread.currentThread().getName());          logger.info("Read bytes : " + result.intValue());          if(result.intValue() == -1){              logger.error("Get response from server error!");          }else{              respByteBuffer.flip();              byte[] respBytes = new byte[respByteBuffer.remaining()];              respByteBuffer.get(respBytes);                            try {                  String response = new String(respBytes, "utf-8");                  logger.info("The response sent by server is : " + response);              } catch (UnsupportedEncodingException e) {                  e.printStackTrace();              }          }      }        @Override      public void failed(Throwable exc, ByteBuffer readByteBuffer) {          logger.error("Read message error!");          exc.printStackTrace();      }  }  

    服务端

    [java] view plain copy package com.test.server;    import java.net.InetSocketAddress;  import java.net.StandardSocketOptions;  import java.nio.channels.AsynchronousChannelGroup;  import java.nio.channels.AsynchronousServerSocketChannel;  import java.nio.channels.AsynchronousSocketChannel;  import java.util.concurrent.CountDownLatch;  import java.util.concurrent.ExecutorService;  import java.util.concurrent.Executors;  import java.util.concurrent.Future;    import org.apache.log4j.Logger;    import com.test.handler.server.ConnectCompleteHandler;    public class Server {      private Logger logger = Logger.getLogger(Server.class);            private String host = "127.0.0.1";      private int port = 9999;      private int poolSize = 10;            private static CountDownLatch serverStatus = new CountDownLatch(1);      private AsynchronousServerSocketChannel asyncServerSocketChannel;            public Server() throws Exception {          try{              //池中的每个线程都在等待IO事件,当IO操作完成后,触发相应的IO时间,调用池中的线程IO回调函数(CompleteHandler)              ExecutorService threadPool = Executors.newFixedThreadPool(poolSize);              AsynchronousChannelGroup asyncChannelGroup                       = AsynchronousChannelGroup.withThreadPool(threadPool);                            asyncServerSocketChannel = AsynchronousServerSocketChannel.open(asyncChannelGroup);              asyncServerSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);              asyncServerSocketChannel.bind(new InetSocketAddress(host, port));              logger.info("Server start up!");          }catch(Exception e){              logger.error("Server establish error!");              throw e;          }      }        public void service(){  //      这种写法将抛出java.nio.channels.AcceptPendingException异常  //      只有一个连接建立成功之后,才能再建立下一个连接  //      while(true){  //          asyncServerSocketChannel.accept();  //      }  //      AIO支持直接返回Future对象,但其实此刻调用并未完成,  //      while(true){  //          try {  //              Future<AsynchronousSocketChannel> acceptFuture = asyncServerSocketChannel.accept();                  wait直到调用完成  //              AsynchronousSocketChannel asyncSocketChannel = acceptFuture.get();  //              logger.info("Connection complete!");  //          } catch (Exception e) {  //              e.printStackTrace();  //          }  //      }  //      由于asyncChannelGroup的存在,回调是更好的实现方式          asyncServerSocketChannel.accept(asyncServerSocketChannel, new ConnectCompleteHandler());      }            public static void main(String[] args) throws Exception {          Server server = new Server();          server.service();                    //由于AIO的方法都是直接返回的,这里必须使用锁以避免线程退出,服务停止          //所谓AIO,既发起请求之后,当前线程可以去干别的事,当请求完成后会得到通知          //作为一个全职服务端,main线程其实也没什么别的事情可干,也许还是客户端更加适合使用AIO          serverStatus.await();      }  }  

    服务端处理器

    [java] view plain copy package com.test.handler.server;    import java.nio.ByteBuffer;  import java.nio.channels.AsynchronousServerSocketChannel;  import java.nio.channels.AsynchronousSocketChannel;  import java.nio.channels.CompletionHandler;    import org.apache.log4j.Logger;    // CompletionHandler<V,A>  // V-IO操作的结果,这里是成功建立的连接,AsynchronousSocketChannel  // A-IO操作附件,这里传入AsynchronousServerSocketChannel便于继续接收请求建立新连接  public class ConnectCompleteHandler           implements CompletionHandler<AsynchronousSocketChannel,                   AsynchronousServerSocketChannel> {            private Logger logger = Logger.getLogger(ConnectCompleteHandler.class);            @Override      public void completed(AsynchronousSocketChannel asyncSocketChannel,               AsynchronousServerSocketChannel asyncServerSocketChannel) {          //使用asyncChannelGroup中保存的线程池中的线程进行处理          logger.info("Deal thread of [ConnectCompleteHandler] : "                   + Thread.currentThread().getName());          //当前连接建立成功后,接收下一个请求建立新的连接          asyncServerSocketChannel.accept(asyncServerSocketChannel,                  new ConnectCompleteHandler());                    //ByteBuffer是非线程安全的,如果要在多个线程间共享同一个ByteBuffer,需要考虑线程安全性问题          ByteBuffer readByteBuffer = ByteBuffer.allocate(100);          //获取客户端发送的请求          asyncSocketChannel.read(readByteBuffer, readByteBuffer,                   new ReadCompleteHandler(asyncSocketChannel));      }        @Override      public void failed(Throwable exc, AsynchronousServerSocketChannel attachment) {          logger.error("Connection error!");      }  }   [java] view plain copy package com.test.handler.server;    import java.io.UnsupportedEncodingException;  import java.nio.ByteBuffer;  import java.nio.channels.AsynchronousSocketChannel;  import java.nio.channels.CompletionHandler;    import org.apache.log4j.Logger;    //CompletionHandler<V,A>  //V-IO操作的结果,这里是read操作成功读取的字节数  //A-IO操作附件,由于ConnectCompleteHandler中调用asyncSocketChannel.read方法时  //  传入了ByteBuffer,所以这里为ByteBuffer  public class ReadCompleteHandler           implements CompletionHandler<Integer, ByteBuffer> {            private Logger logger = Logger.getLogger(ReadCompleteHandler.class);      private AsynchronousSocketChannel asyncSocketChannel;            public ReadCompleteHandler(AsynchronousSocketChannel asyncSocketChannel){          this.asyncSocketChannel = asyncSocketChannel;      }            @Override      public void completed(Integer result, ByteBuffer readByteBuffer) {          logger.info("Deal thread of [ReadCompleteHandler] : "                   + Thread.currentThread().getName());          logger.info("Read bytes : " + result.intValue());          if(result.intValue() == -1){              logger.error("Get request from client error!");          }else{              readByteBuffer.flip();              byte[] reqBytes = new byte[readByteBuffer.remaining()];              readByteBuffer.get(reqBytes);                            try {                  String request = new String(reqBytes, "utf-8");                  logger.info("The request sent by client is : " + request);                                              String response = "Hi, this is server!";                  logger.info("The response has been sent back to client is : "                           + response);                  byte[] respBytes = response.getBytes("utf-8");                  ByteBuffer writeByteBuffer = ByteBuffer.allocate(respBytes.length);                  writeByteBuffer.put(respBytes);                  writeByteBuffer.flip();  //              asyncSocketChannel.write(writeByteBuffer);                  asyncSocketChannel.write(writeByteBuffer, nullnew WriteCompleteHandler());              } catch (UnsupportedEncodingException e) {                  e.printStackTrace();              }          }      }        @Override      public void failed(Throwable exc, ByteBuffer readByteBuffer) {          logger.error("Read message error!");          exc.printStackTrace();      }  }   [java] view plain copy package com.test.handler.server;    import java.nio.channels.CompletionHandler;    import org.apache.log4j.Logger;    //CompletionHandler<V,A>  //V-IO操作的结果,这里是write操作写成功的字节数  //A-IO操作附件,  public class WriteCompleteHandler           implements CompletionHandler<Integer, Object> {            private Logger logger = Logger.getLogger(WriteCompleteHandler.class);            @Override      public void completed(Integer result, Object attachment) {          logger.info("Deal thread of [WriteCompleteHandler] : "                   + Thread.currentThread().getName());          logger.info("Write bytes : " + result.intValue());          if(result.intValue() == -1)              logger.info("Send response to client error!" );          else              logger.info("The response has been sent back to client successfully!" );      }        @Override      public void failed(Throwable exc, Object attachment) {          logger.error("Write message error!");          exc.printStackTrace();      }  }  

    log4j日志配置文件

    [plain] view plain copy log4j.rootLogger=info,logOutput    #log console out put   log4j.appender.logOutput=org.apache.log4j.ConsoleAppender  log4j.appender.logOutput.layout=org.apache.log4j.PatternLayout  log4j.appender.logOutput.layout.ConversionPattern=%p%d{[yy-MM-dd HH:mm:ss]}[%c] -> %m%n  

    Server端运行结果

    [plain] view plain copy INFO[16-08-04 01:20:22][com.test.server.Server] -> Server start up!  INFO[16-08-04 01:20:26][com.test.handler.server.ConnectCompleteHandler] -> Deal thread of [ConnectCompleteHandler] : pool-1-thread-1  INFO[16-08-04 01:20:26][com.test.handler.server.ReadCompleteHandler] -> Deal thread of [ReadCompleteHandler] : pool-1-thread-2  INFO[16-08-04 01:20:26][com.test.handler.server.ReadCompleteHandler] -> Read bytes : 19  INFO[16-08-04 01:20:26][com.test.handler.server.ReadCompleteHandler] -> The request sent by client is : Hi, this is client!  INFO[16-08-04 01:20:26][com.test.handler.server.ReadCompleteHandler] -> The response has been sent back to client is : Hi, this is server!  INFO[16-08-04 01:20:26][com.test.handler.server.WriteCompleteHandler] -> Deal thread of [WriteCompleteHandler] : pool-1-thread-3  INFO[16-08-04 01:20:26][com.test.handler.server.WriteCompleteHandler] -> Write bytes : 19  INFO[16-08-04 01:20:26][com.test.handler.server.WriteCompleteHandler] -> The response has been sent back to client successfully!  

    Client端运行结果

    [plain] view plain copy INFO[16-08-04 01:20:26][com.test.handler.client.ConnectCompleteHandler] -> Deal thread of [ConnectCompleteHandler] : pool-1-thread-1  INFO[16-08-04 01:20:26][com.test.handler.client.ConnectCompleteHandler] -> The request sent by client is : Hi, this is client!  INFO[16-08-04 01:20:26][com.test.handler.client.WriteCompleteHandler] -> Deal thread of [WriteCompleteHandler] : pool-1-thread-2  INFO[16-08-04 01:20:26][com.test.handler.client.WriteCompleteHandler] -> Write bytes : 19  INFO[16-08-04 01:20:26][com.test.handler.client.ReadCompleteHandler] -> Deal thread of [ReadCompleteHandler] : pool-1-thread-3  INFO[16-08-04 01:20:26][com.test.handler.client.ReadCompleteHandler] -> Read bytes : 19  INFO[16-08-04 01:20:26][com.test.handler.client.ReadCompleteHandler] -> The response sent by server is : Hi, this is server
    转载请注明原文地址: https://ju.6miu.com/read-8412.html

    最新回复(0)