Spark study notes: Spark Optimization

surendranadh kurra
3 min readJul 16, 2020

--

Executor Memory is divided into 3 Segments

  1. Spark Memory
  2. User Memory
  3. Reserved Memory

Of these segments, only execution memory is actually used for executing the tasks. These compartments should be properly configured for running the tasks efficiently and without failure.

Spark configuration parameters

spark.executor.memory – The memory size of the Spark Executor
spark.executor.cores – Number of vCores for an Executor
spark.driver.memory – The memory size of the driver
spark.driver.cores – Number of vCores for a Driver
spark.executor.instances – Number of Executors
spark.default.parallelism – The default number of partitions in resilient distributed datasets (RDDs) returned by transformations like join, reduceByKey, and parallelize when no partition number is set by the user.

Cluster Details

No. of Instance = 20
vCores per Instance = 48
Memory per Instance = 384GB
Total vCores = 20 * 48 = 960vCores
Total Memory = 20 * 384 = 7680GB

Parameter Setting

Executor Memory

spark.executor.cores
Assigning executors with a large number of virtual cores leads to a low number of executors and reduced parallelism. Assigning a low number of virtual cores leads to a high number of executors, causing a larger amount of I/O operations. Based on historical data, we suggest that you have 5 virtual cores for each executor to achieve optimal results in any sized cluster.
spark.executors.cores = 5

Calculation

spark.executors.cores = 5

Executors per System
1 vCore is reserved for the system
Executors per system = ( vCores per Instance — 1)/ spark.executors.cores
Executors per system = (48–1)/ 5 = 47 / 5 = 9 (rounded down)

Memory per Executor
1GB is reserved for System daemons
Total executor memory = Total RAM per system / number of executors per system
Total executor memory = (384–1) / 9 = 42GB (rounded down)

Executor Memory Overhead
This Total executor memory includes the executor memory and overhead (spark.yarn.executor.memoryOverhead).
Assign 10 percent(or minimum 384MB) from this Total executor memory to the memory overhead and the remaining 90 percent to executor memory.
spark.yarn.executor.memoryOverhead = total executor memory * 0.10
spark.yarn.executor.memoryOverhead = 42 * 0.1 = 5GB (rounded up)
spark.executors.memory = total executor memory * 0.90
spark.executors.memory = 42 * 0.9 = 37GB (rounded down)

Driver Memory

spark.driver.memory
It is recommended to set this equal to spark.executors.memory.
spark.driver.memory = spark.executors.memory
spark.driver.memory = 42GB

Driver Cores

spark.driver.cores
It is recommended to set this equal to spark.executors.cores.
spark.driver.cores= spark.executors.cores
spark.driver.cores= 9 vCores

Executor Instances

spark.executor.instances
Calculate this by multiplying the number of executors and the total number of systems. Leave one executor for the driver.
spark.executors = (Executors per system * Number of systems) minus 1 for the driver
spark.executors = (9 * 20) - 1 = 179

Default Parallelism

spark.default.parallelism
2-3 times the total number of vCores. Try increasing it to 4 times and check the performance.
spark.default.parallelism = spark.executors. * spark.executors.cores * 2
spark.default.parallelism = 179 * 5 * 2 = 1,790

Warning: Although this calculation gives partitions of 1,700, we recommend that you estimate the size of each partition and adjust this number accordingly by using coalesce or repartition.
In case of dataframes, configure the parameter spark.sql.shuffle.partitions along with spark.default.parallelism.

Garbage Collector

Always set up a garbage collector when handling the large volume of data through Spark.
You can use multiple garbage collectors to evict the old objects and place the new ones into the memory. However, the latest Garbage First Garbage Collector (G1GC) overcomes the latency and throughput limitations with the old garbage collectors. The parameter -XX:+UseG1GC specifies that the G1GC garbage collector should be used. (The default is -XX:+UseParallelGC.)
To understand the frequency and execution time of the garbage collection, use the parameters -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps.
To initiate garbage collection sooner, set InitiatingHeapOccupancyPercent to 35 (the default is 0.45).
Doing this helps avoid potential garbage collection for total memory, which can take a significant amount of time.
Example:
"spark.executor.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",

"spark.driver.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",

--

--

surendranadh kurra
0 Followers

Hi, I am Surendranadh. Bigdata consultant and trainer. Work on Hadoop, Spark and it's ecosystem projects.