主要组成部分
任务
1
被执行的任务需要实现的接口:Runnable接口或Callable接口
任务的执行
1
2任务执行的核心接口:Executor,以及继承它的ExecutorService接口。
Executor框架有两个关键的类实现了ExecutorService接口:ThreadPoolExecutor 和 ScheduledThreadPoolExecutor。Executor
Executor 是一个接口,它是Executor框架的基础,他将任务的提交与任务的执行分离。
只用关心怎么提交任务,不用关心任务怎么又线程来执行(Executor框架已经帮你做好了)
ThreadPoolExecutor
是线程池的核心实现类,用来执行被提交的任务
1
2ThreadPoolExecutor 通常是使用工厂类Executors来创建。Executors提供三种可以满足常见应用场景的线程池:SingleThreadExecutor,FixedThreadExecutor和CachedThreadPool;
本质是调用了ThreadPoolExecutor的构造函数创建。通过ThreadPoolExecutor的execute()或submit()方法添加任务。- ThreadPoolExecutor构造函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21public ThreadPoolExecutor( int corePoolSize,//核心线程数
int maximumPoolSize,//最大线程数
long keepAliveTime,//存活时间
TimeUnit unit,//存活时间的单位
BlockingQueue<Runnable> workQueue,//工作队列
ThreadFactory threadFactory,//线程工厂
RejectedExecutionHandler handler//饱和策略
)
//工作队列
1.SynchronousQueue --一个不存储元素的阻塞队列 Executors.newCachedThreadPool
2.LinkedBlockingQueue --基于链表的阻塞队列 Executors.newFixedThreadPool
3.ArrayBlockingQueue --基于数组的阻塞队列
4.PriorityBlockQueue --基于优先级的队列
//线程工厂
1. Executors.defaultThreadFactory()
2. 实现ThreadFactory接口
//饱和策略
1. ThreadPoolExecutor.AbortPolicy --抛出运行异常 默认
2. ThreadPoolExecutor.CallerRunsPolicy --只用调用者所在的线程运行任务
3. ThreadPoolExecutor.DiscardPolicy --丢弃
4. ThreadPoolExecutor.DiscardOldestPolicy --丢弃队列里面最近的一个任务,执行当前的任务(一) SingleThreadExecutor
创建一个线程来执行提交的任务,适用于顺序的执行任务,任意时间不会出现多个线程执行任务。
使用LinkedBlockingQueue 基于链表的无界队列
(二) FixedThreadExecutor
创建固定线程数线程池。适用于为了满足资源管理器,限制线程数场景,适用于负载比较重的服务器
使用LinkedBlockingQueue 基于链表的无界队列。
(三) CachedThreadPool
创建一个无界的线程池。使用SynchronousQueue没有容量的队列,允许线程空闲一定时间后才回收。
适用于短期异步任务,偶尔任务比较多的负载较轻的服务器。
ScheduledThreadPoolExecutor
是线程池的核心实现类,可以在给定延时后或周期执行任务,使用DelayQeue的无界队列(对PriorityQueue的封装)
1
2
3通常使用Executors创建常见两种常用的线程池:ScheduledThreadPoolExecutor,SingleThreadScheduledThreadPoolExecutor
使用ScheduledThreadPoolExecutor的scheduleAtFixedRate()或scheduleWithFixedDelay()方法添加任务,返回ScheduledFuture类型任务。
原理:通过DelayQeue.get()从队列中获取任务;DelayQeue.add()添加任务(一)SingleThreadScheduledExecutor
使用与多个后台线程执行周期任务。同时线程线程数的场景。
使用无界的DelayQueue 延时队列
(二)ScheduledThreadPoolExecutor
适用于单个后台线程执行周期任务,同时保证顺序执行各个任务
使用无界的DelayQueue 延时队列
异步计算的结果
Future
Future接口和实现了Future接口的FutureTask,代表异步运算的结果
Future 如何拿到ThreadPoolExecutor.submit()的结果?
ThreadPoolExecutor.submit(submit(Callable
task))实际调用的是AbstractExecutorService的submit submit()
1
2
3
4
5
6public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}本质是调用了Executor的execute()方法,看一下newTaskFor(task)
newTaskFor(task)
1
2
3protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}调用了FutrureTask的构造函数
new FutureTask
(callable) 1
2
3
4
5
6public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}在FutureTask的构造函数中初始化了FutureTask的变量。
run()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//调用任务的call()方法获取运行结果
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes通过调用任务的call()方法,获取任务的返回值;再调用set()方法,将结果赋给FutureTask的outcome
再看一下FutureTask的get方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}通过get方法调用report方法,获取outcome的值,作为返回值。
Future 的使用
通过ExecutorService.submit()方法提交任务,返回Futrue对象;再调用Future的get()方法获取返回值
原理
1
2
3FutureTask 的实现是基于AQS。get()方法调用的是重写了AQS的tryAcquireShared()
cancel(),run()方法调用了重写了AQS的tryReleaseShare()方法
done()如果线程不是执行完成RAN或取消状态,将在AQS的等待队列中等待。