1. 程式人生 > >Tomcat原始碼分析 (八)----- HTTP請求處理過程(一)

Tomcat原始碼分析 (八)----- HTTP請求處理過程(一)

終於進行到Connector的分析階段了,這也是Tomcat裡面最複雜的一塊功能了。Connector中文名為聯結器,既然是聯結器,它肯定會連線某些東西,連線些什麼呢?

Connector用於接受請求並將請求封裝成Request和Response,然後交給Container進行處理,Container處理完之後再交給Connector返回給客戶端。

要理解Connector,我們需要問自己4個問題。

  • (1)Connector如何接受請求的?
  • (2)如何將請求封裝成Request和Response的?
  • (3)封裝完之後的Request和Response如何交給Container
    進行處理的?
  • (4)Container處理完之後如何交給Connector並返回給客戶端的?

先來一張Connector的整體結構圖

【注意】:不同的協議、不同的通訊方式,ProtocolHandler會有不同的實現。在Tomcat8.5中,ProtocolHandler的類繼承層級如下圖所示。

 

針對上述的類繼承層級圖,我們做如下說明:

  1. ajp和http11是兩種不同的協議
  2. nio、nio2和apr是不同的通訊方式
  3. 協議和通訊方式可以相互組合。

ProtocolHandler包含三個部件:EndpointProcessorAdapter

  1. Endpoint用來處理底層Socket的網路連線,Processor用於將Endpoint接收到的Socket封裝成Request,Adapter用於將Request交給Container進行具體的處理。
  2. Endpoint由於是處理底層的Socket網路連線,因此Endpoint是用來實現TCP/IP協議的,而Processor用來實現HTTP協議的,Adapter將請求適配到Servlet容器進行具體的處理。
  3. Endpoint的抽象實現類AbstractEndpoint裡面定義了AcceptorAsyncTimeout兩個內部類和一個Handler介面Acceptor用於監聽請求,AsyncTimeout
    用於檢查非同步Request的超時,Handler用於處理接收到的Socket,在內部呼叫Processor進行處理。

至此,我們已經明白了問題(1)、(2)和(3)。至於(4),當我們瞭解了Container自然就明白了,前面章節內容已經詳細分析過了。

Connector原始碼分析入口

 我們在Service標準實現StandardService的原始碼中發現,其init()start()stop()destroy()方法分別會對Connectors的同名方法進行呼叫。而一個Service對應著多個Connector

Service.init()

@Override
protected void initInternal() throws LifecycleException {
    super.initInternal();

    if (engine != null) {
        engine.init();
    }

    // Initialize any Executors
    for (Executor executor : findExecutors()) {
        if (executor instanceof JmxEnabled) {
            ((JmxEnabled) executor).setDomain(getDomain());
        }
        executor.init();
    }

    // Initialize mapper listener
    mapperListener.init();

    // Initialize our defined Connectors
    synchronized (connectorsLock) {
        for (Connector connector : connectors) {
            try {
                connector.init();
            } catch (Exception e) {
                String message = sm.getString(
                        "standardService.connector.initFailed", connector);
                log.error(message, e);

                if (Boolean.getBoolean("org.apache.catalina.startup.EXIT_ON_INIT_FAILURE"))
                    throw new LifecycleException(message);
            }
        }
    }
}

Service.start()

@Override
protected void startInternal() throws LifecycleException {
    if(log.isInfoEnabled())
        log.info(sm.getString("standardService.start.name", this.name));
    setState(LifecycleState.STARTING);

    // Start our defined Container first
    if (engine != null) {
        synchronized (engine) {
            engine.start();
        }
    }

    synchronized (executors) {
        for (Executor executor: executors) {
            executor.start();
        }
    }

    mapperListener.start();

    // Start our defined Connectors second
    synchronized (connectorsLock) {
        for (Connector connector: connectors) {
            try {
                // If it has already failed, don't try and start it
                if (connector.getState() != LifecycleState.FAILED) {
                    connector.start();
                }
            } catch (Exception e) {
                log.error(sm.getString(
                        "standardService.connector.startFailed",
                        connector), e);
            }
        }
    }
}

我們知道Connector實現了Lifecycle介面,所以它是一個生命週期元件。所以Connector的啟動邏輯入口在於init()start()

Connector構造方法

在分析之前,我們看看server.xml,該檔案已經體現出了tomcat中各個元件的大體結構。

<?xml version='1.0' encoding='utf-8'?>
<Server port="8005" shutdown="SHUTDOWN">
  <Listener className="org.apache.catalina.startup.VersionLoggerListener" />
  <Listener className="org.apache.catalina.core.AprLifecycleListener" SSLEngine="on" />
  <Listener className="org.apache.catalina.core.JreMemoryLeakPreventionListener" />
  <Listener className="org.apache.catalina.mbeans.GlobalResourcesLifecycleListener" />
  <Listener className="org.apache.catalina.core.ThreadLocalLeakPreventionListener" />

  <GlobalNamingResources>
    <Resource name="UserDatabase" auth="Container"
              type="org.apache.catalina.UserDatabase"
              description="User database that can be updated and saved"
              factory="org.apache.catalina.users.MemoryUserDatabaseFactory"
              pathname="conf/tomcat-users.xml" />
  </GlobalNamingResources>

  <Service name="Catalina">
    <Connector port="8080" protocol="HTTP/1.1" connectionTimeout="20000" redirectPort="8443" />
    <Connector port="8009" protocol="AJP/1.3" redirectPort="8443" />

    <Engine name="Catalina" defaultHost="localhost">
      <Realm className="org.apache.catalina.realm.LockOutRealm">
        <Realm className="org.apache.catalina.realm.UserDatabaseRealm"
               resourceName="UserDatabase"/>
      </Realm>

      <Host name="localhost"  appBase="webapps"
            unpackWARs="true" autoDeploy="true">
        <Valve className="org.apache.catalina.valves.AccessLogValve" directory="logs"
               prefix="localhost_access_log" suffix=".txt"
               pattern="%h %l %u %t &quot;%r&quot; %s %b" />
      </Host>
    </Engine>
  </Service>
</Server>

在這個檔案中,我們看到一個Connector有幾個關鍵屬性,portprotocol是其中的兩個。server.xml預設支援兩種協議:HTTP/1.1AJP/1.3。其中HTTP/1.1用於支援http1.1協議,而AJP/1.3用於支援對apache伺服器的通訊。

接下來我們看看構造方法。

public Connector() {
    this(null); // 1. 無參構造方法,傳入引數為空協議,會預設使用`HTTP/1.1`
}

public Connector(String protocol) {
    setProtocol(protocol);
    // Instantiate protocol handler
    // 5. 使用protocolHandler的類名構造ProtocolHandler的例項
    ProtocolHandler p = null;
    try {
        Class<?> clazz = Class.forName(protocolHandlerClassName);
        p = (ProtocolHandler) clazz.getConstructor().newInstance();
    } catch (Exception e) {
        log.error(sm.getString(
                "coyoteConnector.protocolHandlerInstantiationFailed"), e);
    } finally {
        this.protocolHandler = p;
    }

    if (Globals.STRICT_SERVLET_COMPLIANCE) {
        uriCharset = StandardCharsets.ISO_8859_1;
    } else {
        uriCharset = StandardCharsets.UTF_8;
    }
}

@Deprecated
public void setProtocol(String protocol) {
    boolean aprConnector = AprLifecycleListener.isAprAvailable() &&
            AprLifecycleListener.getUseAprConnector();

    // 2. `HTTP/1.1`或`null`,protocolHandler使用`org.apache.coyote.http11.Http11NioProtocol`,不考慮apr
    if ("HTTP/1.1".equals(protocol) || protocol == null) {
        if (aprConnector) {
            setProtocolHandlerClassName("org.apache.coyote.http11.Http11AprProtocol");
        } else {
            setProtocolHandlerClassName("org.apache.coyote.http11.Http11NioProtocol");
        }
    }
    // 3. `AJP/1.3`,protocolHandler使用`org.apache.coyote.ajp.AjpNioProtocol`,不考慮apr
    else if ("AJP/1.3".equals(protocol)) {
        if (aprConnector) {
            setProtocolHandlerClassName("org.apache.coyote.ajp.AjpAprProtocol");
        } else {
            setProtocolHandlerClassName("org.apache.coyote.ajp.AjpNioProtocol");
        }
    }
    // 4. 其他情況,使用傳入的protocol作為protocolHandler的類名
    else {
        setProtocolHandlerClassName(protocol);
    }
}

從上面的程式碼我們看到構造方法主要做了下面幾件事情:

  1. 無參構造方法,傳入引數為空協議,會預設使用HTTP/1.1
  2. HTTP/1.1null,protocolHandler使用org.apache.coyote.http11.Http11NioProtocol,不考慮apr
  3. AJP/1.3,protocolHandler使用org.apache.coyote.ajp.AjpNioProtocol,不考慮apr
  4. 其他情況,使用傳入的protocol作為protocolHandler的類名
  5. 使用protocolHandler的類名構造ProtocolHandler的例項

Connector.init()

@Override
protected void initInternal() throws LifecycleException {
    super.initInternal();

    // Initialize adapter
    // 1. 初始化adapter
    adapter = new CoyoteAdapter(this);
    protocolHandler.setAdapter(adapter);

    // Make sure parseBodyMethodsSet has a default
    // 2. 設定接受body的method列表,預設為POST
    if (null == parseBodyMethodsSet) {
        setParseBodyMethods(getParseBodyMethods());
    }

    if (protocolHandler.isAprRequired() && !AprLifecycleListener.isAprAvailable()) {
        throw new LifecycleException(sm.getString("coyoteConnector.protocolHandlerNoApr",
                getProtocolHandlerClassName()));
    }
    if (AprLifecycleListener.isAprAvailable() && AprLifecycleListener.getUseOpenSSL() &&
            protocolHandler instanceof AbstractHttp11JsseProtocol) {
        AbstractHttp11JsseProtocol<?> jsseProtocolHandler =
                (AbstractHttp11JsseProtocol<?>) protocolHandler;
        if (jsseProtocolHandler.isSSLEnabled() &&
                jsseProtocolHandler.getSslImplementationName() == null) {
            // OpenSSL is compatible with the JSSE configuration, so use it if APR is available
            jsseProtocolHandler.setSslImplementationName(OpenSSLImplementation.class.getName());
        }
    }

    // 3. 初始化protocolHandler
    try {
        protocolHandler.init();
    } catch (Exception e) {
        throw new LifecycleException(
                sm.getString("coyoteConnector.protocolHandlerInitializationFailed"), e);
    }
}

init()方法做了3件事情

  1. 初始化adapter
  2. 設定接受body的method列表,預設為POST
  3. 初始化protocolHandler

ProtocolHandler類繼承層級我們知道ProtocolHandler的子類都必須實現AbstractProtocol抽象類,而protocolHandler.init();的邏輯程式碼正是在這個抽象類裡面。我們來分析一下。

@Override
public void init() throws Exception {
    if (getLog().isInfoEnabled()) {
        getLog().info(sm.getString("abstractProtocolHandler.init", getName()));
    }

    if (oname == null) {
        // Component not pre-registered so register it
        oname = createObjectName();
        if (oname != null) {
            Registry.getRegistry(null, null).registerComponent(this, oname, null);
        }
    }

    if (this.domain != null) {
        rgOname = new ObjectName(domain + ":type=GlobalRequestProcessor,name=" + getName());
        Registry.getRegistry(null, null).registerComponent(
                getHandler().getGlobal(), rgOname, null);
    }

    // 1. 設定endpoint的名字,預設為:http-nio-{port}
    String endpointName = getName();
    endpoint.setName(endpointName.substring(1, endpointName.length()-1));
    endpoint.setDomain(domain);
    
    // 2. 初始化endpoint
    endpoint.init();
}

我們接著分析一下Endpoint.init()裡面又做了什麼。該方法位於AbstactEndpoint抽象類,該類是基於模板方法模式實現的,主要呼叫了子類的bind()方法。

public abstract void bind() throws Exception;
public abstract void unbind() throws Exception;
public abstract void startInternal() throws Exception;
public abstract void stopInternal() throws Exception;

public void init() throws Exception {
    // 執行bind()方法
    if (bindOnInit) {
        bind();
        bindState = BindState.BOUND_ON_INIT;
    }
    if (this.domain != null) {
        // Register endpoint (as ThreadPool - historical name)
        oname = new ObjectName(domain + ":type=ThreadPool,name=\"" + getName() + "\"");
        Registry.getRegistry(null, null).registerComponent(this, oname, null);

        ObjectName socketPropertiesOname = new ObjectName(domain +
                ":type=ThreadPool,name=\"" + getName() + "\",subType=SocketProperties");
        socketProperties.setObjectName(socketPropertiesOname);
        Registry.getRegistry(null, null).registerComponent(socketProperties, socketPropertiesOname, null);

        for (SSLHostConfig sslHostConfig : findSslHostConfigs()) {
            registerJmx(sslHostConfig);
        }
    }
}

繼續分析bind()方法,我們終於看到了我們想要看的東西了。關鍵的程式碼在於serverSock.socket().bind(addr,getAcceptCount());,用於繫結ServerSocket到指定的IP和埠。

@Override
public void bind() throws Exception {

    if (!getUseInheritedChannel()) {
        serverSock = ServerSocketChannel.open();
        socketProperties.setProperties(serverSock.socket());
        InetSocketAddress addr = (getAddress()!=null?new InetSocketAddress(getAddress(),getPort()):new InetSocketAddress(getPort()));
        //繫結ServerSocket到指定的IP和埠
        serverSock.socket().bind(addr,getAcceptCount());
    } else {
        // Retrieve the channel provided by the OS
        Channel ic = System.inheritedChannel();
        if (ic instanceof ServerSocketChannel) {
            serverSock = (ServerSocketChannel) ic;
        }
        if (serverSock == null) {
            throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited"));
        }
    }

    serverSock.configureBlocking(true); //mimic APR behavior

    // Initialize thread count defaults for acceptor, poller
    if (acceptorThreadCount == 0) {
        // FIXME: Doesn't seem to work that well with multiple accept threads
        acceptorThreadCount = 1;
    }
    if (pollerThreadCount <= 0) {
        //minimum one poller thread
        pollerThreadCount = 1;
    }
    setStopLatch(new CountDownLatch(pollerThreadCount));

    // Initialize SSL if needed
    initialiseSsl();

    selectorPool.open();
}

好了,我們已經分析完了init()方法,接下來我們分析start()方法。關鍵程式碼就一行,呼叫ProtocolHandler.start()方法。

Connector.start()

@Override
protected void startInternal() throws LifecycleException {

    // Validate settings before starting
    if (getPort() < 0) {
        throw new LifecycleException(sm.getString(
                "coyoteConnector.invalidPort", Integer.valueOf(getPort())));
    }

    setState(LifecycleState.STARTING);

    try {
        protocolHandler.start();
    } catch (Exception e) {
        throw new LifecycleException(
                sm.getString("coyoteConnector.protocolHandlerStartFailed"), e);
    }
}

我們深入ProtocolHandler.start()方法。

  1. 呼叫Endpoint.start()方法
  2. 開啟非同步超時執行緒,執行緒執行單元為Asynctimeout
@Override
public void start() throws Exception {
    if (getLog().isInfoEnabled()) {
        getLog().info(sm.getString("abstractProtocolHandler.start", getName()));
    }

    // 1. 呼叫`Endpoint.start()`方法
    endpoint.start();

    // Start async timeout thread
    // 2. 開啟非同步超時執行緒,執行緒執行單元為`Asynctimeout`
    asyncTimeout = new AsyncTimeout();
    Thread timeoutThread = new Thread(asyncTimeout, getNameInternal() + "-AsyncTimeout");
    int priority = endpoint.getThreadPriority();
    if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
        priority = Thread.NORM_PRIORITY;
    }
    timeoutThread.setPriority(priority);
    timeoutThread.setDaemon(true);
    timeoutThread.start();
}

這兒我們重點關注Endpoint.start()方法

public final void start() throws Exception {
    // 1. `bind()`已經在`init()`中分析過了
    if (bindState == BindState.UNBOUND) {
        bind();
        bindState = BindState.BOUND_ON_START;
    }
    startInternal();
}

@Override
public void startInternal() throws Exception {
    if (!running) {
        running = true;
        paused = false;

        processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                socketProperties.getProcessorCache());
        eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                        socketProperties.getEventCache());
        nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                socketProperties.getBufferPool());

        // Create worker collection
        // 2. 建立工作者執行緒池
        if ( getExecutor() == null ) {
            createExecutor();
        }
        
        // 3. 初始化連線latch,用於限制請求的併發量
        initializeConnectionLatch();

        // Start poller threads
        // 4. 開啟poller執行緒。poller用於對接受者執行緒生產的訊息(或事件)進行處理,poller最終呼叫的是Handler的程式碼
        pollers = new Poller[getPollerThreadCount()];
        for (int i=0; i<pollers.length; i++) {
            pollers[i] = new Poller();
            Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
            pollerThread.setPriority(threadPriority);
            pollerThread.setDaemon(true);
            pollerThread.start();
        }
        // 5. 開啟acceptor執行緒
        startAcceptorThreads();
    }
}

protected final void startAcceptorThreads() {
    int count = getAcceptorThreadCount();
    acceptors = new Acceptor[count];

    for (int i = 0; i < count; i++) {
        acceptors[i] = createAcceptor();
        String threadName = getName() + "-Acceptor-" + i;
        acceptors[i].setThreadName(threadName);
        Thread t = new Thread(acceptors[i], threadName);
        t.setPriority(getAcceptorThreadPriority());
        t.setDaemon(getDaemon());
        t.start();
    }
}
  1. bind()已經在init()中分析過了
  2. 建立工作者執行緒池
  3. 初始化連線latch,用於限制請求的併發量
  4. 建立輪詢Poller執行緒。poller用於對接受者執行緒生產的訊息(或事件)進行處理,poller最終呼叫的是Handler的程式碼
  5. 建立Acceptor執行緒

Connector請求邏輯

分析完了Connector的啟動邏輯之後,我們就需要進一步分析一下http的請求邏輯,當請求從客戶端發起之後,需要經過哪些操作才能真正地得到執行?

Acceptor

Acceptor執行緒主要用於監聽套接字,將已連線套接字轉給Poller執行緒。Acceptor執行緒數由AbstracEndPoint的acceptorThreadCount成員變數控制,預設值為1

AbstractEndpoint.Acceptor是AbstractEndpoint類的靜態抽象類,實現了Runnable介面,部分程式碼如下:
public abstract static class Acceptor implements Runnable {
    public enum AcceptorState {
        NEW, RUNNING, PAUSED, ENDED
    }

    protected volatile AcceptorState state = AcceptorState.NEW;
    public final AcceptorState getState() {
        return state;
    }

    private String threadName;
    protected final void setThreadName(final String threadName) {
        this.threadName = threadName;
    }
    protected final String getThreadName() {
        return threadName;
    }
}

NioEndpoint的Acceptor成員內部類繼承了AbstractEndpoint.Acceptor:

protected class Acceptor extends AbstractEndpoint.Acceptor {
    @Override
    public void run() {
        int errorDelay = 0;

        // Loop until we receive a shutdown command
        while (running) {

            // Loop if endpoint is paused
            // 1. 執行過程中,如果`Endpoint`暫停了,則`Acceptor`進行自旋(間隔50毫秒) `       
            while (paused && running) {
                state = AcceptorState.PAUSED;
                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    // Ignore
                }
            }
            // 2. 如果`Endpoint`終止運行了,則`Acceptor`也會終止
            if (!running) {
                break;
            }
            state = AcceptorState.RUNNING;

            try {
                //if we have reached max connections, wait
                // 3. 如果請求達到了最大連線數,則wait直到連線數降下來
                countUpOrAwaitConnection();

                SocketChannel socket = null;
                try {
                    // Accept the next incoming connection from the server
                    // socket
                    // 4. 接受下一次連線的socket
                    socket = serverSock.accept();
                } catch (IOException ioe) {
                    // We didn't get a socket
                    countDownConnection();
                    if (running) {
                        // Introduce delay if necessary
                        errorDelay = handleExceptionWithDelay(errorDelay);
                        // re-throw
                        throw ioe;
                    } else {
                        break;
                    }
                }
                // Successful accept, reset the error delay
                errorDelay = 0;

                // Configure the socket
                if (running && !paused) {
                    // setSocketOptions() will hand the socket off to
                    // an appropriate processor if successful
                    // 5. `setSocketOptions()`這兒是關鍵,會將socket以事件的方式傳遞給poller
                    if (!setSocketOptions(socket)) {
                        closeSocket(socket);
                    }
                } else {
                    closeSocket(socket);
                }
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                log.error(sm.getString("endpoint.accept.fail"), t);
            }
        }
        state = AcceptorState.ENDED;
    }
}

從以上程式碼可以看到:

  • countUpOrAwaitConnection函式檢查當前最大連線數,若未達到maxConnections則加一,否則等待;
  • socket = serverSock.accept()這一行中的serverSock正是NioEndpoint的bind函式中開啟的ServerSocketChannel。為了引用這個變數,NioEndpoint的Acceptor類是成員而不再是靜態類;
  • setSocketOptions函式呼叫上的註釋表明該函式將已連線套接字交給Poller執行緒處理。

setSocketOptions方法接著處理已連線套接字:

protected boolean setSocketOptions(SocketChannel socket) {
    // Process the connection
    try {
        //disable blocking, APR style, we are gonna be polling it
        socket.configureBlocking(false);
        Socket sock = socket.socket();
        socketProperties.setProperties(sock);

        NioChannel channel = nioChannels.pop();
        if (channel == null) {
            SocketBufferHandler bufhandler = new SocketBufferHandler(
                    socketProperties.getAppReadBufSize(),
                    socketProperties.getAppWriteBufSize(),
                    socketProperties.getDirectBuffer());
            if (isSSLEnabled()) {
                channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
            } else {
                channel = new NioChannel(socket, bufhandler);
            }
        } else {
            channel.setIOChannel(socket);
            channel.reset();
        }
        // 將channel註冊到poller,注意關鍵的兩個方法,`getPoller0()`和`Poller.register()`
        getPoller0().register(channel);
    } catch (Throwable t) {
        ExceptionUtils.handleThrowable(t);
        try {
            log.error("",t);
        } catch (Throwable tt) {
            ExceptionUtils.handleThrowable(tt);
        }
        // Tell to close the socket
        return false;
    }
    return true;
}
  • 從NioChannel棧中出棧一個,若能重用(即不為null)則重用物件,否則新建一個NioChannel物件;
  • getPoller0方法利用輪轉法選擇一個Poller執行緒,利用Poller類的register方法將上述NioChannel物件註冊到該Poller執行緒上;
  • 若成功轉給Poller執行緒該函式返回true,否則返回false。返回false後,Acceptor類的closeSocket函式會關閉通道和底層Socket連線並將當前最大連線數減一。

Poller

Poller執行緒主要用於以較少的資源輪詢已連線套接字以保持連線,當資料可用時轉給工作執行緒。

Poller執行緒數由NioEndPoint的pollerThreadCount成員變數控制,預設值為2與可用處理器數二者之間的較小值。
Poller實現了Runnable介面,可以看到建構函式為每個Poller打開了一個新的Selector。

public class Poller implements Runnable {
    private Selector selector;
    private final SynchronizedQueue<PollerEvent> events =
            new SynchronizedQueue<>();
    // 省略一些程式碼
    public Poller() throws IOException {
        this.selector = Selector.open();
    }

    public Selector getSelector() { return selector;}
    // 省略一些程式碼
}

將channel註冊到poller,注意關鍵的兩個方法,getPoller0()Poller.register()。先來分析一下getPoller0(),該方法比較關鍵的一個地方就是以取模的方式對poller數量進行輪詢獲取。

/**
 * The socket poller.
 */
private Poller[] pollers = null;
private AtomicInteger pollerRotater = new AtomicInteger(0);
/**
 * Return an available poller in true round robin fashion.
 *
 * @return The next poller in sequence
 */
public Poller getPoller0() {
    int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
    return pollers[idx];
}

接下來我們分析一下Poller.register()方法。因為Poller維持了一個events同步佇列,所以Acceptor接受到的channel會放在這個佇列裡面,放置的程式碼為events.offer(event);

public class Poller implements Runnable {

    private final SynchronizedQueue<PollerEvent> events = new SynchronizedQueue<>();

    /**
     * Registers a newly created socket with the poller.
     *
     * @param socket    The newly created socket
     */
    public void register(final NioChannel socket) {
        socket.setPoller(this);
        NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
        socket.setSocketWrapper(ka);
        ka.setPoller(this);
        ka.setReadTimeout(getSocketProperties().getSoTimeout());
        ka.setWriteTimeout(getSocketProperties().getSoTimeout());
        ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
        ka.setSecure(isSSLEnabled());
        ka.setReadTimeout(getConnectionTimeout());
        ka.setWriteTimeout(getConnectionTimeout());
        PollerEvent r = eventCache.pop();
        ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
        if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
        else r.reset(socket,ka,OP_REGISTER);
        addEvent(r);
    }

    private void addEvent(PollerEvent event) {
        events.offer(event);
        if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup();
    }
}

PollerEvent

接下來看一下PollerEvent,PollerEvent實現了Runnable介面,用來表示一個輪詢事件,程式碼如下:

public static class PollerEvent implements Runnable {
    private NioChannel socket;
    private int interestOps;
    private NioSocketWrapper socketWrapper;

    public PollerEvent(NioChannel ch, NioSocketWrapper w, int intOps) {
        reset(ch, w, intOps);
    }

    public void reset(NioChannel ch, NioSocketWrapper w, int intOps) {
        socket = ch;
        interestOps = intOps;
        socketWrapper = w;
    }

    public void reset() {
        reset(null, null, 0);
    }

    @Override
    public void run() {
        if (interestOps == OP_REGISTER) {
            try {
                socket.getIOChannel().register(
                        socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper);
            } catch (Exception x) {
                log.error(sm.getString("endpoint.nio.registerFail"), x);
            }
        } else {
            final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
            try {
                if (key == null) {
                    socket.socketWrapper.getEndpoint().countDownConnection();
                    ((NioSocketWrapper) socket.socketWrapper).closed = true;
                } else {
                    final NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment();
                    if (socketWrapper != null) {
                        //we are registering the key to start with, reset the fairness counter.
                        int ops = key.interestOps() | interestOps;
                        socketWrapper.interestOps(ops);
                        key.interestOps(ops);
                    } else {
                        socket.getPoller().cancelledKey(key);
                    }
                }
            } catch (CancelledKeyException ckx) {
                try {
                    socket.getPoller().cancelledKey(key);
                } catch (Exception ignore) {}
            }
        }
    }

}

在run函式中:

  • 若感興趣集是自定義的OP_REGISTER,則說明該事件表示的已連線套接字通道尚未被輪詢執行緒處理過,那麼將該通道註冊到Poller執行緒的Selector上,感興趣集是OP_READ,通道註冊的附件是一個NioSocketWrapper物件。從Poller的register方法新增事件即是這樣的過程;
  • 否則獲得已連線套接字通道註冊到Poller執行緒的Selector上的SelectionKey,為key新增新的感興趣集。

重訪Poller

上文提到Poller類實現了Runnable介面,其重寫的run方法如下所示。

public boolean events() {
    boolean result = false;
    PollerEvent pe = null;
    for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++ ) {
        result = true;
        try {
            //直接呼叫run方法
            pe.run();
            pe.reset();
            if (running && !paused) {
                eventCache.push(pe);
            }
        } catch ( Throwable x ) {
            log.error("",x);
        }
    }
    return result;
}

@Override
public void run() {
    // Loop until destroy() is called
    while (true) {
        boolean hasEvents = false;

        try {
            if (!close) {
                /執行PollerEvent的run方法
                hasEvents = events();
                if (wakeupCounter.getAndSet(-1) > 0) {
                    //if we are here, means we have other stuff to do
                    //do a non blocking select
                    keyCount = selector.selectNow();
                } else {
                    keyCount = selector.select(selectorTimeout);
                }
                wakeupCounter.set(0);
            }
            if (close) {
                events();
                timeout(0, false);
                try {
                    selector.close();
                } catch (IOException ioe) {
                    log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
                }
                break;
            }
        } catch (Throwable x) {
            ExceptionUtils.handleThrowable(x);
            log.error("",x);
            continue;
        }
        //either we timed out or we woke up, process events first
        if ( keyCount == 0 ) hasEvents = (hasEvents | events());

        // 獲取當前選擇器中所有註冊的“選擇鍵(已就緒的監聽事件)”
        Iterator<SelectionKey> iterator =
            keyCount > 0 ? selector.selectedKeys().iterator() : null;
        // Walk through the collection of ready keys and dispatch
        // any active event.
        // 對已經準備好的key進行處理
        while (iterator != null && iterator.hasNext()) {
            SelectionKey sk = iterator.next();
            NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
            // Attachment may be null if another thread has called
            // cancelledKey()
            if (attachment == null) {
                iterator.remove();
            } else {
                iterator.remove();
                // 真正處理key的地方
                processKey(sk, attachment);
            }
        }//while

        //process timeouts
        timeout(keyCount,hasEvents);
    }//while

    getStopLatch().countDown();
}
  • 若佇列裡有元素則會先把佇列裡的事件均執行一遍,PollerEvent的run方法會將通道註冊到Poller的Selector上;
  • 對select返回的SelectionKey進行處理,由於在PollerEvent中註冊通道時帶上了NioSocketWrapper附件,因此這裡可以用SelectionKey的attachment方法得到,接著呼叫processKey去處理已連線套接字通道。

我們接著分析processKey(),該方法又會根據key的型別,來分別處理讀和寫。

  1. 處理讀事件,比如生成Request物件
  2. 處理寫事件,比如將生成的Response物件通過socket寫回客戶端
protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
    try {
        if ( close ) {
            cancelledKey(sk);
        } else if ( sk.isValid() && attachment != null ) {
            if (sk.isReadable() || sk.isWritable() ) {
                if ( attachment.getSendfileData() != null ) {
                    processSendfile(sk,attachment, false);
                } else {
                    unreg(sk, attachment, sk.readyOps());
                    boolean closeSocket = false;
                    // 1. 處理讀事件,比如生成Request物件
                    // Read goes before write
                    if (sk.isReadable()) {
                        if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
                            closeSocket = true;
                        }
                    }
                    // 2. 處理寫事件,比如將生成的Response物件通過socket寫回客戶端
                    if (!closeSocket && sk.isWritable()) {
                        if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
                            closeSocket = true;
                        }
                    }
                    if (closeSocket) {
                        cancelledKey(sk);
                    }
                }
            }
        } else {
            //invalid key
            cancelledKey(sk);
        }
    } catch ( CancelledKeyException ckx ) {
        cancelledKey(sk);
    } catch (Throwable t) {
        ExceptionUtils.handleThrowable(t);
        log.error("",t);
    }
}

我們繼續來分析方法processSocket()

  1. processorCache裡面拿一個Processor來處理socket,Processor的實現為SocketProcessor
  2. Processor放到工作執行緒池中執行
public boolean processSocket(SocketWrapperBase<S> socketWrapper,
        SocketEvent event, boolean dispatch) {
    try {
        if (socketWrapper == null) {
            return false;
        }
        // 1. 從`processorCache`裡面拿一個`Processor`來處理socket,`Processor`的實現為`SocketProcessor`
        SocketProcessorBase<S> sc = processorCache.pop();
        if (sc == null) {
            sc = createSocketProcessor(socketWrapper, event);
        } else {
            sc.reset(socketWrapper, event);
        }
        // 2. 將`Processor`放到工作執行緒池中執行
        Executor executor = getExecutor();
        if (dispatch && executor != null) {
            executor.execute(sc);
        } else {
            sc.run();
        }
    } catch (RejectedExecutionException ree) {
        getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
        return false;
    } catch (Throwable t) {
        ExceptionUtils.handleThrowable(t);
        // This means we got an OOM or similar creating a thread, or that
        // the pool and its queue are full
        getLog().error(sm.getString("endpoint.process.fail"), t);
        return false;
    }
    return true;
}

dispatch引數表示是否要在另外的執行緒中處理,上文processKey各處傳遞的引數都是true。

  • dispatch為true且工作執行緒池存在時會執行executor.execute(sc),之後是由工作執行緒池處理已連線套接字;
  • 否則繼續由Poller執行緒自己處理已連線套接字。

AbstractEndPoint類的createSocketProcessor是抽象方法,NioEndPoint類實現了它:

@Override
protected SocketProcessorBase<NioChannel> createSocketProcessor(
        SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) {
    return new SocketProcessor(socketWrapper, event);
}

接著我們分析SocketProcessor.doRun()方法(SocketProcessor.run()方法最終呼叫此方法)。該方法將處理邏輯交給Handler處理,當event為null時,則表明是一個OPEN_READ事件。

該類的註釋說明SocketProcessor與Worker的作用等價。

/**
 * This class is the equivalent of the Worker, but will simply use in an
 * external Executor thread pool.
 */
protected class SocketProcessor extends SocketProcessorBase<NioChannel> {

    public SocketProcessor(SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) {
        super(socketWrapper, event);
    }

    @Override
    protected void doRun() {
        NioChannel socket = socketWrapper.getSocket();
        SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());

        try {
            int handshake = -1;

            try {
                if (key != null) {
                    if (socket.isHandshakeComplete()) {
                        // No TLS handshaking required. Let the handler
                        // process this socket / event combination.
                        handshake = 0;
                    } else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT ||
                            event == SocketEvent.ERROR) {
                        // Unable to complete the TLS handshake. Treat it as
                        // if the handshake failed.
                        handshake = -1;
                    } else {
                        handshake = socket.handshake(key.isReadable(), key.isWritable());
                        // The handshake process reads/writes from/to the
                        // socket. status may therefore be OPEN_WRITE once
                        // the handshake completes. However, the handshake
                        // happens when the socket is opened so the status
                        // must always be OPEN_READ after it completes. It
                        // is OK to always set this as it is only used if
                        // the handshake completes.
                        event = SocketEvent.OPEN_READ;
                    }
                }
            } catch (IOException x) {
                handshake = -1;
                if (log.isDebugEnabled()) log.debug("Error during SSL handshake",x);
            } catch (CancelledKeyException ckx) {
                handshake = -1;
            }
            if (handshake == 0) {
                SocketState state = SocketState.OPEN;
                // Process the request from this socket
                // 將處理邏輯交給`Handler`處理,當event為null時,則表明是一個`OPEN_READ`事件
                if (event == null) {
                    state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
                } else {
                    state = getHandler().process(socketWrapper, event);
                }
                if (state == SocketState.CLOSED) {
                    close(socket, key);
                }
            } else if (handshake == -1 ) {
                close(socket, key);
            } else if (handshake == SelectionKey.OP_READ){
                socketWrapper.registerReadInterest();
            } else if (handshake == SelectionKey.OP_WRITE){
                socketWrapper.registerWriteInterest();
            }
        } catch (CancelledKeyException cx) {
            socket.getPoller().cancelledKey(key);
        } catch (VirtualMachineError vme) {
            ExceptionUtils.handleThrowable(vme);
        } catch (Throwable t) {
            log.error("", t);
            socket.getPoller().cancelledKey(key);
        } finally {
            socketWrapper = null;
            event = null;
            //return to cache
            if (running && !paused) {
                processorCache.push(this);
            }
        }
    }
}

Handler的關鍵方法是process(),雖然這個方法有很多條件分支,但是邏輯卻非常清楚,主要是呼叫Processor.process()方法。

@Override
public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {
    try {
     
        if (processor == null) {
            processor = getProtocol().createProcessor();
            register(processor);
        }

        processor.setSslSupport(
                wrapper.getSslSupport(getProtocol().getClientCertProvider()));

        // Associate the processor with the connection
        connections.put(socket, processor);

        SocketState state = SocketState.CLOSED;
        do {
            // 關鍵的程式碼,終於找到你了
            state = processor.process(wrapper, status);

        } while ( state == SocketState.UPGRADING);
        return state;
    } 
    catch (Throwable e) {
        ExceptionUtils.handleThrowable(e);
        // any other exception or error is odd. Here we log it
        // with "ERROR" level, so it will show up even on
        // less-than-verbose logs.
        getLog().error(sm.getString("abstractConnectionHandler.error"), e);
    } finally {
        ContainerThreadMarker.clear();
    }

    // Make sure socket/processor is removed from the list of current
    // connections
    connections.remove(socket);
    release(processor);
    return SocketState.CLOSED;
}

Processor

createProcessor 

protected Http11Processor createProcessor() {                          
    // 構建 Http11Processor
    Http11Processor processor = new Http11Processor(
            proto.getMaxHttpHeaderSize(), (JIoEndpoint)proto.endpoint, // 1. http header 的最大尺寸
            proto.getMaxTrailerSize(),proto.getMaxExtensionSize());
    processor.setAdapter(proto.getAdapter());
    // 2. 預設的 KeepAlive 情況下, 每個 Socket 處理的最多的 請求次數
    processor.setMaxKeepAliveRequests(proto.getMaxKeepAliveRequests());
    // 3. 開啟 KeepAlive 的 Timeout
    processor.setKeepAliveTimeout(proto.getKeepAliveTimeout());      
    // 4. http 當遇到檔案上傳時的 預設超時時間 (300 * 1000)    
    processor.setConnectionUploadTimeout(
            proto.getConnectionUploadTimeout());                      
    processor.setDisableUploadTimeout(proto.getDisableUploadTimeout());
    // 5. 當 http 請求的 body size超過這個值時, 通過 gzip 進行壓縮
    processor.setCompressionMinSize(proto.getCompressionMinSize());  
    // 6. http 請求是否開啟 compression 處理    
    processor.setCompression(proto.getCompression());                  
    processor.setNoCompressionUserAgents(proto.getNoCompressionUserAgents());
    // 7. http body裡面的內容是 "text/html,text/xml,text/plain" 才會進行 壓縮處理
    processor.setCompressableMimeTypes(proto.getCompressableMimeTypes());
    processor.setRestrictedUserAgents(proto.getRestrictedUserAgents());
    // 8. socket 的 buffer, 預設 9000
    processor.setSocketBuffer(proto.getSocketBuffer());       
    // 9. 最大的 Post 處理尺寸的大小 4 * 1000    
    processor.setMaxSavePostSize(proto.getMaxSavePostSize());          
    processor.setServer(proto.getServer());
    processor.setDisableKeepAlivePercentage(
            proto.getDisableKeepAlivePercentage());                    
    register(processor);                                               
    return processor;
}

這兒我們主要關注的是Processor對於讀的操作,也只有一行程式碼。呼叫service()方法。

public abstract class AbstractProcessorLight implements Processor {

    @Override
    public SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status)
            throws IOException {

        SocketState state = SocketState.CLOSED;
        Iterator<DispatchType> dispatches = null;
        do {
            if (dispatches != null) {
                DispatchType nextDispatch = dispatches.next();
                state = dispatch(nextDispatch.getSocketStatus());
            } else if (status == SocketEvent.DISCONNECT) {
                // Do nothing here, just wait for it to get recycled
            } else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) {
                state = dispatch(status);
                if (state == SocketState.OPEN) {
                    // There may be pipe-lined data to read. If the data isn't
                    // processed now, execution will exit this loop and call
                    // release() which will recycle the processor (and input
                    // buffer) deleting any pipe-lined data. To avoid this,
                    // process it now.
                    state = service(socketWrapper);
                }
            } else if (status == SocketEvent.OPEN_WRITE) {
                // Extra write event likely after async, ignore
                state = SocketState.LONG;
            } else if (status == SocketEvent.OPEN_READ){
                // 呼叫`service()`方法
                state = service(socketWrapper);
            } else {
                // Default to closing the socket if the SocketEvent passed in
                // is not consistent with the current state of the Processor
                state = SocketState.CLOSED;
            }

            if (getLog().isDebugEnabled()) {
                getLog().debug("Socket: [" + socketWrapper +
                        "], Status in: [" + status +
                        "], State out: [" + state + "]");
            }

            if (state != SocketState.CLOSED && isAsync()) {
                state = asyncPostProcess();
                if (getLog().isDebugEnabled()) {
                    getLog().debug("Socket: [" + socketWrapper +
                            "], State after async post processing: [" + state + "]");
                }
            }

            if (dispatches == null || !dispatches.hasNext()) {
                // Only returns non-null iterator if there are
                // dispatches to process.
                dispatches = getIteratorAndClearDispatches();
            }
        } while (state == SocketState.ASYNC_END ||
                dispatches != null && state != SocketState.CLOSED);

        return state;
    }
}

Processor.service()方法比較重要的地方就兩點。該方法非常得長,也超過了200行,在此我們不再拷貝此方法的程式碼。

  1. 生成Request和Response物件
  2. 呼叫Adapter.service()方法,將生成的Request和Response物件傳進去

Adapter

Adapter用於連線ConnectorContainer,起到承上啟下的作用。Processor會呼叫Adapter.service()方法。我們來分析一下,主要做了下面幾件事情:

  1. 根據coyote框架的request和response物件,生成connector的request和response物件(是HttpServletRequest和HttpServletResponse的封裝)
  2. 補充header
  3. 解析請求,該方法會出現代理伺服器、設定必要的header等操作
  4. 真正進入容器的地方,呼叫Engine容器下pipeline的閥門
  5. 通過request.finishRequest 與 response.finishResponse(刷OutputBuffer中的資料到瀏覽器) 來完成整個請求
@Override
public void service(org.apache.coyote.Request req, org.apache.coyote.Response res)
        throws Exception {

    // 1. 根據coyote框架的request和response物件,生成connector的request和response物件(是HttpServletRequest和HttpServletResponse的封裝)
    Request request = (Request) req.getNote(ADAPTER_NOTES);
    Response response = (Response) res.getNote(ADAPTER_NOTES);

    if (request == null) {
        // Create objects
        request = connector.createRequest();
        request.setCoyoteRequest(req);
        response = connector.createResponse();
        response.setCoyoteResponse(res);

        // Link objects
        request.setResponse(response);
        response.setRequest(request);

        // Set as notes
        req.setNote(ADAPTER_NOTES, request);
        res.setNote(ADAPTER_NOTES, response);

        // Set query string encoding
        req.getParameters().setQueryStringCharset(connector.getURICharset());
    }

    // 2. 補充header
    if (connector.getXpoweredBy()) {
        response.addHeader("X-Powered-By", POWERED_BY);
    }

    boolean async = false;
    boolean postParseSuccess = false;

    req.getRequestProcessor().setWorkerThreadName(THREAD_NAME.get());

    try {
        // Parse and set Catalina and configuration specific
        // request parameters
        // 3. 解析請求,該方法會出現代理伺服器、設定必要的header等操作
        // 用來處理請求對映 (獲取 host, context, wrapper, URI 後面的引數的解析, sessionId )
        postParseSuccess = postParseRequest(req, request, res, response);
        if (postParseSuccess) {
            //check valves if we support async
            request.setAsyncSupported(
                    connector.getService().getContainer().getPipeline().isAsyncSupported());
            // Calling the container
            // 4. 真正進入容器的地方,呼叫Engine容器下pipeline的閥門
            connector.getService().getContainer().getPipeline().getFirst().invoke(
                    request, response);
        }
        if (request.isAsync()) {
            async = true;
            ReadListener readListener = req.getReadListener();
            if (readListener != null && request.isFinished()) {
                // Possible the all data may have been read during service()
                // method so this needs to be checked here
                ClassLoader oldCL = null;
                try {
                    oldCL = request.getContext().bind(false, null);
                    if (req.sendAllDataReadEvent()) {
                        req.getReadListener().onAllDataRead();
                    }
                } finally {
                    request.getContext().unbind(false, oldCL);
                }
            }

            Throwable throwable =
                    (Throwable) request.getAttribute(RequestDispatcher.ERROR_EXCEPTION);

            // If an async request was started, is not going to end once
            // this container thread finishes and an error occurred, trigger
            // the async error process
            if (!request.isAsyncCompleting() && throwable != null) {
                request.getAsyncContextInternal().setErrorState(throwable, true);
            }
        } else {
            //5. 通過request.finishRequest 與 response.finishResponse(刷OutputBuffer中的資料到瀏覽器) 來完成整個請求
            request.finishRequest();
            //將 org.apache.catalina.connector.Response對應的 OutputBuffer 中的資料 刷到 org.apache.coyote.Response 對應的 InternalOutputBuffer 中, 並且最終呼叫 socket對應的 outputStream 將資料刷出去( 這裡會組裝 Http Response 中的 header 與 body 裡面的資料, 並且刷到遠端 )
            response.finishResponse();
        }

    } catch (IOException e) {
        // Ignore
    } finally {
        AtomicBoolean error = new AtomicBoolean(false);
        res.action(ActionCode.IS_ERROR, error);

        if (request.isAsyncCompleting() && error.get()) {
            // Connection will be forcibly closed which will prevent
            // completion happening at the usual point. Need to trigger
            // call to onComplete() here.
            res.action(ActionCode.ASYNC_POST_PROCESS,  null);
            async = false;
        }

        // Access log
        if (!async && postParseSuccess) {
            // Log only if processing was invoked.
            // If postParseRequest() failed, it has already logged it.
            Context context = request.getContext();
            // If the context is null, it is likely that the endpoint was
            // shutdown, this connection closed and the request recycled in
            // a different thread. That thread will have updated the access
            // log so it is OK not to update the access log here in that
            // case.
            if (context != null) {
                context.logAccess(request, response,
                        System.currentTimeMillis() - req.getStartTime(), false);
            }
        }

        req.getRequestProcessor().setWorkerThreadName(null);

        // Recycle the wrapper request and response
        if (!async) {
            request.recycle();
            response.recycle();
        }
    }
}

請求預處理

postParseRequest方法對請求做預處理,如對路徑去除分號表示的路徑引數、進行URI解碼、規格化(點號和兩點號)

 

protected boolean postParseRequest(org.apache.coyote.Request req, Request request,
        org.apache.coyote.Response res, Response response) throws IOException, ServletException {
    // 省略部分程式碼
    MessageBytes decodedURI = req.decodedURI();

    if (undecodedURI.getType() == MessageBytes.T_BYTES) {
        // Copy the raw URI to the decodedURI
        decodedURI.duplicate(undecodedURI);

        // Parse the path parameters. This will:
        //   - strip out the path parameters
        //   - convert the decodedURI to bytes
        parsePathParameters(req, request);

        // URI decoding
        // %xx decoding of the URL
        try {
            req.getURLDecoder().convert(decodedURI, false);
        } catch (IOException ioe) {
            res.setStatus(400);
            res.setMessage("Invalid URI: " + ioe.getMessage());
            connector.getService().getContainer().logAccess(
                    request, response, 0, true);
            return false;
        }
        // Normalization
        if (!normalize(req.decodedURI())) {
            res.setStatus(400);
            res.setMessage("Invalid URI");
            connector.getService().getContainer().logAccess(
                    request, response, 0, true);
            return false;
        }
        // Character decoding
        convertURI(decodedURI, request);
        // Check that the URI is still normalized
        if (!checkNormalize(req.decodedURI())) {
            res.setStatus(400);
            res.setMessage("Invalid URI character encoding");
            connector.getService().getContainer().logAccess(
                    request, response, 0, true);
            return false;
        }
    } else {
        /* The URI is chars or String, and has been sent using an in-memory
            * protocol handler. The following assumptions are made:
            * - req.requestURI() has been set to the 'original' non-decoded,
            *   non-normalized URI
            * - req.decodedURI() has been set to the decoded, normalized form
            *   of req.requestURI()
            */
        decodedURI.toChars();
        // Remove all path parameters; any needed path parameter should be set
        // using the request object rather than passing it in the URL
        CharChunk uriCC = decodedURI.getCharChunk();
        int semicolon = uriCC.indexOf(';');
        if (semicolon > 0) {
            decodedURI.setChars
                (uriCC.getBuffer(), uriCC.getStart(), semicolon);
        }
    }

    // Request mapping.
    MessageBytes serverName;
    if (connector.getUseIPVHosts()) {
        serverName = req.localName();
        if (serverName.isNull()) {
            // well, they did ask for it
            res.action(ActionCode.REQ_LOCAL_NAME_ATTRIBUTE, null);
        }
    } else {
        serverName = req.serverName();
    }

    // Version for the second mapping loop and
    // Context that we expect to get for that version
    String version = null;
    Context versionContext = null;
    boolean mapRequired = true;

    while (mapRequired) {
        // This will map the the latest version by default
        connector.getService().getMapper().map(serverName, decodedURI,
                version, request.getMappingData());
        // 省略部分程式碼
    }
    // 省略部分程式碼
}

以MessageBytes的型別是T_BYTES為例:

  • parsePathParameters方法去除URI中分號表示的路徑引數;
  • req.getURLDecoder()得到一個UDecoder例項,它的convert方法對URI解碼,這裡的解碼只是移除百分號,計算百分號後兩位的十六進位制數字值以替代原來的三位百分號編碼;
  • normalize方法規格化URI,解釋路徑中的“.”和“..”;
  • convertURI方法利用Connector的uriEncoding屬性將URI的位元組轉換為字元表示;
  • 注意connector.getService().getMapper().map(serverName, decodedURI, version, request.getMappingData()) 這行,之前Service啟動時MapperListener註冊了該Service內的各Host和Context。根據URI選擇Context時,Mapper的map方法採用的是convertURI方法解碼後的URI與每個Context的路徑去比較

容器處理

如果請求可以被傳給容器的Pipeline即當postParseRequest方法返回true時,則由容器繼續處理,在service方法中有connector.getService().getContainer().getPipeline().getFirst().invoke(request, response)這一行:

  • Connector呼叫getService返回StandardService;
  • StandardService呼叫getContainer返回StandardEngine;
  • StandardEngine呼叫getPipeline返回與其關聯的StandardPipeline;

 後續處理流程請看下一篇文章