大家都知道在Http中有长连接这个说话,但是长连接,指的是在一次TCP连接中完成多个HTTP请求,也就是说我们的每个请求仍然存在header,可不要小看这些header,因为在大多数的情况下header都是重复的基本都是没有什么作用,这就是导致了HTTP协议就会有很大的开销。对于我们的推送一般是采用poling或者Comet。什么是polling呢?也就是我们所说的长轮询,是指客户端不断主动的向服务器发送HTTP请求查询是否有新数据。这个模式有很大缺点,也是服务器和客户端在通信的过程中需要交换很多HTTP header,效率很低,这就让HTTP逐渐远离了低延迟应用的开发。所以H5就出现了,引进了全新技术,WebSocket!!!
WebSocket将网络套接字引入到客户端和服务端,浏览器和服务器之间可以通过套接字建立持久的链接,可以达到半双工。
WebSocket是HTML5开始提供的一种浏览器和服务器间进行全双工通信的网络技术,WebSocket API被W3C定为标准。 简单来说WebSocket中,客户端只需要通过第一个HTTP request和服务器端建立连接,后面交换数据的时候都不需要再发HTTP request了,使得这个长连接变成了一个真正的长连接。WebSocket基于TCP双向全双工进行消息传递,在同一时刻,既可以发送消息,也可以接受消息。当然不发送HTTP header,就导致我们不能利用原有的HTTP协议,就会导致我们升级才能实现。在Websocket中为了防止连接被GG,也会引用Ping/Pong来保持链路激活。 首先我们可以看一下一个典型的握手HTTP request是怎么样的:
GET /chat HTTP/1.1 Host: server.example.com Upgrade : websocket Connection: Upgrade Sec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw== Sec-WebSocket-Protocol: chat, superchat Sec-WebSocket-Version: 13 Origin: http://example.com在上面的头中:
Upgrade : websocket Connection: Upgrade表明了这是一个Websocket协议的请求
什么是Sec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw== ? 这是一个Base64 encode的值,这个是浏览器随机生成的,告诉服务器,需要让他验证服务器是否是真的webSocket服务器,然后服务器端会用这个数据构造一个SHA-1的信息摘要,”Sec-WebSocket-Key”加上一个魔幻字符串。然后进行BASE-64编码。也就是返回我们下面的东西:
HTTP/1.1 101 Switching Protocols Upgrade: websocket Connection: Upgrade Sec-WebSocket-Accept: HSmrc0sMlYUkAGmm5OPpG2HaGWk= Sec-WebSocket-Protocol: chat所有的讲解都会在注释里面,比较重要可能会提取出来:
package websocket; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.stream.ChunkedWriteHandler; /** * Created by lz on 2016/8/12. */ public class WebSocketServer { public void run(int port) throws Exception{ //创建两组线程,监听连接和工作 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try{ //Netty用于启动Nio服务端的启动类 ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup,workerGroup) //注册NioServerSocketChannel .channel(NioServerSocketChannel.class) //注册处理器 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); //用于Http请求的编码或者解码 pipeline.addLast("http-codec", new HttpServerCodec()); //把Http消息组成完整地HTTP消息 pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); //向客户端发送HTML5文件 pipeline.addLast("http-chunked", new ChunkedWriteHandler()); //实际处理的Handler pipeline.addLast("handler", new WebSocketServerHandler()); } }); Channel ch = b.bind(port).sync().channel(); System.out.println("Web socket server started at port " + port + '.'); System.out.println("Open your browser and navigate to http://localhost:" + port + '/'); ch.closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; new WebSocketServer().run(port); } }真正的处理我们逻辑的Handler:
package websocket; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.handler.codec.http.*; import io.netty.handler.codec.http.websocketx.*; import io.netty.util.CharsetUtil; import java.util.Date; import java.util.logging.Level; import java.util.logging.Logger; import static io.netty.handler.codec.http.HttpHeaders.isKeepAlive; import static io.netty.handler.codec.http.HttpHeaders.setContentLength; /** * Created by lz on 2016/8/12. */ public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> { private static final Logger logger = Logger.getLogger(WebSocketServerHandler.class.getName()); private WebSocketServerHandshaker handshaker; @Override protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception { //传统的HTTP接入 if (msg instanceof FullHttpRequest){ handleHttpRequest(ctx,(FullHttpRequest)msg); }//WebSocket接入 else if (msg instanceof WebSocketFrame){ handleWebSocketFrame(ctx,(WebSocketFrame)msg); } } private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) { //判断是否关闭链路指令 if (frame instanceof CloseWebSocketFrame){ handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); return; } //判断是否是Ping消息 if (frame instanceof PingWebSocketFrame){ ctx.channel().write(new PongWebSocketFrame(frame.content()).retain()); return; } //本例程仅支持文本信息,不支持二进制消息 if (!(frame instanceof TextWebSocketFrame)){ throw new UnsupportedOperationException(String.format("%s frame types not supported",frame.getClass().getName())); } //返回应答消息 String request = ((TextWebSocketFrame) frame).text(); if (logger.isLoggable(Level.FINE)){ logger.fine(String.format("%s received %s",ctx.channel(),request)); } ctx.channel().write(new TextWebSocketFrame(request+"欢迎使用Netty WebSocket服务,现在时刻:" + new Date().toString())); } private static void sendHttpResponse(ChannelHandlerContext ctx,FullHttpRequest req,FullHttpResponse res){ //返回应答给客户端 if (res.getStatus().code() != 200){ ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8); res.content().writeBytes(buf); buf.release(); setContentLength(res,res.content().readableBytes()); } //如果是非Keep-Alive,关闭连接 ChannelFuture f = ctx.channel().writeAndFlush(res); if (!isKeepAlive(req) || res.getStatus().code() != 200){ f.addListener(ChannelFutureListener.CLOSE); } } private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception{ //如果HTTP解码失败,返回HTTP异常 if (!req.getDecoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))){ sendHttpResponse(ctx,req,new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)); return; } //构造握手响应返回,本机测试 WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://localhost:8080/websocket",null,false); handshaker = wsFactory.newHandshaker(req); if (handshaker == null){ WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel()); }else { //把握手消息返回给客户端 handshaker.handshake(ctx.channel(),req); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }