Spark study notes: Spark Optimization
Executor Memory is divided into 3 Segments
- Spark Memory
- User Memory
- Reserved Memory
Of these segments, only execution memory is actually used for executing the tasks. These compartments should be properly configured for running the tasks efficiently and without failure.
Spark configuration parameters
spark.executor.memory
– The memory size of the Spark Executorspark.executor.cores
– Number of vCores for an Executorspark.driver.memory
– The memory size of the driverspark.driver.cores
– Number of vCores for a Driverspark.executor.instances
– Number of Executorsspark.default.parallelism
– The default number of partitions in resilient distributed datasets (RDDs) returned by transformations like join, reduceByKey, and parallelize when no partition number is set by the user.
Cluster Details
No. of Instance = 20
vCores per Instance = 48
Memory per Instance = 384GB
Total vCores = 20 * 48 = 960vCores
Total Memory = 20 * 384 = 7680GB
Parameter Setting
Executor Memory
spark.executor.cores
Assigning executors with a large number of virtual cores leads to a low number of executors and reduced parallelism. Assigning a low number of virtual cores leads to a high number of executors, causing a larger amount of I/O operations. Based on historical data, we suggest that you have 5 virtual cores for each executor to achieve optimal results in any sized cluster.spark.executors.cores = 5
Calculation
spark.executors.cores = 5
Executors per System
1 vCore is reserved for the system
Executors per system = ( vCores per Instance — 1)/ spark.executors.cores
Executors per system = (48–1)/ 5 = 47 / 5 = 9 (rounded down)
Memory per Executor
1GB is reserved for System daemons
Total executor memory = Total RAM per system / number of executors per system
Total executor memory = (384–1) / 9 = 42GB (rounded down)
Executor Memory Overhead
This Total executor memory includes the executor memory and overhead (spark.yarn.executor.memoryOverhead).
Assign 10 percent(or minimum 384MB) from this Total executor memory to the memory overhead and the remaining 90 percent to executor memory.spark.yarn.executor.memoryOverhead
= total executor memory * 0.10spark.yarn.executor.memoryOverhead
= 42 * 0.1 = 5GB (rounded up)spark.executors.memory
= total executor memory * 0.90spark.executors.memory
= 42 * 0.9 = 37GB (rounded down)
Driver Memory
spark.driver.memory
It is recommended to set this equal to spark.executors.memory.spark.driver.memory = spark.executors.memory
spark.driver.memory = 42GB
Driver Cores
spark.driver.cores
It is recommended to set this equal to spark.executors.cores.spark.driver.cores= spark.executors.cores
spark.driver.cores= 9 vCores
Executor Instances
spark.executor.instances
Calculate this by multiplying the number of executors and the total number of systems. Leave one executor for the driver.spark.executors
= (Executors per system * Number of systems) minus 1 for the driverspark.executors = (9 * 20) - 1 = 179
Default Parallelism
spark.default.parallelism
2-3 times the total number of vCores. Try increasing it to 4 times and check the performance.spark.default.parallelism = spark.executors. * spark.executors.cores * 2
spark.default.parallelism = 179 * 5 * 2 = 1,790
Warning: Although this calculation gives partitions of 1,700, we recommend that you estimate the size of each partition and adjust this number accordingly by using coalesce or repartition.
In case of dataframes, configure the parameter spark.sql.shuffle.partitions along with spark.default.parallelism.
Garbage Collector
Always set up a garbage collector when handling the large volume of data through Spark.
You can use multiple garbage collectors to evict the old objects and place the new ones into the memory. However, the latest Garbage First Garbage Collector (G1GC) overcomes the latency and throughput limitations with the old garbage collectors. The parameter -XX:+UseG1GC specifies that the G1GC garbage collector should be used. (The default is -XX:+UseParallelGC.)
To understand the frequency and execution time of the garbage collection, use the parameters -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps.
To initiate garbage collection sooner, set InitiatingHeapOccupancyPercent to 35 (the default is 0.45).
Doing this helps avoid potential garbage collection for total memory, which can take a significant amount of time.
Example:"spark.executor.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
"spark.driver.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",