Spark倾斜调优(建议收藏)

Spark 优化
一、概述
粉丝朋友们大家好!我是唐三少,大家在工作或者自己练习比较"幸运"的时候,会遇到数据倾斜或者shuffle优化等幸福的事情,今天三少就和大家一起讨论一下,遇到数据倾斜和shuffle优化时我们应该怎么去做,往哪个方向撞南墙。come on my fans!

二、shuffle优化
(一)性能调优
1.分配更多资源
在生产环境中,提交spark作业时,用的spark-submit shell脚本,里面调整对应的参数:

/usr/local/spark/bin/spark-submit \
--class cn.spark.sparktest.core.WordCountCluster \
--num-executors 3 \  配置executor的数量
--driver-memory 100m \  配置driver的内存(影响不大)
--executor-memory 100m \  配置每个executor的内存大小
--total-executor-cores 3 \  配置所有executor的cpu core数量
/usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
2.设置并行度
spark.default.parallelism
SparkConf conf = new SparkConf()
    .set("spark.default.parallelism", "500")
3.使用Kryo序列化
Spark支持使用Kryo序列化机制。这种序列化机制,比默认的Java序列化机制速度要快,序列化后的数据更小,大概是Java序列化机制的1/10。所以Kryo序列化优化以后,可以让网络传输的数据变少,在集群中耗费的内存资源大大减少。

具体生产中使用如下:

第一步,在SparkConf中设置一个属性,spark.serializer,org.apache.spark.serializer.KryoSerializer类。
第二步,注册你使用的需要通过Kryo序列化的一些自定义类,SparkConf.registerKryoClasses()。
项目中的使用:
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(new Class[]{CategorySortKey.class})
(二)Shuffle调优
在工作中使用Spark的时候,大家会经常用到算子开发,在算子开发过程中,部分算子涉及shuffle,主要是以下几个算子:groupByKey、reduceByKey、countByKey、特殊算子join(先groupByKey后再join是不会发生shuffle的)等等。那什么是shuffle呢?什么情况下会出现?哪些算子会产生shuffle?哇哦,那我们就一起唠唠吧。说一千道一万,不如实例来相见:

比如groupByKey算子:

要把分布在集群各个节点上的数据中的同一个key,对应的values,都要集中到一块儿,集中到集群中同一个节点上,更严密一点说,就是集中到一个节点的一个executor的一个task中。
然后集中一个key对应的values之后,才能交给我们来进行处理,<key, Iterable>。
总结:shuffle,一定是分为两个stage来完成的。因为这其实是个逆向的过程,不是stage决定shuffle,是shuffle决定stage。




既然有shuffle,我们优化的思路就可以从shuffle进行入手,那我们就唠唠怎么对shuffle进行调优吧

1.合并map端输出文件
如果不合并map端输出文件的话,会怎么样?
举例实际生产环境的条件:

环境:

(1)100个节点(每个节点一个executor):100个executor

(2)每个executor:2个cpu core

(3)总共1000个task:每个executor平均10个task

(4)每个节点,10个task,每个节点会输出多少份map端文件?10 * 1000=1万个文件

宝宝们思考下总共有多少份map端输出文件?

答案是:100 * 10000 = 100万。

分析:

(1)第一个stage,每个task,都会给第二个stage的每个task创建一份map端的输出文件。

(2)第二个stage,每个task,会到各个节点上面去,拉取第一个stage每个task输出的,属于自己的那一份文件。

(3)shuffle中的写磁盘的操作,基本上就是shuffle中性能消耗最为严重的部分。

通过上面的分析,一个普通的生产环境的spark job的一个shuffle环节,会写入磁盘100万个文件。

磁盘IO对性能和spark作业执行速度的影响,是极其惊人和吓人的。

基本上,spark作业的性能,都消耗在shuffle中了,虽然不只是shuffle的map端输出文件这一个部分,但是这里也是非常大的一个性能消耗点。

开启shuffle map端输出文件合并的机制:
new SparkConf().set("spark.shuffle.consolidateFiles", "true")

默认情况下,是不开启的,如果不开启的话会发生如上所述的大量map端输出文件的操作,严重影响性能。

实际在生产环境中,使用了spark.shuffle.consolidateFiles机制以后,实际的性能调优的效果是相当的可观的。spark作业可从5个小时 -> 2~3个小时。实际上,在数据量比较大,你自己本身做了前面的性能调优,executor上去->cpu core上去->并行度(task数量)上去,shuffle没调优,shuffle就很糟糕了。大量的map端输出文件的产生,对性能有比较恶劣的影响。这个时候,去开启这个机制,可以很有效的提升性能。
3. 调节map端内存缓冲与reduce端内存占比:
调节map task内存缓冲:
spark.shuffle.file.buffer,默认32k(spark 1.3.x不是这个参数,后面还有一个后缀,kb。
spark 1.5.x以后,变了,就是现在这个参数)
调节reduce端聚合内存占比:spark.shuffle.memoryFraction,0.2
PS:可以根据实际情况进行调整哦
那么我们什么时候需要调整呢?答案只有一个,当默认的满足不了的时候;那怎么算满足不了呢?调整完之后有什么好处呢?下面听三少给到大家一一解惑答疑

(1)首先需要看Spark UI,如果公司使用的是standalone模式,那么so easy,你的spark跑起来,会显示一个Spark UI的地址,进去观察每个stage的详情,有哪些executor,有哪些task,每个task的shuffle write和shuffle read的量,shuffle的磁盘和内存读写的数据量。如果是用的yarn模式来提交,从yarn的界面进去,点击对应的application,进入Spark UI查看详情。






(2)如果发现shuffle磁盘的write和read都很大。这个时候,就意味着最好调节一些shuffle的参数。首先当然是考虑开启map端输出文件合并机制。其次调节上面说的那两个参数。

SQL调节原则如下:
spark.shuffle.file.buffer每次扩大一倍,然后看效果;
spark.shuffle.memoryFraction每次提高0.1,然后看效果。
PS:不能调节的太大,因为内存资源是有限的,要做到所有资源的均衡分配。
(3)参数调整之后,通过将map task内存缓冲变大减少spill到磁盘文件的次数。通过reduce端聚合内存变大减少spill到磁盘的次数,从而减少后面聚合读取磁盘文件的数量。

(三)数据倾斜解决
1. 数据倾斜的现象及产生原因:
困扰我们的数据倾斜的问题相信大家都备感“幸福”,那么出现的原因是什么呢?出现时候有什么现象呢?

现象:

绝大多数task执行得都非常快,但个别task执行极慢。比如,总共有1000个task,998,999个task都在1分钟之内执行完了,但是剩余一两个task却要一两个小时才能执行完,这种现象是很常见的。

原本能够正常执行的Spark作业,某天突然报出OOM(内存溢出)异常,观察异常栈,是我们写的业务代码造成的。这种情况比较少见。

原因:

(1)在进行shuffle的时候,必须将各个节点上相同的key拉取到某个节点上的一个task来进行处理,比如按照key进行聚合或join等操作。此时如果某个key对应的数据量特别大的话,就会发生数据倾斜。比如大部分key对应10条数据,但是个别key却对应了100万条数据,那么大部分task可能就只会分配到10条数据,然后1秒钟就运行完了;但是个别task可能分配到了100万数据,要运行一两个小时。因此,整个Spark作业的运行进度是由运行时间最长的那个task决定的。

(2)出现数据倾斜的时候,Spark作业看起来会运行得非常缓慢,甚至可能因为某个task处理的数据量过大导致内存溢出

(3)定位问题:

看算子:

你在自己的程序里面找找,哪些地方用了会产生shuffle的算子,groupByKey、countByKey、reduceByKey、join

看log :

log一般会报是在你的哪一行代码,导致了OOM异常。或者看log,看看是执行到了第几个stage。spark代码,是怎么划分成一个一个的stage的。哪一个stage生成的task特别慢,就能够自己用肉眼去对你的spark代码进行stage的划分,就能够通过stage定位到你的代码,到底哪里发生了数据倾斜。

2. 数据倾斜的解决方案
我们既然知道了原因和现象,怎么定位问题,那so good,集美们就可以对症下药搞定它了。

(1)聚合源数据:Spark元数据基本是来自hive表,数据倾斜,某个key对应的80万数据,某些key对应几百条,某些key对应几十条。现在咱们直接在生成hive表的hive etl中对数据进行聚合。比如按key来分组,将key对应的所有的values全部用一种特殊的格式拼接到一个字符串里面去。那么也就意味着,每个key就只对应一条数据。在spark中,就不需要再去执行groupByKey+map这种操作了。直接对每个key对应的values字符串进行map操作,进行你需要的操作即可。

比如“key=sessionid,
value: action_seq=1|user_id=1|search_keyword=火锅|category_id=001;
action_seq=2|user_id=1|search_keyword=涮肉|category_id=001”。
对key进行group,在spark中,拿到key=sessionid,values。hive etl中,直接对key进行了聚合。那么也就意味着,每个key就只对应一条数据。

在spark中,就不需要再去执行groupByKey+map这种操作了。直接对每个key对应的values字符串进行map操作,进行你需要的操作即可。

(2)过滤导致倾斜的key:

如果你能够接受某些数据在spark作业中直接就摒弃。比如说,总共有100万个key,只有2个key是数据量达到10万的,其他所有的key,对应的数量都是几十万。

这个时候,你自己可以去取舍,如果业务和需求可以理解和接受的话,在你从hive表查询源数据的时候,直接在sql中用where条件,过滤掉某几个key。这样之前有大量数据导致数据倾斜的key被过滤掉之后,自然就不会发生数据倾斜了。

(3)提高shuffle操作reduce并行度:

在调用我们的shuffle算子时候,比如groupByKey、countByKey、reduceByKey,传入进去一个参数。这个参数,就代表了shuffle操作的reduce端的并行度。
(4)使用随机数以及扩容表进行join:

当采用随机数和扩容表进行join解决数据倾斜的时候,就代表着,你的之前的数据倾斜的解决方案,都没法使用。这个方案是没办法彻底解决数据倾斜的,更多的,是一种对数据倾斜的缓解。具体如下:

选择一个RDD,要用flatMap,进行扩容,将每条数据,映射为多条数据,每个映射出来的数据,都带了一个n以内的随机数,通常来说会选择10。

将另外一个RDD,做普通的map映射操作,每条数据都打上一个10以内的随机数。

最后将两个处理后的RDD进行join操作。

作者:教你学懂大数据


欢迎关注微信公众号 :教你学懂大数据