Spark OOM问题总结
Spark 内存模型
Spark 在一个 Executor 的内存主要分位三块:
- execution 内存, 执行内存,
join、aggregate、map
都在这部分内存执行,shuffle 的数据也会先在这里缓存,满了再写入磁盘,减少IO - storage 内存,
broadcast、persist、cache
存储数据的地方 - other 内存, 程序执行时预留给自己的内存
OOM 问题原因
Executor OOM
- map 类操作产生大量数据, 包括 map、filter、mapPartitionsd等操作
- shuffle 后产生数据倾斜
Driver OOM
- 读取数据太大
- 数据回传
Driver Heap OOM的三大原因
一、用户在 Driver 端生成大对象,比如创建一个很大的集合数据结构
解决思路:
- 考虑将该对象转换为 Spark 数据结构,转换成 Executor 端加载,比如调用
sc.textFile()
方法等 - 如若无法避免,合理评估资源,适当调大 Driver 端内存
二、从 Executor 拉取数据到 Driver 端, 比如 Collect()
操作
某个 stage 中 Executor 端发回到 Driver 的数据量不能超过 spark.driver.maxResultSize
, 默认 1g, 如果增加该值,driver-memory
也需要相应调整。 同时 resultSize
只是数据序列化之后的 Size,如果是 Collect 操作会将这些数据反序列化收集,此时所需内存还会膨胀更多。
// directSend = sending directly back to the driver
// maxResultSize = spark.driver.maxResultSize
// maxDirectResultSize = spark.task.maxDirectResultSize
val serializedResult: ByteBuffer = {
if (maxResultSize > 0 && resultSize > maxResultSize) {
logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +
s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
s"dropping it.")
ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
} else if (resultSize > maxDirectResultSize) {
val blockId = TaskResultBlockId(taskId)
env.blockManager.putBytes(
blockId,
new ChunkedByteBuffer(serializedDirectResult.duplicate()),
StorageLevel.MEMORY_AND_DISK_SER)
logInfo(
s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
} else {
logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")
serializedDirectResult
}
}
解决思路:
- 本身不建议将大数据拉取到 Driver 端,尽量使用 RDD 完成数据操作
三、Spark 本身框架内存占用
主要由 Spark UI
数据占用,取决于任务复杂度,比如 Task 个数, 所以也会出现 Spark UI 打不开的情况,这个时候就需要考虑是否有 OOM 了
解决思路:
- 考虑缩小
partition
个数,例如从HDFS load
的数据会自动计算partitions
, 但是假如后续用户操作中做了类似过滤等操作大大减少了数据量的情况下可以考虑缩小partition
个数 - 调整
spark.ui.retainedJobs
、spark.ui.retainedStages
、spark.ui.retainedTasks
参数, 控制 Spark UI 记录保留限制
常见内存优化场景
一、map 过程产生大量对象 或 调用 coalesce 导致 OOM
调用 repartition() 增大分区,生成更多任务, 减少每个 task 的大小
二、shuffle 内存溢出
Shufflel类算子发生shuffle时,需要传入一 个partitioner,大部分Spark中的shuffle操作,默认的partitioner都是HashPatitioner,默认值是父RDD中最大的分 区数这个参数通过spark.default.parallelism控制(在spark-sql中用spark.sql.shuffle.partitions),
spark.default.parallelism参数只对HashPartitioner有效,所以如果是别的Partitioner或者自己实现的Partitioner就 不能使用spark.default.parallelism这个参数来控制shuffle的并发量了。如果是别的partitioner导致的shuffle内存溢 出,就需要从partitioner的代码增加partitions的数量。
解决方法: 提高任务并行度, 或者代码层面增加partitions的数量
四、在RDD中,共用对象能够减少OOM的情况
如果RDD中有大量的重复数据,或者Array中需要存大量重复数据的时候我们都可以将重复数据转化为String,能够 有效的减少内存使用
比如: rdd.flatMap(x=>for(i <— 1 to 1000) yield(“key”,“value”))
会导致OOM,但是在同样的情况下,使用rdd.flatMap(x=>for(i <—1 to 1000) yield “key”+ “value”)
就 不会有OOM的问题,这是因为每次(“key”,“value”)都产生一个Tuple对象,而“key”+“value”,不管多少个,都只有 一个对象,指向常量池。
五、算子层面优化
-
使用mapPartitions代替大部分map操作,或者连续使用的map操作: 整个分区的操作,减少了中间结果的输出,避免了频繁的创建了对象。
-
DataFrame 代替RDD, 任务被划分成多个stage,在每个 stage内部,RDD是无法自动优化的,而DataFrame 使用sql查询,自带sql优化器,可自动找到最优方案
-
broadcast join 替代普通join:在大数据分布式系统中,大量数据的移动对性能的影响也是巨大的。基于这个思想,在两个RDD进行join操作的时候,如果其中一个RDD相对小很多,可以将小的RDD进行collect操作然后设置为broadcast变量,这样做之后,另一个RDD就可以使用map操作进行join,这样能够有效的减少相对大很多的那个RDD的数据移动。
-
先filter再join:这个就是谓词下推,这个很显然,filter之后再join,shuffle的数据量会减少,这里提一点是spark—sql的优化器已经对这部分有优化了,不需要用户显示的操作,个人实现rdd的计算的时候需要注意这个。
-
partitonBy优化:如果一个RDD需要多次在join(特别是迭代)中使用,那么事先使用partitionBy对RDD进行分区,可以减少大量的shuffle.
-
combineByKey的使用:因为combineByKey是Spark中一个比较核心的高级函数,其他一些高阶键值对函数底层都是用它实现的。诸如groupByKey,reduceByKey等等
-
在内存不足的使用,使用rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)代替rdd.cache():
rdd.cache()和rdd.persist(Storage.MEMORY_ONLY)是等价的,在内存不足的时候rdd.cache0的数据会丢失,再次使用的时候会重算,而rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)在内存不足的时候会存储在磁盘,避免,重算,只是消耗点IO时间
六、参数层面优化
-
spark.driver.memory (default 1g)
这个参数用来设置Driver的内存。在Spark程序中, SparkContext, DAGScheduler都是运行在Driver端的。对应 rdd的Stage切分也是在Driver端运行,如果用户自己写的程序有过多的步骤,切分出过多的Stage,这部分信息消 耗的是Driver的内存,这个时候就需要调大Driver的内存。
-
spark.rdd.compress (default false)
这个参数在内存吃紧的时候,又需要persist数据有良好的性能,就可以设置这个参数为true,这样在使用
persist(StorageLevel.MEMORY_ONLY_SER)
的时候,就能够压缩内存中的rdd数据。减少内存消耗,就是在使用的 时候会占用CPU的解压时间 -
spark.memory.storageFraction (default 0.5)
这个参数设置内存表示Executor内存中
storage/(storage+execution)
,虽然spark1.6.0+
的版本内存storage和 execution的内存已经是可以互相借用的了,但是借用和赎回也是需要消耗性能的,所以如果明知道程序中storage是多是少就可以调节一下这个参数