代码编织梦想

目录

一、任务优化

1、调大分配资源

2、提高并行度

3、RDD的重用和持久化

4、使用广播变量

5、避免使用shuffle类算子

6、使用高性能的算子

7、Kryo序列化

8、Spark内存模型调优

二、参数

1、动态资源分配

2、推测机制 


一、任务优化


1、调大分配资源

        SparkC ontext,DAGScheduler,TaskScheduler,会将算子,切割成大量的task,提交到worker节点的executor上去执行

1、增加executor
        如果executor数量比较少,那么,能够并行执行的task数量就比较少,就意味着,我们的Application的并行执行的能力就很弱。比如有3个executor,每个executor有2个cpucore,那么同时能够并行执行6个task。6个执行完以后,再换下一批6个task。增加了executor数量以后,那么,就意味着,能够并行执行的task数量,也就变多了。

2、增加每个executor的cpu core
        相当于增加了执行的并行能力。原本20个executor,每个有2个Cpu core。能够并行执行的task数量是40个。现在每个executor的cpu core,增加到了10个。能够并行执行的task数量是100个。执行的速度,提升了5倍。

3、增加每个executor的内存量
        在资源允许的情况下,增加了内存量以后,对性能的提升,有三点:
        1、如果需要对RDD进行cache,那么更多的内存,就可以缓存更多的数据,将更少的数据
写入磁盘,甚至不写入磁盘,减少了磁盘IO。
        2、对于shuffle操作,reduce端, 会需要内存来存放拉取的数据并进行聚合。如果内存不够
也会写入磁盘。如果给executor分配更多内存以后,就有更少的数据,需要写入磁盘,甚至不需要写入磁盘,减少了磁盘IO,提升了性能。
        3、对于task的执行, 可能会创建很多对象。如果内存比较小,可能会频繁导致JVM堆内存满了,然后频繁GC,垃圾回收(速度很慢),内存加大以后,带来更少的GC,避免了速度变慢,提升性能。

4、增加driver的内存大小
        增加了driver的内存大小,有利于driver端可以存储更多的数据,有利于避免OOM。
        比如:提交任务时可能出现的异常信息java.lang.OutOfMemoryError: GC overhead limit exceeded 可能导致该异常的原因:
        当前一个rdd的数据量非常的大,然后针对于这个rdd执行了collect算子操作,它会把
rdd的所有数据转换成数组,拉到driver端。由于driver内存有限,存不下这些数据,就会导致出现OOM异常,这个时候就可以增加driver的内存大小。


2、提高并行度

1、设置task的数量

        至少设置成与spark Application 的总cpu core 数量相同。最理想情况,150个core,分配150task。官方推荐,task数量,设置成spark Application 总cpu core数量的2~3倍。

        设置参数spark.default.parallelism,默认是没有值的,只有设置了值,它会在shuffle的过程才会起作用。可以通过在构建SparkConf对象的时候设置,例如:

        new SparkConf().set("spark.defalut.parallelism","500")

2、提高sparksql运行的task数量

        可以适当增大sparksql运行的task数量,来提高并行度。 比如设置为:

        spark.sql.shuffle.partitions=500(默认为200)

3、给RDD重新设置partition的数量

        使用rdd.repartition 来重新分区,该方法会生成一个新的rdd,并使其分区数变大。由于一个partition对应一个task那么对应的task个数越多,通过这种方式也可以提高并行度。


3、RDD的重用和持久化

1、可以调用rddcache或者persist方法进行持久化

1cache方法默认是把数据持久化到内存中

2persist方法中有丰富的缓存级别

2、rdd持久化时可以采用序列化

1)如果正常将数据持久化在内存中,那么可能会导致内存的占用过大,这样的话,也许会导致OOM内存溢出。

2)当纯内存无法支撑公共RDD数据完全存放的时候,就优先考虑使用序列化的方式在纯内存中存储。将RDD的每个partition的数据,序列化成一个字节数组;序列化后,大大减少内存的空间占用。

3)序列化的方式,唯一的缺点就是,在获取数据的时候,需要反序列化。但是可以减少占用的空间和便于网络传输

4)如果序列化纯内存方式,还是导致OOM内存溢出;就只能考虑磁盘的方式,内存+磁盘的普通方式(无序列化)。

5)为了数据的高可靠性,而且内存充足,可以使用双副本机制,进行持久化持久化的每个数据单元,存储一份副本,放在其他节点上面,从而进行容错;一个副本丢了,不用重新计算,还可以使用另外一份副本。这种方式,仅仅针对你的内存资源极度充足。


4、使用广播变量

性能分析 

        假设:一个任务需要50executor1000task共享数据为100M

        (1)在不使用广播变量的情况下,1000task就需要该共享数据的1000个副本,也就是说有1000份数需要大量的网络传输和内存开销存储。耗费的内存大小1000*100M=100G

        (2)使用了广播变量后,50executor就只需要50个副本数据,而且不一定都是从Driver传输到每个节点,还可能是就近从最近的节点的executorblockmanager上拉取广播变量副本,网络传输速度大大增加;内存开销 50*100M=5G

        内存节约(task数量/excutor数量)倍

注意事项

(1RDD使用广播                  

2)广播变量只能在Driver端定义,不能在Executor端定义。

3)在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。

4)如果executor端用到了Driver的变量,如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本。如果Executor端用到了Driver的变量,如果使用广播变量在每个Executor中只有一份Driver端的变量副本。


5、避免使用shuffle类算子

        reduceByKey、join、distinct、repartition等算子都会产生shuffle

val rdd3 = rdd1.join(rdd2)

val rdd2Data = rdd2.collect()

val rdd2DataBroadcast = sc.broadcast(rdd2Data)

val rdd3 = rdd1.map(rdd2DataBroadcast...)


6、使用高性能的算子

  • 使用reduceByKey/aggregateByKey替代groupByKey
    • 使用mapPartitions替代普通map
    • 使用foreachPartition替代foreach
    • 使用filter之后进行coalesce操作
    • 使用repartitionAndSortWithinPartitions替代repartitionsort类操作

7、Kryo序列化

1Java的序列化器

优点:处理起来方便,不需要我们手动做其他操作,只是在使用一个对象和变量的时候,需要实现Serializble接口。

缺点:默认的序列化机制的效率不高,序列化的速度比较慢;序列化以后的数据,占用的内存空间相对还是比较大。

2Kryo序列化机制

比默认的Java序列化机制,速度要快,序列化后的数据要更小,大概是Java序列化机制的1/10。所以Kryo序列化优化以后,可以让网络传输的数据变少;在集群中耗费的内存资源大大减少。

3Kryo序列化机制生效地方

算子函数中使用到的外部变量

持久化RDD时进行序列化,StorageLevel.MEMORY_ONLY_SER

产生shuffle的地方,也就是宽依赖


8、Spark内存模型调优

Executor的内存主要分为三块
1) task执行我们自己编写的代码时使用
2) task通过shuffle过程拉取了上一个stagetask的输出后,进行聚合等操作时使用
3) RDD缓存时使用

缺点:配置完Storage内存区域和execution区域后,我们的一个任务假设execution内存不够用了,但是它的Storage内存区域是空闲的,两个之间不能互相借用,不够灵活

Storage内存和execution内存 可以相互借用


二、参数

1、动态资源分配

  • spark.dynamicAllocation.enabled

    是否开启动态资源配置,根据工作负载来衡量是否应该增加或减少executor,默认false

    spark.dynamicAllocation.minExecutors

    动态分配最小executor个数,在启动时就申请好的,默认0

    spark.dynamicAllocation.maxExecutors

    动态分配最大executor个数,默认infinity

    spark.dynamicAllocation.initialExecutors

    动态分配初始executor个数,默认值=spark.dynamicAllocation.minExecutors

    spark.dynamicAllocation.executorIdleTimeout

    当某个executor空闲超过这个设定值,就会被kill,默认60s

    spark.dynamicAllocation.cachedExecutorIdleTimeout

    当某个缓存数据的executor空闲时间超过这个设定值,就会被kill,默认infinity

    spark.dynamicAllocation.schedulerBacklogTimeout

    任务队列非空,资源不够,申请executor的时间间隔,默认1s

    spark.dynamicAllocation.sustainedSchedulerBacklogTimeout

    同schedulerBacklogTimeout,是申请了新executor之后继续申请的间隔,默认=schedulerBacklogTimeout

    2、推测机制 

spark.speculation如果设置为"true", 就会对tasks执行推测机制。就是说在一个stage下跑的慢的tasks将有机会被重新启动,默认false
spark.speculation.intervalSpark检测tasks推测机制的间隔时间,默认100ms
spark.speculation.quantile当一个stage下多少百分比的tasks运行完成才会开启推测机制,默认0.75
spark.speculation.multiplier一个task的运行时间是所有task的运行时间中位数的几倍(门限值)才会被认为该task需要重新启动,默认1.5

 speculation工作流程

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/qq_43744420/article/details/129671102

spark streaming 调优实践_god_war的博客-爱代码爱编程

在使用 Spark 和 Spark Streaming 时,当我们将应用部署在集群上时,可能会碰到运行慢、占用过多资源、不稳定等问题,这时需要做一些优化才能达到最好的性能。有时候一个简单的优化可以起到化腐朽为神奇的作用,使得程序能够更加有效率,也更加节省资源。本文我们就来介绍一些能够提高应用性能的参数和配置。 另外需要指出的是,优化本身是一个具体性很强的

Spark性能优化调优根据Spark UI进行调优记录-爱代码爱编程

SPARK-SQL优化三剑客:1内存2并发3CPU 1、内存: spark的dirver和executor内存及对应spark作业参数 涉及内存调优就三个参数:spark.driver.memory ,-executor-memory 和 spark.yarn.executor.memoryOverhead 2、并发:提高有shuffle(join,

spark 调优参数-爱代码爱编程

Spark.reducer.maxSizeInFlight 默认值:48m 参数说明:该参数用于设置shuffle read任务的buff缓冲区大小,该缓冲区决定一次可以拉取多少数据。 调整建议:如果可用内存资源足够,则可以增加参数的大小(例如96m),从而减少拉取数据的次数,这可以减少网络传输的次数并提高性能。 在实践中发现,合理调整参数后,性能会

HiveonSpark调优-爱代码爱编程

前言 之前在Hive on Spark跑测试时,100g的数据量要跑⼗⼏个⼩时,⼀看CPU和内存的监控,发现 POWER_TEST阶段(依次执⾏30个查询)CPU只⽤了百分之⼗⼏,也就是没有把整个集群的性能利⽤起来,导致跑得很慢。因此,如何调整参数,使整个集群发挥最⼤性能显得尤为重要。 Spark作业运⾏原理 详细原理见上图。我们使⽤spark

面试官:spark任务如何调优-爱代码爱编程

如果面试时被问到spark任务如何调优,我们该如何回答呐? 下面我们从四大方面回答这个问题,保证吊打面试官。 一、spark性能调优 1、分配更多的资源 比如增加执行器个数(num_executor)、增加执行器个数(executor_cores)、增加执行器内存(executor_memory) 2、调节并行度 spark.default.p

spark 任务如何调优_星空下的那个人影的博客-爱代码爱编程

spark 性能调优 a. 分配更多资源——第一步要做的 比如增加 executor个数(num_executor)、增加 executor 的 cpu 核数(executor_cores)、增加 executor 的内存量(executor_memory) 增加 executor个数 和 executor 的 cpu 核数是为了增加执行的并行能力(能够

hive on spark 相关调优_何时java大成的博客-爱代码爱编程

一、yarn相关调优 需要调整的Yarn参数均与CPU、内存等资源有关,核心配置参数如下 (1)yarn.nodemanager.resource.memory-mb 该参数的含义是,一个NodeManager节点分配给Container使用的内存。该参数的配置,取决于NodeManager所在节点的总内存容量和该节点运行的其他服务的数量。 考虑上

[spark]二spark性能调优|spark任务监控|程序调优|资源调优_胖胖学编程的博客-爱代码爱编程

1.Spark任务监控 对Spark性能的调优离不开对任务的监控,只有在运行过程中,通过监控手段发现问题,才能迅速定位问题所在。 SparkUI使用 在运行Spark应用程序时,默认会在Driver节点的4040端口启动WebUI服务,通过此WebUI可对Spark的应用程序的Job划分、Stage划分、Task执行缓存的使用等各个方面进行了监控。