Java线程池的使用

Java中创建线程池的方式一般有两种

  1. 通过Executors工厂方法创建
  2. 通过ThreadPoolExecutor自定义创建(推荐,可以指定线程池大小)

线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式的原因(阿里巴巴Java开发手册):

Executors工厂方法创建线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class TestThreadPoolExecutor {
public static void main(String[] args) {
// 1. 创建使用单个线程的线程池
ExecutorService es1 = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
es1.submit(() -> System.out.println(Thread.currentThread().getName() + "正在执行任务"));
}
es1.shutdown(); // 线程池不再接收新的线程,并执行完已经提交的线程,最后关闭线程池
while (!es1.isTerminated()) { // 等待线程执行结束,你也可以使用"executor.awaitTermination();"来等待
}
System.out.println("es1 Finished");

// 2.创建使用固定线程数的线程池
ExecutorService es2 = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
es2.submit(() -> System.out.println(Thread.currentThread().getName() + "正在执行任务"));
}
es2.shutdown();
while (!es2.isTerminated()) {
}
System.out.println("es2 Finished");

// 3. 创建一个会根据需要创建新线程的线程池
ExecutorService es3 = Executors.newCachedThreadPool();
for (int i = 0; i < 20; i++) {
es3.submit(() -> System.out.println(Thread.currentThread().getName() + "正在执行任务"));
}
es3.shutdown();
while (!es3.isTerminated()) {
}
System.out.println("es3 Finished");

// 4. 创建拥有固定线程数量的定时线程任务的线程池
ScheduledExecutorService es4 = Executors.newScheduledThreadPool(2);
System.out.println("时间:" + System.currentTimeMillis());
for (int i = 0; i < 5; i++) {
es4.schedule(() -> System.out.println("时间:" + System.currentTimeMillis() + "--" + Thread.currentThread().getName() + "正在执行任务"), 3, TimeUnit.SECONDS);
}
es4.shutdown();
while (!es4.isTerminated()) {
}
System.out.println("es4 Finished");

// 5. 创建只有一个线程的定时线程任务的线程池
ScheduledExecutorService es5 = Executors.newSingleThreadScheduledExecutor();
System.out.println("时间:" + System.currentTimeMillis());
for (int i = 0; i < 5; i++) {
es5.schedule(() -> System.out.println("时间:" + System.currentTimeMillis() + "--" + Thread.currentThread().getName() + "正在执行任务"), 3, TimeUnit.SECONDS);
}
es5.shutdown();
while (!es5.isTerminated()) {
}
System.out.println("es5 Finished");
}
}

ThreadPoolExecutor自定义方式创建线程池

生命周期

运行状态:

状态 描述
RUNNING 能够接受新提交的任务,并且也能处理阻塞队列里面的任务
SHUTDOWN 关闭状态,不在接受新提交的任务,但是可以处理阻塞队列里面已保存的任务
STOP 不在接受新的任务,也不处理阻塞队列里面的任务,会终端正在处理任务的线程
TIDYING 所有任务都已终止,有效线程数(workerCount)为0当线程池变为TIDYING状态时,会执行钩子函数terminated()
TERMINATED 在terminated()方法进入这个状态

生命周期:

重要参数

1
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) ;
  1. corePoolSize :核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads() 或者prestartCoreThread() 方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize 个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize 后,就会把到达的任务放到缓存队列当中;
  2. maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;
  3. workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:
名称 描述
ArrayBlockingQueue 一个由数组结构组成的有界阻塞队列
LinkedBlockingQueue 一个由链表结构组成的有界阻塞队列
PriorityBlockingQueue 一个支持线程优先级排序的无界阻塞队列,默认自然序进行排列,也可以自定义上线compareTo()方法指定元素排列顺序,不能保证同优先级元素的顺序
DelayQueue 一个实现PriorityBlockingQueue实现延迟获取的无界阻塞队列,创建队列时,可以设置多久才能在队列中获取当前元素。只有延时期满后才能从获取到队列
SynchronousQueue 一个不存储元素的阻塞队列
LinkedTransferQueue 一个由链表结构组成的无界阻塞队列
LinkedBlockingDeque 一个由链表结构组成的双向阻塞队列,队列的头部和尾部都可以添加或移除元素,多线程并发时,可以将锁的竞争最多降到一半

ArrayBlockingQueuePriorityBlockingQueue使用较少,一般使用LinkedBlockingQueueSynchronousQueue 。线程池的排队策略与BlockingQueue 有关。

其他常见参数:

  1. keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize 时,keepAliveTime 才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize 时,如果一个线程空闲的时间达到keepAliveTime ,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean) 方法,在线程池中的线程数不大于corePoolSize 时,keepAliveTime参数也会起作用,直到线程池中的线程数为0; unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:

    • TimeUnit.DAYS; // 天
    • TimeUnit.HOURS; // 小时
    • TimeUnit.MINUTES; // 分钟
    • TimeUnit.SECONDS; // 秒
    • TimeUnit.MILLISECONDS; // 毫秒
    • TimeUnit.MICROSECONDS; // 微妙
    • TimeUnit.NANOSECONDS; // 纳秒
  2. unit : keepAliveTime 参数的时间单位。

  3. threadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程做些更有意义的事情,比如设置daemon和优先级等等

  4. handler:表示当拒绝处理任务时的策略,有以下四种取值:

名称 描述
AbortPolicy 直接抛出异常,默认
CallerRunsPolicy 只用调用者所在线程来运行任务
DiscardOldestPolicy 丢弃队列里最近的一个任务,并执行当前任务
DiscardPolicy 不处理,丢弃掉

也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务

配置合理的参数

出自《Java并发编程实战》

CPU 密集型指利用 CPU 计算能力的任务比如你在内存中对大量数据进行计算。 IO 密集型指网络读取,文件读取

  • CPU 密集型任务(N+1): 这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
  • I/O 密集型任务(2N): 这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。

查看CPU核数

1
System.out.println(Runtime.getRuntime().availableProcessors());

任务调度

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import java.io.IOException;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class ThreadTest {
public static void main(String[] args) throws IOException {
int corePoolSize = 2;
int maximumPoolSize = 4;
long keepAliveTime = 10;
TimeUnit unit = TimeUnit.SECONDS;
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
RejectedExecutionHandler handler = new MyIgnorePolicy();
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
workQueue, r -> new Thread(r, "Demo-Thread"), handler);
executor.prestartAllCoreThreads(); // 预启动所有核心线程

for (int i = 1; i <= 10; i++) {
MyTask task = new MyTask(String.valueOf(i));
executor.execute(task);
}
System.in.read(); // 阻塞主线程
}

static class NameTreadFactory implements ThreadFactory {

private final AtomicInteger mThreadNum = new AtomicInteger(1);

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "my-thread-" + mThreadNum.getAndIncrement());
System.out.println(t.getName() + " has been created");
return t;
}
}

public static class MyIgnorePolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
doLog(r, e);
}

private void doLog(Runnable r, ThreadPoolExecutor e) {
// 可做日志记录等
System.err.println(r.toString() + " rejected");
}
}

static class MyTask implements Runnable {
private final String name;

public MyTask(String name) {
this.name = name;
}

@Override
public void run() {
try {
System.out.println(this.toString() + " is running!");
Thread.sleep(3000); //让任务执行慢点
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public String getName() {
return name;
}

@Override
public String toString() {
return "MyTask [name=" + name + "]";
}
}
}

其中线程线程1-4先占满了核心线程和最大线程数量,然后4、5线程进入等待队列,7-10线程被直接忽略拒绝执行,等1-4线程中有线程执行完后通知4、5线程继续执行。

判断线程池中的线程是否全部执行结束

CountDownLatch使一个线程等待其他线程各自执行完毕后再执行,通过一个计数器来实现的,计数器的初始值是线程的数量。每当一个线程执行完毕后,计数器的值就-1,当计数器的值为0时,表示所有线程都执行完毕,然后在闭锁上等待的线程就可以恢复工作了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class Test {
public static void main(String[] args) {
int size = 10;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 15,
0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));
final CountDownLatch countDownLatch = new CountDownLatch(size);
for (int i = 0; i < size; i++) {
int finalI1 = i;
// 使用一个对象实现Runnable接口或者直接使用lambda表达式(JDK1.8之后)
threadPoolExecutor.submit(() -> {
try {
System.out.println(finalI1);
} catch (Exception e) {
System.out.println(e.getMessage());
} finally {
countDownLatch.countDown();
}
});
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
threadPoolExecutor.shutdownNow();
}
System.out.println("Finished all threads");
}
}

常见概念的对比

Runnable vs Callable

Callable在jdk1.5引入,目的是弥补Runnable不支持的用例,Runnable不会返回结果或抛出检查异常,但是Callable可以,所以,如果任务不需要返回结果或抛出异常推荐使用Runnable。

execute() vs submit()

  • execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行是否成功。
  • submit()方法用于提交需要返回值的任务。线程池会返回一个 Future 类型的对象,通过这个 Future 对象可以判断任务是否执行成功,可以通过 Future 的 get()方法来获取返回值,get() 方法会阻塞当前线程直到任务完成

shutdown() VS shutdownNow()

  • shutdown(): 线程池的状态变为 SHUTDOWN。线程池不再接受新任务了,否则将会抛出RejectedExecutionException异常,但是此时线程池不会立刻退出,直到队列里的任务执行完毕才会退出
  • shutdownNow(): 线程池的状态立刻变成STOP状态,线程池会终止当前正在运行的任务,并停止处理排队的任务并返回正在等待执行的 List

isTerminated() VS isShutdown()

  • isShutDown: 当调用 shutdown() 方法后返回为 true
  • isTerminated: 当调用 shutdown() 方法后,并且所有提交的任务完成后返回为 true