Tomcat-Server和Service


​ 对Tomcat中Server,Service的init和start方法 + 核心字段进行了分析,并重点分析了Connector-NioEndpoint内部的Acceptor和Poller线程,并用图文的方法做了总结。并简单总结了Mapper和MapperListener的作用

Server

Server 表示 Tomcat 的顶层容器,默认实现为 StandardServer。它的职责相对简单,主要包括

  • 负责管理整个 Tomcat 的生命周期事件(如 start()stop() 等),并将其传递给所有子组件 Service

  • 监听关闭端口:默认会在 8005 端口监听本地连接,用于接收特定的 SHUTDOWN 命令,从而实现优雅关闭

  • 监听行为可通过配置项(如 portshutdown)启用、关闭或修改shutdown命令

  • 当监听到SHUTDOWN命令,await()则返回,main线程会退出。可以推测出,tomcat启动的其他线程池应该是daemon线程的,否则整个进程是不会退出的。后续我会陆续验证这个猜想

核心源码

​ 我这里只展示了部分优化后的核心源码,不重要的我都删了

public final class StandardServer extends LifecycleMBeanBase implements Server {

    // shutdown监听端口
    private int port = 8005;

    private String address = "localhost";

    // 内部所有的Service
    private Service services[] = new Service[0];
    private final Object servicesLock = new Object();

    // shutdown命令,默认就是SHUTDOWN
    private String shutdown = "SHUTDOWN";

    private volatile boolean stopAwait = false;

    private Catalina catalina = null;

    // 监听shutdown端口的ServerSocket
    private volatile ServerSocket awaitSocket = null;

    @Override
    protected void startInternal() throws LifecycleException {

        fireLifecycleEvent(CONFIGURE_START_EVENT, null);
        setState(LifecycleState.STARTING);

        globalNamingResources.start();

        // 依次启动内部所有的Service
        synchronized (servicesLock) {
            for (Service service : services) {
                service.start();
            }
        }
    }

    @Override
    public void await() {
        // -2:完全不等待(可用于嵌入式)
        if (port == -2) {
            return;
        }
        // -1:无限睡眠轮询(用于不想用端口,但主线程又不能退出的场景)
        if (port == -1) {
            try {
                awaitThread = Thread.currentThread();
                while (!stopAwait) {
                    try {
                        Thread.sleep(10000);
                    } catch (InterruptedException ex) {
                        // continue and check the flag
                    }
                }
            } finally {
                awaitThread = null;
            }
            return;
        }

        // 启动ServerSocket,监听本地端口(默认8005),用于接收 SHUTDOWN 命令
        try {
            awaitSocket = new ServerSocket(port, 1, InetAddress.getByName(address));
        } catch (IOException e) {
            return;
        }

        try {
            awaitThread = Thread.currentThread();
            // 循环
            while (!stopAwait) {
                ServerSocket serverSocket = awaitSocket;
                if (serverSocket == null) {
                    break;
                }

                Socket socket = null;
                StringBuilder command = new StringBuilder();
                try {
                    InputStream stream;
                    long acceptStartTime = System.currentTimeMillis();
                    // 阻塞,等待连接建立(当返回时表示已获取到客户端的连接)
                    socket = serverSocket.accept();
                    socket.setSoTimeout(10 * 1000); // Ten seconds
                    stream = socket.getInputStream();

                    // 安全防御:预设最大读取长度,防止 DOS 攻击
                    int expected = 1024;
                    while (expected < shutdown.length()) {
                        if (random == null) {
                            random = new Random();
                        }
                        expected += (random.nextInt() % 1024);
                    }
                    // 读取字符,直到遇到控制字符/EOF
                    while (expected > 0) {
                        int ch = stream.read();
                        if (ch < 32 || ch == 127)
                            break;
                        command.append((char) ch);
                        expected--;
                    }
                } finally {
                    // 接受完一次请求就关闭客户端的连接
                    if (socket != null) {
                        socket.close();
                    }
                }

                // 判断是否是合法的 SHUTDOWN 命令
                boolean match = command.toString().equals(shutdown);
                if (match) { // 跳出循环,结束main
                    break;
                }
            }
        } finally {
            // 清理资源
            ServerSocket serverSocket = awaitSocket;
            awaitThread = null;
            awaitSocket = null;

            if (serverSocket != null) {
                try {
                    serverSocket.close();
                } catch (IOException e) {
                    // Ignore
                }
            }
        }
    }

}

Service

Service 是 Tomcat 中的一个核心组件,其默认实现类为 StandardService,表示一个具体的服务实例。它与最顶层的容器 Engine 是一对一的

一个 Service 通常包含多个 Connector,每个 Connector 监听一个本地端口,支持特定协议(如 HTTP、AJP),接收客户端请求。请求经过 Mapper 映射到对应的 HostContextWrapper,最终交由目标 Servlet 处理

核心生命周期方法

  • initInternal()
    • 初始化绑定的 Engine 容器
    • 等待 Engine 及其子容器完成初始化后,注册相关 MBean(用于 JMX 监控)
    • 初始化所有 Connector主要是是创建并绑定 ServerSocketChannel 到指定端口
  • startInternal()
    • 启动关联的 Engine
    • 启动 MapperListener(用于监听容器结构变化并更新路由)
    • 启动所有 Connector主要是启动Acceptor和Poller线程,开始监听端口并接受请求

核心源码


public class StandardService extends LifecycleMBeanBase implements Service {
    // 当前Service名名
    private String name = null;
    // 关联的Server
    private Server server = null;
    // 当前 Service 管理的所有 Connector(支持 HTTP、AJP 等协议)
    protected Connector connectors[] = new Connector[0];
    private final Object connectorsLock = new Object();
    // 当前 Service 下配置的所有线程池 Executor(一般为空,由 Connector 自带)
    protected final ArrayList<Executor> executors = new ArrayList<>();
    // 当前 Service 使用的 Engine(容器的顶层容器,代表整个虚拟主机群)
    private Engine engine = null;

    /**
     * 我把它叫请求路由器,用于将请求映射到对应的 Wrapper(最终的 Servlet)。
     * 由 Connector 配置协议解析后交给 Mapper 做 URI 到容器层级的映射。
     */
    protected final Mapper mapper = new Mapper();

    // Mapper 的生命周期监听器。主要注册 MBean,不参与请求处理。
    protected final MapperListener mapperListener = new MapperListener(this);

    @Override
    protected void initInternal() throws LifecycleException {

        super.initInternal();

        // 初始化容器结构
        if (engine != null) {
            engine.init();
        }

        // Service下的Executor节点,默认没有
        for (Executor executor : findExecutors()) {
            if (executor instanceof JmxEnabled) {
                ((JmxEnabled) executor).setDomain(getDomain());
            }
            executor.init(); // 没做啥有用的事,仅仅注册个MBean
        }

        // 没做啥有用的事,仅仅注册个MBean
        mapperListener.init();

        // Initialize our defined Connectors
        synchronized (connectorsLock) {
            for (Connector connector : connectors) {
                try {
                    // 构建ProtocolHandler和Endpoint,绑定服务器端口
                    connector.init();
                } catch (Exception e) {
                    if (Boolean.getBoolean("org.apache.catalina.startup.EXIT_ON_INIT_FAILURE")) {
                        throw new LifecycleException(message);
                    }
                }
            }
        }
    }

    @Override
    protected void startInternal() throws LifecycleException {

        setState(LifecycleState.STARTING);

        // 启动容器 Engine,进而递归启动 Host、Context、Wrapper
        if (engine != null) {
            synchronized (engine) {
                engine.start();
            }
        }

        // 启动所有线程池 Executor(默认为空)
        synchronized (executors) {
            for (Executor executor : executors) {
                executor.start();
            }
        }

        mapperListener.start();

        synchronized (connectorsLock) {
            for (Connector connector : connectors) {
                if (connector.getState() != LifecycleState.FAILED) {
                    // 启动底层 ProtocolHandler 和 Acceptor 线程
                    connector.start();
                }
            }
        }
    }

}

Connector

​ 用来处理Socket的抽象,主要关注内部的ProtocolHandler和Adapter

ProtocolHandler

​ 用来处理具体协议,默认处理http1.1协议使用的是Http11NioProtocol这个类,内部通过NioEndpoint来真正操作Soeket,封装了Socket的相关操作Mapper

NioEndpoint
Acceptor(连接处理器)

​ 一个处理连接的专用线程(默认线程名为 http-nio-8080-Acceptor),负责监听客户端连接,职责类似 Netty 中的 ServerBootstrapAcceptor。其核心流程如下:

  • 阻塞监听客户端连接请求(调用 ServerSocketChannel.accept()
  • 建立连接后,封装为 SocketChannel,并设置相应的 socket 参数(如非阻塞、超时等)
  • 构造 PollerEvent 对象,设为对 OP_READ 事件感兴趣
  • 将事件投递至 Poller 线程,由其负责后续的事件注册和处理

补充说明:Acceptor 只负责连接建立,不处理任何 I/O 数据交互,也不触发SocketChannel向Selector注册感兴趣的事件

// Acceptor线程核心源码,只保留了一些核心逻辑

public void run() {
    // 主循环:阻塞接收客户端连接请求
    while (running) {
        try {

            // 连接限制计数(默认10000)
            countUpOrAwaitConnection();

            SocketChannel socket = null;

            // 阻塞等待客户端连接(accept)
            socket = serverSock.accept();

            if (running && !paused) {
                // 初始化客户端连接并注册到 Poller
                // 若失败则关闭 socket 并增加可用的LimitLatch
                if (!setSocketOptions(socket)) {
                    closeSocket(socket);
                }
            } else {
                closeSocket(socket);
            }
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
        }
    }
}

protected boolean setSocketOptions(SocketChannel socket) {
    try {
        // 设置客户端 socket 为非阻塞模式
        socket.configureBlocking(false);
        Socket sock = socket.socket();
        // 设置Socket参数
        socketProperties.setProperties(sock);

        // 从缓存中获取或创建 NioChannel(用于封装 SocketChannel + Buffer)
        NioChannel channel = nioChannels.pop();
        if (channel == null) {
            SocketBufferHandler bufhandler = new SocketBufferHandler(
                    socketProperties.getAppReadBufSize(),
                    socketProperties.getAppWriteBufSize(),
                    socketProperties.getDirectBuffer());
            if (isSSLEnabled()) {
                channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
            } else {
                channel = new NioChannel(socket, bufhandler);
            }
        } else { // 有缓存,就重置
            channel.setIOChannel(socket);
            channel.reset();
        }
        // 注册到 Poller,由其负责后续事件驱动处理
        getPoller0().register(channel);
    } catch (Throwable t) {
        return false;
    }
    return true;
}

public void register(final NioChannel socket) {
    // 绑定所属 poller
    socket.setPoller(this);
    // 包装为 NioSocketWrapper,承载 socket 的 I/O 状态管理
    NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
    socket.setSocketWrapper(ka);
    ka.setPoller(this);
    // socket参数设置(超时等)
    ka.setReadTimeout(getSocketProperties().getSoTimeout());
    ka.setWriteTimeout(getSocketProperties().getSoTimeout());
    ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
    ka.setReadTimeout(getConnectionTimeout());
    ka.setWriteTimeout(getConnectionTimeout());
    PollerEvent r = eventCache.pop();
    // 设置其能监听的可读事件(当SocketChannel可读时触发)
    ka.interestOps(SelectionKey.OP_READ);
    if (r == null) {
        r = new PollerEvent(socket, ka, OP_REGISTER);
    } else {
        r.reset(socket, ka, OP_REGISTER);
    }
    // 投递到 Poller 的事件队列,等待 selector 执行注册
    addEvent(r);
}
Poller(事件轮询器)

Poller 是 NIO 模型中负责事件监听的线程,线程名一般为 http-nio-8080-ClientPoller。它持续轮询 Selector,将感兴趣的 IO 事件分发出去,核心流程如下:

  • 接收来自 Acceptor 的 SocketChannel,将其注册到 Selector 中,关注 OP_READ 等事件
  • 循环调用 Selector.select() 等待事件就绪
  • 对于就绪事件,将对应连接封装为 SocketProcessor,提交给 Worker 线程池处理(负责请求解析与响应)
// Poller关键部分源码
public class Poller implements Runnable {

    private Selector selector;
    // PollerEvent事件队列
    private final SynchronizedQueue<PollerEvent> events = new SynchronizedQueue<>();

    private AtomicLong wakeupCounter = new AtomicLong(0);

    public Poller() throws IOException {
        this.selector = Selector.open();
    }

    /**
     * 添加 PollerEvent 到事件队列,并根据 wakeupCounter 唤醒 Selector,
     * 以确保 selector 能及时处理注册/取消等变更
     */
    private void addEvent(PollerEvent event) {
        events.offer(event);
        if (wakeupCounter.incrementAndGet() == 0) {
            selector.wakeup();
        }
    }

    @Override
    public void run() {
        // 循环处理事件监听
        while (true) {
            boolean hasEvents = false;

            if (!close) {
                // 处理事件队列中的注册/取消请求
                hasEvents = events();
                if (wakeupCounter.getAndSet(-1) > 0) {
                    // wakeupCounter值大于0,代表Acceptor向事件队列里push了新对象,
                    // 说明有新的连接,则立即执行非阻塞 select
                    keyCount = selector.selectNow();
                } else {
                    // 否则阻塞等待一定时间(默认1秒)以监听 I/O 事件
                    keyCount = selector.select(selectorTimeout);
                }
                // 重置
                wakeupCounter.set(0);
            }
            // 如果没有事件触发,检查是否还有待处理的注册事件
            if (keyCount == 0) {
                hasEvents = (hasEvents | events());
            }

            // 遍历触发的事件(如 OP_READ)
            Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null;
            while (iterator != null && iterator.hasNext()) {
                SelectionKey sk = iterator.next();
                iterator.remove();
                NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
                if (socketWrapper != null) {
                    /**
                     * 将就绪事件交由 Worker 线程处理(例如封装为 SocketProcessor 执行 OP_READ)
                     * 实际业务处理在 Processor 中完成
                     */
                    processKey(sk, socketWrapper);
                }
            }
            // 扫描并关闭超时连接
            timeout(keyCount, hasEvents);
        }

        getStopLatch().countDown();
    }

}
init()

初始化阶段会触发的方法,调用路径:StandardService#init -> Connector#init -> ProtocolHandler#init -> AbstractEndpoint#init -> NioEndpoint#bind,核心作用如下

  • 创建 ServerSocketChannel,并绑定至指定端口(默认 8080)
  • ServerSocketChannel 配置为 阻塞模式accept() 会阻塞直到有连接)
  • 初始化 Selector 池,管理 I/O 多路复用
  • 启动 BlockPoller 后台线程,异步检测空闲连接、关闭已超时 socket 等后台任务
public void bind() throws Exception {

    // 若未启用继承的 channel(默认如此),则创建新的 ServerSocketChannel
    if (!getUseInheritedChannel()) {
        serverSock = ServerSocketChannel.open();
        socketProperties.setProperties(serverSock.socket());
        InetSocketAddress addr = (getAddress()!=null?new InetSocketAddress(getAddress(),getPort()):new InetSocketAddress(getPort()));
        serverSock.socket().bind(addr,getAcceptCount());
    } else {
        Channel ic = System.inheritedChannel();
        if (ic instanceof ServerSocketChannel) {
            serverSock = (ServerSocketChannel) ic;
        }
        if (serverSock == null) {
            throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited"));
        }
    }
    // 配置为阻塞模式,accept时期阻塞(重要)
    serverSock.configureBlocking(true);

    // 合理化接收线程数(默认为 1)
    if (acceptorThreadCount == 0) {
        acceptorThreadCount = 1;
    }
    // 合理化Poller线程数
    if (pollerThreadCount <= 0) {
        pollerThreadCount = 1;
    }
    // 初始化关闭控制器,用于 await Poller 线程退出
    setStopLatch(new CountDownLatch(pollerThreadCount));

    initialiseSsl();

    // 初始化 Selector 池,启动BlockPoller线程
    selectorPool.open();
}
start()

启动阶段触发的核心方法,调用路径:StandardService#start -> Connector#start -> ProtocolHandler#start -> AbstractEndpoint#start,核心步骤如下

  • 用栈的数据结构初始化Processor、Event、Nio缓存池
  • 初始化 Worker 线程池(事件处理线程池)
  • 初始化LimitLatch,用来限制连接个数
  • 创建并启动Poller线程(Poller是事件轮询器,SocketChannel会将READ事件注册,并让其轮询)
  • 创建并启动Acceptor线程(连接接收器),专用来处理连接
public void startInternal() throws Exception {

    if (!running) {
        running = true;
        paused = false;

        // =========== 初始化各类缓存栈(预分配池),提升对象复用效率,减少 GC 压力 ================
        // 请求处理器缓存池
        processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getProcessorCache());
        // 事件对象缓存池
        eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getEventCache());
        // NIO Channel 缓存池
        nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getBufferPool());

        /*
         * 初始化工作线程池(Worker Thread Pool)
         * - 若 server.xml里的Connector 节点未指定 executor,则使用默认实现。
         * - 实际线程为 daemon,线程名格式:http-nio-端口号-exec-索引
         * - 线程池大小由 AbstractEndpoint 的 minSpareThreads 和 maxThreads 控制
         */
        if (getExecutor() == null) {
            createExecutor();
        }
        // 初始化连接限制器(内部用 Semaphore 控制最大连接数)
        initializeConnectionLatch();

        // 启动 Poller 线程(其数量可通过 server.xml 中 Connector 属性 `pollerThreadCount`配置)
        pollers = new Poller[getPollerThreadCount()];
        for (int i = 0; i < pollers.length; i++) {
            pollers[i] = new Poller();
            Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-" + i);
            pollerThread.setPriority(threadPriority);
            pollerThread.setDaemon(true); // 也是daemon线程
            pollerThread.start();
        }

        // 启动 Acceptor 线程
        startAcceptorThreads();
    }
}

MapperListener

​ 这个组件理解其作用就行,源码不值得深入分析,价值不大。它是一个用于监听容器结构变化、并同步更新到Mapper 组件中映射关系的辅助组件。其核心作用如下:

  • 启动时注册
    • startInternal() 中:
      • 将自身注册为 EngineHostContextWrapper 容器的 ContainerListener
      • 遍历并将当前 Engine 及其所有子容器的路由信息(如 host name、context path、wrapper mapping 等)注册进 Mapper
  • 监听容器事件
    • 实现 containerEvent() 方法,监听容器结构变化事件(如 Host 增加 alias、Context 增加/移除 Wrapper 等),当结构变更时,自动将变化同步更新到 Mapper,保证路由信息的时效性

Mapper

​ 同上面的MapperListener,理解作用即可,源码深入分析的价值不大。我把它叫做路由器,其负责将 HTTP 请求路径快速匹配到对应的 Servlet。其本身不参与请求处理,仅提供路径与容器结构之间的高性能映射机制。

​ 内部通过 MapperListener 监听容器层级(Engine → Host → Context → Wrapper)的结构变更,动态构建和维护内部的映射模型。其内部将Container抽象为轻量级MapElement结构体并按层级关系嵌套组织,构建出一棵用于快速匹配的树状结构。当请求到来时,根据url从左至右快速的查找到对应唯一的Host -> Context -> Wrapper,让它们来处理对应的请求。

总结

tomcat-connector-handle.png