Connector的作用是什么?它主要实现了什么功能?
tomcat中Catalina包括Connector和Container,Connector主要功能是接收请求、将请求交给container进行处理。下面看一下Connector的主要功能。
调用open方法,创建socket。
private ServerSocket open() throws IOException, KeyStoreException, NoSuchAlgorithmException, CertificateException, UnrecoverableKeyException, KeyManagementException { // Acquire the server socket factory for this Connector ServerSocketFactory factory = getFactory(); // If no address is specified, open a connection on all addresses if (address == null) { // "HttpConnector Opening server socket on all host IP addresses" log(sm.getString("httpConnector.allAddresses")); try { return (factory.createSocket(port, acceptCount)); } catch (BindException be) { throw new BindException(be.getMessage() + ":" + port); } } // Open a server socket on the specified address try { InetAddress is = InetAddress.getByName(address); log(sm.getString("httpConnector.anAddress", address)); try { return (factory.createSocket(port, acceptCount, is)); } catch (BindException be) { throw new BindException(be.getMessage() + ":" + address + ":" + port); } } catch (Exception e) { log(sm.getString("httpConnector.noAddress", address)); try { return (factory.createSocket(port, acceptCount)); } catch (BindException be) { throw new BindException(be.getMessage() + ":" + port); } } }主要操作是: 获取socket
socket = serverSocket.accept();创建processor,将socket交给processor处理。
// Hand this socket off to an appropriate processor //如果得到processor为null,直接关闭该连接,忽略该请求 //用一个堆栈保存所有的processor,每次需要时,从堆栈中取最顶端的processor。且控制堆栈的大小在一定范围内。 HttpProcessor processor = createProcessor(); if (processor == null) { try { log(sm.getString("httpConnector.noProcessor")); socket.close(); } catch (IOException e) { ; } continue; } // if (debug >= 3) // log("run: Assigning socket to processor " + processor); processor.assign(socket); // The processor will recycle itself when it finishes }获取一个processor。用Stack processors保存所有的processor。
private HttpProcessor createProcessor() { synchronized (processors) { if (processors.size() > 0) { // if (debug >= 2) // log("createProcessor: Reusing existing processor"); return ((HttpProcessor) processors.pop()); } if ((maxProcessors > 0) && (curProcessors < maxProcessors)) { // if (debug >= 2) // log("createProcessor: Creating new processor"); return (newProcessor()); } else { if (maxProcessors < 0) { // if (debug >= 2) // log("createProcessor: Creating new processor"); return (newProcessor()); } else { // if (debug >= 2) // log("createProcessor: Cannot create new processor"); return (null); } } } }run()方法,获取socket,并处理。
public void run() { // Process requests until we receive a shutdown signal while (!stopped) { // Wait for the next socket to be assigned Socket socket = await(); if (socket == null) continue; // Process the request from this socket try { process(socket); } catch (Throwable t) { log("process.invoke", t); } // Finish up this request connector.recycle(this); } // Tell threadStop() we have shut ourselves down successfully synchronized (threadSync) { threadSync.notifyAll(); } }run()调用process(),process()主要逻辑是,由socket获取input,创建request(获取sessionid等)和response,调用connector的container.invoke()方法,将request和response交给container来处理。
boolean ok = true; boolean finishResponse = true; SocketInputStream input = null; OutputStream output = null; // Construct and initialize the objects we will need try { input = new SocketInputStream(socket.getInputStream(), connector.getBufferSize()); } catch (Exception e) { log("process.create", e); ok = false; } keepAlive = true; while (!stopped && ok && keepAlive) { finishResponse = true; try { request.setStream(input); request.setResponse(response); output = socket.getOutputStream(); response.setStream(output); response.setRequest(request); ((HttpServletResponse) response.getResponse()).setHeader ("Server", SERVER_INFO); } catch (Exception e) { log("process.create", e); ok = false; } // Parse the incoming request try { if (ok) { parseConnection(socket); parseRequest(input, output); if (!request.getRequest().getProtocol() .startsWith("HTTP/0")) parseHeaders(input); if (http11) { // Sending a request acknowledge back to the client if // requested. ackRequest(output); // If the protocol is HTTP/1.1, chunking is allowed. if (connector.isChunkingAllowed()) response.setAllowChunking(true); } } } catch (EOFException e) { // It's very likely to be a socket disconnect on either the // client or the server ok = false; finishResponse = false; } catch (ServletException e) { ok = false; try { ((HttpServletResponse) response.getResponse()) .sendError(HttpServletResponse.SC_BAD_REQUEST); } catch (Exception f) { ; } } catch (InterruptedIOException e) { if (debug > 1) { try { log("process.parse", e); ((HttpServletResponse) response.getResponse()) .sendError(HttpServletResponse.SC_BAD_REQUEST); } catch (Exception f) { ; } } ok = false; } catch (Exception e) { try { log("process.parse", e); ((HttpServletResponse) response.getResponse()).sendError (HttpServletResponse.SC_BAD_REQUEST); } catch (Exception f) { ; } ok = false; } // Ask our Container to process this request try { ((HttpServletResponse) response).setHeader ("Date", FastHttpDateFormat.getCurrentDate()); if (ok) { connector.getContainer().invoke(request, response); } } catch (ServletException e) { ... } ... }connector创建processor后,调用其await()方法,通知其处理某个socket。
synchronized void assign(Socket socket) { // Wait for the Processor to get the previous Socket //利用available变量通知processor它被调用了 while (available) { //当前processor正在处理前一个socket中,所以要等待 try { wait(); } catch (InterruptedException e) { } } // Store the newly available Socket and notify our thread this.socket = socket; available = true; notifyAll();// if ((debug >= 1) && (socket != null)) log(" An incoming request is being assigned"); }Connector处理请求的主要流程是:
1.connector获取socket,通过一个堆栈stack创建processor,通过processor.await(socket)交给processor
2.processor被创建时,就调用了processor.start()方法,所以如run()中所示,processor创建后就一直检查available变量(标识processor有没有被调用),并获取socket,然后进行处理。processor的处理,就是获取socket后,得到input等,创建request和response,传给connector.container进行处理。