Java NIO —— TCP套接字(ServerSocketChannel & SocketChannel)

    xiaoxiao2021-12-14  20

    package com.demo.test; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Date; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class SocketChannelDemo { private static final int PORT = 9999; private static final int ACCEPT_INTERVAL = 300; private static final int BUFF_SIZE = 255; private static final int WAIT_CONNECT_INTERVAL = 300; private static void openServerSocketChannel(boolean isBlocking) { try { // The new channel's socket is initially unbound; it must be bound to a specific address via one of its socket's bind methods before connections can be accepted. final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(PORT)); serverSocketChannel.configureBlocking(isBlocking); while (true) { final SocketChannel socketChannel = serverSocketChannel.accept(); // 在非阻塞模式下,accept() 方法会立刻返回,如果还没有新进来的连接,返回的将是null if (!isBlocking && socketChannel == null) { System.out.println("serverSocketChannel.accept() = null"); try { Thread.sleep(ACCEPT_INTERVAL); } catch (InterruptedException e) { e.printStackTrace(); } continue; } System.out.println("serverSocketChannel.accept() != null"); new Thread(new Runnable() { @Override public void run() { // do something with socketChannel... ByteBuffer readBuffer = ByteBuffer.allocate(BUFF_SIZE); byte[] buffer = new byte[BUFF_SIZE]; int len = -1; do { try { len = socketChannel.read(readBuffer); } catch (IOException e) { e.printStackTrace(); try { socketChannel.close(); } catch (IOException e1) { e1.printStackTrace(); } break; } if (len != -1) { //切换读写模式,socketChannel.read是向ByteBuffer里面写,而get是将数据从ByteBuffer里读出来 readBuffer.flip(); //get也会改变position readBuffer.get(buffer, 0, len); //rewind会将position的位置设置为0 readBuffer.rewind(); try { socketChannel.write(readBuffer); } catch (IOException e) { e.printStackTrace(); } //将当前指针移到数组首位,相当于清空数据 readBuffer.clear(); System.out.println("service recv : len=" + len + ", data=" + new String(buffer, 0, len)); } } while (len != -1); } }).start(); break; } try { Thread.sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } serverSocketChannel.close(); } catch (IOException e) { e.printStackTrace(); } } private static void openSocketChannel(boolean isBlocking) { try { final SocketChannel socketChannel = SocketChannel.open(); // true if a connection was established, false if this channel is in non-blocking mode and the connection operation is in progress boolean isEstablished = socketChannel.connect(new InetSocketAddress("127.0.0.1", PORT)); System.out.println("socketChannel.connect() = " + isEstablished); if (!isBlocking && !isEstablished) { while (!socketChannel.finishConnect()) { // wait, or do something else... System.out.println("socketChannel.finishConnect() = false"); try { Thread.sleep(WAIT_CONNECT_INTERVAL); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("socketChannel.finishConnect() = true"); } new Thread(new Runnable() { @Override public void run() { int len = -1; do { ByteBuffer readBuffer = ByteBuffer.allocate(BUFF_SIZE); byte[] buffer = new byte[BUFF_SIZE]; // The number of bytes read, possibly zero, or -1 if the channel has reached end-of-stream try { len = socketChannel.read(readBuffer); if (len != -1) { readBuffer.flip(); readBuffer.get(buffer, 0, len); readBuffer.compact(); System.out.println("client recv : len=" + len + ", data=" + new String(buffer, 0, len)); } } catch (IOException e) { e.printStackTrace(); break; } } while (len != -1); } }).start(); ByteBuffer writeBuffer = ByteBuffer.allocate(BUFF_SIZE); for (int i = 0; i < 5; i++) { String str = "(" + i + ")" + new Date().toString(); byte[] buffer = str.getBytes(); //将数组写入到ByteBuffer中 writeBuffer.put(buffer); // 这里要交换读写模式 writeBuffer.flip(); //将数据从ByteBuffer中读出,写入到流中 socketChannel.write(writeBuffer); // 压缩数据,即将数据向前移动已使用的长度, 如果没有这句会报 java.nio.BufferOverflowException writeBuffer.compact(); System.out.println("client send : len=" + buffer.length + ", data=" + str); } try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } socketChannel.close(); } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) { int nThreads = 2; ExecutorService executorService = Executors.newFixedThreadPool(nThreads); final boolean isBlocking = false; executorService.execute(new Runnable() { @Override public void run() { openServerSocketChannel(isBlocking); } }); executorService.execute(new Runnable() { @Override public void run() { openSocketChannel(isBlocking); } }); } }

    “isBlocking = true”的情况

    serverSocketChannel.accept() != null socketChannel.connect() = true client send : len=31, data=(0)Mon Dec 05 09:25:48 CST 2016 client send : len=31, data=(1)Mon Dec 05 09:25:48 CST 2016 client send : len=31, data=(2)Mon Dec 05 09:25:48 CST 2016 client send : len=31, data=(3)Mon Dec 05 09:25:48 CST 2016 client send : len=31, data=(4)Mon Dec 05 09:25:48 CST 2016 service recv : len=31, data=(0)Mon Dec 05 09:25:48 CST 2016 service recv : len=124, data=(1)Mon Dec 05 09:25:48 CST 2016(2)Mon Dec 05 09:25:48 CST 2016(3)Mon Dec 05 09:25:48 CST 2016(4)Mon Dec 05 09:25:48 CST 2016 client recv : len=31, data=(0)Mon Dec 05 09:25:48 CST 2016 client recv : len=124, data=(1)Mon Dec 05 09:25:48 CST 2016(2)Mon Dec 05 09:25:48 CST 2016(3)Mon Dec 05 09:25:48 CST 2016(4)Mon Dec 05 09:25:48 CST 2016 java.nio.channels.AsynchronousCloseException at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:205) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:407) at com.demo.test.SocketChannelDemo$2.run(SocketChannelDemo.java:118) at java.lang.Thread.run(Thread.java:745)

    “isBlocking = false”的情况

    socketChannel.connect() = true serverSocketChannel.accept() != null service recv : len=31, data=(0)Mon Dec 05 09:26:39 CST 2016 client recv : len=31, data=(0)Mon Dec 05 09:26:39 CST 2016 client send : len=31, data=(0)Mon Dec 05 09:26:39 CST 2016 client send : len=31, data=(1)Mon Dec 05 09:26:39 CST 2016 client recv : len=31, data=(1)Mon Dec 05 09:26:39 CST 2016 service recv : len=31, data=(1)Mon Dec 05 09:26:39 CST 2016 client send : len=31, data=(2)Mon Dec 05 09:26:39 CST 2016 client recv : len=31, data=(2)Mon Dec 05 09:26:39 CST 2016 service recv : len=31, data=(2)Mon Dec 05 09:26:39 CST 2016 client send : len=31, data=(3)Mon Dec 05 09:26:39 CST 2016 service recv : len=31, data=(3)Mon Dec 05 09:26:39 CST 2016 client recv : len=31, data=(3)Mon Dec 05 09:26:39 CST 2016 service recv : len=31, data=(4)Mon Dec 05 09:26:39 CST 2016 client recv : len=31, data=(4)Mon Dec 05 09:26:39 CST 2016 client send : len=31, data=(4)Mon Dec 05 09:26:39 CST 2016 java.nio.channels.AsynchronousCloseException at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:205) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:407) at com.demo.test.SocketChannelDemo$2.run(SocketChannelDemo.java:118) at java.lang.Thread.run(Thread.java:745)

    注意:并没有要求客户端和服务端必须同时阻塞或同时非阻塞,只是还有2种情况的打印这里省略了而已。

    转载请注明原文地址: https://ju.6miu.com/read-963984.html

    最新回复(0)