1.我们都知道spring只是为我们简单的处理线程池,每次用到线程总会new 一个新的线程,效率不高,所以我们需要自定义一个线程池。
2.自定义线程池有两种方法,第一种自定义线程池然后使用自己的自定义的,第二种重写spring默认的线程池,然后使用自己重写过的线程池
一:自定义线程池
1.1 创建线程池
/**
* @author liu
* @Desc springboot自定义线程池
*/
@Configuration
public class ThreadPoolConfig {
@Bean(name = "customThreadExecutor")
public ExecutorService executor() throws Exception {
//cpu核心
int cpuNum = Runtime.getRuntime().availableProcessors();
ExecutorService executor = new ThreadPoolExecutor(4, 4, 1, TimeUnit.MINUTES,
new ArrayBlockingQueue<>(100), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
}
1.2使用他
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@Component
public class Consumer1 {
@Resource
@Qualifier(value = "customThreadExecutor")//指定线程池的名字
private ExecutorService executorService;
public void test(String msg){
System.out.println(Thread.currentThread().getName()+":"+msg);
/**
* 分类1:可以返回值的 Callable
*/
Future fal = executorService.submit(new Callable<String>() {
@Override
public String call() {
System.out.println(Thread.currentThread().getName()+":"+msg);
return "处理成功!";
}
});
try {
System.out.println(fal.get());
}catch (Exception e){
System.out.println(e);
}
/**
* 分类2:不会返回值的 Runnable
*/
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+":"+msg);
}
});
/**
* 分类3:也可以这样
*/
executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+":"+msg);
}
});
}
}
或者这样使用也可以
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@RestController
@RequestMapping("/custom")
@Slf4j
public class ThreadPoolTest {
@Autowired
@Qualifier(value = "customThreadExecutor")
ExecutorService executorService;
@RequestMapping(value = "/thread", method = RequestMethod.GET)
public String task() {
Future fal = executorService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println(Thread.currentThread().getName() + ":");
return "处理成功!";
}
});
try {
fal.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return "ok";
}
}
二:配置默认的线程池
2.1 第一种方式的那个线程池使用时候总要加注解@Async(“taskExecutor”),而这种方式是重写spring默认线程池的方式,使用的时候只需要加@Async注解就可以,不用去声明线程池类。
2.2 这个和上面的TaskThreadPoolConfig类相同,这里不重复
2.3 NativeAsyncTaskExecutePool.java 装配线程池
**
* 原生(Spring)异步任务线程池装配类,实现AsyncConfigurer重写他的两个方法,这样在使用默认的
* 线程池的时候就会使用自己重写的
*/
@Slf4j
@Configuration
public class NativeAsyncTaskExecutePool implements AsyncConfigurer{
//注入配置类
@Autowired
TaskThreadPoolConfig config;
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//核心线程池大小
executor.setCorePoolSize(config.getCorePoolSize());
//最大线程数
executor.setMaxPoolSize(config.getMaxPoolSize());
//队列容量
executor.setQueueCapacity(config.getQueueCapacity());
//活跃时间
executor.setKeepAliveSeconds(config.getKeepAliveSeconds());
//线程名字前缀
executor.setThreadNamePrefix("NativeAsyncTaskExecutePool-");
// setRejectedExecutionHandler:当pool已经达到max size的时候,如何处理新任务
// CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 等待所有任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.initialize();
return executor;
}
/**
* 异步任务中异常处理
* @return
*/
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new AsyncUncaughtExceptionHandler() {
@Override
public void handleUncaughtException(Throwable arg0, Method arg1, Object... arg2) {
log.error("=========================="+arg0.getMessage()+"=======================", arg0);
log.error("exception method:"+arg1.getName());
}
};
}
}
2.4 测试service方法
/**
*
*/
@Service
public class ThreadPoolService2 {
private static final Logger logger = LoggerFactory.getLogger(ThreadPoolService2.class);
/**
* @Async该注解不需要在指定任何bean
*/
@Async
public void executeAsync() {
logger.info("start executeAsync");
try {
System.out.println("当前运行的线程名称:" + Thread.currentThread().getName());
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
logger.info("end executeAsync");
}
}
2.5 测试controller
/**
*
*/
@Api(description = "测试控制类22222")
@RestController
@RequestMapping("/threadPoolController2")
public class ThreadPoolController2 {
@Autowired
private ThreadPoolService2 threadPoolService;
@ApiOperation(value = "测试方法")
@ResponseBody
@RequestMapping(value = "/test",method = RequestMethod.GET)
public String threadPoolTest() {
threadPoolService.executeAsync();
return "hello word!";
}
}
3.另外一种方式
import com.example.recordlog.threadFactory.CustomThreadFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.*;
/**
* @author liu
* @Desc springboot自定义线程池
*/
@Configuration
public class ThreadPoolConfig {
@Bean(name = "customThreadExecutor")
public ExecutorService nlpFeignExecutor() throws Exception {
ExecutorService executor = new ThreadPoolExecutor(4, 8,
0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(5), new CustomThreadFactory("custom-Thread-pool"), new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
try {
e.getQueue().put(r);
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
}
}
});
return executor;
}
}
自定义实现ThreadFactory
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author
* @Desc 自定义线程工厂
*/
public class CustomThreadFactory implements java.util.concurrent.ThreadFactory {
private AtomicInteger threadNo = new AtomicInteger(1);
private final String nameStart;
private final String nameEnd = "]";
public CustomThreadFactory(String poolName) {
this.nameStart = "[" + poolName + "-";
}
@Override
public Thread newThread(Runnable r) {
String threadName = this.nameStart + this.threadNo.getAndIncrement() + "]";
Thread newThread = new Thread(r, threadName);
newThread.setDaemon(true);
if (newThread.getPriority() != 5) {
newThread.setPriority(5);
}
return newThread;
}
}
使用自定义线程池
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import java.util.concurrent.ExecutorService;
@Controller
@RequestMapping("/custom")
@Slf4j
public class ThreadPoolTest {
@Autowired
@Qualifier(value = "customThreadExecutor")
ExecutorService executorService;
@RequestMapping(value = "/thread", method = RequestMethod.GET)
public String task() {
for (int i = 0; i < 5; i++) {
executorService.execute(() -> {
//实现逻辑
System.out.println(Thread.currentThread() + "正在执行");
});
}
return "OK";
}
}