netty学习笔记(4)

    xiaoxiao2021-04-18  75

    我们知道,在网上大多数NIO的入门例子中,是用一个线程来处理selector轮询出的Key的。这样效率肯定是不高。 但是简单的对key加多线程的话,肯定会抛空指针异常。原因是多线程还没来得及处理Key,selector线程就开始了新一轮的select()。又得到了Key,这个key跟刚才的key是一样的。都是绑定了accept事件,那么还会去执行accept方法

    SocketChannel socketChannel = serverSocketChannel.accept();

    因为之前这个方法已经执行过了,所以再次执行的话,就报空指针。

    ============================================

    思考这么一个问题,一个NIO程序是不是只能有一个selector?好比就是问一个饭店是不是只能有一个服务员?答案当然是否定的。

    selector是不是只能注册一个ServerSocketChannel?好比就是问一个饭店是不是只能有一个大门?答案也是否定的?

    多请服务生就是引入多线程。每个服务生负责n个顾客。

    netty工作的线程模型就是这样的。一个线程池看大门,一个线程池负责招呼客人。具体说是,一个线程组负责监听端口,建立连接。一个线程吃负责读写操作。

    下面举一个例子,虽然跟netty的源码是不一样的,不过思想是差不多的。

    /** * main函数 * Created by peter on 2017/4/13. */ public class Start { public static void main(String[] args) { Executor bossExecutor = Executors.newCachedThreadPool(); Executor workExecutor = Executors.newCachedThreadPool(); int bossNum = 2; int workerNum = 3; int[] ports = {8080,8081}; Manager manager = new Manager(bossExecutor,workExecutor,bossNum,workerNum,ports); manager.doWork(); System.out.println("已经启动"); } } /** * Created by peter on 2017/4/13. * 大堂经理 */ public class Manager { /** * 门卫专用线程池 */ private Executor bossExecutors; /** * 服务员专用线程池 */ private Executor workerExecutors; //门卫组 private BossNIO[] bossNIOs; //服务员组 private WorkerNIO[] workerNIOs; //端口 private int[] ports; //分配任务使用 AtomicInteger atomicInteger = new AtomicInteger(-1); public Manager(Executor bossExecutors, Executor workerExecutors, int bossNum, int workerNum, int[] ports) { this.bossExecutors = bossExecutors; this.workerExecutors = workerExecutors; this.ports = ports; //初始化门卫组 this.bossNIOs = new BossNIO[bossNum]; for (int i = 0; i < bossNum; i++) { bossNIOs[i] = new BossNIO(this, "boss" + i); } //初始化服务员组 this.workerNIOs = new WorkerNIO[workerNum]; for (int i = 0; i < workerNum; i++) { workerNIOs[i] = new WorkerNIO(this, "worker" + i); } } public void doWork() { for (int i = 0; i < ports.length; i++) { bindAndRegister(ports[i]); } for (int i = 0; i < bossNIOs.length; i++) { bossExecutors.execute(bossNIOs[i]); } for (int i = 0; i < workerNIOs.length; i++) { workerExecutors.execute(workerNIOs[i]); } } public void bindAndRegister(int port) { try { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //绑定 ServerSocket serverSocket = serverSocketChannel.socket(); serverSocket.bind(new InetSocketAddress(port)); //注册 serverSocketChannel.configureBlocking(false);//这句话不能丢,因为这句话,调试了好久 BossNIO bossNIO = this.nextBoss(); bossNIO.putServerSocketChannel2Queue(serverSocketChannel); } catch (IOException e) { e.printStackTrace(); } } /** * 选择一个服务员 * * @return */ public WorkerNIO nextWorker() { return workerNIOs[Math.abs(atomicInteger.incrementAndGet()) % workerNIOs.length]; } /** * 选择一个门卫 * * @return */ public BossNIO nextBoss() { return bossNIOs[Math.abs(atomicInteger.incrementAndGet()) % bossNIOs.length]; } } /** * 员工抽象类,门卫跟服务员都是员工类的实现 * Created by peter on 2017/4/13. */ public abstract class AbstractNIO implements Runnable { /** * 注册任务队列 */ protected Queue<SelectableChannel> registerTaskQueue = new ConcurrentLinkedQueue<>(); protected Selector selector; @Override public void run() { while (true) { try { //处理待注册任务队列 processRegisterTasks(); //处理事件 //必须加参数,每隔0.5秒放行一次,否则可能出现永远阻塞的情况 //那么线程就永远不会执行了,上面的注册队列也就不能执行了。 selector.select(500); Set<SelectionKey> selectionKeys = selector.selectedKeys(); for(Iterator<SelectionKey> it = selectionKeys.iterator();it.hasNext();){ SelectionKey key = it.next(); it.remove(); process(key); } } catch (IOException e) { e.printStackTrace(); } } } public abstract void process(SelectionKey key); /** * 处理任务队列中的任务 */ public abstract void processRegisterTasks(); } /** * 门卫 * Created by peter on 2017/4/13. */ public class BossNIO extends AbstractNIO{ private Manager manager; private String bossName; public BossNIO(Manager manager,String bossName){ this.manager = manager; this.bossName = bossName; try { this.selector = Selector.open(); } catch (IOException e) { e.printStackTrace(); } } @Override public void process(SelectionKey key) { if(key.isValid()){ if(key.isAcceptable()){ ServerSocketChannel channel = (ServerSocketChannel)key.channel(); try { //创建tcp连接 SocketChannel socketChannel = channel.accept(); System.out.println(bossName +" 新建了一个连接"); //注册读事件 WorkerNIO workerNIO = manager.nextWorker(); socketChannel.configureBlocking(false); workerNIO.putChannel2Queue(socketChannel); } catch (IOException e) { e.printStackTrace(); } } } } /** * 将ServerSocketChannel放入待处理任务队列 * @param channel */ public void putServerSocketChannel2Queue(ServerSocketChannel channel){ this.registerTaskQueue.offer(channel); } @Override public void processRegisterTasks() { for(;;){ ServerSocketChannel channel = (ServerSocketChannel) this.registerTaskQueue.poll(); if(channel == null){ break; } try { channel.register(this.selector,SelectionKey.OP_ACCEPT); } catch (ClosedChannelException e) { e.printStackTrace(); } } } } /** *服务员类 * Created by peter on 2017/4/13. */ public class WorkerNIO extends AbstractNIO { private Manager manager; private String workerName; public WorkerNIO(Manager manager, String workerName) { this.manager = manager; this.workerName = workerName; try { this.selector = Selector.open(); } catch (IOException e) { e.printStackTrace(); } } @Override public void process(SelectionKey key) { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer buf = ByteBuffer.allocate(64); try { int read = channel.read(buf); if (read > 0) { System.out.println(workerName + " 读到数据: " + new String(buf.array())); }else { System.out.println(workerName +" 说,客户端已经断开连接"); key.cancel(); } } catch (IOException e) { e.printStackTrace(); } } @Override public void processRegisterTasks() { for(;;){ SocketChannel channel = (SocketChannel) this.registerTaskQueue.poll(); if(channel == null){ break; } try { channel.register(this.selector,SelectionKey.OP_READ); } catch (ClosedChannelException e) { e.printStackTrace(); } } } /** * 将SocketChannel放入待注册任务队列 * @param channel */ public void putChannel2Queue(SocketChannel channel){ this.registerTaskQueue.offer(channel); } }
    转载请注明原文地址: https://ju.6miu.com/read-675297.html

    最新回复(0)