Java并发编程(2)- FutureTask详解与池化思想的设计和实战一
作者:
修罗debug
版权声明:本文为博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
在Java并发编程领域,FutureTask可以说是一个非常强大的利器,它通过实现RunnableFuture接口间接拥有了Runnable和Future接口的相关特性,既可以用于充当线程执行的任务(Runnable),也可以用于获取线程异步执行任务后返回的结果(Future);本文将通过剖析解读FutureTask底层相关的核心源码,并基于FutureTask自设计并实战一款“池容器”,即池化思想的设计和实战;
写在前面的话:debug最近又出了一本新书:《Spring
Boot企业级项目-入门到精通》 感兴趣的小伙伴可以前往各大商城平台(淘宝、天猫、当当、京东等)一睹为快!书籍的封面如下所示,后续debug会专门出篇文章专门介绍这本书(同时提供优惠购书渠道):
言归正传,在上篇文章中:Java并发编程(1): Callable、Future和FutureTask ,我们已经介绍并实战过了Java并发编程中Callable、Future以及FutureTask的相关基本概念以及API,本文将不再赘述;
值得一提的是,Future或者FutureTask需要通过线程池才能发挥出实际的功效,因此在实际应用中它跟线程池又有着千丝万缕的联系,本文将从源码的角度进行剖析,通过解读FutureTask底层相关的核心源码,并基于FutureTask自设计并实战一款“池容器”,即池化思想的设计和实战;
(1)在上篇文章中想必各位观看老爷们已经基本知道了Future、FutureTask需要结合线程池来使用,看下方代码:
ArrayBlockingQueue queue=new ArrayBlockingQueue(2);
ExecutorService executor=new ThreadPoolExecutor(2,4,1, TimeUnit.MINUTES,queue);
FutureTask<Map<String,Object>> futureTask=new FutureTask<Map<String, Object>>(new ProductThread());
executor.execute(futureTask);
Map<String,Object> resMap=futureTask.get();
System.out.println("--子线程执行任务后得到的结果:"+resMap);
简短解说:在上述该代码中,ProductThread是一个实现了Callable接口的类,其中的call()方法便是真正要执行的任务代码逻辑,在此就不贴出来了(在上篇文章有源码);然后通过它构造一FutureTask,最后便是提交给线程池的execute()方法进行执行,执行完成之后,通过futureTask的get()方法获取线程异步执行后返回的结果;
而本文我们将基于这一段“解说”进行核心源码的剖析。
(2)首先是new
FutureTask<Map<String, Object>>(new ProductThread()),即创建FutureTask,其底层源码如下所示:
//创建任务.等待被线程池中的线程执行
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
在上述源码中,值得一提的是,任务的运行状态变量state的设计个人觉得相当巧妙,采用volatile关键字进行定义,而volatile的作用想必有些小伙伴是比较熟悉的:
(1)保证线程之间可见性:即对 用volatile关键字修饰的变量 的可见性,即 “当一个线程修改了这个变量的值时,volatile 可以保证新值能立即同步到主内存,以及每次使用前立即从主内存刷新”
(2)禁止指令重排序:A.那么什么是“指令重排序”呢,我们写的代码最终都将转化为相应的指令交付给底层控制单元、计算单元执行,即CPU执行,重排序 则指的是CPU采用了允许将多条指令不按程序规定的顺序分开发送给各相应电路单元处理,这样做的弊端在于多核处理器下各处理器会发生乱序执行,从而导致我们所谓的 “并发安全”问题,而volatile关键词就禁止了这种现象,即通过在本地代码中插入许多内存屏障指令来保证处理器不发生乱序执行;
OK,回到线程待执行任务的运行状态变量state,其定义和取值如下所示(总共有6个状态的取值,其含义直接翻译过来就行了):
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
从官方的源码注释中可以看出 一个任务从创建到完毕,可能经历的状态变化为:
NEW -> COMPLETING -> NORMAL
NEW -> COMPLETING -> EXCEPTIONAL
NEW -> CANCELLED
NEW -> INTERRUPTING -> INTERRUPTED
(3)FutureTask任务定义好了之后,接下来就应该是交给线程准备执行了,即:executor.execute(futureTask) ,其底层源码如下所示:
public void execute(Runnable command){
//如果任务对象为null,抛异常
if (command == null)
throw new NullPointerException();
//先获取当前池中工作中的线程(活跃的、可用的线程数)
//如果当前池中的线程数小于核心线程数,就会调用addWorker检查运行状态和正在运行的
//线程数量
//通过return操作可以用于防止错误地添加线程、然后执行当前任务
//(因为随意的添加线程只会造成资源浪费)
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//否则当 池中的线程数大于核心线程数的时候 且 任务可以被加入任务队列时
//我们需要来个双重判断,判断是否真的需要添加一个新的线程来执行这个任务,
//因为可能已经存在这样的情况:线程执行完毕任务后的那一刻可能处于空闲状态,
//这个时候该线程就可以直接复用;
//否则直接创建一个新的线程来执行此任务
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//判断池中是否有可用的线程,如果没有 而且 也无法将当前任务加入到任务队列时
//则拒绝执行当前的任务(拒绝的策略取决于创建线程池时指定的策略)
if (!isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果不能创建新的线程去执行新任务的话,就拒绝当前任务
else if (!addWorker(command, false))
reject(command);
}
在上述源码中,addWorker()方法很抢眼,因此不得不稍微介绍一番,因为该方法代码有点长,debug特意将其截成长图供各位看官老爷们阅读,如下所示:
在上图中有一小段代码便是真正触发“线程执行任务”的时机,即:
if (workerAdded) {
t.start();
workerStarted = true;
}
start()方法一调用,便最终会调用FutureTask中的run()方法,其底层源码如下所示:
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
//将前面执行 new FutureTask(callable) 代码时传入该构造方法的任务对象
//callable引用 交给新的 c
//如果当前任务处于 NEW,即创建的状态,则执行callable原生定义的call()方法的
//代码逻辑,其实就是 ProductThread 中的 call()方法的代码逻辑
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
//执行完成之后,将得到的结果通过 set() 方法设置到 FutureTask 中的
//私有变量 outCome中
if (ran)
set(result);
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
其中,if (ran) set(result); 表示将线程异步执行完之后得到的结果通过 set() 方法设置到 FutureTask 中的私有变量 outCome中,其代码如下所示:
//上述执行完run()方法后,会得到一个结果V,将该结果通过 set(V) 方法可以设置
//到 FutureTask
//中的私有变量 outCome中
//其中设置回去的过程其实也是加了锁,只不过是一个乐观锁,即通过cas机制来实现
//即“判断当前任务的旧状态old=stateOffset是否真的为New,如果是,//则将其设置为COMPLETING,
//代表任务正在执行中”,其他线程就执行不成功
//然后设置完成后,将该任务的运行状态设置为NORMAL,即最终态为//“任务已执行完成”
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
最后,便是通过futureTask的get()方法获取线程异步执行后返回的结果,即对应的业务代码为: Map<String,Object> resMap=futureTask.get(); 接下来一起研读get()方法底层的源码:
//获取线程异步执行后的结果,过程:先获取当前任务的运行状态
//如果是已完成,即Normal状态时,则直接report(s),即获取结果
//如果是未完成(即处于运行期间 : <= COMPLETING),
//则进入等待的逻辑,即awaitDone()方法里面的代码逻辑
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
而report()方法的底层源码也很简洁,如下所示:
//其中report()方法的核心逻辑,就是通过判断当前任务的运行状态(已完成)
//从而将线程的执行结果 result 返回:
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
如果当前任务仍然处于运行中的状态时,则执行 awaitDone() 方法进入堵塞队列等待获取执行结果的代码逻辑,如下所示:
/**
* Awaits completion or aborts on interrupt or timeout.
* 等待任务执行完成、或者任务被终止、或者任务被中断、或者任务执行超时
* @param timed true if use timed waits
* @param nanos time to wait, if timed
*
* 如果设置timed为true,则代表在获取线程异步执行的结果时 可以等待一定的时间nanos
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
//这其实也是一个死循环 - CAS自旋 + AQS 即同步队列控制器 的原理
for (;;) {
//看看执行任务的线程是不是被中断interrupt,如果是的话做出:
//1.在等待队列中移除这个调用get方法的线程结点WaitNode
//(怎么移除呢:很简单,只需要调整链表中的线程节点即可)
//2. 抛出异常
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
//表示任务执行完毕:可能正常完成也可能抛异常,总之就是结束了
//就把这个waitNode的执行线程thread指向null
if (q != null)
q.thread = null;
return s;
}
//如果state==COMPLETING,意味着基本完成但还没保存结果,就yield,//表示线程挂起
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
//q == null时,即等待节点q为null,就创建等待节点,//这个节点后面会被插入阻塞队列
//第一次循环时一般会执行到
else if (q == null)
q = new WaitNode();
//一般第二次循环时会执行到
//大概的含义为:判断queued,即是否入队列成功,//这里是将创建的线程节点q加入队列头
//使用Unsafe的CAS方法,对waiters进行赋值,//waiters也是一个WaitNode节点,
//相当于队列头,
//或者理解为队列的头指针,通过WaitNode可以遍历整个阻塞队列//(头插法)
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
//设置了超时的处理机制:设置超时时间之后,//调用get()的线程最多阻塞nanos 纳秒,
//就会从阻塞状态醒过来。
//如果最终真的超时的话,就移除 调用get()方法的线程wait结点,//并返回state
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
//否则就进入堵塞等待状态(nanos纳秒),等待被唤醒
LockSupport.parkNanos(this, nanos);
}
else
//没有设置超时时间但任务又还没执行出结果,就直接进入阻塞状态,
//等待被其他线程唤醒
LockSupport.park(this);
}
}
至此,对于FutureTask的核心源码剖析我们已经撸完了,当然啦,还有像cancel()方法,即取消任务的执行就留给各位看官老爷们的研读了(很简单,通过CAS机制判断任务的运行状态 以及 mayInterruptIfRunning 参数决定最终是否可以中断该任务的执行线程,同时也改变了任务的运行状态: New-> INTERRUPTING->NORMAL ; New-> CANCELLED):
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
像finishCompletion()等方法的源码解读也是如此,就留给各位小伙伴细品了!本文我们就到这里吧!OK,打完收工,下期还有个高级案例实战,即“池化思想的设计与实战”,我们下期再见!!!
总结:
(1)代码下载:关注“程序员实战基地”微信公众号(扫描下图微信公众号即可),回复“100”,即可获取代码下载链接;欢迎关注debug的技术公众号一起学习干货技术吧!