Starting Apache Spark version 1.6.0, memory management model has changed. The old memory management model is implemented by StaticMemoryManager class, and now it is called “legacy”. “Legacy” mode is disabled by default, which means that running the same code on Spark 1.5.x and 1.6.0 would result in different behavior, be careful with that. For compatibility, you can enable the “legacy” model with spark.memory.useLegacyMode parameter, which is turned off by default.
Previously I have described the “legacy” model of memory management in this article about Spark Architecture almost one year ago. Also I have written an article on Spark Shuffle implementations that briefly touches memory management topic as well.
This article describes new memory management model used in Apache Spark starting version 1.6.0, which is implemented as UnifiedMemoryManager.
Long story short, new memory management model looks like this:
You can see 3 main memory regions on the diagram:
- Reserved Memory. This is the memory reserved by the system, and its size is hardcoded. As of Spark 1.6.0, its value is 300MB, which means that this 300MB of RAM does not participate in Spark memory region size calculations, and its size cannot be changed in any way without Spark recompilation or setting spark.testing.reservedMemory, which is not recommended as it is a testing parameter not intended to be used in production. Be aware, this memory is only called “reserved”, in fact it is not used by Spark in any way, but it sets the limit on what you can allocate for Spark usage. Even if you want to give all the Java Heap for Spark to cache your data, you won’t be able to do so as this “reserved” part would remain spare (not really spare, it would store lots of Spark internal objects). For your information, if you don’t give Spark executor at least 1.5 * Reserved Memory = 450MB heap, it will fail with “please use larger heap size” error message.
- User Memory. This is the memory pool that remains after the allocation of Spark Memory, and it is completely up to you to use it in a way you like. You can store your own data structures there that would be used in RDD transformations. For example, you can rewrite Spark aggregation by using mapPartitions transformation maintaining hash table for this aggregation to run, which would consume so called User Memory. In Spark 1.6.0 the size of this memory pool can be calculated as (“Java Heap” – “Reserved Memory”) * (1.0 – spark.memory.fraction), which is by default equal to (“Java Heap” – 300MB) * 0.25. For example, with 4GB heap you would have 949MB of User Memory. And again, this is the User Memory and its completely up to you what would be stored in this RAM and how, Spark makes completely no accounting on what you do there and whether you respect this boundary or not. Not respecting this boundary in your code might cause OOM error.
- Spark Memory. Finally, this is the memory pool managed by Apache Spark. Its size can be calculated as (“Java Heap” – “Reserved Memory”) * spark.memory.fraction, and with Spark 1.6.0 defaults it gives us (“Java Heap” – 300MB) * 0.75. For example, with 4GB heap this pool would be 2847MB in size. This whole pool is split into 2 regions – Storage Memory and Execution Memory, and the boundary between them is set by spark.memory.storageFraction parameter, which defaults to 0.5. The advantage of this new memory management scheme is that this boundary is not static, and in case of memory pressure the boundary would be moved, i.e. one region would grow by borrowing space from another one. I would discuss the “moving” this boundary a bit later, now let’s focus on how this memory is being used:
- Storage Memory. This pool is used for both storing Apache Spark cached data and for temporary space serialized data “unroll”. Also all the “broadcast” variables are stored there as cached blocks. In case you’re curious, here’s the code of unroll. As you may see, it does not require that enough memory for unrolled block to be available – in case there is not enough memory to fit the whole unrolled partition it would directly put it to the drive if desired persistence level allows this. As of “broadcast”, all the broadcast variables are stored in cache with MEMORY_AND_DISK persistence level.
- Execution Memory. This pool is used for storing the objects required during the execution of Spark tasks. For example, it is used to store shuffle intermediate buffer on the Map side in memory, also it is used to store hash table for hash aggregation step. This pool also supports spilling on disk if not enough memory is available, but the blocks from this pool cannot be forcefully evicted by other threads (tasks).
Ok, so now let’s focus on the moving boundary between Storage Memory and Execution Memory. Due to nature of Execution Memory, you cannot forcefully evict blocks from this pool, because this is the data used in intermediate computations and the process requiring this memory would simply fail if the block it refers to won’t be found. But it is not so for the Storage Memory – it is just a cache of blocks stored in RAM, and if we evict the block from there we can just update the block metadata reflecting the fact this block was evicted to HDD (or simply removed), and trying to access this block Spark would read it from HDD (or recalculate in case your persistence level does not allow to spill on HDD).
So, we can forcefully evict the block from Storage Memory, but cannot do so from Execution Memory. When Execution Memory pool can borrow some space from Storage Memory? It happens when either:
- There is free space available in Storage Memory pool, i.e. cached blocks don’t use all the memory available there. Then it just reduces the Storage Memory pool size, increasing the Execution Memory pool.
- Storage Memory pool size exceeds the initial Storage Memory region size and it has all this space utilized. This situation causes forceful eviction of the blocks from Storage Memory pool, unless it reaches its initial size.
In turn, Storage Memory pool can borrow some space from Execution Memory pool only if there is some free space in Execution Memory pool available.
Initial Storage Memory region size, as you might remember, is calculated as “Spark Memory” * spark.memory.storageFraction = (“Java Heap” – “Reserved Memory”) * spark.memory.fraction * spark.memory.storageFraction. With default values, this is equal to (“Java Heap” – 300MB) * 0.75 * 0.5 = (“Java Heap” – 300MB) * 0.375. For 4GB heap this would result in 1423.5MB of RAM in initial Storage Memory region.
This implies that if we use Spark cache and the total amount of data cached on executor is at least the same as initial Storage Memory region size, we are guaranteed that storage region size would be at least as big as its initial size, because we won’t be able to evict the data from it making it smaller. However, if your Execution Memory region has grown beyond its initial size before you filled the Storage Memory region, you won’t be able to forcefully evict entries from Execution Memory, so you would end up with smaller Storage Memory region while execution holds its blocks in memory.
I hope this article helped you better understand Apache Spark memory management principles and design your applications accordingly. If you have any questions, feel free to ask them in comments.
Pingback: Spark Memory Management | Filling the gaps in Big Data
Thanks for the very detailed article! It will be very helpful when using Spark. As a user, how do I know the current state of how memory is being used?
For example, if I want to cache an RDD and I calculated that memory would be enough when allocating resources, how would I verify that my assumptions are still correct after the work I’ve done in the context?
As of RDD cache status, you can always check this in the Spark WebUI, on the “Storage” tab. It will show you RDD storage level, percentage of RDD cached, size of the cache in memory, size of the cache on disk
If you want to get it from your code, you have to use the method SparkContext.getRDDStorageInfo() – this would return you the same information (basically array of RDDInfo http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.storage.RDDInfo). But this is developer API, so the signature of the function might be changed in the future releases
it was a very nice explanation !!!
Could you please help me on the below queries :
1. how to view the array of RDDInfo return by SparkContext.getRDDStorageInfo() methods
2. is it possible to view the executor memory in spark web ui.
Suppose if my memory is insufficient spark pushes data into hdd how will it move to other executors . My entire data should be in either RAM or hdd but can it be contained in two places ?
Thanks Nice explanation
really nice explanation.
When i am executing spark job after every task GC(Garbage collector) is calling and job is taking more time for execution.Is their any spark configuration which can avoid this scenario.
There is no Java setting to prevent garbage collection. But you should be able to analyze heap dump of a running process to see which structures are causing big garbage collection. Most likely the root cause of this is your code that creates objects for each row, for example. This can be fixed by fixing your code
Thanks for the indepth article. Few queries though
Do we need to tweak our spark jobs again for spark 1.6 ?
With spark 1.6 the memory management is simplified , so the all other parameters(spark.storage.safetyFraction, shuffle.memory.fraction) effecting memory usage are of no use in spark 1.6?
And does the shuffle works same as old with new changes?
Yes, you need to tweak memory usage once again. But you can always turn on “spark.memory.useLegacyMode”, this way memory management would work the same as in old Spark version (prior to 1.6). Yes, old parameters are of no use, you can see the new parameters in the latest official guide http://spark.apache.org/docs/latest/configuration.html#memory-management (and also I described them in the article above)
Yes, for the shuffle implementations you can refer to my article on shuffles
Can I borrow your picture of Unified Memory Manager in my slides?
I will leave this link in my slides as reference!
Yes, feel free to do this keeping the reference to original
3. Spark Memory — For example, with 4GB heap this pool would be 2847MB in size. should it be (4-0.3) * 0.75 = 2.775GB?
Also in my own cluster, spark.executor.memory is set to 9G,but the executors in UI shows 0.0B / 6.2GB. I ran spark-shell on spark 1.6.0. I cannot figure out where 6.2GB come from, my calculation is (9-0.3) * 0.75 = 6.525.
No, my calculation is correct. 4GB = 4096MB, this is why (4096 – 300) * 0.75 = 2847MB or 2.78GB
In case of your exact environment, take a look at the other settings in WebUI and also check JVM process of executor – it would give you a hint where this number has came from. I get 6.2GB only when I take your 9GB as 9’000’000KB = 8789MB, and (8789 – 300) * 0.75 = 6366MB or 6.2GB
This is because the storage memory shown in web UI is maxHeapMemory – onHeapExecutionMemoryPool.memoryUsed, maxHeapMemory is (9-0.3) * 0.75 = 6.525, but since you’ve already used some memory for broadcase values such as jar files, so that the storage memory is smaller than the ideal value.
You just saved half of my hairs! I wouldn’t figure out why giving 2GB of RAM and the executor just got nuked by OOM. It runs fine when I profile the mapPartitions function (even with only 1GB of heap) but fails in YARN mode.
Thank you, sir!
Very Informative, thanks 🙂
Pingback: Spark UnifiedMemoryManager和StaticMemoryManager | Codeba
Thanks for sharing wonderful article and I think this is one of the nice article I have read about about Spark.
However, can you please throw some lights how spark switching memory management from RAM to HDD and how this transition happened?
what are the disadvantageous, performance overhead during this type of memory switching between RAM to HDD?
Thanks again for nice article.
What do you mean by “transition to HDD”? Is it about cache eviction to disk? You can read more about it here: https://spark.apache.org/docs/1.6.1/programming-guide.html#rdd-persistence
To put it short, Spark stores partitions in LRU cache in memory. When cache hits its limit in size, it evicts the entry (i.e. partition) from it. When the partition has “disk” attribute (i.e. your persistence level allows storing partition on disk), it would be written to HDD and the memory consumed by it would be freed, unless you would request it. When you request it, it would be read into the memory, and if there won’t be enough memory some other, older entries from the cache would be evicted. If your partition does not have “disk” attribute, eviction would simply mean destroying the cache entry without writing it to HDD
Thanks a lot for your amazing easy to understand article about spark
I’ve used spark 1.6.1 (mllib) with python 3.5.1 to classify Arabic text of 5 GB in size with naive Bayes and decision tree with following steps:
1- represent data in tf-idf
2 – the build the model
3- test the model
4- calculate metrics (precision, recall, etc)
its known that spark would be faster than Hadoop except that my work takes 3-5 hours using standalone spark cluster
my python code in here :
sample file from my corpus:
used arabic corpus of 8 categories with text files of 30 kB each
my cluster nodes of 3 GB RAM and 4 cores
Is RAM size could consider an issue or my code structure or nature of used data?
how to reduce the execution time ?what goes wrong?
Will you give me hints, guides?
This question is mainly about data science. Your first step is to find the bottleneck – add intermediate caching and actions on top of them like “count()” to get the idea of where the bottleneck is. If the bottleneck is model training, consider reducing the number of features, for example with SVD (see here: https://en.wikipedia.org/wiki/Latent_semantic_analysis). There might also be other problems, so you need to play with your data a bit more. Personally, for processing 5GB of data I would use a single machine and sklearn+nltk
Thank you for the explanation! Do I interpret correctly: if I get OOM errors (heap space), that’s because the executor runs out of user memory, since both storage and execution memory can be spilled on disk?
No. Storage memory will be spilled to disk only if you specified persistence level that allows storing the data on disk. If not, it will be just destroyed. Execution memory in turn cannot be forcefully spilled to disk, which I described in the article.
Also, you cannot evict all the storage memory if you need more executor memory. Your executors would borrow some memory from storage pool if it has some free space available or if storage pool has previously borrowed some memory from execution pool. You cannot borrow memory if your storage memory is occupied by your cached blocks and its size is within initial storage pool size, I provided the calculation for it in article
Thanks again, that clarified it!
I am seeing OOM errors (heap space) when running Spark on YARN. Could these be caused by the Java non-heap memory that is set using spark.yarn.executor/driver.memoryOverhead?
Pingback: spark1.6内存管理 | Codeba
Pingback: Spark1.6内存管理 | SEARU.ORG
I have similar issue. In my cluster, spark.executor.memory is set to 14G, the spark UI shows 9.8GB. I read an article before, it says:”This is because of the runtime overhead imposed by Scala, which is usually around 3-7%, more or less.” This seems apply to my calculation as (14*1024-300)*0.75*0.95=10001mb or 9.8GB.
Thanks for excellent article. How do you measure execution memory region utilisation at runtime? For storage region this information is available on the Spark UI and also exposed via JMX. But I did not find any way to check memory utilisation for execution region.
did you ever get an answer to this question? I too would like to be able to monitor the execution memory usage but have not found a solution.
I would be delighted to know this. How Spark allocates memory to container. T
I am using spark on hadoop and want to know how SPARK allocates the virtual memory to executor/container.
As per YARN vmem-pmem it gives 2.1 times virtual memory to container.
Hence- If XMX is 1GB then –> 1 GB * 2.1 = 2.1 GB is allocated to the container.
My question is how it works on SPARK. and whether the below statement is correct-
If I give Executor memory = 1GB then,
Total virtual memory = 1GB * 2.1 * spark.yarn.executor.memoryOverhead. Is this true?
If not then please elaborate how virtual memory for an executor is calculated in SPARK.
As per spark.apache.org/docs/latest/running-on-yarn.html executorMemory * 0.10, with minimum of 384. The question that I have is whether in case of SPARK, does YARN uses vmem-pmem (default 2.1) to calculate the virtual memory and then adds the overhead to that memory?
Thanks for a great post!
I see contradictions of what you say about Storage Memory and what is written in Spark docs(https://spark.apache.org/docs/1.6.0/tuning.html#memory-management-overview).
You say that “…this boundary is not static, and in case of memory pressure the boundary would be moved, i.e. one region would grow by borrowing space from another one”, which as I understand means that, for example, if I cached RDD and then a lot of shuffle happens, then cached data (stored in Storage Memory) will be evicted, and shuffle data will sit in that place. But Spark docs says: that “spark.memory.storageFraction expresses the size of R as a fraction of M (default 0.5). R is the storage space within M where cached blocks immune to being evicted by execution”, which as I understand says there is a region of storage memory that is unevictable.
Finally, is there a region that is unevictable in Storage Memory or not?
hi, i got a problem that i use the mapPartitions function heavily,but i dont known how to manage the memory in “user memory”,so i guest it accumulate data all the time,and finally the worker was killed by yarn because of “real memory use” > “core memory” + “yarn overhead” ,is there some solution or some useful doc? thx.
Pingback: Apache Spark 2.0 Memory Management | OneSmartClick.com
Really nice overview! So where do memory errors (in Pyspark) potentially come from, if so many parts can spill to disk? A summary of this would be incredibly useful!
Suppose if my memory is insufficient spark pushes data into hdd how will it move to other executors . My entire data should be in either RAM or hdd but can it be contained in two places ?
I am trying to understand the memory management in pyspark when it works with YARN for resource allocation
In my use-case, I have a spark streaming application written in python that reads data from a partitioned Kafka topic. For Kafka integration I use the spark-kafka direct streaming API. My processing for each message is a single map operation that analyses the message and saves eventual results in a database, the code is as simple as:
dstream.foreachRDD(lambda rdd: rdd.foreach(process_message))
I am trying to achieve memory limitation that
– each message should be processed with at most 1GB of memory
– YARN should kill the container if it consumes more than some amount of memory
I start the pyspark application with spark.executor.memory=5GB and after a container for an executor is requested and granted by YARN, I see in the operating system something like:
(j1) java parent process ———– (Xms and Xmx are set to 5GB)
(p1)python parent process———–
(p_1_1) python worker process
(p_1_2) python worker process
(p_1_3) python worker process
(p_1_4) python worker process
What I observe than is:
– the java process memory slowly increases up to 5GB
– all python worker processes have some memory consumption of 500MB
– eventually YARN kills the container for exceeding the 5GB limit
What I do not understand:
1) why the java process grows slowly to 5GB when I have only one map operation containing python code, which I assume is executed in the python worker processes?
2) Is there a way to achieve the desired memory limitation? I am wondering if setting spark.executor.memory=1GB and spark.executor.memoryOverhead=4GB will do the trick?
3) I assume that I have 4 python workers, because the executor is handling 4 partitions for each RDD. is this correct and is there a way to know how many python worker processes will be created for an executor?
Quick question about the User memory: Lets say I define a custom mapPartitions functions and use a bunch of data structures in python. eg. heap or something. That heap should be allocated on the user memory right?
Your article helped a lot to understand internals of SPARK.
I am running Spark in standalone mode on my local machine with 16 GB RAM. I have configured spark with 4G Driver memory, 12 GB executor memory with 4 cores.
I am using default configuration of memory management as below:
According to calculation given in the article,storage memory should be (12*1024-300)*06*.5 = 3596.4 MB, But it shows 2.1 GB.
Please help me understand, if I am missing something.
Thanks for your nice blog. Learn a lot about Spark memory management from your blog. But there is still one problem here.
About the parameter
spark.memory.storageFraction, I think it’s the least fraction for storage memory, from this post http://spark.apache.org/docs/1.6.0/tuning.html#memory-management-overview, but in your article, storage memory can be lower than this threshold?
Can you elaborate about the user space memory like what is the specific operations we do there ?
Pingback: Spark on YARN: yarn-client vs yarn-cluster: Spark Driver memory differences | ASK Dev Archives
Pingback: Spark资源 – 数据驱动的世界
Pingback: Working with Apache Spark – Challenges and Lessons Learned | Silver Ibenye
Pingback: Off-heap – Spark Memory | 清风谷
Pingback: Kylin 2.0 Spark Cubing 優化改進 - 程序員的後花園
Pingback: RDDs, Spark Memory, and Execution | 程式前沿
Nice Explanation..Actually i am stuck in one log which says :
“18/09/27 13:37:02 INFO MemoryStore: MemoryStore started with capacity 366.3 MB”
Now i am not sure why this is happening and how i can override this.Kindly help
Pingback: Spark study notes: core concepts visualized – javaengage
Pingback: [Spark性能调优] Spark Shuffle 中 JVM 内存使用及配置内幕详情 | 麦叶
Pingback: Spark 内存管理详解 | 麦叶
Pingback: [翻訳] Spark Memory Management since 1.6.0 - TECHBIRD ｜ TECHBIRD - プログラミングを楽しく学ぼう
Pingback: [Spark] Memory model | Deon Room
Very informative and detailed. Thanks!
Have a question to ask. When you shuffle data after caching (For joins/aggregation) is it the cached blocks that gets shuffled or is it the copy of these cached blocks.
An extension of the question would be, if we prune columns post cache, what would be the actual data that is transferred? The cached block or modified cached block?
Pingback: spark 源码分析之十五 — Spark内存管理剖析 - SEOOS技术门户
Pingback: [TR] Spark Performans Optimizasyonu Yeni Bilgiler | CS Günlükleri