Java并发编程(3)- FutureTask详解与池化思想的设计和实战二

作者: 修罗debug
版权声明:本文为博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。


Java并发编程领域,FutureTask可以说是一个非常强大的利器,它通过实现RunnableFuture接口间接拥有了RunnableFuture接口的相关特性,既可以用于充当线程执行的任务(Runnable),也可以用于获取线程异步执行任务后返回的结果(Future);本文将基于FutureTask实战一个高级案例:设计一款简化版的池容器,以此学习巩固池化思想. 

写在前面的话:debug最近又出了一本新书:Spring Boot企业级项目-入门到精通》感兴趣的小伙伴可以前往各大商城平台(淘宝、天猫、当当、京东等)一睹为快!书籍的封面如下所示,后续debug会专门出篇文章专门介绍这本书(同时提供优惠购书渠道):   


言归正传,在上篇文章中:Java并发编程(2)-FutureTask详解与池化思想的设计和实战一,我们已经从源码的角度结合多线程ThreadPoolExecuto,深入剖析并解读了FutureTask 的相关API,从任务的创建、到任务的执行 最后再到 线程执行完任务后异步获取执行结果;整个过程下来,想必各位看官老爷们应该收获颇丰。

而本文我们将趁热打铁,进一步介绍FutureTask在实际项目开发中的作用;总的来说,FutureTask在实际项目开发中起到的作用有两个:

(一)FutureTask执行多任务计算的场景

比如网站“程序员实战基地fightjava.com”的首页的数据是由多个功能模块组成的:轮播图、最新课程、最新博客、最新学习路线、最新资料、友情链接等模块数据;

这些模块由于具有独立性、互不相关性,因此可以开启多个FutureTask,然后交由多线程去执行,最终再统一通过get()方法获取多线程异步执行任务的结果返回给前端浏览器(这一场景在debug最新撸的课程:Java工程师核心技术-典型案例与面试实战系列二 就有重点介绍过,感兴趣的小伙伴可以前往观看学习!)


(二)在高并发场景下确保任务只被执行一次

在很多高并发的场景下,往往我们只需要某些任务被执行一次,这种情景FutureTask就能胜任(当然啦,可能还要借助像ConcurrentHashMap这样的组件辅助;而本文要介绍的便是在这一场景下FutureTask所发挥的作用!


按照惯例,我们还是先来介绍下这一场景/需求吧:假设有一个带Key的连接池,当Key存在时,则直接返回Key对应的连接对象;当Key不存在时,则创建一个 “连接”对象;


对于这样的应用场景,通常采用的方法是使用一个Map来存储Key和连接池对应的对应关系,而由于这是出于高并发的应用场景,因此稳妥的方式是采用ConcurrentHashMap,下面debug将采用N种方式对此进行实现,当然啦,最后的实现方式当然是FutureTask啦,毕竟大佬总是最后才出场的!!!


 

话不多说,进入代码实战环节

1)首先出场的是传统的实现方式,即按照常规的业务逻辑、算法实现的方式:   

@Component
@Slf4j
public class MyConnectionPool {
private ConcurrentHashMap<String,MyConnection> connMap=new ConcurrentHashMap<>();

//获取链接
public MyConnection getConn(final String key){
MyConnection conn=connMap.get(key);
if (conn!=null){
return conn;
}
conn=createConn(key);
connMap.putIfAbsent(key,conn);
return conn;
}
}

代码的含义应该不难理解哈,这但凡有点英语基础的闭着眼睛都能猜出来是啥意思,除非你英语是体育老师教的:

 

那么到底这种方式行不行呢?其实行不行并不是由你我说了算,而是得需要先经过压测哈,因为我们上面已经说了,必须要满足“高并发环境”的前提;OK,啪的一下打开JMeter.sh,然后设置QPS=1000,甚至10000 ,很快啊,如下图所示:


 

从上图中就可以看出,此种方式虽然最终是可以得到想要的结果,但是却产生了大量的、很有可能会被闲置的连接资源,因此这种方式不值得推荐!


2)第二种要出场的是“synchronized”,其实现代码如下所示:   

public synchronized MyConnection getConnA(final String key){
MyConnection conn=connMap.get(key);
if (conn!=null){
return conn;
}
conn=createConn(key);
connMap.putIfAbsent(key,conn);
return conn;
}

相对于第一种方式,虽然解决了安全性问题,但是却大大牺牲了性能,无法提高前端的并发量;即 synchronized这种同步方式,虽然牺牲了性能,但却没有浪费由于大量创建的连接所占用的空间资源(这其实是一种加  独占/悲观 锁的方式)

JMeter一通压测下来,发现确实没啥问题哈,如下图所示:


3)第三种实现方式仍然是加锁,只是在这里加的是ZooKeeper的分布式锁,话不多说,直接上代码:   

//基于zk的分布式锁 ~ 这种就集成、依赖了第三方的中间件(不具备独立对外提供服务的特性)
public MyConnection getConnB(final String key) throws Exception{
MyConnection conn=null;
conn=connMap.get(key);
if (conn!=null){
return conn;
}
//操作zookeeper的客户端实例
InterProcessMutex mutex=new InterProcessMutex(client,"/pool/V4");
try {
if (mutex.acquire(4,TimeUnit.SECONDS)){
conn=connMap.get(key);
if (conn!=null){
return conn;
}
conn=createConn(key);
connMap.putIfAbsent(key,conn);
}
}catch (Exception e){
}finally {
mutex.release();
}
return conn;
}

对于ZooKeeper不熟悉的小伙伴可以前往 “程序员实战基地fightjava.com”观看debug以前撸过的课程,比如“分布式锁实战视频教程(基于Spring Boot”等等。上述的实现方式经过压测后也没有啥问题:



虽然在高并发的场景下性能是没得说的、结果也是正确的;但是呢,它的缺陷也是很明显的,那就是集成、依赖了第三方的中间件ZooKeeper,不具备独立对外提供服务的特性,对外不友好 (所谓的“独立”,指的是最好能直接基于JDK、而不依赖于任何第三方组件,且可移植性良好的特点)

 

4)最后要登场的自然是FutureTask啦,老规矩,还是先上代码哈:   

private ConcurrentHashMap<String,FutureTask<MyConnection>> connHashMap=new ConcurrentHashMap<>();

//基于futureTask
public MyConnection getConnC(final String key) throws Exception{
FutureTask<MyConnection> futureTask=connHashMap.get(key);
if (futureTask!=null){
return futureTask.get();
}

//多线程访问这段代码都可以执行,即都创建了 “获取连接” 的任务,
//但是注意:此时还没真正地执行 获取连接对象实例 的代码
Callable<MyConnection> callable= () -> createConn(key);
FutureTask<MyConnection> newTask= new FutureTask<>(callable);

//同一时刻,对于同个key并发的多个线程只有一个可以成功此行代码,即 putIfAbsent
//当返回null时表示现在本地映射map还没有该key对应的task,否则调取get()方法
//堵塞式等待获取执行结果即可
futureTask=connHashMap.putIfAbsent(key,newTask);
if (futureTask==null){
futureTask=newTask;
futureTask.run();
}
return futureTask.get();
}


JMeter压测过后,观察最终的结果,发现也是没啥问题的,如下图所示:


 

有小伙伴可能会问,这是为啥呢?为啥这种方式也可行呢?这主要有两个原因:

A. ConcurrentHashMap<String,FutureTask<MyConnection>> 该方法从名字上就可以看出大概的意思:适用于高并发下的场景,它的Key具有唯一性,而且putIfAbsent() 方法的作用在于同一时间只会有一个线程执行该方法成功,当返回null时,表示还不存在该key,否则,表示已经存在该key了,你再put进去也没用了,某种程度上,这其实也是一个加锁的过程(Redis SETEX 也正有此种功效)

 

B.建立在A的基础上,只要保证FutureTask中任务的“创建”早于ConcurrentHashMapputIfAbsent()方法、而真正“执行其真正的代码逻辑”时则晚于ConcurrentHashMapputIfAbsent()方法即可,之所以要如此做,正是因为 A 中提到的 putIfAbsent() 起到了加锁的作用,同一时间将只会有一个线程趟过去,牛逼吧:


 

OK,至此,我们也就撸完了,收工!咱们下期再见!!!

总结

1)代码下载:文章涉及到的代码可以通过关注“程序员实战基地”微信公众号(扫描下图微信公众号即可),回复数字:100,即可获取代码下载链接:

我是debug,一个相信技术改变生活、技术成就梦想 的攻城狮;如果本文对你有帮助,欢迎你关注debug的技术公众号一起学习干货技术,并动动手指点赞、收藏以及转发,你的三连可是debug分享的动力哦