#apache-spark #data-skew #salting

Data Skewness 📊

The observed imbalance or significant disproportion in the distribution of data within specific grouping units, such as partitions, keys, and (primary/secondary) indexes, etc., is referred to as data skew.

Specifically, in the context of big data and distributed computing with Apache Spark, data skew can have a significant impact. Apache Spark, like most engines that process data in parallel, assigns and distributes chunks of data (partitions) across to all available executors.

This uneven distribution of data can lead to poor performance due to resources being underutilized. This scenario, as shown in Figure 1, entails four executors processing a few hundred rows each (best case) while the other four executors are dealing with millions of rows. A significant difference in scales.

Figure 1: Obligatory xkcd-style figure depicting the distribution ($log_{10}$) of data across different partitions.

Figure 1: Obligatory xkcd-style figure depicting the distribution ($log_{10}$) of data across different partitions.

These processes will be visible on Spark’s UI, and the discrepancy in the total duration of each task will be apparent. The summary metrics table resembles something like the following one.

Metric Min 25th percentile Median 75th percentile Max
Duration … X s Y s Z s W s
…

From the percentiles, we can identify the discrepancy, if any. Ideally, we would like the Median to be as close as possible to the Max duration. We can individually examine the Tasks on the corresponding table and identify the Duration for the relevant column (sort by ascending & descending).

An arching but important theme is storage; skewed partitions obviously will also differ greatly in file size. Following our previous example, the corresponding file sizes range from ~7.5KBs (the smallest partition) to ~251MBs (the largest). Given the configuration settings provided by Spark, small partitions will lead to excessive task scheduling overhead and larger ones to memory issues such as out-of-memory (OOM) and disk spillage.

Salting 🧂

The most obvious approach to tackle data skew is to compose new custom partitions that manually mitigate the issue, or even test in a reduced subset of data (while maintaining the same distribution profile), how Adaptive Query Execution (AEQ) performs.

Alternatively, a common approach is salting; deriving a new column based on the current keys, and injecting a random factor. A simple algorithm follows.

val shufflePartitions = 8 // number of available cores locally
spark.conf.set("spark.sql.shuffle.partitions", shufflePartitions)

val saltedDf = df.withColumn("saltKey", (rand() * shufflePartitions).cast("int"))

The resulting data distribution will look something like in the following figure. Pretty much equally distributed across 8 partitions; as many as the cores locally. If possible, any subsequent operations should use the newly composed key.

Figure 2: Data distribution based on the newly-derived salted key (id). More or less the data are uniformly distributed across 8 partitions (in this case, as much as the number of available executors).

Figure 2: Data distribution based on the newly-derived salted key (id). More or less the data are uniformly distributed across 8 partitions (in this case, as much as the number of available executors).