Java NIO是New IO的简称,它是一种可以替代Java IO的一套新的IO机制。它提供了一套不同于Java标准IO的操作机制。
Java NIO中涉及的基础内容有通道(Channel)和缓冲区(Buffer)、文件IO和网络IO。有关通道、缓冲区以及文件IO在这里不打算进行详细的介绍。这里参考《实战Java高并发程序设计》利用NIO实现一个Echo服务器的服务端与客户端。
在看完Echo服务器实现之后,发现使用NIO进行网络编程跟Linux中的epoll模型是非常类似的。同样是将Channel注册到Selector上,并且说明感兴趣的事件。注册之后调用selector.select()进行阻塞等待。而对于epoll来说,需要使用
epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
进行事件的注册,使用
epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout)
来进行事件阻塞等待。epoll在Linux中是一种IO复用技术,感觉NIO对于Java的作用是类似的,一个Selector可以监听多个Channel上的事件,有一个事件(可能有多个)触发时,则进行相应。
对于NIO中的socket来说,感兴趣的事件有以下几类,在SelectionKey中定义:
public static final int OP_READ = 1 << 0;
public static final int OP_WRITE= 1 << 2;
public static final int OP_CONNECT= 1 << 3;
public static final int OP_ACCEPT =1 << 4;
在Java中使用Socket进行编程的过程跟在Linux上类似,这里大体总结一下:
服务端:
1、使用静态工厂产生一个Selector实例,
private Selectorselector =null;
selector = SelectorProvider.provider().openSelector();
2、使用静态工厂产生一个SerSocketChannel,也就是一个服务端的通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
3、将上面的通道(Channel)绑定(bind)到服务器的地址。
InetSocketAddressinet SocketAddress = new InetSocketAddress("127.0.0.1",8001);
serverSocketChannel.socket().bind(inetSocketAddress);
4、将服务端Channel注册到Selector,并说明感兴趣的事件。这里的SelectionKey就关联了对应的Channel和Selector。
SelectionKey acceptKey = serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
5、服务端等待事件发生(这个过程在一个无限while循环中),这个过程是阻塞的。
int readyEventNum =selector.select();
6、有事件发生了,获得所有的通道的SelectionKey,进行遍历检查,检查主要是检查这个SelectionKey是对什么事件感兴趣,不同事件有不同的处理。
Set<SelectionKey> readyKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = readyKeys.iterator();
while(iterator.hasNext()){
SelectionKeyreadyKey=iterator.next();
//将正在处理的实例移除,否则就会重复处理相同的SelectionKey
iterator.remove();
//TODO 检查并处理
……
} //endwhile客户端编程类似,大家直接看代码吧。
服务端代码:
import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.channels.spi.SelectorProvider; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class NioEchoServer { //selector用于处理每一个网络连接,一个服务端程序使用一个selector实例就够了 //这个实例使用静态工厂生成 private Selector selector = null; //用于对每一个客户端进行相应的处理,每一个请求都会委托给线程池中的线程进行实际处理 private ExecutorService pool = Executors.newCachedThreadPool(); //统计服务器线程在一个客户端花费了多少时间 public static Map<Socket,Long> time_stat = new HashMap<Socket,Long>(10240); private void startServer() throws IOException{ //服务套接字通道 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); //一个socket地址。 InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1",8001); //UnknownHostException serverSocketChannel.socket().bind(inetSocketAddress); //通过工厂方法获得一个Selector对象的实例 selector = SelectorProvider.provider().openSelector(); /** * 注意每向selector注册一个channel就返回一个SelectionKey实例 * 一个SelectionKey表示一个SelectableChannel到一个Selector的注册 * 一个Selector可以管理多个SelectableChannel, SocketChannel是SelectableChannel的一个子类 */ //这里注册感兴趣的事件为 Accept SelectionKey acceptKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); for(;;){ //这个select方法是一个阻塞方法,如果没有任何数据准备好,他就会等待,一旦有数据可读 //他就会返回,它的返回值是已经准备就绪的SelectionKey数量。 int readyEventNum = selector.select(); //获取那些准备好的SelectionKey。因为可能多个Chennel已经准备好 Set<SelectionKey> readyKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = readyKeys.iterator(); while(iterator.hasNext()){ SelectionKey readyKey = iterator.next(); //将正在处理的实例移除,否则就会重复处理相同的SelectionKey iterator.remove(); if(readyKey.isAcceptable()){ doAccept(readyKey); }else if(readyKey.isValid() && readyKey.isReadable()){ if(!time_stat.containsKey(((SocketChannel)readyKey.channel()).socket())){ time_stat.put(((SocketChannel)readyKey.channel()).socket(), System.currentTimeMillis()); doRead(readyKey); } }else if(readyKey.isValid() && readyKey.isWritable()){ doWrite(readyKey); long end = System.currentTimeMillis(); long begin = time_stat.remove(((SocketChannel)readyKey.channel()).socket()); System.out.println("spend: "+(end-begin)+" ms"); } }//end while }//end for } private void doAccept(SelectionKey selectionKey){ ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel(); SocketChannel clientChannel = null; try{ //生成的clientChannel表示和新的客户端通信的通道 clientChannel = server.accept(); clientChannel.configureBlocking(false); //Register this channel for reading,将新生成的Channel注册到selector选择器上,并告诉Selector, //我现在对读OP_READ操作很感兴趣。这样当Selector发现这个Channel已经准备好读时,就能给线程一个通知 SelectionKey clientKey = clientChannel.register(selector, SelectionKey.OP_READ); //Allocate an EchoClient instance and attach it to this selection key. //将一个客户端实例作为附件,附加到这个连接的SelectionKey上,这样在连接的处理过程中 //我们都可以共享这个EchoClient实例 EchoClient echoClient = new EchoClient(); clientKey.attach(echoClient); InetAddress clientAddress = clientChannel.socket().getInetAddress(); System.out.println("accepted connection from "+ clientAddress.getHostAddress()); }catch(Exception e){ System.out.println("failed to accept new client."); e.printStackTrace(); } } //读完了要 private void doRead(SelectionKey selectionKey){ SocketChannel channel = (SocketChannel)selectionKey.channel(); ByteBuffer bb = ByteBuffer.allocate(8192); int len; try{ len = channel.read(bb); if(len<0){ disconnect(selectionKey); return; } }catch(Exception e){ System.out.println("Failed to read from client."); e.printStackTrace(); disconnect(selectionKey); return; } bb.flip(); pool.execute(new HandleMsg(selectionKey,bb)); } private void doWrite(SelectionKey sk){ SocketChannel channel = (SocketChannel) sk.channel(); EchoClient echoClient = (EchoClient) sk.attachment(); LinkedList<ByteBuffer> outq = echoClient.getOutputQueue(); ByteBuffer byteBuffer = outq.getLast(); try{ int len = channel.write(byteBuffer); if(len==-1){ disconnect(sk); return; } if(byteBuffer.remaining()==0){ outq.removeLast(); } }catch(Exception e){ System.out.println("Failed to write to client"); e.printStackTrace(); disconnect(sk); } if(outq.size()==0){ sk.interestOps(SelectionKey.OP_READ); } } private void disconnect(SelectionKey sk){ try { sk.channel().close(); } catch (IOException e) { e.printStackTrace(); } } //启动一个新的线程任务来处理消息 class HandleMsg implements Runnable{ SelectionKey selectionKey; ByteBuffer byteBuffer; public HandleMsg(SelectionKey sk,ByteBuffer bb){ this.byteBuffer = bb; this.selectionKey = sk; } @Override public void run(){ EchoClient echoClient = (EchoClient)selectionKey.attachment(); echoClient.enqueue(byteBuffer); //从此对写也很感兴趣,那这个通道空了就可以写了? selectionKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); //强迫selector立即返回 selector.wakeup(); } } public static void main(String[] args) throws IOException { NioEchoServer server = new NioEchoServer(); server.startServer(); } /** * 一个EchoClient代表一个客户端,保存所有收到的ByteBuffer缓冲 * @version 创建时间:2016年8月14日 下午7:27:51 */ class EchoClient { private LinkedList<ByteBuffer> outq = null; EchoClient(){ outq = new LinkedList<ByteBuffer>(); } public LinkedList<ByteBuffer> getOutputQueue(){ return outq; } public void enqueue(ByteBuffer bb){ outq.addFirst(bb); } } } 客户端代码:
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.channels.spi.SelectorProvider; import java.util.Iterator; /** * 用Nio实现EchoServer的客户端 * @version 创建时间:2016年8月14日 下午5:17:26 */ public class ClientNio { //同样是一个客户端使用一个selector实例,这个实例也是使用静态工厂生成的 private Selector selector = null; public void init(String ip,int port) throws IOException{ //使用静态工厂产生一个channel SocketChannel channel = SocketChannel.open(); channel.configureBlocking(false); this.selector = SelectorProvider.provider().openSelector(); //将这个通道连接到服务端 channel.connect(new InetSocketAddress(ip,port)); //这个通道暂时只对 "读" 感兴趣 channel.register(selector, SelectionKey.OP_CONNECT); } public void working()throws IOException{ while(true){ if(!selector.isOpen()) break; //阻塞,直到有事件发生 selector.select(); //获取所有发生的时间,并进行遍历 Iterator<SelectionKey> iterator = this.selector.selectedKeys().iterator(); while(iterator.hasNext()){ SelectionKey key = iterator.next(); iterator.remove(); if(key.isConnectable()){ //连接事件 connect(key); }else if(key.isReadable()){ //读事件 read(key); } } } } public void connect(SelectionKey selectionKey) throws IOException{ SocketChannel channel = (SocketChannel)selectionKey.channel(); //如果正在连接,则完成连接 if(channel.isConnectionPending()){ channel.finishConnect(); } channel.configureBlocking(false); //这里进行了一系列转换:String -> byte[] -> ByteBuffer //调用的方法依次是 String.getBytes() ByteBuffer.wrap() channel.write(ByteBuffer.wrap(new String("hello server.\r\n").getBytes())); //从此这个channel对读感兴趣 channel.register(this.selector, SelectionKey.OP_READ); } public void read(SelectionKey key) throws IOException{ SocketChannel channel = (SocketChannel)key.channel(); ByteBuffer buffer = ByteBuffer.allocate(100); channel.read(buffer); byte[] data = buffer.array(); String msg = new String(data).trim(); System.out.println("client receive: "+msg); channel.close(); key.selector().close(); } public static void main(String[] args) { ClientNio client = new ClientNio(); try { client.init("127.0.0.1", 8001); client.working(); } catch (IOException e) { e.printStackTrace(); } } }
参考《实战Java高并发程序设计》