Edit from 2015/12/17: Memory model described in this article is deprecated starting Apache Spark 1.6+, the new memory model is based on UnifiedMemoryManager and described in this article
Over the recent time I’ve answered a series of questions related to ApacheSpark architecture on StackOverflow. All of them seem to be caused by the absence of a good general description of the Spark architecture in the internet. Even official guide does not have that many details and of cause it lacks good diagrams. Same for the “Learning Spark” book and the materials of official workshops.
In this article I would try to fix this and provide a single-stop shop guide for Spark architecture in general and some most popular questions on its concepts. This article is not for complete beginners – it will not provide you an insight on the Spark main programming abstractions (RDD and DAG), but requires their knowledge as a prerequisite.
This is the first article in a series. The second one regarding shuffle is available here. The third one about new memory management model is available here.
Let’s start with the official picture available on the http://spark.apache.org/docs/1.3.0/cluster-overview.html:
As you might see, it has many terms introduced at the same time – “executor”, “task”, “cache”, “Worker Node” and so on. When I started to learn the Spark concepts some time ago, it was almost the only picture about Spark architecture available over the internet and now the things didn’t change much. I personally don’t really like this because it does not show some important concepts or shows them not in the best way.
Let’s start from the beginning. Any, any Spark process that would ever work on your cluster or local machine is a JVM process. As for any JVM process, you can configure its heap size with -Xmx and -Xms flags of the JVM. How does this process use its heap memory and why does it need it at all? Here’s the diagram of Spark memory allocation inside of the JVM heap:
By default, Spark starts with 512MB JVM heap. To be on a safe side and avoid OOM error Spark allows to utilize only 90% of the heap, which is controlled by the spark.storage.safetyFraction parameter of Spark. Ok, as you might have heard of Spark as an in-memory tool, Spark allows you to store some data in memory. If you have read my article here https://0x0fff.com/spark-misconceptions/, you should understand that Spark is not really in-memory tool, it just utilizes the memory for its LRU cache (http://en.wikipedia.org/wiki/Cache_algorithms). So some amount of memory is reserved for the caching of the data you are processing, and this part is usually 60% of the safe heap, which is controlled by the spark.storage.memoryFraction parameter. So if you want to know how much data you can cache in Spark, you should take the sum of all the heap sizes for all the executors, multiply it by safetyFraction and by storage.memoryFraction, and by default it is 0.9 * 0.6 = 0.54 or 54% of the total heap size you allow Spark to use.
Now a bit more about the shuffle memory. It is calculated as “Heap Size” * spark.shuffle.safetyFraction * spark.shuffle.memoryFraction. Default value for spark.shuffle.safetyFraction is 0.8 or 80%, default value for spark.shuffle.memoryFraction is 0.2 or 20%. So finally you can use up to 0.8*0.2 = 0.16 or 16% of the JVM heap for the shuffle. But how does Spark uses this memory? You can get more details on this here (https://github.com/apache/spark/blob/branch-1.3/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala), but in general Spark uses this memory for the exact task it is called after – for Shuffle. When the shuffle is performed, sometimes you as well need to sort the data. When you sort the data, you usually need a buffer to store the sorted data (remember, you cannot modify the data in the LRU cache in place as it is there to be reused later). So it needs some amount of RAM to store the sorted chunks of data. What happens if you don’t have enough memory to sort the data? There is a wide range of algorithms usually referenced as “external sorting” (http://en.wikipedia.org/wiki/External_sorting) that allows you to sort the data chunk-by-chunk and then merge the final result together.
The last part of RAM I haven’t yet cover is “unroll” memory. The amount of RAM that is allowed to be utilized by unroll process is spark.storage.unrollFraction * spark.storage.memoryFraction * spark.storage.safetyFraction, which with the default values equal to 0.2 * 0.6 * 0.9 = 0.108 or 10.8% of the heap. This is the memory that can be used when you are unrolling the block of data into the memory. Why do you need to unroll it after all? Spark allows you to store the data both in serialized and deserialized form. The data in serialized form cannot be used directly, so you have to unroll it before using, so this is the RAM that is used for unrolling. It is shared with the storage RAM, which means that if you need some memory to unroll the data, this might cause dropping some of the partitions stored in the Spark LRU cache.
This is great, because at the moment you know what exactly Spark process is and how it utilizes the memory of its JVM processes. Now let’s switch to the cluster mode – when you start a Spark cluster, how does it really look like? I like YARN so I would cover how it works in YARN, but in general it is the same for any cluster manager you use:
When you have a YARN cluster, it has a YARN Resource Manager daemon that controls the cluster resources (practically memory) and a series of YARN Node Managers running on the cluster nodes and controlling node resource utilization. From the YARN standpoint, each node represents a pool of RAM that you have a control over. When you request some resources from YARN Resource Manager, it gives you information of which Node Managers you can contact to bring up the execution containers for you. Each execution container is a JVM with requested heap size. JVM locations are chosen by the YARN Resource Manager and you have no control over it – if the node has 64GB of RAM controlled by YARN (yarn.nodemanager.resource.memory-mb setting in yarn-site.xml) and you request 10 executors with 4GB each, all of them can be easily started on a single YARN node even if you have a big cluster.
When you start Spark cluster on top of YARN, you specify the amount of executors you need (–num-executors flag or spark.executor.instances parameter), amount of memory to be used for each of the executors (–executor-memory flag or spark.executor.memory parameter), amount of cores allowed to use for each executors (–executor-cores flag of spark.executor.cores parameter), and amount of cores dedicated for each task’s execution (spark.task.cpus parameter). Also you specify the amount of memory to be used by the driver application (–driver-memory flag or spark.driver.memory parameter).
When you execute something on a cluster, the processing of your job is split up into stages, and each stage is split into tasks. Each task is scheduled separately. You can consider each of the JVMs working as executors as a pool of task execution slots, each executor would give you spark.executor.cores / spark.task.cpus execution slots for your tasks, with a total of spark.executor.instances executors. Here’s an example. The cluster with 12 nodes running YARN Node Managers, 64GB of RAM each and 32 CPU cores each (16 physical cores with hyper threading). This way on each node you can start 2 executors with 26GB of RAM each (leave some RAM for system processes, YARN NM and DataNode), each executor with 12 cores to be utilized for tasks (leave some cores for system processes, YARN NM and DataNode). So In total your cluster would handle 12 machines * 2 executors per machine * 12 cores per executor / 1 core for each task = 288 task slots. This means that your Spark cluster would be able to run up to 288 tasks in parallel thus utilizing almost all the resources you have on this cluster. The amount of RAM you can use for caching your data on this cluster is 0.9 spark.storage.safetyFraction * 0.6 spark.storage.memoryFraction * 12 machines * 2 executors per machine * 26 GB per executor = 336.96 GB. Not that much, but in most cases it is enough.
So far so good, now you know how the Spark uses its JVM’s memory and what are the execution slots you have on your cluster. As you might already noticed, I didn’t stop in details on what the “task” really is. This would be a subject of the next article, but basically it is a single unit of work performed by Spark, and is executed as a thread in the executor JVM. This is the secret under the Spark low job startup time – forking additional thread inside of the JVM is much faster that bringing up the whole JVM, which is performed when you start a MapReduce job in Hadoop.
Now let’s focus on another Spark abstraction called “partition”. All the data you work with in Spark is split into partitions. What a single partition is and how is it determined? Partition size completely depends on the data source you use. For most of the methods to read the data in Spark you can specify the amount of partitions you want to have in your RDD. When you read a file from HDFS, you use Hadoop’s InputFormat to make it. By default each input split returned by InputFormat is mapped to a single partition in RDD. For most of the files on HDFS single input split is generated for a single block of data stored on HDFS, which equals to approximately 64MB of 128MB of data. Approximately, because the data in HDFS is split on exact block boundaries in bytes, but when it is processed it is split on the record splits. For text file the splitting character is the newline char, for sequence file it is the block end and so on. The only exception of this rule is compressed files – if you have the whole text file compressed, then it cannot be split into records and the whole file would become a single input split and thus a single partition in Spark and you have to manually repartition it.
And what we have now is really simple – to process a single partition of data Spark spawns a single task, which is executed in task slot located close to the data you have (Hadoop block location, Spark cached partition location).
This information is more than enough for a single article. In the next one I would cover how Spark splits the execution process into stages and stages into tasks, how Spark shuffles the data through the cluster and some more useful things.
This is the first article in a series. The second one regarding shuffle is available here. The third one about new memory management model is available here.
Nice observation.I feel that enough RAM size or nodes will save, despite using LRU cache.I think incorporating Tachyon helps a little too, like de-duplicating in-memory data and some more features not related like speed, sharing, safe. It is just my opinion.
Of course the more RAM you have the faster would be your jobs: more data cached in Linux cache, more data is loaded to RAM in Spark, etc. But in general Tachyon can speed up only in terms of IO when you persist the data, and it is not the main bottleneck in Spark (writing shuffle data to local filesystems usually is).
And I didn’t get your point about deduplication – you can cache the data in Spark or persist it to HDFS, HBase, Cassandra, Tachyon, etc. or do both, it is up to you and persistence level cannot help you in deduplication
Thanks for the wonderful explanation sir.
Pingback: spark resources | We Get Signal
Does spark.shuffle.memoryfraction reserve space in heap’s old generation (like spark.storage.memoryfraction) ?
It depends on your JVM settings. Spark does not control whether the memory is allocated in old or young generation. During the shuffle phase the memory is allocated just as a continuous array of Longs (within a chunks of 128MB, see https://github.com/apache/spark/blob/branch-1.4/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala through https://github.com/apache/spark/blob/branch-1.4/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java and https://github.com/apache/spark/blob/branch-1.4/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java) and it is up to JVM to decide whether it can allocate this space in young generation or should allocate it directly in old.
Very nice description.
Thank you for your very good description of the heap, I have been looking for it since some time.
Be careful with the heap image tough because it is a little misleading. As you said in the text just below the picture spark has two different safetyFraction for shuffle and storage memory with default values to 0.8 and 0.9. In the figure it seems that both are affected by the same safetyFraction. The text is correct anyway.
Moreover the shuffle memory is divided statically between the tasks that run on the executor so each task will have “Shuffle Memory” /”Executor Cores” of memory availabe.
Finally another parameter that I found useful in some situation (big block sizes) is the spark.kryoserializer.buffer.max that define the maximum size of the bufer used by kryo to serialize blocks. The default value is 64 mb but you might want to increase it if you have big blocks. Again each Task will have its own serializer buffer so at most “Executor Cores” * “Kryo Max Buffer” will be used. The memory used by kryo will reside in the free heap.
Another interesting source about this topic is this article by Cloudera:
http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/
I have a query. How many threads run in Executor JVM to execute these tasks. Who runs these threads and how. Does this parameter “executor-cores” also has say in deciding how many threads to be run. Also is number of threads to be run dependent on virtual cores or physical cores and how ?
For the amount of threads – it depends. For instance, the executor running a single task (spark.executor.cores = 1) on my cluster runs in 30 threads. Of course, there is only one task execution thread, all the others are supplementary ones. The task execution thread itself is started by Executor scheduler thread running in executor, and it itself is accessed by the scheduler running on the Driver.
Amount of threads that execute tasks in each executor JVM can be set with “spark.executor.cores” parameter. Its name is ambiguous, in fact it has nothing to do with the amount of cores and just sets the amount of threads to use for the user task processing. It is named so because the best practice for setting this parameters is to use the value equal to the amount of cores on the machine this executor is running (in case of 1 executor per machine).
Number of task executor threads has nothing to do with the amount of cores on machine – you can set spark.executor.cores to 10 and have 10 threads processing the data on your VM with 1 vCPU
I came across this article when I searched for information related to the meaning of executor-core on Yarn. Thanks for the clarification above!
I wonder how can I actually allocate 1 core per task if executor core is actually mapped to thread and not actual vcore? Thank you!
When the execution is managed by YARN, in the latest versions it uses CGroups to control the CPU usage of the executors to ensure they won’t overcommit the CPU resources they’ve requested: more info. But there is no way to make it for specific task – it is completely up to you what and how will be executed in specific task. You can write a code in your function that would spawn a number of threads, and it would be a single task thus you would hurt CPU resources allocation for other tasks running on the same executor
Thanks a lot for this nice article !
Love this series of article.
Can you write something about Blocks ? How are blocks defined in spark?
What do you mean by “blocks”? BlockManager and its internals, partitions? I’m thinking about writing an article on BlockManager, but wondering whether it would be too in-depth to be useful
Thanks very much!
I have some questions hoping for help.
How can I measue the memory usage of a spark application?
Can I measure the memory usage of every stages in a application?
Or can I get tthe specific memory number before and after a stage is executed?
I have been confused for a long time and sincerely looking forward for help.
Really thanks very much!
There is no clear way of doing this. You should:
1. Use the latest version of Spark. With each new version memory monitoring is improved
2. Execution stages are usually separated by shuffles. You can view shuffle statistics in Spark web UI
3. Dump executor heap with jmap and analyze it
4. You can try to force GC before stage, write memory statistics to the log and do the same after the stage execution
Thank you very much! Could you please explain about Spark raw and serialized cache?
Why on Spark core selection of allocation strategy is allowed with persist(allocation_strategy) method, but on SparkSQL there are table.cache() method only with MEMORY_ONLY allocation by default?
Spark RDD cache and Spark DF cache are two different things. For DataFrame “cache” causes data to be cached in optimized columnar structure (InMemoryRelation, check here), while for RDD “cache” is just a mark that data of this RDD should be persisted in specific way. As you might see, after putting the data to columnar structure DF “persist” just calls “persist” for underlying RDD. So the latest version of Spark allows you to persist DF with any strategy
Thanks! How I may control actual DataFrame “cache” size and usage? There are nothing on the “storage” section of WebUI.
It uses the same mechanics as RDD cache, so tune spark.storage.memoryFraction and spark.storage.safetyFraction. Cache usage is simply LRU, on processing RDD partition with persistence attribute and not enough memory to store it, the engine would simply evict least recently used cached partition from memory.
By the way, for DF the default persistence level is MEMORY_AND_DISK (function cacheQuery here)
In order to sizing of LRU cache I want to understand how much memory needed for some table in case of InMemoryColumnar structure. Says, I runs “select * from some_table”; where then I may look into how much memory it takes? Since there are not serialized MEMORY_AND_DISK strategy – does it takes memory with ~K=3.7?
The best option here is to try on your specific table. It is the same for all the columnar compressed storages – typically they deliver from 2x to 8x compression depending on your data. So you cannot have a fair estimation without testing it with your specific data. And even more – reordering of your data and changing partitioning (sharding) logic might affect compression rate for the same data. So experiment is the only option for you. How much data is cached you can see in Spark WebUI
The issue that “Memory Used” says “0.0 B / 707.0 MB” and “Storage” section is empty in case of LRU cache. Storage section contain info only in case of forced “cache table my_table;”
Is there any other way to check memory consumption? Spark 1.3.1
Again, the best place for this kind of detailed questions is StackOverflow, I usually monitor it and sometimes answer the questions. Second best – Spark User List (user@spark.apache.org), I’m subscribed to it as well
Regarding your case, I suspect that you miss the point of “transformation” vs “action”. “Cache table …” in SparkSQL is an action. “cache()” in DF is transformation. Transformation is a lazy computation, thus it has no immediate action unless you call an action on top of this DF.
Hi nice article I am very confused about how threads launched from driver assigned on yarn scheduler. I have a spark job where from driver I spawn around 12 threads from pool of ExecutorService. I am usinng –executor-cores 4 –num-executor 20 –executor-memory 20G. I have YARN nodes with 30 GB and 8 cores and my YARN cluster is having 100 such nodes. My spark job runs slow and many times my executor gets killed by YARN because of memory issue. How do I give optimal resources CPU/memory to my job. Please guide. My Spark job has skewed data to process of around 1TB and involves unavoidable group by and huge shuffle. I am not caching anything so spark.storage.memory is 0.1 and spark.shuffle.memory is 0.5. Please guide thanks much in advance.
The best place for this kind of questions is StackOverflow, try to put as much details as you can: exact error message, settings of YARN, settings of Spark, etc. In general with the right configuration your executor containers would not be killed by YARN. Check yarn.nodemanager.resource.memory-mb, yarn.scheduler.minimum-allocation-mb, yarn.scheduler.increment-allocation-mb, spark.yarn.executor.memoryOverhead, yarn.nodemanager.vmem-pmem-ratio
Hi. Very informative post indeed as it shares about the sparks internal’s more than the official site in a concise manner. Going per the above logic , i have increased by spark-shell’s memory to 1g , but i can still see the cache-able memory to be 265.1 Mb. Is this right behaviour or am i missing something here. I am using spark 1.4 .
You are missing something. You can check “storage memory” on the “executors” tab of the Spark Web UI. It is calculated as spark.storage.memoryFraction * spark.storage.safetyFraction * Runtime.getRuntime.maxMemory. As of the number you get, it seems that you run Spark with 512MB of heap
Amazing !!!, I am just halfway through, but could not stop myself thanking you for this wonderful write up. Hats off … One of the best written article in any subject.
Thank you for the post!
Could you please clarify the partitioning implementation. Is there is a guarantee that all pieces of data which related to the given partition (suppose I partitioned my data with user % number of partitions), will be handled on the same node and in a single thread at the time in the spark? So, if it’s true, I don’t need implement any kind of additional synchronization or atomic operations. Or I must keep in mind that pieces of the data related to the same partitions will be handled completely parallel?
It really depend on your use case. When you create a PairRDD with a key of “user” as you proposed and forced its shuffle on this field (by calling “partitionBy” function by example), then you guarantee that all the data points for the same key would be in the same partition and you don’t need to implement anything additional. What is your use case?
My question is very close to above so I’ll post it here:
1) let’s say I use rdd.mapPartitions(func). Should I implement func logic as thread safe?
2) spark.task.cpus and task is not covered in next post (about shuffle), so I’ll ask here: how does sparks task utilises this cpus?
1) Yes, because you don’t know how many parallel tasks would be executed inside the single JVM. Best practice is not to have any shared state at all or have it in external system like RDBMS
2) Spark does not utilize them in any way. If you say Spark that your executor has 8 cores and spark.task.cpus for your job is 2, then it would run 4 concurrent tasks on this executors. If you set spark.task.cpus to 3, it would run only 2 concurrent ones. It will not enforce parallelism for this “cpus” in any way, it is just a declaration of how your own code would work and how much resource it needs
It is calculated as “Heap Size” * spark.shuffle.safetyFraction * spark.shuffle.memoryFraction. Default value for spark.shuffle.safetyFraction is 0.8 or 80%, default
Shouldn’t the 0.8 be 0.9 or 90% ?
Please, take a look here: spark source code – at the moment spark.shuffle.safetyFraction is 0.8 by default
Clear, concise explanation. Love the diagrams
in documentation (http://spark.apache.org/docs/latest/configuration.html)
i found that
spark.storage.unrollFraction is -> Fraction of spark.storage.memoryFraction
i.e
unroll” memory=0.2*0.6
is it true
Thank you for your comment. Yes, I think it was mostly a misprint for the article, because the picture clearly states that unroll is 20% of storage, but in the text I missed this somehow. Thank you for noting this. You can check the related code here: https://github.com/apache/spark/blob/branch-1.6/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
Thank you for your great article. There is one part which I do not understand.
You mentioned
spark.storage.memoryFraction controls how much data we can cache,
and
the amount of RAM that is allowed to be utilized by unroll process is spark.storage.unrollFraction * spark.storage.memoryFraction * spark.storage.safetyFraction
I think there is some problem in the memory setup, as it is necessary to have memory for unroll, but not necessary to have memory for caching.
What if I set spark.storage.memoryFraction to 0, as I do not cache or persists as data. Will the spark crash as there will be no memory for unrolling data in serialized form?
Spark 1.5.x and older would tell you in this case: “Failed to reserve initial memory threshold of ${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory” (https://github.com/apache/spark/blob/branch-1.6/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala).
Unroll memory is allocated in the storage memory pool, so if your pool is empty it would fail to acquire enough memory
But you should pay attention to spark.memory.useLegacyMode – the things have changed and now Spark has a different memory manager called UnifiedMemoryManager (https://github.com/apache/spark/blob/branch-1.6/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala), which does not have a static boundary between storage and shuffle memory, and just allocates unroll memory from storage pool on the same conditions like the other storage pool allocation requests (for caching new block for example), so there is no border and unrolling process might evict everything from the cache
Thank you for your clear explanation
Thanks for great article and keeping answering questions!
I’m trying to understand Spark memory regions(e.g. how heap is divided), and I’m kind confused because of different information here, in cloudera article(http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/) and in learning spark.
It’s clear that:
1) there is RDD storage: the place where CACHED RDDs reside. It’s calculated like this: RddCacheMem = spark.executor.memory * spark.storage.safetyFraction * spark.storage.memoryFraction
2) there is Shuffle and aggregation buffer: the place where shuffle data reside and intermediate results of aggregation. If I’m not mistaken it’s calculated like this:
ShuffleMem = spark.executor.memory * spark.shuffle.safetyFraction * spark.shuffle.memoryFraction
3) this is the place of my confusion:
In Learning Spark it is said that all other part of heap is devoted to ‘User code’ (20% by default). In your article there is no such a part of memory. But you have unrollFraction(which is part of RDD storage, UnrollMem = spark.executor.memory * spark.storage.safetyFraction * spark.storage.memoryFraction * spark.storage.unrollFraction). And in cloudera article there is a formula for task memory:TaskMem = (spark.executor.memory * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction) / spark.executor.cores.
Can you please clarify what this 3 points are about, how can I change ‘User code’ memory. Thanks!
As I describen in the previous comment, the things has changed since my article. Please, find the configuration paramters for the new memory manager (http://spark.apache.org/docs/1.6.0/configuration.html) and the related source code (https://github.com/apache/spark/blob/branch-1.6/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala)
Regarding your questions:
1) Yes, you are right about this. This is valid for Spark prior to 1.6.0. Starting 1.6.0 you got a single memory space for both cache and shuffle, and the amount is calculated as (spark.executor.memory – RESERVED_SYSTEM_MEMORY_BYTES * 1.5) * spark.memory.fraction, where RESERVED_SYSTEM_MEMORY_BYTES is hardcoded for 300MB
2) Yes, you are right, this is how it was prior to 1.6.0. Also for you to mention that by default spark.storage.safetyFraction is not equal to spark.shuffle.safetyFraction, which might be a bit confusing
3) Learning Spark is right. Prior to 1.6.0, you got UsableRam = TotalRam – ShuffleMem – RddCacheMem. If you get rid of “safety” part, they would be equal to 60% and 20% of your RAM by default, which means usable part would be 100%-60%-20%=20%. But getting into account safety, you would get 1 – 0.6*0.9 – 0.2*0.8 = 0.3 or 30%. If you take a look at the diagram I draw for the memory, you would see blocks for “Storage” and “Shuffle” – everything beyond them is for the use of your application code. Cloudera’s article has a mistake there – this memory is available for “shuffle”, and as you might see from my article regarding different shuffle implementations in Spark, it is utilized in shuffle phase by Spark. And the second mistake in their formula is that they don’t put into account spark.task.cpus.
Starting Spark 1.6.0, the amount of usable space is UsableRam = spark.executor.memory – (spark.executor.memory – RESERVED_SYSTEM_MEMORY_BYTES * 1.5) * spark.memory.fraction = spark.executor.memory * (1.0 – spark.memory.fraction) – 1.5 * RESERVED_SYSTEM_MEMORY_BYTES * spark.memory.fraction. With the default value of spark.memory.fraction = 0.75, you would get UsableRam = 0.25 * spark.executor.memory – 337.5MB
You should also understand that this memory is not used always. For instance, if you got no cached RDDs, your cache space would be completely empty and you can use up to 100% of your RAM (but shuffle would require some after all)
To extend the user code memory, simply shrink the memory allocated to caching and shuffling with reducing spark.storage.memoryFraction and spark.shuffle.memoryFraction (for Spark < 1.6.0), and by reducing spark.memory.fraction (for Spark >= 1.6.0)
Great thanks, now everything in its place)
Hi, this is the best article I’ve read on Spark resource usage, thanks for sharing!
I’m a PySpark user and interested in your diagram of PySpark cluster here https://0x0fff.com/wp-content/uploads/2015/11/Spark-Architecture-JD-Kiev-v04.pdf.
Since as you pictured the drivers and executors in PySpark and in “original” Spark have different architectures, would you mind explaining a bit more on:
1. What do the Worker Monitors (for both driver and executors) do during job execution? What about the Python Workers?
2. When is Python Workers’ memory used (spark.python.worker.memory)? How is it different from spark.executor.memory or spark.driver.memory?
3. Is there any difference in Worker Monitors’ / Python Workers’ role when I use DataFrame as opposed to RDD?
Thanks again!
These are not the easiest questions to answer. I’ll give it a try
1. Python Driver maintains Spark context and tightly works with Java Driver process, integrating with it using Py4J (see here: https://github.com/apache/spark/blob/master/python/pyspark/context.py), which is basically a solution to pass calls to java though socket and receive results from it through another socket. For example, “version()” call is actually passed to Java (https://github.com/apache/spark/blob/master/python/pyspark/context.py#L316). Regarding the Python Workers on the executors – these are the processes managed by Scala PythonRDD (https://github.com/apache/spark/blob/branch-1.6/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala#L67) that creates Python processes (https://github.com/apache/spark/blob/branch-1.6/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala#L101) to process the data in Python with the functions you specified, and it also brings up Monitor to monitor the status of Python process (https://github.com/apache/spark/blob/branch-1.6/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala#L307). And here is the code of Python Worker that is invoked when some processing in Python is required (https://github.com/apache/spark/blob/branch-1.6/python/pyspark/worker.py)
2. Python Worker memory is used in functions you pass to the transformations as parameters, but this function is executed just as iterator (https://github.com/apache/spark/blob/branch-1.6/python/pyspark/worker.py#L105), so the memory is mostly not used unless you have big broadcast variables or maintaining big global state in the function (even though it is not recommended)
3. Yes, when you use DataFrame all the code is passed to Java Executor and is being processed there, no data is streamed to Python. Plus DataFrame goes through optimization steps just like SQL queries, this is why it works faster. Plus DataFrame is stored in special columnar format with “zone map”-like index, so its processing happens even faster. In general, when you use PySpark with DataFrame calculations, you might not have Python workers on the executor nodes at all, and thus no monitors for them
Thanks for the detailed answers. Now I have more knowledge to keep in mind when writing efficient PySpark programs!
When I create a table using a file from S3, how can I determine the distribution of that data in separate nodes? Can I distribute data based on a particulae column of a table?
When you read data in Spark, regardless the source (HDFS, S3, etc.), you cannot determine the distribution of data in separate nodes in any way. It completely depends on InputFormat used to read the data, and InputFormat reads the data by InputSplits, which give no information about the data contents. But after the data is read into DataFrame, you can use this tips to repartition it as you need: http://stackoverflow.com/questions/30995699/how-to-define-partitioning-of-a-spark-dataframe
Thanks for sharing, I have a question. how Spark store RDD in memory at cluster environment.
does it distribute into memory of each node? or there is shared memory at cluster which can be access for all the node
if it is under node’s memory, how node B task can access node A’s memory? is there some way like how HDFS manage disk to manage the memory which from different node?
No, there is no shared memory. And Spark is not an in-memory tool, by default it does not store anything in memory. It can after all cache some RDD partitions in memory. Partition is in fact a chunk of the source file’s data. Each node stores some partitions in each memory, and only driver knows which node stores which part of the source file. HDFS has no direct relation with the way Spark manages memory, however when you read data from HDFS you use related InputFormats, and in most cases they would generate a single partition for a single HDFS block of data, and this block would most likely be stored on the same node it physically resides in HDFS. However, none of each of these statements can be false under specific conditions, you need to read related guides to understand it better
Thanks for explanation 🙂
Thanks for this; Came here searching for the communication protocol between nodes (obviously not JMS message queue) Came to know from some links that it uses Netty ; If you could update that in the architecture picture it would be great
I will consider this, but at the moment the diagram does not show the layer responsible for communication between the processes
I read your bolg. That is wonderful and very informative. I am working on spark. My concern is about security. On which layer we have security implemented and where we can improve security more. Could you please let me know the analysis.
Unfortunately, I’m not nearly a security expert. But I can say that as well as many distributed systems, Spark was designed without security being in mind from the beginning. Usually, security is enterprise requirement, while Spark appears to be a university research project initially.
After all, I think the best place to start is official documentation: https://spark.apache.org/docs/latest/security.html
Great post, i wonder when read partition data from hdfs, where are the data location stored, is it ispark.storage.safetyFraction RAM?
It is not stored. Spark requests specific input split, input split translates read request to block-offset address and passes it to HDFS daemon, HDFS daemon opens specific block file, searches to specific offset and starts reading the data, passing it directly to Spark process. Spark process in turn processes it in place and transfers processing results downstream. Nothing is cached unless you manually specify that you want to cache something
Reference to “Spark process in turn processes it in place”, so “in place” will occupy which one memory. What’s the task processing partition data workflow, does it read all partition data, and then process all partition data together. Or does it read one line and then process one line and then transfers processing results downstream?
Thanks a lot!
I have one question which I cannot find the answer to: “Why do spark and hadoop use JVM as their execution environment?”
Thank you again in advance.
Pingback: Spark体系架构 | 36大数据
Hello
Many many thanks for your deeply-detailed articles, they are very valuable.
In your example I’m wondering what is the point of using “2 executors with 26GB of RAM each […] each executor with 12 cores” instead of a big one with 24 CPU & 52 GB RAM ?
dud
Thanks for this wonderful explanation, it is very useful for us to do the spark tuning:-)
Hi,
Thanks for such an explnatory post.
I have a question on the number of executors and worker nodes
As per my assumption, the number of worker nodes is the cluster we declare in the slaves and the number of executors is a JVM process inside each worker node.
Please suggest.
Great post! help me a lot. Thanks.
Thanks for the nice post series!
I have a question about ‘reduce’ phase. Is it fetches data in the same fashion as MapReduce (fetches into memory, saves on disk if it doesn’t fit, merges and so on) or in some other way? I’m asking because I observe OOM during reduce phase often like it doesn’t spill data into disk
Hi, if storage+shuffle memories take 60% + 20% from safe memory (90%) does it means that only 10% of safe memory remains for java objects and tasks computing ?
Thanks !
Pingback: Spark资源 – 数据驱动的世界
Pingback: Spark Architecture: Shuffle – sendilsadasivam
Pingback: Apache Spark 内存管理详解 - CAASLGlobal
Pingback: Spark的效能調優 - 程序員的後花園
Hi,
I was going through your articles on spark memory management,spark architecture etc. I got confused over one thing
In this article you said that
” if the node has 64GB of RAM controlled by YARN (yarn.nodemanager.resource.memory-mb setting in yarn-site.xml) and you request 10 executors with 4GB each, all of them can be easily started on a single YARN node even if you have a big cluster.”
But if I have created an RDD with say 10 partitions(So 10 worker nodes) , then say I have asked for 10 executors with 4GB each, In this case ,whether resource manger of YARN will provide 10 containers(JVM) which will be executed in each of the worker nodes OR all 10 partition will reside in a single worker node and these 10 containers will be executed in parallel on the same worker node?
Please Help me with a reply. And Thanks a ton for your detailed explanations which helped me a looottt !!!
You are the master of spark Alexey Grishchenko
Дякую
Pingback: Spark 内存管理详解 | 麦叶
Pingback: Spark1.5堆内存分配 | 麦叶
Pingback: 那些年,我的书签 - 南宁开发者 - 中小企业信息化方案研究
Pingback: Apache Spark 内存管理详解(转载) | 麦叶
Great article and well written. Thank you. Have a question: does explanation still apply to Spark 2.X.
On my running Spark instances, I am not able to find the properties mentioned in the article, e.g., spark.storage.safetyFraction
Seems like this article is little outdated in terms of memory configuration, Most of the memory conf parameters are deprecated. Do you have updated article?
Pingback: [Spark性能调优] 第四章 : Spark Shuffle 中 JVM 内存使用及配置内幕详情 - 算法网
Pingback: 『 读书笔记 』4月读书总结|博文推荐 | FIXBBS
Pingback: Spark 内存分配规则 – FIXBBS
Pingback: Apache Spark 内存管理详解 - Java天堂
Pingback: [Spark性能调优] 第四章 : Spark Shuffle 中 JVM 内存使用及配置内幕详情 - Java天堂
Pingback: PySpark Basic Exercises II – From B To A