Netty 3.x 简例

    xiaoxiao2021-03-25  107

    Netty是一个异步的、事件驱动的网络应用框架,可以用来快速开发高性的客户端、服务端程序。示例使用Netty 3.10.5

    首先是Server部分的代码:

    Server端主程序:

    [java] view plain copy package com.sean.server;    import java.net.InetSocketAddress;  import java.util.concurrent.Executors;    import org.jboss.netty.bootstrap.ServerBootstrap;  import org.jboss.netty.channel.Channel;  import org.jboss.netty.channel.ChannelFactory;  import org.jboss.netty.channel.group.ChannelGroup;  import org.jboss.netty.channel.group.ChannelGroupFuture;  import org.jboss.netty.channel.group.DefaultChannelGroup;  import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;    public class Server {      private ChannelFactory factory;            public static ChannelGroup channelGroup = new DefaultChannelGroup();            public void start(){          // NioServerSocketChannelFactory用于创建基于NIO的服务端          // ServerSocketChannel。本身包含2种线程,boss线程和worker线程。          // 每个ServerSocketChannel会都会拥有自己的boss线程,          // 当一个连接被服务端接受(accepted),          // boss线程就会将接收到的Channel传递给一个worker线程处理,          // 而worker线程以非阻塞的方式为一个或多个Channel提供非阻塞的读写          factory = new NioServerSocketChannelFactory(                  Executors.newCachedThreadPool(),    // boss线程池                  Executors.newCachedThreadPool(),    // worker线程池                  8); // worker线程数                    // ServerBootstrap用于帮助服务器启动          ServerBootstrap bootstrap = new ServerBootstrap(factory);                    // 没有child.前缀,则该选项是为ServerSocketChannel设置          bootstrap.setOption("reuseAddress"true);          // 有child.前缀,则该选项是为Channel设置  //      bootstrap.setOption("child.tcpNoDelay", true);  //      bootstrap.setOption("child.keepAlive", true);                // 对每一个连接(channel),server都会调用          // ChannelPipelineFactory为该连接创建一个ChannelPipeline          ServerChannelPiplineFactory channelPiplineFactory =                  new ServerChannelPiplineFactory();          bootstrap.setPipelineFactory(channelPiplineFactory);                    // 这里绑定服务端监听的IP和端口          Channel channel = bootstrap.bind(new InetSocketAddress("127.0.0.1"8000));          Server.channelGroup.add(channel);                    System.out.println("Server is started...");      }            public void stop(){          // ChannelGroup为其管理的Channels提供一系列的批量操作          // 关闭的Channel会自动从ChannelGroup中移除          ChannelGroupFuture channelGroupFuture = Server.channelGroup.close();          channelGroupFuture.awaitUninterruptibly();          factory.releaseExternalResources();          System.out.println("Server is stopped.");      }        public static void main(String[] args) throws Exception {          Server server = new Server();          server.start();          Thread.sleep(30*1000);          server.stop();      }  }  

    PipelineFactory:

    [java] view plain copy package com.sean.server;    import java.util.concurrent.Executor;    import org.jboss.netty.channel.ChannelPipeline;  import org.jboss.netty.channel.ChannelPipelineFactory;  import org.jboss.netty.channel.Channels;  import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;    import com.sean.server.handler.ServerExecutionHandler;  import com.sean.server.handler.ServerLogicHandler;  import com.sean.server.handler.ServerReadDecoder;  import com.sean.server.handler.ServerWriteEncoder;    public class ServerChannelPiplineFactory implements ChannelPipelineFactory {        @Override      public ChannelPipeline getPipeline() throws Exception {          ServerReadDecoder serverReadDecoder = new ServerReadDecoder();          ServerWriteEncoder serverWriteEncoder = new ServerWriteEncoder();          Executor executor =                   new OrderedMemoryAwareThreadPoolExecutor(4200200);          ServerExecutionHandler serverExecutionHandler =                   new ServerExecutionHandler(executor);          ServerLogicHandler serverLogicHandler = new ServerLogicHandler();                    // ChannelPipeline的源码中的javadoc介绍的非常详细,很有必要看一下          // ChannelPipeline是一个处理ChannelEvent的handler链          // 如果为读操作,ChannelEvent事件会从前到后依次被          // Upstream的handler处理          // serverReadDecoder -> serverLogicHandler          // 如果为写操作,ChannelEvent事件会从后至前依次被          // Downstream的handler处理          // serverLogicHandler -> serverWriteEncoder          ChannelPipeline channelPipeline = Channels.pipeline();          channelPipeline.addLast("1", serverReadDecoder);          channelPipeline.addLast("2", serverWriteEncoder);          channelPipeline.addLast("3", serverExecutionHandler);          channelPipeline.addLast("4", serverLogicHandler);                    System.out.println(channelPipeline.hashCode());          return channelPipeline;      }    }  

    各个Handler:

    [java] view plain copy package com.sean.server.handler;    import java.util.concurrent.Executor;    import org.jboss.netty.handler.execution.ExecutionHandler;    // 提供一个线程池  public class ServerExecutionHandler extends ExecutionHandler{        public ServerExecutionHandler(Executor executor) {          super(executor);      }  }   [java] view plain copy package com.sean.server.handler;    import org.jboss.netty.channel.Channel;  import org.jboss.netty.channel.ChannelFuture;  import org.jboss.netty.channel.ChannelFutureListener;  import org.jboss.netty.channel.ChannelHandlerContext;  import org.jboss.netty.channel.ChannelStateEvent;  import org.jboss.netty.channel.ExceptionEvent;  import org.jboss.netty.channel.MessageEvent;  import org.jboss.netty.channel.SimpleChannelHandler;    import com.sean.server.Server;    // SimpleChannelHandler提供了很多基本的handler方法用来重写  // 通常情况下足够使用了  public class ServerLogicHandler extends SimpleChannelHandler {      @Override      public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)              throws Exception {          System.out.println("######channelConnected");          // channel group is thread safe          Server.channelGroup.add(e.getChannel());          System.out.println(e.getChannel().toString());      }        @Override      public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {          System.out.println("######messageReceived");                    // 经过了ServerReadDecoder的处理,这里可以直接得到String类型的message          String msg = (String)e.getMessage();          System.out.println("The message sent by client is : " + msg);                    Channel ch = e.getChannel();          String str = "Hi, Client.";          // 由于IO操作是异步的,当方法返回时并不能保证IO操作一定完成了          // 因此返回一个ChannelFuture对象实例          // 该实例中保存了IO操作的状态信息          ChannelFuture cf = ch.write(str);          // 为ChannelFuture对象实例添加监听,如果数据发送完毕则关闭连接          cf.addListener(new ChannelFutureListener(){              @Override              public void operationComplete(ChannelFuture future)                      throws Exception {                  Channel ch = future.getChannel();                  ch.close();              }          });                    System.out.println("The message has sent to client.");      }         @Override      public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {          e.getCause().printStackTrace();          Channel ch = e.getChannel();          ch.close();      }  }   [java] view plain copy package com.sean.server.handler;    import org.jboss.netty.buffer.ChannelBuffer;  import org.jboss.netty.channel.Channel;  import org.jboss.netty.channel.ChannelHandlerContext;  import org.jboss.netty.handler.codec.string.StringDecoder;  import org.jboss.netty.util.CharsetUtil;    // 解决接收流数据时,数据出现碎片化的问题  public class ServerReadDecoder extends StringDecoder{        @Override      protected Object decode(ChannelHandlerContext ctx, Channel channel,              Object msg) throws Exception {          System.out.println("######ServerReadDecoder");                    // 从msg中取出的数据类型是ChannelBuffer的          byte[] buffer = ((ChannelBuffer)msg).array();          byte last = buffer[buffer.length - 1];          // 46 is '.'          if(last == 46)              // 并将ChannelBuffer转为String              return new String(buffer, CharsetUtil.UTF_8);          return null;      }  }   [java] view plain copy package com.sean.server.handler;    import org.jboss.netty.buffer.ChannelBuffer;  import org.jboss.netty.buffer.ChannelBuffers;  import org.jboss.netty.channel.Channel;  import org.jboss.netty.channel.ChannelHandlerContext;  import org.jboss.netty.handler.codec.string.StringEncoder;  import org.jboss.netty.util.CharsetUtil;    public class ServerWriteEncoder extends StringEncoder{        @Override      protected Object encode(ChannelHandlerContext ctx, Channel channel,              Object msg) throws Exception {          System.out.println("######ServerWriteEncoder");                    String str = (String)msg;          // 通过ChannelBuffers工具,为指定编码的指定字符串分配缓存空间          // 并将String转为ChannelBuffer          ChannelBuffer channelBuffer =                   ChannelBuffers.copiedBuffer(str, CharsetUtil.UTF_8);          return channelBuffer;      }  }  

    然后是Client部分的代码:

    Client端主程序:

    [java] view plain copy package com.sean.client;    import java.net.InetSocketAddress;  import java.util.concurrent.Executors;    import org.jboss.netty.bootstrap.ClientBootstrap;  import org.jboss.netty.channel.ChannelFactory;  import org.jboss.netty.channel.ChannelPipeline;  import org.jboss.netty.channel.ChannelPipelineFactory;  import org.jboss.netty.channel.Channels;  import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;    import com.sean.client.handler.ClientLogicHandler;  import com.sean.client.handler.ClientReadDecoder;  import com.sean.client.handler.ClientWriteEncoder;    public class Client {      public static void main(String[] args){          // 同服务端相同,只是这里使用的是NioClientSocketChannelFactory          final ChannelFactory factory = new NioClientSocketChannelFactory(                  Executors.newCachedThreadPool(),                      Executors.newCachedThreadPool(),                      8);                     // ClientBootstrap用于帮助客户端启动          ClientBootstrap bootstrap = new ClientBootstrap(factory);          // 由于客户端不包含ServerSocketChannel,所以参数名不能带有child.前缀          bootstrap.setOption("tcpNoDelay"true);  //      bootstrap.setOption("keepAlive", true);                bootstrap.setPipelineFactory(new ChannelPipelineFactory(){              @Override              public ChannelPipeline getPipeline() throws Exception {                  ChannelPipeline channelPipeline =                          Channels.pipeline(new ClientReadDecoder(),                           new ClientWriteEncoder(), new ClientLogicHandler());                                    System.out.println(channelPipeline.hashCode());                  return channelPipeline;              }          });                    // 这里连接服务端绑定的IP和端口          bootstrap.connect(new InetSocketAddress("127.0.0.1"8000));          System.out.println("Client is started...");      }  }  

    各个Handler:

    [java] view plain copy package com.sean.client.handler;    import org.jboss.netty.channel.Channel;  import org.jboss.netty.channel.ChannelFuture;  import org.jboss.netty.channel.ChannelFutureListener;  import org.jboss.netty.channel.ChannelHandlerContext;  import org.jboss.netty.channel.ChannelStateEvent;  import org.jboss.netty.channel.ExceptionEvent;  import org.jboss.netty.channel.MessageEvent;  import org.jboss.netty.channel.SimpleChannelHandler;  import org.jboss.netty.channel.WriteCompletionEvent;    public class ClientLogicHandler extends SimpleChannelHandler {            @Override      public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)              throws Exception {          System.out.println("######channelConnected");                    Channel ch = e.getChannel();          String msg = "Hi, Server.";          ch.write(msg);      }        @Override      public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e)              throws Exception {          System.out.println("######writeComplete");      }        @Override      public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {          System.out.println("######messageReceived");                    String msg = (String)e.getMessage();          System.out.println("The message gotten from server is : " + msg);                    ChannelFuture channelFuture = e.getChannel().close();          channelFuture.addListener(ChannelFutureListener.CLOSE);      }        @Override      public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {          e.getCause().printStackTrace();          Channel ch = e.getChannel();          ch.close();      }  }   [java] view plain copy package com.sean.client.handler;    import org.jboss.netty.buffer.ChannelBuffer;  import org.jboss.netty.channel.Channel;  import org.jboss.netty.channel.ChannelHandlerContext;  import org.jboss.netty.handler.codec.string.StringDecoder;  import org.jboss.netty.util.CharsetUtil;    public class ClientReadDecoder extends StringDecoder{        @Override      protected Object decode(ChannelHandlerContext ctx, Channel channel,              Object msg) throws Exception {          System.out.println("######ClientReadDecoder");                    byte[] buffer = ((ChannelBuffer)msg).array();          byte last = buffer[buffer.length - 1];          // 46 is '.'          if(last == 46)              return new String(buffer, CharsetUtil.UTF_8);          return null;      }  }   [java] view plain copy package com.sean.client.handler;    import org.jboss.netty.buffer.ChannelBuffer;  import org.jboss.netty.buffer.ChannelBuffers;  import org.jboss.netty.channel.Channel;  import org.jboss.netty.channel.ChannelHandlerContext;  import org.jboss.netty.handler.codec.string.StringEncoder;  import org.jboss.netty.util.CharsetUtil;    public class ClientWriteEncoder extends StringEncoder{        @Override      protected Object encode(ChannelHandlerContext ctx, Channel channel,              Object msg) throws Exception {          System.out.println("######ClientWriteEncoder");                    String str = (String)msg;          ChannelBuffer channelBuffer =                   ChannelBuffers.copiedBuffer(str, CharsetUtil.UTF_8);          return channelBuffer;      }  }  

    运行结果如下:

    Server端后台日志:

    [plain] view plain copy Server is started...  1257526899  ######channelConnected  [id: 0x88120865, /127.0.0.1:58887 => /127.0.0.1:8000]  ######ServerReadDecoder  ######messageReceived  The message sent by client is : Hi, Server.  ######ServerWriteEncoder  The message has sent to client.  Server is stopped.  

    Client端后台日志:

    [plain] view plain copy 1767582956  Client is started...  ######channelConnected  ######ClientWriteEncoder  ######writeComplete  ######ClientReadDecoder  ######messageReceived  The message gotten from server is : Hi, Client. 
    转载请注明原文地址: https://ju.6miu.com/read-8246.html

    最新回复(0)