线程池配置
- Java线程池实现原理及其在美团业务中的实践:https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html
- SpringBoot 线程池 配置使用:https://www.cnblogs.com/dudou/p/15136180.html
- 线程池参数及配置:https://blog.csdn.net/xinpz/article/details/110132365
线程池 ThreadPoolExecutor
JDK 中提供的 ThreadPoolExecutor 类
使用线程池可以带来一系列好处:
- 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
- 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
- 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
= 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。
自定义线程池执行异步任务
线程池配置
### 自定义配置
task.pool.core-size = 8
task.pool.maxSize = 8
task.pool.keepAliveSeconds = 60
task.pool.queueCapacity = 20
自定义线程池
@Data
@Configuration
@ConfigurationProperties(prefix = "task.pool")
public class ThreadPoolConfig {
private int coreSize;
private int maxSize;
private int keepAliveSeconds;
private int queueCapacity;
@Bean("customizeThreadPool")
public Executor doConfigCustomizeThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//核心线程池大小
executor.setCorePoolSize(coreSize);
//最大线程数
executor.setMaxPoolSize(maxSize);
//队列容量
executor.setQueueCapacity(queueCapacity);
//活跃时间
executor.setKeepAliveSeconds(keepAliveSeconds);
//线程名字前缀
executor.setThreadNamePrefix("customize-thread-");
/*
当poolSize已达到maxPoolSize,如何处理新任务(是拒绝还是交由其它线程处理)
CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行
*/
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
重写 SpringBoot 默认异步线程池
@Data
@Configuration
@ConfigurationProperties(prefix = "task.pool")
public class OverrideDefaultThreadPoolConfig implements AsyncConfigurer {
private static final Logger log = LoggerFactory.getLogger(OverrideDefaultThreadPoolConfig.class);
private int coreSize;
private int maxSize;
private int keepAliveSeconds;
private int queueCapacity;
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//核心线程池大小
executor.setCorePoolSize(coreSize);
//最大线程数
executor.setMaxPoolSize(maxSize);
//队列容量
executor.setQueueCapacity(queueCapacity);
//活跃时间
executor.setKeepAliveSeconds(keepAliveSeconds);
//线程名字前缀
executor.setThreadNamePrefix("default-thread-");
/*
当poolSize已达到maxPoolSize,如何处理新任务(是拒绝还是交由其它线程处理)
CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行
*/
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (ex, method, params) -> {
log.error("==========================" + ex.getMessage() + "=======================", ex);
log.error("exception method:" + method.getName());
};
}
}
定义异步方法
- 异步方法来执行异步任务
@EnableAsync
@Component
public class ThreadPoolHandler {
@Async("customizeThreadPool")
public void doStatusAnalyseHandle(String start, String end) {
int sleepSeconds = new Random().nextInt(3) + 1;
if(sleepSeconds == 3){
throw new RuntimeException("333");
}
try {
Thread.sleep(sleepSeconds * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("在自定义线程" + Thread.currentThread().getName() + "执行了" + sleepSeconds + "秒");
}
@Async
public void doStatusAnalyseHandle(String end) {
int sleepSeconds = new Random().nextInt(3) + 1;
if(sleepSeconds == 3){
throw new RuntimeException("333");
}
try {
Thread.sleep(sleepSeconds * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("在默认线程" + Thread.currentThread().getName() + "执行了" + sleepSeconds + "秒");
}
}
测试异步任务执行
@SpringBootTest
public class ThreadPoolTest {
@Autowired
private ApplicationContext applicationContext;
@Autowired
private ThreadPoolHandler threadPoolHandler;
@Test
void testThreadPool(){
applicationContext.getBean(ThreadPoolHandler.class).doStatusAnalyseHandle(null,null);
applicationContext.getBean(ThreadPoolHandler.class).doStatusAnalyseHandle(null);
threadPoolHandler.doStatusAnalyseHandle(null);
threadPoolHandler.doStatusAnalyseHandle(null,null);
}
}