Spark Memory Management

Starting Apache Spark version 1.6.0, memory management model has changed. The old memory management model is implemented by StaticMemoryManager class, and now it is called “legacy”. “Legacy” mode is disabled by default, which means that running the same code on Spark 1.5.x and 1.6.0 would result in different behavior, be careful with that. For compatibility, you can enable the “legacy” model with spark.memory.useLegacyMode parameter, which is turned off by default.

Previously I have described the “legacy” model of memory management in this article about Spark Architecture almost one year ago. Also I have written an article on Spark Shuffle implementations that briefly touches memory management topic as well.

This article describes new memory management model used in Apache Spark starting version 1.6.0, which is implemented as UnifiedMemoryManager.

Long story short, new memory management model looks like this:

Spark Memory Management 1.6.0+

Apache Spark Unified Memory Manager introduced in v1.6.0+

You can see 3 main memory regions on the diagram:

  1. Reserved Memory. This is the memory reserved by the system, and its size is hardcoded. As of Spark 1.6.0, its value is 300MB, which means that this 300MB of RAM does not participate in Spark memory region size calculations, and its size cannot be changed in any way without Spark recompilation or setting spark.testing.reservedMemory, which is not recommended as it is a testing parameter not intended to be used in production. Be aware, this memory is only called “reserved”, in fact it is not used by Spark in any way, but it sets the limit on what you can allocate for Spark usage. Even if you want to give all the Java Heap for Spark to cache your data, you won’t be able to do so as this “reserved” part would remain spare (not really spare, it would store lots of Spark internal objects). For your information, if you don’t give Spark executor at least 1.5 * Reserved Memory = 450MB heap, it will fail with “please use larger heap size” error message.
  2. User Memory. This is the memory pool that remains after the allocation of Spark Memory, and it is completely up to you to use it in a way you like. You can store your own data structures there that would be used in RDD transformations. For example, you can rewrite Spark aggregation by using mapPartitions transformation maintaining hash table for this aggregation to run, which would consume so called User Memory. In Spark 1.6.0 the size of this memory pool can be calculated as (“Java Heap” – “Reserved Memory”) * (1.0 – spark.memory.fraction), which is by default equal to (“Java Heap” – 300MB) * 0.25. For example, with 4GB heap you would have 949MB of User Memory. And again, this is the User Memory and its completely up to you what would be stored in this RAM and how, Spark makes completely no accounting on what you do there and whether you respect this boundary or not. Not respecting this boundary in your code might cause OOM error.
  3. Spark Memory. Finally, this is the memory pool managed by Apache Spark. Its size can be calculated as (“Java Heap” – “Reserved Memory”) * spark.memory.fraction, and with Spark 1.6.0 defaults it gives us (“Java Heap” – 300MB) * 0.75. For example, with 4GB heap this pool would be 2847MB in size. This whole pool is split into 2 regions – Storage Memory and Execution Memory, and the boundary between them is set by spark.memory.storageFraction parameter, which defaults to 0.5. The advantage of this new memory management scheme is that this boundary is not static, and in case of memory pressure the boundary would be moved, i.e. one region would grow by borrowing space from another one. I would discuss the “moving” this boundary a bit later, now let’s focus on how this memory is being used:
    1. Storage Memory. This pool is used for both storing Apache Spark cached data and for temporary space serialized data “unroll”. Also all the “broadcast” variables are stored there as cached blocks. In case you’re curious, here’s the code of unroll. As you may see, it does not require that enough memory for unrolled block to be available – in case there is not enough memory to fit the whole unrolled partition it would directly put it to the drive if desired persistence level allows this. As of “broadcast”, all the broadcast variables are stored in cache with MEMORY_AND_DISK persistence level.
    2. Execution Memory. This pool is used for storing the objects required during the execution of Spark tasks. For example, it is used to store shuffle intermediate buffer on the Map side in memory, also it is used to store hash table for hash aggregation step. This pool also supports spilling on disk if not enough memory is available, but the blocks from this pool cannot be forcefully evicted by other threads (tasks).

Ok, so now let’s focus on the moving boundary between Storage Memory and Execution Memory. Due to nature of Execution Memory, you cannot forcefully evict blocks from this pool, because this is the data used in intermediate computations and the process requiring this memory would simply fail if the block it refers to won’t be found. But it is not so for the Storage Memory – it is just a cache of blocks stored in RAM, and if we evict the block from there we can just update the block metadata reflecting the fact this block was evicted to HDD (or simply removed), and trying to access this block Spark would read it from HDD (or recalculate in case your persistence level does not allow to spill on HDD).

So, we can forcefully evict the block from Storage Memory, but cannot do so from Execution Memory. When Execution Memory pool can borrow some space from Storage Memory? It happens when either:

  • There is free space available in Storage Memory pool, i.e. cached blocks don’t use all the memory available there. Then it just reduces the Storage Memory pool size, increasing the Execution Memory pool.
  • Storage Memory pool size exceeds the initial Storage Memory region size and it has all this space utilized. This situation causes forceful eviction of the blocks from Storage Memory pool, unless it reaches its initial size.

In turn, Storage Memory pool can borrow some space from Execution Memory pool only if there is some free space in Execution Memory pool available.

Initial Storage Memory region size, as you might remember, is calculated as Spark Memory” * spark.memory.storageFraction = (“Java Heap” – “Reserved Memory”) * spark.memory.fraction * spark.memory.storageFraction. With default values, this is equal to (“Java Heap” – 300MB) * 0.75 * 0.5 = (“Java Heap” – 300MB) * 0.375. For 4GB heap this would result in 1423.5MB of RAM in initial Storage Memory region.

This implies that if we use Spark cache and the total amount of data cached on executor is at least the same as initial Storage Memory region size, we are guaranteed that storage region size would be at least as big as its initial size, because we won’t be able to evict the data from it making it smaller. However, if your Execution Memory region has grown beyond its initial size before you filled the Storage Memory region, you won’t be able to forcefully evict entries from Execution Memory, so you would end up with smaller Storage Memory region while execution holds its blocks in memory.

I hope this article helped you better understand Apache Spark memory management principles and design your applications accordingly. If you have any questions, feel free to ask them in comments.

25 thoughts on “Spark Memory Management

  1. Pingback: Spark Memory Management | Filling the gaps in Big Data

  2. Alejandro

    Thanks for the very detailed article! It will be very helpful when using Spark. As a user, how do I know the current state of how memory is being used?

    For example, if I want to cache an RDD and I calculated that memory would be enough when allocating resources, how would I verify that my assumptions are still correct after the work I’ve done in the context?

    Thanks!

    Reply
    1. 0x0FFF Post author

      Thank you
      As of RDD cache status, you can always check this in the Spark WebUI, on the “Storage” tab. It will show you RDD storage level, percentage of RDD cached, size of the cache in memory, size of the cache on disk
      If you want to get it from your code, you have to use the method SparkContext.getRDDStorageInfo() – this would return you the same information (basically array of RDDInfo http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.storage.RDDInfo). But this is developer API, so the signature of the function might be changed in the future releases

      Reply
  3. MANDAR VAIDYA

    really nice explanation.

    When i am executing spark job after every task GC(Garbage collector) is calling and job is taking more time for execution.Is their any spark configuration which can avoid this scenario.

    Regards,
    Mandar Vaidya.

    Reply
    1. 0x0FFF Post author

      There is no Java setting to prevent garbage collection. But you should be able to analyze heap dump of a running process to see which structures are causing big garbage collection. Most likely the root cause of this is your code that creates objects for each row, for example. This can be fixed by fixing your code

      Reply
  4. gagan mishra

    Thanks for the indepth article. Few queries though
    Do we need to tweak our spark jobs again for spark 1.6 ?
    With spark 1.6 the memory management is simplified , so the all other parameters(spark.storage.safetyFraction, shuffle.memory.fraction) effecting memory usage are of no use in spark 1.6?
    And does the shuffle works same as old with new changes?

    Thanks again.

    Reply
    1. 0x0FFF Post author

      Yes, you need to tweak memory usage once again. But you can always turn on “spark.memory.useLegacyMode”, this way memory management would work the same as in old Spark version (prior to 1.6). Yes, old parameters are of no use, you can see the new parameters in the latest official guide http://spark.apache.org/docs/latest/configuration.html#memory-management (and also I described them in the article above)
      Yes, for the shuffle implementations you can refer to my article on shuffles

      Reply
  5. Jimin Hsieh

    Hi Alexey,

    Can I borrow your picture of Unified Memory Manager in my slides?
    I will leave this link in my slides as reference!
    Thanks!

    Reply
  6. Wayne

    3. Spark Memory — For example, with 4GB heap this pool would be 2847MB in size. should it be (4-0.3) * 0.75 = 2.775GB?

    Also in my own cluster, spark.executor.memory is set to 9G,but the executors in UI shows 0.0B / 6.2GB. I ran spark-shell on spark 1.6.0. I cannot figure out where 6.2GB come from, my calculation is (9-0.3) * 0.75 = 6.525.

    Reply
    1. 0x0FFF Post author

      No, my calculation is correct. 4GB = 4096MB, this is why (4096 – 300) * 0.75 = 2847MB or 2.78GB

      In case of your exact environment, take a look at the other settings in WebUI and also check JVM process of executor – it would give you a hint where this number has came from. I get 6.2GB only when I take your 9GB as 9’000’000KB = 8789MB, and (8789 – 300) * 0.75 = 6366MB or 6.2GB

      Reply
  7. Doug

    You just saved half of my hairs! I wouldn’t figure out why giving 2GB of RAM and the executor just got nuked by OOM. It runs fine when I profile the mapPartitions function (even with only 1GB of heap) but fails in YARN mode.
    Thank you, sir!

    Reply
  8. Pingback: Spark UnifiedMemoryManager和StaticMemoryManager | Codeba

  9. Nishan

    Hi,

    Thanks for sharing wonderful article and I think this is one of the nice article I have read about about Spark.

    However, can you please throw some lights how spark switching memory management from RAM to HDD and how this transition happened?

    what are the disadvantageous, performance overhead during this type of memory switching between RAM to HDD?

    Thanks again for nice article.

    Reply
    1. 0x0FFF Post author

      What do you mean by “transition to HDD”? Is it about cache eviction to disk? You can read more about it here: https://spark.apache.org/docs/1.6.1/programming-guide.html#rdd-persistence
      To put it short, Spark stores partitions in LRU cache in memory. When cache hits its limit in size, it evicts the entry (i.e. partition) from it. When the partition has “disk” attribute (i.e. your persistence level allows storing partition on disk), it would be written to HDD and the memory consumed by it would be freed, unless you would request it. When you request it, it would be read into the memory, and if there won’t be enough memory some other, older entries from the cache would be evicted. If your partition does not have “disk” attribute, eviction would simply mean destroying the cache entry without writing it to HDD

      Reply
  10. Bushra

    Hello,
    Thanks a lot for your amazing easy to understand article about spark

    I’ve used spark 1.6.1 (mllib) with python 3.5.1 to classify Arabic text of 5 GB in size with naive Bayes and decision tree with following steps:
    1- represent data in tf-idf
    2 – the build the model
    3- test the model
    4- calculate metrics (precision, recall, etc)

    its known that spark would be faster than Hadoop except that my work takes 3-5 hours using standalone spark cluster

    my python code in here :
    https://drive.google.com/file/d/0B0ZUuQ4M71d7a3ZfLXc1R1RLS1U/view?usp=sharing

    sample file from my corpus:
    https://drive.google.com/file/d/0B0ZUuQ4M71d7Q3FiYUNxS2dxM2c/view?usp=sharing

    used arabic corpus of 8 categories with text files of 30 kB each
    my cluster nodes of 3 GB RAM and 4 cores
    Is RAM size could consider an issue or my code structure or nature of used data?
    how to reduce the execution time ?what goes wrong?
    Will you give me hints, guides?

    Best Regards,

    Reply
    1. 0x0FFF Post author

      This question is mainly about data science. Your first step is to find the bottleneck – add intermediate caching and actions on top of them like “count()” to get the idea of where the bottleneck is. If the bottleneck is model training, consider reducing the number of features, for example with SVD (see here: https://en.wikipedia.org/wiki/Latent_semantic_analysis). There might also be other problems, so you need to play with your data a bit more. Personally, for processing 5GB of data I would use a single machine and sklearn+nltk

      Reply
  11. Hanna

    Thank you for the explanation! Do I interpret correctly: if I get OOM errors (heap space), that’s because the executor runs out of user memory, since both storage and execution memory can be spilled on disk?

    Reply
    1. 0x0FFF Post author

      No. Storage memory will be spilled to disk only if you specified persistence level that allows storing the data on disk. If not, it will be just destroyed. Execution memory in turn cannot be forcefully spilled to disk, which I described in the article.
      Also, you cannot evict all the storage memory if you need more executor memory. Your executors would borrow some memory from storage pool if it has some free space available or if storage pool has previously borrowed some memory from execution pool. You cannot borrow memory if your storage memory is occupied by your cached blocks and its size is within initial storage pool size, I provided the calculation for it in article

      Reply
  12. Pingback: spark1.6内存管理 | Codeba

  13. Pingback: Spark1.6内存管理 | SEARU.ORG

  14. qi

    I have similar issue. In my cluster, spark.executor.memory is set to 14G, the spark UI shows 9.8GB. I read an article before, it says:”This is because of the runtime overhead imposed by Scala, which is usually around 3-7%, more or less.” This seems apply to my calculation as (14*1024-300)*0.75*0.95=10001mb or 9.8GB.

    Reply

Leave a Reply