Java中创建线程池的方式一般有两种
通过Executors
工厂方法创建
通过ThreadPoolExecutor
自定义创建(推荐,可以指定线程池大小)
线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式的原因(阿里巴巴Java开发手册):
Executors工厂方法创建线程池 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;public class TestThreadPoolExecutor { public static void main (String[] args) { ExecutorService es1 = Executors.newSingleThreadExecutor(); for (int i = 0 ; i < 10 ; i++) { es1.submit(() -> System.out.println(Thread.currentThread().getName() + "正在执行任务" )); } es1.shutdown(); while (!es1.isTerminated()) { } System.out.println("es1 Finished" ); ExecutorService es2 = Executors.newFixedThreadPool(3 ); for (int i = 0 ; i < 10 ; i++) { es2.submit(() -> System.out.println(Thread.currentThread().getName() + "正在执行任务" )); } es2.shutdown(); while (!es2.isTerminated()) { } System.out.println("es2 Finished" ); ExecutorService es3 = Executors.newCachedThreadPool(); for (int i = 0 ; i < 20 ; i++) { es3.submit(() -> System.out.println(Thread.currentThread().getName() + "正在执行任务" )); } es3.shutdown(); while (!es3.isTerminated()) { } System.out.println("es3 Finished" ); ScheduledExecutorService es4 = Executors.newScheduledThreadPool(2 ); System.out.println("时间:" + System.currentTimeMillis()); for (int i = 0 ; i < 5 ; i++) { es4.schedule(() -> System.out.println("时间:" + System.currentTimeMillis() + "--" + Thread.currentThread().getName() + "正在执行任务" ), 3 , TimeUnit.SECONDS); } es4.shutdown(); while (!es4.isTerminated()) { } System.out.println("es4 Finished" ); ScheduledExecutorService es5 = Executors.newSingleThreadScheduledExecutor(); System.out.println("时间:" + System.currentTimeMillis()); for (int i = 0 ; i < 5 ; i++) { es5.schedule(() -> System.out.println("时间:" + System.currentTimeMillis() + "--" + Thread.currentThread().getName() + "正在执行任务" ), 3 , TimeUnit.SECONDS); } es5.shutdown(); while (!es5.isTerminated()) { } System.out.println("es5 Finished" ); } }
ThreadPoolExecutor自定义方式创建线程池 生命周期 运行状态:
状态
描述
RUNNING
能够接受新提交的任务,并且也能处理阻塞队列里面的任务
SHUTDOWN
关闭状态,不在接受新提交的任务,但是可以处理阻塞队列里面已保存的任务
STOP
不在接受新的任务,也不处理阻塞队列里面的任务,会终端正在处理任务的线程
TIDYING
所有任务都已终止,有效线程数(workerCount)为0当线程池变为TIDYING状态时,会执行钩子函数terminated()
TERMINATED
在terminated()方法进入这个状态
生命周期:
重要参数 1 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) ;
corePoolSize
:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()
或者prestartCoreThread()
方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize
个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize
后,就会把到达的任务放到缓存队列当中;
maximumPoolSize
:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;
workQueue
:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:
名称
描述
ArrayBlockingQueue
一个由数组结构组成的有界阻塞队列
LinkedBlockingQueue
一个由链表结构组成的有界阻塞队列
PriorityBlockingQueue
一个支持线程优先级排序的无界阻塞队列,默认自然序进行排列,也可以自定义上线compareTo()方法指定元素排列顺序,不能保证同优先级元素的顺序
DelayQueue
一个实现PriorityBlockingQueue实现延迟获取的无界阻塞队列,创建队列时,可以设置多久才能在队列中获取当前元素。只有延时期满后才能从获取到队列
SynchronousQueue
一个不存储元素的阻塞队列
LinkedTransferQueue
一个由链表结构组成的无界阻塞队列
LinkedBlockingDeque
一个由链表结构组成的双向阻塞队列,队列的头部和尾部都可以添加或移除元素,多线程并发时,可以将锁的竞争最多降到一半
ArrayBlockingQueue
和PriorityBlockingQueue
使用较少,一般使用LinkedBlockingQueue
和SynchronousQueue
。线程池的排队策略与BlockingQueue
有关。
其他常见参数:
keepAliveTime
:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize
时,keepAliveTime
才会起作用,直到线程池中的线程数不大于corePoolSize
,即当线程池中的线程数大于corePoolSize
时,如果一个线程空闲的时间达到keepAliveTime
,则会终止,直到线程池中的线程数不超过corePoolSize
。但是如果调用了allowCoreThreadTimeOut(boolean)
方法,在线程池中的线程数不大于corePoolSize
时,keepAliveTime
参数也会起作用,直到线程池中的线程数为0;
unit
:参数keepAliveTime
的时间单位,有7种取值,在TimeUnit
类中有7种静态属性:
TimeUnit.DAYS; // 天
TimeUnit.HOURS; // 小时
TimeUnit.MINUTES; // 分钟
TimeUnit.SECONDS; // 秒
TimeUnit.MILLISECONDS; // 毫秒
TimeUnit.MICROSECONDS; // 微妙
TimeUnit.NANOSECONDS; // 纳秒
unit : keepAliveTime 参数的时间单位。
threadFactory
:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程做些更有意义的事情,比如设置daemon和优先级等等
handler
:表示当拒绝处理任务时的策略,有以下四种取值:
名称
描述
AbortPolicy
直接抛出异常,默认
CallerRunsPolicy
只用调用者所在线程来运行任务
DiscardOldestPolicy
丢弃队列里最近的一个任务,并执行当前任务
DiscardPolicy
不处理,丢弃掉
也可以根据应用场景需要来实现RejectedExecutionHandler
接口自定义策略。如记录日志或持久化不能处理的任务
配置合理的参数 出自《Java并发编程实战》
CPU 密集型指利用 CPU 计算能力的任务比如你在内存中对大量数据进行计算。 IO 密集型指网络读取,文件读取
CPU 密集型任务(N+1): 这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU
核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU
就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
I/O 密集型任务(2N): 这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU
来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O
密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。
查看CPU核数
1 System.out.println(Runtime.getRuntime().availableProcessors());
任务调度
示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 import java.io.IOException;import java.util.concurrent.*;import java.util.concurrent.atomic.AtomicInteger;public class ThreadTest { public static void main (String[] args) throws IOException { int corePoolSize = 2 ; int maximumPoolSize = 4 ; long keepAliveTime = 10 ; TimeUnit unit = TimeUnit.SECONDS; BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue <>(2 ); RejectedExecutionHandler handler = new MyIgnorePolicy (); ThreadPoolExecutor executor = new ThreadPoolExecutor (corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, r -> new Thread (r, "Demo-Thread" ), handler); executor.prestartAllCoreThreads(); for (int i = 1 ; i <= 10 ; i++) { MyTask task = new MyTask (String.valueOf(i)); executor.execute(task); } System.in.read(); } static class NameTreadFactory implements ThreadFactory { private final AtomicInteger mThreadNum = new AtomicInteger (1 ); @Override public Thread newThread (Runnable r) { Thread t = new Thread (r, "my-thread-" + mThreadNum.getAndIncrement()); System.out.println(t.getName() + " has been created" ); return t; } } public static class MyIgnorePolicy implements RejectedExecutionHandler { @Override public void rejectedExecution (Runnable r, ThreadPoolExecutor e) { doLog(r, e); } private void doLog (Runnable r, ThreadPoolExecutor e) { System.err.println(r.toString() + " rejected" ); } } static class MyTask implements Runnable { private final String name; public MyTask (String name) { this .name = name; } @Override public void run () { try { System.out.println(this .toString() + " is running!" ); Thread.sleep(3000 ); } catch (InterruptedException e) { e.printStackTrace(); } } public String getName () { return name; } @Override public String toString () { return "MyTask [name=" + name + "]" ; } } }
其中线程线程1-4先占满了核心线程和最大线程数量,然后4、5线程进入等待队列,7-10线程被直接忽略拒绝执行,等1-4线程中有线程执行完后通知4、5线程继续执行。
判断线程池中的线程是否全部执行结束 CountDownLatch使一个线程等待其他线程各自执行完毕后再执行,通过一个计数器来实现的,计数器的初始值是线程的数量。每当一个线程执行完毕后,计数器的值就-1,当计数器的值为0时,表示所有线程都执行完毕,然后在闭锁上等待的线程就可以恢复工作了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class Test { public static void main (String[] args) { int size = 10 ; ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor (10 , 15 , 0L , TimeUnit.SECONDS, new LinkedBlockingQueue <>(10 )); final CountDownLatch countDownLatch = new CountDownLatch (size); for (int i = 0 ; i < size; i++) { int finalI1 = i; threadPoolExecutor.submit(() -> { try { System.out.println(finalI1); } catch (Exception e) { System.out.println(e.getMessage()); } finally { countDownLatch.countDown(); } }); } try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } finally { threadPoolExecutor.shutdownNow(); } System.out.println("Finished all threads" ); } }
常见概念的对比 Runnable vs Callable Callable在jdk1.5引入,目的是弥补Runnable不支持的用例,Runnable不会返回结果或抛出检查异常,但是Callable可以,所以,如果任务不需要返回结果或抛出异常推荐使用Runnable。
execute() vs submit()
execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行是否成功。
submit()方法用于提交需要返回值的任务。线程池会返回一个 Future 类型的对象,通过这个 Future 对象可以判断任务是否执行成功,可以通过
Future 的 get()方法来获取返回值,get()
方法会阻塞当前线程直到任务完成
shutdown() VS shutdownNow()
shutdown(): 线程池的状态变为 SHUTDOWN。线程池不再接受新任务了,否则将会抛出RejectedExecutionException异常,但是此时线程池不会立刻退出,直到队列里的任务执行完毕才会退出
shutdownNow(): 线程池的状态立刻变成STOP状态,线程池会终止当前正在运行的任务,并停止处理排队的任务并返回正在等待执行的
List
isTerminated() VS isShutdown()
isShutDown: 当调用 shutdown() 方法后返回为 true
isTerminated: 当调用 shutdown() 方法后,并且所有提交的任务完成后返回为 true