Tomcat-热加载之ThreadLocal内存泄漏篇(二)


​ 通过举例深入的分析了热加载场景下ThreadLocal使用不当造成的内存泄漏。并剖析了Tomcat通过实现无效的ThreadLocalMap.Entry清除ThreadLocalLeakPreventionListener回收空闲worker线程TaskQueue对取任务逻辑的重写来兜底剩余worker线程回收这三种策略来优雅的解决ThreadLocal内存泄漏的问题

上一篇讨论了Tomcat热加载期间垃圾的产生与分类,并且探讨了对Context线程回收的必要性以及Tomcat的实现。而这篇会来分析一个更棘手的问题:如果项目中ThreadLocal如果使用不当,热加载后是如何引起内存泄漏的,并着重分析基于这个问题,Tomcat的考量以及如何优雅的解决这个问题

问题

​ Tomcat 在执行热加载时,会主动停止当前 Context 中由应用创建的线程(通过遍历 JVM 中所有线程并判断其 contextClassLoader 是否为当前 WebappClassLoader 来过滤)。但这并不包括用于处理 HTTP 请求的 Worker 线程。这些线程服务于整个 JVM 生命周期,不会被某个 Context 独占或绑定到某个特定的 WebappClassLoader

​ 问题在于,ThreadLocal是和线程绑定的。如果某个应用将其资源(比如 Spring 的上下文、Class、Bean 等)放入了 Worker 线程的 ThreadLocal 中,而该资源又直接或间接引用了 WebappClassLoader,就会导致这个 WebappClassLoader和其加载的所有Class无法被 GC 回收。最终,随着热加载次数增加,最终触发 OOM

换句话说:共享线程+ThreadLocal+Context类资源 = 热加载内存泄漏的元凶之一。

案例分析

所有案例中MyCounter一致,为计数器。目的是实现对Tomcat worker线程处理了多少次http请求的计数。

分析之前可以看看我的这篇文章,对ThreadLocal要有一个比较深刻的认识最好

demo1

public class LeakingServlet extends HttpServlet {

    public final static ThreadLocal<MyCounter> COUNTER_THREAD_LOCAL = ThreadLocal.withInitial(MyCounter::new);

    public static class MyCounter {
        private int count = 0;

        public void increment() {
            count++;
        }

        public int getCount() {
            return count;
        }
    }

    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp) {
        MyCounter counter = LeakingServlet.COUNTER_THREAD_LOCAL.get();
        counter.increment();
        // doBusiness...
    }
}

字段COUNTER_THREAD_LOCAL 是一个静态变量,被LeakingServlet.class对象给持有

​ 当请求进入后,会触发一个如下的引用链图。ThreadLocal本身被Entry这个虚引用对象持有,但还被另一边的强引用LeakingServlet.class也持有了,会导致ThreadLocal不能触发虚引用的回收作用,致使对应的value(MyCounter对象)也不会回收,而MyCounter间接持有的WebappClassLoader和其所加载的所有class对象也都不能被回收,引发了内存泄漏

tomcat-threadlocal-memory-leaking-sample-1.png

demo1引用链

demo2

public class LeakingServlet extends HttpServlet {

    public final ThreadLocal<MyCounter> COUNTER_THREAD_LOCAL = ThreadLocal.withInitial(MyCounter::new);

    public static class MyCounter {
        private int count = 0;

        public void increment() {
            count++;
        }

        public int getCount() {
            return count;
        }
    }

    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp) {
        MyCounter counter = COUNTER_THREAD_LOCAL.get();
        counter.increment();
        // doBusiness...
    }
}

​ 此时COUNTER_THREAD_LOCAL非静态,被LeakingServlet对象持有。引用链图如下,此案例中理论上不会有内存泄漏

持有ThreadLocal强引用的LeakingServlet对象会被置为null并被回收,回收后仅被虚引用持有的ThreadLocal就能被回收,那么其value(MyCounter对象)也能在后续ThreadLocalMap对Entry修改时被检测到并被手动释放后回收。上层引用链都断开,那么下面的WebappClassLoader也能被回收了

​ 但这并不意味着立即释放,虽然最终是可以回收的,但在回收检测发生之前,ThreadLocalMap中的 value(MyCounter)依旧存活,仍然间接引用着WebappClassLoader及其加载的所有类,就导致这段「悬挂时间」里依旧存在内存压力

tomcat-threadlocal-memory-leaking-sample-2.png

demo2引用链

demo3

public class LeakingServlet extends HttpServlet {

    public final TransmittableThreadLocal<MyCounter> COUNTER_THREAD_LOCAL = TransmittableThreadLocal.withInitial(MyCounter::new);

    public static class MyCounter {
        private int count = 0;

        public void increment() {
            count++;
        }

        public int getCount() {
            return count;
        }
    }

    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp) {
        MyCounter counter = COUNTER_THREAD_LOCAL.get();
        counter.increment();
        // doBusiness...
    }
}

此时COUNTER_THREAD_LOCAL还是非静态且被LeakingServlet对象持有,但ThreadLocal为第三方的TransmittableThreadLocal,其引用链图如下。核心分析其实和demo2一样,TransmittableThreadLocal对象还是能被回收,也同样具备因不能及时回收value而导致WebappClassLoader不能被回收的隐式风险

tomcat-threadlocal-memory-leaking-sample-3.png

demo3引用链

解决方案

清除ThreadLocalMap$Entry

核心方法在WebappClassLoaderBase#checkThreadLocalsForLeaks,调用链为:

  1. WebappLoader#stop
  2. WebappClassLoaderBase#stop
  3. WebappClassLoaderBase#clearReferences
  4. WebappClassLoaderBase#checkThreadLocalsForLeaks(需要clearReferencesThreadLocals=true,默认也为true)
private void checkThreadLocalsForLeaks() {
    // 获取JVM的所有线程
    Thread[] threads = getThreads();

    try {
        // 获取Thread中threadLocals与inheritableThreadLocals两个字段
        Field threadLocalsField = Thread.class.getDeclaredField("threadLocals");
        threadLocalsField.setAccessible(true);
        Field inheritableThreadLocalsField = Thread.class.getDeclaredField("inheritableThreadLocals");
        inheritableThreadLocalsField.setAccessible(true);
        // ThreadLocalMap.table 数组字段(用于存储 Entry 的数组)
        Class<?> tlmClass = Class.forName("java.lang.ThreadLocal$ThreadLocalMap");
        Field tableField = tlmClass.getDeclaredField("table");
        tableField.setAccessible(true);
        // ThreadLocalMap中删除过期Entry方法
        Method expungeStaleEntriesMethod = tlmClass.getDeclaredMethod("expungeStaleEntries");
        expungeStaleEntriesMethod.setAccessible(true);

        for (Thread thread : threads) {
            Object threadLocalMap;
            if (thread != null) {

                // 检查 threadLocals
                threadLocalMap = threadLocalsField.get(thread);
                if (null != threadLocalMap) {
                    // 主动触发清理过期Entry(即key被回收的Entry)
                    expungeStaleEntriesMethod.invoke(threadLocalMap);
                    checkThreadLocalMapForLeaks(threadLocalMap, tableField);
                }

                // 检查 inheritableThreadLocals
                threadLocalMap = inheritableThreadLocalsField.get(thread);
                if (null != threadLocalMap) {
                    expungeStaleEntriesMethod.invoke(threadLocalMap);
                    checkThreadLocalMapForLeaks(threadLocalMap, tableField);
                }
            }
        }
    } catch (Throwable t) {
        // 异常处理...
    }
}

private void checkThreadLocalMapForLeaks(Object map,
        Field internalTableField) throws IllegalAccessException,
        NoSuchFieldException {
    if (map != null) {
        Object[] table = (Object[]) internalTableField.get(map);
        if (table != null) {
            for (Object obj : table) {
                if (obj != null) {
                    boolean keyLoadedByWebapp = false;
                    boolean valueLoadedByWebapp = false;
                    // 检查key(即为ThreadLocal对象或其子类)
                    Object key = ((Reference<?>) obj).get();
                    // loadedByThisOrChild用于检测指定的对象(也支持集合)及其所有父类是否有当前WebappClassLoader所加载
                    if (this.equals(key) || loadedByThisOrChild(key)) {
                        keyLoadedByWebapp = true;
                    }
                    // 检查value(ThreadLocal里真正存放的对象)
                    Field valueField = obj.getClass().getDeclaredField("value");
                    valueField.setAccessible(true);
                    Object value = valueField.get(obj);
                    if (this.equals(value) || loadedByThisOrChild(value)) {
                        valueLoadedByWebapp = true;
                    }
                    // key 或 value 被当前 WebappClassLoader 加载,说明有潜在泄漏
                    if (keyLoadedByWebapp || valueLoadedByWebapp) {
                        Object[] args = new Object[5];
                        args[0] = getContextName();
                        if (key != null) {
                            args[1] = getPrettyClassName(key.getClass());
                            try {
                                args[2] = key.toString();
                            } catch (Exception e) {
                                // ...
                            }
                        }
                        if (value != null) {
                            args[3] = getPrettyClassName(value.getClass());
                            try {
                                args[4] = value.toString();
                            } catch (Exception e) {
                                // ...
                            }
                        }
                        // =========== 仅打日志。value泄漏用error打印,key泄漏则用debug打印 ================
                        if (valueLoadedByWebapp) {
                            log.error(sm.getString(
                                    "webappClassLoader.checkThreadLocalsForLeaks",
                                    args));
                        } else if (value == null) {
                            if (log.isDebugEnabled()) {
                                log.debug(sm.getString(
                                        "webappClassLoader.checkThreadLocalsForLeaksNull",
                                        args));
                            }
                        } else {
                            if (log.isDebugEnabled()) {
                                log.debug(sm.getString(
                                        "webappClassLoader.checkThreadLocalsForLeaksNone",
                                        args));
                            }
                        }
                    }
                }
            }
        }
    }
}

总结

其核心逻辑只有两个:

  1. 通过反射调用所有线程里ThreadLocalMap#expungeStaleEntry,来主动清除需要被GC的Entry。解决案例中demo2和demo3的隐式隐患
  2. 如果Entry里的key(ThreadLocal)和value的相关class由当前WebappClassLoader加载,也仅打日志

为什么仅打日志呢?

​ 因为ThreadLocal中存放的通常是业务线程正在使用的上下文信息(如用户信息、缓存等)。Tomcat 无法判断这些值是否仍然有效或在使用中,如果直接释放掉可能导致运行中的Web程序抛出相关的空指针,因此选择了保守处理,只警告不干预

线程回收

ThreadLocalLeakPreventionListener

​ Tomcat 默认启用了 ThreadLocalLeakPreventionListener(已在 server.xml 中配置)。在触发热加载时,该Listener 会监听 Lifecycle.AFTER_STOP_EVENT 事件,并在收到该事件后调用所有 Worker 线程池的 contextStopping() 方法,从而优雅地中断并清理空闲线程,避免ThreadLocal导致的内存泄漏问题

​ 可以先看看这篇线程池相关源码解析文章。ThreadPoolExecutor#contextStopping代码如下,核心就1件事:对空闲线程进行中断并退出(通过设置corePoolSize=0来保证)

public void contextStopping() {
    // 非常重要:记录当前 Context 停止的时间,用于后续判断线程是否需要回收
    this.lastContextStoppedTime.set(System.currentTimeMillis());

    int savedCorePoolSize = this.getCorePoolSize();
    TaskQueue taskQueue = getQueue() instanceof TaskQueue ? (TaskQueue) getQueue() : null;
    if (taskQueue != null) {
        // 临时设置任务队列剩余容量为0
        // 但ThreadPoolExecutor#setCorePoolSize内部没发现哪里有检查queue.remainingCapacity() ==
        // 0的逻辑,可能是为了兼容其他逻辑吧(offer和poll)???
        taskQueue.setForcedRemainingCapacity(0);
    }

    // 将核心线程池设为0:内部会中断所有空闲线程并触发getTask()里的退出逻辑
    this.setCorePoolSize(0);

    // ======== 恢复线程池原始配置,避免影响http请求 ============

    if (taskQueue != null) {
        taskQueue.resetForcedRemainingCapacity();
    }
    this.setCorePoolSize(savedCorePoolSize);
}

TaskQueue

​ 单靠上述的 ThreadLocalLeakPreventionListener 并不足以关闭运行中的 worker 线程。为此,Tomcat 自定义了一个 TaskQueue 作为线程池的任务队列,并重写了几个关键方法,以精细控制 worker 线程的创建与回收策略。

​ 这里重点关注其重写的 polltake 方法 —— 它们是线程池中 worker 线程获取任务的主要途径。Tomcat 在此做了一个巧妙的设计:仅对需要被回收的线程(即线程创建时间早于当前 Context 的上次 stop 时间)应用 fallback 策略

  • take() 方法原本是阻塞式的,这里被 fallback 到poll(),即允许超时等待,从而避免永久阻塞
  • poll()若超时返回 null,则直接 fallback 到抛出异常,从而促使线程正常终止
public class TaskQueue extends LinkedBlockingQueue<Runnable> {
    // ================= 重写的offer方法,调整线程的创建策略 ====================

    /**
     * 目的:尽量优先创建线程,而不是把任务直接塞进队列
     * 原因:Tomcat 设计希望在线程数未满时尽可能创建新线程,提高吞吐量。
     */
    @Override
    public boolean offer(Runnable o) {
        if (parent == null) {
            return super.offer(o);
        }
        // 如果线程池已达到最大线程数,那只能乖乖入队列
        if (parent.getPoolSize() == parent.getMaximumPoolSize()) {
            return super.offer(o);
        }
        // 还有空闲线程(已提交任务数 <= 当前线程数),也入队
        if (parent.getSubmittedCount() <= (parent.getPoolSize())) {
            return super.offer(o);
        }
        // 线程数没达到最大限制,并且提交的任务数已多余当前线程数(请求压力大)。则返回 false 触发线程池直接创建新建线程
        if (parent.getPoolSize() < parent.getMaximumPoolSize()) {
            return false;
        }
        // 默认入队
        return super.offer(o);
    }

    // ================= 重写poll和take方法(仅对需要被终结的线程施加fallback策略,进而调整了线程的回收策略) =========================

    @Override
    public Runnable poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        Runnable runnable = super.poll(timeout, unit);
        if (runnable == null && parent != null) {
            // 内部会判断当前线程是否需要被终结,需要的话会抛出异常,最终终结当前线程
            parent.stopCurrentThreadIfNeeded();
        }
        // 只要有任务,还是直接返回,不影响活跃线程
        return runnable;
    }

    @Override
    public Runnable take() throws InterruptedException {
        if (parent != null && parent.currentThreadShouldBeStopped()) {
            // fallback为poll方法,即允许超时
            return poll(parent.getKeepAliveTime(TimeUnit.MILLISECONDS),
                    TimeUnit.MILLISECONDS);
        }
        // 正常线程还是阻塞等待
        return super.take();
    }
}

总结

ThreadLocalLeakPreventionListener 负责主动回收空闲的 worker 线程,而 TaskQueue 则通过重写 polltake 方法,利用线程池的任务获取机制,实现对线程回收的兜底策略。两者配合,仅在 线程空闲时 才进行处理,不影响活跃线程的正常工作。通过这种机制,Tomcat 能够在热加载后安全且优雅的清理掉旧的所有worker 线程,从而避免因ThreadLocal残留导致的内存泄漏。

相关链接