Spring-其他重要的BeanPostProcesor
AsyncAnnotationBeanPostProcessor和ScheduledAnnotationBeanPostProcessor源码解析
AsyncAnnotationBeanPostProcessor
@EnableAsync
其主要功能是向容器中注册AsyncAnnotationBeanPostProcessor这个BeanDefinition,其继承了AbstractBeanFactoryAwareAdvisingPostProcessor,可以处理指定的Advisor以此实现@Async的代理,来开启异步
主要属性
属性 | 说明 |
---|---|
annotation |
自定义的异步注解,可以用来定制化 |
proxyTargetClass |
是否代理目标类。默认为false,即走jdk代理;为true则用cglib代理 |
mode |
AOP 模式,取值有:PROXY (默认):使用 Spring 的代理机制ASPECTJ :使用 AspectJ(需要额外依赖与编译时/加载时织入配置) |
order |
AsyncAnnotationBeanPostProcessor的执行顺序,默认最后才执行 |
AsyncAnnotationAdvisor
在AsyncAnnotationBeanPostProcessor#setBeanFactory(bean初始化阶段的一个hook方法,用于给bean填充BeanFactory)中设置的Advisor
public class AsyncAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware {
private Advice advice;
private Pointcut pointcut;
public AsyncAnnotationAdvisor(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
// 支持@Async
asyncAnnotationTypes.add(Async.class);
try {
// 也支持EJB的@Asynchronous
asyncAnnotationTypes.add((Class<? extends Annotation>) ClassUtils.forName("javax.ejb.Asynchronous",
AsyncAnnotationAdvisor.class.getClassLoader()));
} catch (ClassNotFoundException ex) {
// 类路径里不存在@javax.ejb.Asynchronous,也无所谓
}
this.advice = buildAdvice(executor, exceptionHandler);
this.pointcut = buildPointcut(asyncAnnotationTypes);
}
/**
* 构建异步拦截器 Advice:用于真正执行异步调用逻辑
*/
protected Advice buildAdvice(
@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
interceptor.configure(executor, exceptionHandler);
return interceptor;
}
/**
* 将asyncAnnotationTypes所有注解构造成一个ComposablePointcut,只要class或method上存在任意一个asyncAnnotationType,即可进行代理
*/
protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
ComposablePointcut result = null;
for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
// AnnotationMatchingPointcut是用于注解进行匹配的Pointcut
// cpc用于class匹配(只要class上存在asyncAnnotationType注解就行)
Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
// mpc用于方法匹配(只要method上存在asyncAnnotationType注解就行)
Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
// ComposablePointcut为注解组合,或逻辑。只要内部的ClasaFilter匹配一个就为true。MethodMatcher匹配一个就为true
if (result == null) {
result = new ComposablePointcut(cpc);
} else {
result.union(cpc);
}
result = result.union(mpc);
}
return (result != null ? result : Pointcut.TRUE);
}
}
AnnotationAsyncExecutionInterceptor
拦截器核心,用于解析指定的执行器,并异步执行目标方法
public class AnnotationAsyncExecutionInterceptor extends AsyncExecutionInterceptor {
public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
// 根据@Async的value来获取容器中对象的执行器,准备在这个Executor中执行代理逻辑
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
if (executor == null) {
throw new IllegalStateException(
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
}
// 构建Callable
Callable<Object> task = () -> {
try {
Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future<?>) result).get();
}
}
catch (ExecutionException ex) {
handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
}
catch (Throwable ex) {
handleError(ex, userDeclaredMethod, invocation.getArguments());
}
return null;
};
// 提交任务到Executor中
return doSubmit(task, executor, invocation.getMethod().getReturnType());
}
/**
* 可知对于CompletableFuture,ListenableFuture,Future这三种返回值都是支持的,但对于其他返回值都直接返回null了
*/
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
if (CompletableFuture.class.isAssignableFrom(returnType)) {
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
}
catch (Throwable ex) {
throw new CompletionException(ex);
}
}, executor);
}
else if (ListenableFuture.class.isAssignableFrom(returnType)) {
return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
}
else if (Future.class.isAssignableFrom(returnType)) {
return executor.submit(task);
}
else {
executor.submit(task);
return null;
}
}
}
总结
- 方法或类上只要存在@Async或@Asynchronous即可走异步
- 可使用@Async的value字段指定异步线程池(前提是要这个线程池在容器中)
- 如未指定value,则使用容器中的AsyncConfigurer提供的Executor
- 如果容器中没有AsyncConfigurer,则使用默认的SimpleAsyncTaskExecutor。其每次execute都会新开一个线程
- 如未指定value,则使用容器中的AsyncConfigurer提供的Executor
- 只支持CompletableFuture,ListenableFuture,Future这三种返回值。其余返回值都直接返回null
- 异步切面会在这个方法中的所有切面里最先执行(因为其AsyncAnnotationBeanPostProcessor.beforeExistingAdvisors = true)
ScheduledAnnotationBeanPostProcessor
@EnableScheduling
向容器中注册ScheduledAnnotationBeanPostProcessor这个BeanDefinition,用于@Schedules和@Scheduled的解析和注册
ScheduledAnnotationBeanPostProcessor
核心源码
public class ScheduledAnnotationBeanPostProcessor
implements ScheduledTaskHolder, MergedBeanDefinitionPostProcessor, DestructionAwareBeanPostProcessor,
Ordered, EmbeddedValueResolverAware, BeanNameAware, BeanFactoryAware, ApplicationContextAware,
SmartInitializingSingleton, ApplicationListener<ContextRefreshedEvent>, DisposableBean {
/**
* bean初始化完成后的hook,只做定时任务的解析和封装,这里并不执行
*/
public Object postProcessAfterInitialization(Object bean, String beanName) {
if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
bean instanceof ScheduledExecutorService) {
// Ignore AOP infrastructure such as scoped proxies.
return bean;
}
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
if (!this.nonAnnotatedClasses.contains(targetClass)) {
// 解析方法上的@Schedules和@Scheduled
Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
method, Scheduled.class, Schedules.class);
return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
});
if (annotatedMethods.isEmpty()) {
this.nonAnnotatedClasses.add(targetClass);
} else {
// 存在注解,对每个定时任务方法进行处理
annotatedMethods.forEach((method, scheduledMethods) -> scheduledMethods
.forEach(scheduled -> processScheduled(scheduled, method, bean)));
}
}
return bean;
}
/**
* 解析@Scheduled注解,可知其定时字段优先级:cron > fixedDelay > fixedDelayString > fixedRate > fixedRateString
*/
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
try {
// 将方法封装为可直接运行的Runnable(方法不能有参数)
Runnable runnable = createRunnable(bean, method);
// 是否已处理过(一个@Scheduled只能使用一个定时)
boolean processedSchedule = false;
String errorMessage = "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";
Set<ScheduledTask> tasks = new LinkedHashSet<>(4);
// 解析初始延时
long initialDelay = scheduled.initialDelay();
String initialDelayString = scheduled.initialDelayString();
if (StringUtils.hasText(initialDelayString)) {
Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");
if (this.embeddedValueResolver != null) {
initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
}
if (StringUtils.hasLength(initialDelayString)) {
try {
initialDelay = parseDelayAsLong(initialDelayString);
} catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid initialDelayString value \"" + initialDelayString
+ "\" - cannot parse into long");
}
}
}
// 优先解析cron表达式
String cron = scheduled.cron();
if (StringUtils.hasText(cron)) {
String zone = scheduled.zone();
if (this.embeddedValueResolver != null) {
cron = this.embeddedValueResolver.resolveStringValue(cron);
zone = this.embeddedValueResolver.resolveStringValue(zone);
}
if (StringUtils.hasLength(cron)) {
// 校验cron不支持初始延时
Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
processedSchedule = true;
if (!Scheduled.CRON_DISABLED.equals(cron)) {
TimeZone timeZone;
if (StringUtils.hasText(zone)) {
timeZone = StringUtils.parseTimeZoneString(zone);
} else {
timeZone = TimeZone.getDefault();
}
// 构造为CronTask,先缓存起来
tasks.add(this.registrar
.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
}
}
}
if (initialDelay < 0) {
initialDelay = 0;
}
// 解析固定周期
long fixedDelay = scheduled.fixedDelay();
if (fixedDelay >= 0) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
tasks.add(
this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}
String fixedDelayString = scheduled.fixedDelayString();
if (StringUtils.hasText(fixedDelayString)) {
if (this.embeddedValueResolver != null) {
fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);
}
if (StringUtils.hasLength(fixedDelayString)) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
try {
fixedDelay = parseDelayAsLong(fixedDelayString);
} catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long");
}
// 构造为FixedDelayTask,先缓存起来
tasks.add(this.registrar
.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}
}
// 最后才解析固定频率
long fixedRate = scheduled.fixedRate();
if (fixedRate >= 0) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
}
String fixedRateString = scheduled.fixedRateString();
if (StringUtils.hasText(fixedRateString)) {
if (this.embeddedValueResolver != null) {
fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
}
if (StringUtils.hasLength(fixedRateString)) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
try {
fixedRate = parseDelayAsLong(fixedRateString);
} catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long");
}
// 构造为FixedRateTask,先缓存起来
tasks.add(
this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
}
}
// 校验使用了注解的必须有定时任务
Assert.isTrue(processedSchedule, errorMessage);
// Finally register the scheduled tasks
synchronized (this.scheduledTasks) {
Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));
regTasks.addAll(tasks);
}
} catch (IllegalArgumentException ex) {
throw new IllegalStateException(
"Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage());
}
}
/**
* ContextRefreshed事件,表示容器刷新完毕,可以真正的注册定时任务了
*/
public void onApplicationEvent(ContextRefreshedEvent event) {
if (event.getApplicationContext() == this.applicationContext) {
finishRegistration();
}
}
/**
* 开始真正的投递定时任务到线程池中。
* 定时线程池优先级:
* 1. scheduler
* 2. TaskScheduler(beanName为taskScheduler) bean
* 3. 任意TaskScheduler bean
* 4. ScheduledExecutorService(beanName为taskScheduler) bean
* 5. 任意ScheduledExecutorService bean
* 6. ScheduledThreadPoolExecutor(线程数为1)
*/
private void finishRegistration() {
if (this.scheduler != null) {
this.registrar.setScheduler(this.scheduler);
}
if (this.beanFactory instanceof ListableBeanFactory) {
Map<String, SchedulingConfigurer> beans = ((ListableBeanFactory) this.beanFactory)
.getBeansOfType(SchedulingConfigurer.class);
List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());
AnnotationAwareOrderComparator.sort(configurers);
for (SchedulingConfigurer configurer : configurers) {
configurer.configureTasks(this.registrar);
}
}
if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type");
try {
// 先获取beanName为taskScheduler的TaskScheduler bean
this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));
} catch (NoUniqueBeanDefinitionException ex) {
try {
// 再按class获取TaskScheduler bean
this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true));
} catch (NoSuchBeanDefinitionException ex2) {
}
} catch (NoSuchBeanDefinitionException ex) {
try {
// 再获取beanName为taskScheduler的ScheduledExecutorService bean
this.registrar.setScheduler(
resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false));
} catch (NoUniqueBeanDefinitionException ex2) {
try {
// 再按class获取ScheduledExecutorService bean
this.registrar.setScheduler(
resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true));
} catch (NoSuchBeanDefinitionException ex3) {
}
} catch (NoSuchBeanDefinitionException ex2) {
}
}
}
// 到这了如果taskScheduler还没有,内部则会创建一个容量为1的ScheduledThreadPoolExecutor去执行定时任务
// 将bean解析完成的所有定时任务投递到定时任务线程吃
this.registrar.afterPropertiesSet();
}
}
总结
- bean初始化完成后解析才@Scheduled和@Schedules(只是解析,并不投递),缓存到统一的ScheduledTaskRegistrar中
- @Scheduled只能解析为一个定时任务,优先级:
- cron
- fixedDelay
- fixedDelayString
- fixedRate
- fixedRateString
- 要定时成多个任务需要使用@Schedules
- @Scheduled只能解析为一个定时任务,优先级:
- ContextRefreshedEvent事件触发,开始投递任务
- 先获取任务定时器,定时线程池优先级:
- ScheduledAnnotationBeanPostProcessor#scheduler(只能为TaskScheduler或ScheduledExecutorService的子类)
- 任意TaskScheduler bean
- TaskScheduler(beanName为taskScheduler) bean
- 任意ScheduledExecutorService bean
- ScheduledExecutorService(beanName为taskScheduler) bean
- ScheduledThreadPoolExecutor(线程数为1),即默认线程池
- 将解析阶段ScheduledTaskRegistrar内缓存的Task都投递到定时线程池中
- 先获取任务定时器,定时线程池优先级:
- 重点了解下cron,它会被封装为ReschedulingRunnable,每次执行完后才根据cron计算下一次任务的执行时间,这样循环动态的计算执行时间