Spring-其他重要的BeanPostProcesor


​ AsyncAnnotationBeanPostProcessor和ScheduledAnnotationBeanPostProcessor源码解析

AsyncAnnotationBeanPostProcessor

@EnableAsync

​ 其主要功能是向容器中注册AsyncAnnotationBeanPostProcessor这个BeanDefinition,其继承了AbstractBeanFactoryAwareAdvisingPostProcessor,可以处理指定的Advisor以此实现@Async的代理,来开启异步

主要属性

属性 说明
annotation 自定义的异步注解,可以用来定制化
proxyTargetClass 是否代理目标类。默认为false,即走jdk代理;为true则用cglib代理
mode AOP 模式,取值有:
PROXY(默认):使用 Spring 的代理机制
ASPECTJ:使用 AspectJ(需要额外依赖与编译时/加载时织入配置)
order AsyncAnnotationBeanPostProcessor的执行顺序,默认最后才执行

AsyncAnnotationAdvisor

​ 在AsyncAnnotationBeanPostProcessor#setBeanFactory(bean初始化阶段的一个hook方法,用于给bean填充BeanFactory)中设置的Advisor


public class AsyncAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware {

    private Advice advice;

    private Pointcut pointcut;

    public AsyncAnnotationAdvisor(
            @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

        Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
        // 支持@Async
        asyncAnnotationTypes.add(Async.class);
        try {
            // 也支持EJB的@Asynchronous
            asyncAnnotationTypes.add((Class<? extends Annotation>) ClassUtils.forName("javax.ejb.Asynchronous",
                    AsyncAnnotationAdvisor.class.getClassLoader()));
        } catch (ClassNotFoundException ex) {
            // 类路径里不存在@javax.ejb.Asynchronous,也无所谓
        }
        this.advice = buildAdvice(executor, exceptionHandler);
        this.pointcut = buildPointcut(asyncAnnotationTypes);
    }

    /**
     * 构建异步拦截器 Advice:用于真正执行异步调用逻辑
     */
    protected Advice buildAdvice(
            @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

        AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
        interceptor.configure(executor, exceptionHandler);
        return interceptor;
    }

    /**
     * 将asyncAnnotationTypes所有注解构造成一个ComposablePointcut,只要class或method上存在任意一个asyncAnnotationType,即可进行代理
     */
    protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
        ComposablePointcut result = null;
        for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
            // AnnotationMatchingPointcut是用于注解进行匹配的Pointcut
            // cpc用于class匹配(只要class上存在asyncAnnotationType注解就行)
            Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
            // mpc用于方法匹配(只要method上存在asyncAnnotationType注解就行)
            Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
            // ComposablePointcut为注解组合,或逻辑。只要内部的ClasaFilter匹配一个就为true。MethodMatcher匹配一个就为true
            if (result == null) {
                result = new ComposablePointcut(cpc);
            } else {
                result.union(cpc);
            }
            result = result.union(mpc);
        }
        return (result != null ? result : Pointcut.TRUE);
    }

}

AnnotationAsyncExecutionInterceptor

​ 拦截器核心,用于解析指定的执行器,并异步执行目标方法

public class AnnotationAsyncExecutionInterceptor extends AsyncExecutionInterceptor {

    public Object invoke(final MethodInvocation invocation) throws Throwable {
        Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
        Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
        final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);

        // 根据@Async的value来获取容器中对象的执行器,准备在这个Executor中执行代理逻辑
        AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
        if (executor == null) {
            throw new IllegalStateException(
                    "No executor specified and no default executor set on AsyncExecutionInterceptor either");
        }

        // 构建Callable
        Callable<Object> task = () -> {
            try {
                Object result = invocation.proceed();
                if (result instanceof Future) {
                    return ((Future<?>) result).get();
                }
            }
            catch (ExecutionException ex) {
                handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
            }
            catch (Throwable ex) {
                handleError(ex, userDeclaredMethod, invocation.getArguments());
            }
            return null;
        };

        // 提交任务到Executor中
        return doSubmit(task, executor, invocation.getMethod().getReturnType());
    }

    /**
     *  可知对于CompletableFuture,ListenableFuture,Future这三种返回值都是支持的,但对于其他返回值都直接返回null了
     */
    protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
        if (CompletableFuture.class.isAssignableFrom(returnType)) {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    return task.call();
                }
                catch (Throwable ex) {
                    throw new CompletionException(ex);
                }
            }, executor);
        }
        else if (ListenableFuture.class.isAssignableFrom(returnType)) {
            return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
        }
        else if (Future.class.isAssignableFrom(returnType)) {
            return executor.submit(task);
        }
        else {
            executor.submit(task);
            return null;
        }
    }
}

总结

  • 方法或类上只要存在@Async或@Asynchronous即可走异步
  • 可使用@Async的value字段指定异步线程池(前提是要这个线程池在容器中)
    • 如未指定value,则使用容器中的AsyncConfigurer提供的Executor
      • 如果容器中没有AsyncConfigurer,则使用默认的SimpleAsyncTaskExecutor。其每次execute都会新开一个线程
  • 只支持CompletableFuture,ListenableFuture,Future这三种返回值。其余返回值都直接返回null
  • 异步切面会在这个方法中的所有切面里最先执行(因为其AsyncAnnotationBeanPostProcessor.beforeExistingAdvisors = true)

ScheduledAnnotationBeanPostProcessor

@EnableScheduling

​ 向容器中注册ScheduledAnnotationBeanPostProcessor这个BeanDefinition,用于@Schedules和@Scheduled的解析和注册

ScheduledAnnotationBeanPostProcessor

核心源码

public class ScheduledAnnotationBeanPostProcessor
        implements ScheduledTaskHolder, MergedBeanDefinitionPostProcessor, DestructionAwareBeanPostProcessor,
        Ordered, EmbeddedValueResolverAware, BeanNameAware, BeanFactoryAware, ApplicationContextAware,
        SmartInitializingSingleton, ApplicationListener<ContextRefreshedEvent>, DisposableBean {

    /**
     * bean初始化完成后的hook,只做定时任务的解析和封装,这里并不执行
     */
    public Object postProcessAfterInitialization(Object bean, String beanName) {
        if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
                bean instanceof ScheduledExecutorService) {
            // Ignore AOP infrastructure such as scoped proxies.
            return bean;
        }

        Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
        if (!this.nonAnnotatedClasses.contains(targetClass)) {
            // 解析方法上的@Schedules和@Scheduled
            Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
                    (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
                        Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
                                method, Scheduled.class, Schedules.class);
                        return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
                    });
            if (annotatedMethods.isEmpty()) {
                this.nonAnnotatedClasses.add(targetClass);
            } else {
                // 存在注解,对每个定时任务方法进行处理
                annotatedMethods.forEach((method, scheduledMethods) -> scheduledMethods
                        .forEach(scheduled -> processScheduled(scheduled, method, bean)));
            }
        }
        return bean;
    }

    /**
     * 解析@Scheduled注解,可知其定时字段优先级:cron > fixedDelay > fixedDelayString > fixedRate > fixedRateString
     */
    protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
        try {
            // 将方法封装为可直接运行的Runnable(方法不能有参数)
            Runnable runnable = createRunnable(bean, method);
            // 是否已处理过(一个@Scheduled只能使用一个定时)
            boolean processedSchedule = false;
            String errorMessage = "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";

            Set<ScheduledTask> tasks = new LinkedHashSet<>(4);

            // 解析初始延时
            long initialDelay = scheduled.initialDelay();
            String initialDelayString = scheduled.initialDelayString();
            if (StringUtils.hasText(initialDelayString)) {
                Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");
                if (this.embeddedValueResolver != null) {
                    initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
                }
                if (StringUtils.hasLength(initialDelayString)) {
                    try {
                        initialDelay = parseDelayAsLong(initialDelayString);
                    } catch (RuntimeException ex) {
                        throw new IllegalArgumentException(
                                "Invalid initialDelayString value \"" + initialDelayString
                                        + "\" - cannot parse into long");
                    }
                }
            }

            // 优先解析cron表达式
            String cron = scheduled.cron();
            if (StringUtils.hasText(cron)) {
                String zone = scheduled.zone();
                if (this.embeddedValueResolver != null) {
                    cron = this.embeddedValueResolver.resolveStringValue(cron);
                    zone = this.embeddedValueResolver.resolveStringValue(zone);
                }
                if (StringUtils.hasLength(cron)) {
                    // 校验cron不支持初始延时
                    Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
                    processedSchedule = true;
                    if (!Scheduled.CRON_DISABLED.equals(cron)) {
                        TimeZone timeZone;
                        if (StringUtils.hasText(zone)) {
                            timeZone = StringUtils.parseTimeZoneString(zone);
                        } else {
                            timeZone = TimeZone.getDefault();
                        }
                        // 构造为CronTask,先缓存起来
                        tasks.add(this.registrar
                                .scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
                    }
                }
            }

            if (initialDelay < 0) {
                initialDelay = 0;
            }

            // 解析固定周期
            long fixedDelay = scheduled.fixedDelay();
            if (fixedDelay >= 0) {
                Assert.isTrue(!processedSchedule, errorMessage);
                processedSchedule = true;
                tasks.add(
                        this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
            }
            String fixedDelayString = scheduled.fixedDelayString();
            if (StringUtils.hasText(fixedDelayString)) {
                if (this.embeddedValueResolver != null) {
                    fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);
                }
                if (StringUtils.hasLength(fixedDelayString)) {
                    Assert.isTrue(!processedSchedule, errorMessage);
                    processedSchedule = true;
                    try {
                        fixedDelay = parseDelayAsLong(fixedDelayString);
                    } catch (RuntimeException ex) {
                        throw new IllegalArgumentException(
                                "Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long");
                    }
                    // 构造为FixedDelayTask,先缓存起来
                    tasks.add(this.registrar
                            .scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
                }
            }

            // 最后才解析固定频率
            long fixedRate = scheduled.fixedRate();
            if (fixedRate >= 0) {
                Assert.isTrue(!processedSchedule, errorMessage);
                processedSchedule = true;
                tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
            }
            String fixedRateString = scheduled.fixedRateString();
            if (StringUtils.hasText(fixedRateString)) {
                if (this.embeddedValueResolver != null) {
                    fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
                }
                if (StringUtils.hasLength(fixedRateString)) {
                    Assert.isTrue(!processedSchedule, errorMessage);
                    processedSchedule = true;
                    try {
                        fixedRate = parseDelayAsLong(fixedRateString);
                    } catch (RuntimeException ex) {
                        throw new IllegalArgumentException(
                                "Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long");
                    }
                    // 构造为FixedRateTask,先缓存起来
                    tasks.add(
                            this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
                }
            }

            // 校验使用了注解的必须有定时任务
            Assert.isTrue(processedSchedule, errorMessage);

            // Finally register the scheduled tasks
            synchronized (this.scheduledTasks) {
                Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));
                regTasks.addAll(tasks);
            }
        } catch (IllegalArgumentException ex) {
            throw new IllegalStateException(
                    "Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage());
        }
    }

    /**
     * ContextRefreshed事件,表示容器刷新完毕,可以真正的注册定时任务了
     */
    public void onApplicationEvent(ContextRefreshedEvent event) {
        if (event.getApplicationContext() == this.applicationContext) {
            finishRegistration();
        }
    }

    /**
     * 开始真正的投递定时任务到线程池中。
     * 定时线程池优先级:
     * 1. scheduler
     * 2. TaskScheduler(beanName为taskScheduler) bean
     * 3. 任意TaskScheduler bean
     * 4. ScheduledExecutorService(beanName为taskScheduler) bean
     * 5. 任意ScheduledExecutorService bean
     * 6. ScheduledThreadPoolExecutor(线程数为1)
     */
    private void finishRegistration() {
        if (this.scheduler != null) {
            this.registrar.setScheduler(this.scheduler);
        }

        if (this.beanFactory instanceof ListableBeanFactory) {
            Map<String, SchedulingConfigurer> beans = ((ListableBeanFactory) this.beanFactory)
                    .getBeansOfType(SchedulingConfigurer.class);
            List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());
            AnnotationAwareOrderComparator.sort(configurers);
            for (SchedulingConfigurer configurer : configurers) {
                configurer.configureTasks(this.registrar);
            }
        }

        if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
            Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type");
            try {
                // 先获取beanName为taskScheduler的TaskScheduler bean
                this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));
            } catch (NoUniqueBeanDefinitionException ex) {
                try {
                    // 再按class获取TaskScheduler bean
                    this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true));
                } catch (NoSuchBeanDefinitionException ex2) {

                }
            } catch (NoSuchBeanDefinitionException ex) {
                try {
                    // 再获取beanName为taskScheduler的ScheduledExecutorService bean
                    this.registrar.setScheduler(
                            resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false));
                } catch (NoUniqueBeanDefinitionException ex2) {
                    try {
                        // 再按class获取ScheduledExecutorService bean
                        this.registrar.setScheduler(
                                resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true));
                    } catch (NoSuchBeanDefinitionException ex3) {

                    }
                } catch (NoSuchBeanDefinitionException ex2) {

                }
            }
        }
        // 到这了如果taskScheduler还没有,内部则会创建一个容量为1的ScheduledThreadPoolExecutor去执行定时任务
        // 将bean解析完成的所有定时任务投递到定时任务线程吃
        this.registrar.afterPropertiesSet();
    }

}

总结

  • bean初始化完成后解析才@Scheduled和@Schedules(只是解析,并不投递),缓存到统一的ScheduledTaskRegistrar中
    • @Scheduled只能解析为一个定时任务,优先级:
      1. cron
      2. fixedDelay
      3. fixedDelayString
      4. fixedRate
      5. fixedRateString
    • 要定时成多个任务需要使用@Schedules
  • ContextRefreshedEvent事件触发,开始投递任务
    • 先获取任务定时器,定时线程池优先级:
      1. ScheduledAnnotationBeanPostProcessor#scheduler(只能为TaskScheduler或ScheduledExecutorService的子类)
      2. 任意TaskScheduler bean
      3. TaskScheduler(beanName为taskScheduler) bean
      4. 任意ScheduledExecutorService bean
      5. ScheduledExecutorService(beanName为taskScheduler) bean
      6. ScheduledThreadPoolExecutor(线程数为1),即默认线程池
    • 将解析阶段ScheduledTaskRegistrar内缓存的Task都投递到定时线程池中
  • 重点了解下cron,它会被封装为ReschedulingRunnable,每次执行完后才根据cron计算下一次任务的执行时间,这样循环动态的计算执行时间