【spark】spark调优-爱代码爱编程
目录
一、任务优化
1、调大分配资源
SparkC ontext,DAGScheduler,TaskScheduler,会将算子,切割成大量的task,提交到worker节点的executor上去执行
1、增加executor
如果executor数量比较少,那么,能够并行执行的task数量就比较少,就意味着,我们的Application的并行执行的能力就很弱。比如有3个executor,每个executor有2个cpucore,那么同时能够并行执行6个task。6个执行完以后,再换下一批6个task。增加了executor数量以后,那么,就意味着,能够并行执行的task数量,也就变多了。
2、增加每个executor的cpu core
相当于增加了执行的并行能力。原本20个executor,每个有2个Cpu core。能够并行执行的task数量是40个。现在每个executor的cpu core,增加到了10个。能够并行执行的task数量是100个。执行的速度,提升了5倍。
3、增加每个executor的内存量
在资源允许的情况下,增加了内存量以后,对性能的提升,有三点:
1、如果需要对RDD进行cache,那么更多的内存,就可以缓存更多的数据,将更少的数据
写入磁盘,甚至不写入磁盘,减少了磁盘IO。
2、对于shuffle操作,reduce端, 会需要内存来存放拉取的数据并进行聚合。如果内存不够
也会写入磁盘。如果给executor分配更多内存以后,就有更少的数据,需要写入磁盘,甚至不需要写入磁盘,减少了磁盘IO,提升了性能。
3、对于task的执行, 可能会创建很多对象。如果内存比较小,可能会频繁导致JVM堆内存满了,然后频繁GC,垃圾回收(速度很慢),内存加大以后,带来更少的GC,避免了速度变慢,提升性能。
4、增加driver的内存大小
增加了driver的内存大小,有利于driver端可以存储更多的数据,有利于避免OOM。
比如:提交任务时可能出现的异常信息java.lang.OutOfMemoryError: GC overhead limit exceeded 可能导致该异常的原因:
当前一个rdd的数据量非常的大,然后针对于这个rdd执行了collect算子操作,它会把
rdd的所有数据转换成数组,拉到driver端。由于driver内存有限,存不下这些数据,就会导致出现OOM异常,这个时候就可以增加driver的内存大小。
2、提高并行度
1、设置task的数量
至少设置成与spark Application 的总cpu core 数量相同。最理想情况,150个core,分配150task。官方推荐,task数量,设置成spark Application 总cpu core数量的2~3倍。
设置参数spark.default.parallelism,默认是没有值的,只有设置了值,它会在shuffle的过程才会起作用。可以通过在构建SparkConf对象的时候设置,例如:
new SparkConf().set("spark.defalut.parallelism","500")
2、提高sparksql运行的task数量
可以适当增大sparksql运行的task数量,来提高并行度。 比如设置为:
spark.sql.shuffle.partitions=500(默认为200)
3、给RDD重新设置partition的数量
使用rdd.repartition 来重新分区,该方法会生成一个新的rdd,并使其分区数变大。由于一个partition对应一个task,那么对应的task个数越多,通过这种方式也可以提高并行度。
3、RDD的重用和持久化
1、可以调用rdd的cache或者persist方法进行持久化
(1)cache方法默认是把数据持久化到内存中
(2)persist方法中有丰富的缓存级别
2、rdd持久化时可以采用序列化
(1)如果正常将数据持久化在内存中,那么可能会导致内存的占用过大,这样的话,也许会导致OOM内存溢出。
(2)当纯内存无法支撑公共RDD数据完全存放的时候,就优先考虑使用序列化的方式在纯内存中存储。将RDD的每个partition的数据,序列化成一个字节数组;序列化后,大大减少内存的空间占用。
(3)序列化的方式,唯一的缺点就是,在获取数据的时候,需要反序列化。但是可以减少占用的空间和便于网络传输
(4)如果序列化纯内存方式,还是导致OOM,内存溢出;就只能考虑磁盘的方式,内存+磁盘的普通方式(无序列化)。
(5)为了数据的高可靠性,而且内存充足,可以使用双副本机制,进行持久化。持久化的每个数据单元,存储一份副本,放在其他节点上面,从而进行容错;一个副本丢了,不用重新计算,还可以使用另外一份副本。这种方式,仅仅针对你的内存资源极度充足。
4、使用广播变量
性能分析
假设:一个任务需要50个executor,1000个task,共享数据为100M。
(1)在不使用广播变量的情况下,1000个task,就需要该共享数据的1000个副本,也就是说有1000份数需要大量的网络传输和内存开销存储。耗费的内存大小1000*100M=100G
(2)使用了广播变量后,50个executor就只需要50个副本数据,而且不一定都是从Driver传输到每个节点,还可能是就近从最近的节点的executor的blockmanager上拉取广播变量副本,网络传输速度大大增加;内存开销 50*100M=5G
内存节约(task数量/excutor数量)倍
注意事项
(1)RDD使用广播
(2)广播变量只能在Driver端定义,不能在Executor端定义。
(3)在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。
(4)如果executor端用到了Driver的变量,如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本。如果Executor端用到了Driver的变量,如果使用广播变量在每个Executor中只有一份Driver端的变量副本。
5、避免使用shuffle类算子
reduceByKey、join、distinct、repartition等算子都会产生shuffle
val rdd3 = rdd1.join(rdd2)
val rdd2Data = rdd2.collect()
val rdd2DataBroadcast = sc.broadcast(rdd2Data)
val rdd3 = rdd1.map(rdd2DataBroadcast...)
6、使用高性能的算子
- 使用reduceByKey/aggregateByKey替代groupByKey
- 使用mapPartitions替代普通map
- 使用foreachPartition替代foreach
- 使用filter之后进行coalesce操作
- 使用repartitionAndSortWithinPartitions替代repartition与sort类操作
7、Kryo序列化
1、Java的序列化器
优点:处理起来方便,不需要我们手动做其他操作,只是在使用一个对象和变量的时候,需要实现Serializble接口。
缺点:默认的序列化机制的效率不高,序列化的速度比较慢;序列化以后的数据,占用的内存空间相对还是比较大。
2、Kryo序列化机制
比默认的Java序列化机制,速度要快,序列化后的数据要更小,大概是Java序列化机制的1/10。所以Kryo序列化优化以后,可以让网络传输的数据变少;在集群中耗费的内存资源大大减少。
3、Kryo序列化机制生效地方
算子函数中使用到的外部变量
持久化RDD时进行序列化,StorageLevel.MEMORY_ONLY_SER
产生shuffle的地方,也就是宽依赖
8、Spark内存模型调优
Executor的内存主要分为三块
1) task执行我们自己编写的代码时使用
2) task通过shuffle过程拉取了上一个stage的task的输出后,进行聚合等操作时使用
3) RDD缓存时使用
缺点:配置完Storage内存区域和execution区域后,我们的一个任务假设execution内存不够用了,但是它的Storage内存区域是空闲的,两个之间不能互相借用,不够灵活
Storage内存和execution内存 可以相互借用
二、参数
1、动态资源分配
-
spark.dynamicAllocation.enabled
是否开启动态资源配置,根据工作负载来衡量是否应该增加或减少executor,默认false
spark.dynamicAllocation.minExecutors
动态分配最小executor个数,在启动时就申请好的,默认0
spark.dynamicAllocation.maxExecutors
动态分配最大executor个数,默认infinity
spark.dynamicAllocation.initialExecutors
动态分配初始executor个数,默认值=spark.dynamicAllocation.minExecutors
spark.dynamicAllocation.executorIdleTimeout
当某个executor空闲超过这个设定值,就会被kill,默认60s
spark.dynamicAllocation.cachedExecutorIdleTimeout
当某个缓存数据的executor空闲时间超过这个设定值,就会被kill,默认infinity
spark.dynamicAllocation.schedulerBacklogTimeout
任务队列非空,资源不够,申请executor的时间间隔,默认1s
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout
同schedulerBacklogTimeout,是申请了新executor之后继续申请的间隔,默认=schedulerBacklogTimeout
2、推测机制
spark.speculation | 如果设置为"true", 就会对tasks执行推测机制。就是说在一个stage下跑的慢的tasks将有机会被重新启动,默认false |
spark.speculation.interval | Spark检测tasks推测机制的间隔时间,默认100ms |
spark.speculation.quantile | 当一个stage下多少百分比的tasks运行完成才会开启推测机制,默认0.75 |
spark.speculation.multiplier | 一个task的运行时间是所有task的运行时间中位数的几倍(门限值)才会被认为该task需要重新启动,默认1.5 |
speculation工作流程