I want to re-partition a dataset and then write it on the destination path. However, my pods are getting evicted due to DiskPressure
. Spark only shows that it lost a worker but when I see the events
in my OpenShift console, I see the that the pod(worker) was evicted.
Here is how I am re-partitioning:
df = df.repartition("created_year", "created_month", "created_day")
df.write.partitionBy("created_year", "created_month", "created_day").mode("overwrite").parquet(dest_path)
There are around 38k partitions:
Job Id ▾
Description
Submitted
Duration
Stages: Succeeded/Total Tasks (for all stages): Succeeded/Total
1
parquet at NativeMethodAccessorImpl.java:0
(kill)parquet at NativeMethodAccessorImpl.java:0 2020/08/11 21:35:46 1.5 h 0/2
2166/38281 (5633 failed)
Spark configurations are as follows:
def create_spark_config(spark_cluster, executor_memory='16g', executor_cores='4', max_cores='16'):
print('Spark cluster is: {}'.format(spark_cluster))
sc_conf = (
pyspark.SparkConf().setMaster(spark_cluster) \
.set('spark.driver.host', HOSTNAME) \
.set('spark.driver.port', 42000) \
.set('spark.driver.bindAddress', '0.0.0.0') \
.set('spark.driver.blockManager.port', 42100) \
.set('spark.executor.memory', '5G') \
.set('spark.driver.memory', '3G') \
.set('spark.sql.parquet.enableVectorizedReader', True) \
.set('spark.sql.files.ignoreCorruptFiles', True)
)
return sc_conf
I am not able to figure out what is causing the DiskPressure, and how can I stop it?
I read some answers and articles about DiskPressure and its handling but they were more generic and not pertaining to Spark.
Spark has 6 workers, each with 5GB of memory and 6 cores.
DiskPressure is a case where disk usage of the containers increases over a large margin such that node on which pod is running faces crunch of disk availability. This crunch would be something like <5-10% of total availability.
In such an event, kubelet sets the DiskPressure status on the node(which inturn is not-ready for scheduling) so newer pods are not scheduled and pods are evicted(which are re-scheduled to other availability) to meet uptime of pods.
Most common cases of facing diskpressure is missing log rotation(debug logs) and other case would be large data being written on a node with limited disk.
Edit: My answer is generic and not specific to spark scenario.