Async 异步注解支持异步上下文

App

// @EnableAsync // 禁止开启此注释,否则 AsyncContextAopConfig 会拦截上下文失败

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.bozhi.xiaoluo;

import com.bozhi.xiaoluo.modules.common.utils.SSLUtil;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.EnableAspectJAutoProxy;

@SpringBootApplication
@EnableCaching
// @EnableAsync // 禁止开启此注释,否则 AsyncContextAopConfig 会拦截上下文失败
@EnableAspectJAutoProxy(exposeProxy = true, proxyTargetClass = true)
// exposeProxy = true,可以 通过 AopContext.currentProxy() 获取代理类
// proxyTargetClass = true,使用 CGLIB 动态代理
public class XiaoluoApplication {

public static void main(String[] args) {
SSLUtil.ignoreSsl();
SpringApplication.run(XiaoluoApplication.class, args);
}

}

AsyncContextAopConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.bozhi.xiaoluo.config.aop;

import com.bozhi.xiaoluo.modules.common.utils.AsyncUtils;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import java.util.concurrent.CompletionException;

/**
* 使 @Async 注解支持异步上下文
*/
@Component
@Aspect
@Slf4j
public class AsyncContextAopConfig {

/**
* @see Async
*/
@Pointcut("@annotation(org.springframework.scheduling.annotation.Async)")
private void pointcut() {
}

/**
* @see AsyncUtils#runAsync(Runnable) 异步方法代理
*/
@Around("pointcut()")
public Object aroundAdvice(ProceedingJoinPoint point) throws Throwable {

AsyncUtils.runAsync(() -> {
try {
point.proceed(point.getArgs());
} catch (Throwable e) {
throw new CompletionException(e);
}
});

return null;// @Async 标注的方法,总是返回 null
}

}

AsyncUtils

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package com.bozhi.xiaoluo.modules.common.utils;

import cn.hutool.core.thread.AsyncUtil;
import cn.hutool.core.thread.NamedThreadFactory;
import com.bozhi.xiaoluo.modules.user.vo.UserInfoVo;
import com.google.common.base.Supplier;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* 异步工具类 --- 支持上下文
*/
public class AsyncUtils {

static Executor executor = new ThreadPoolExecutor(
20, 200,// 20核心线程,200最大线程(执行完毕 30s 取不到任务即销毁)
60L, TimeUnit.SECONDS,// 60秒到期时间
new SynchronousQueue<>(),// 同步队列,队列中不存放元素
new NamedThreadFactory("AsyncUtils-", false)
);

/**
* 异步处理 [有返回值]
*/
public static <T> CompletableFuture<T> supplyAsync(Supplier<T> supplier) {
final String traceId = MDCUtil.get();
final RequestAttributes req = RequestContextHolder.getRequestAttributes();
final UserInfoVo user = ContextUtils.loginUser();

return CompletableFuture.supplyAsync(() -> {
try {
MDCUtil.set(traceId);
RequestContextHolder.setRequestAttributes(req);
ContextUtils.setUser(user);
return supplier.get();
} finally {
RequestContextHolder.resetRequestAttributes();
ContextUtils.remove();
MDCUtil.remove();
}
}, executor);
}

/**
* 异步处理 [无返回值]
*/
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return supplyAsync(() -> {
runnable.run();
return null;
});
}

/**
* 异步处理列表
*
* @param list 需要处理的列表
* @param function 处理函数
* @param supplier 收集函数(用于收集处理后的结果)
* @param <T> list 的元素类型
* @param <R> 处理后的返回值
* @param <C> 把返回值收集为集合
*/
public static <T, R, C extends Collection<R>> C asyncList(List<T> list, Function<T, R> function, Supplier<C> supplier) {
// 1、遍历 list
// 2、异步处理 list ===> 每个元素 T 都会被 function 处理为 R 对象
CompletableFuture<R>[] completableFutures = list.stream()
.map(e -> supplyAsync(() -> function.apply(e)))
.toArray(CompletableFuture[]::new);

// 3、等待所有完成
waitAll(completableFutures);

// 4、获取异步处理结果 => 封装为 C extends Collection<R>
return Arrays.stream(completableFutures)
.map(AsyncUtils::asyncGet)
.collect(Collectors.toCollection(supplier));
}

/**
* 异步处理列表
*
* @param list 需要处理的列表
* @param consumer 处理函数
*/
public static <T> void asyncList(List<T> list, Consumer<T> consumer) {
// 1、遍历 list
// 2、异步处理 list ===> 每个元素 T 都会被 consumer 消费
CompletableFuture<Void>[] completableFutures = list.stream()
.map(e -> runAsync(() -> consumer.accept(e)))
.toArray(CompletableFuture[]::new);

// 3、等待所有完成
waitAll(completableFutures);
}

/**
* 等待所有任务执行完毕,包裹了异常
*
* @param tasks 并行任务
*/
public static void waitAll(CompletableFuture<?>... tasks) {
asyncGet(CompletableFuture.allOf(tasks));
}

/**
* 获取异步任务结果 => 对特殊异常做了处理
*
* @param <T> 任务返回值类型
* @param future 异步任务
* @return 任务返回值
*/
public static <T> T asyncGet(CompletableFuture<T> future) {
return AsyncUtil.get(future);
}
}