I'm trying to scale a Structured Streaming pipeline using the Apache Spark 2.3 Scala API running on Kubernetes. The basic flow of the job is like this:
I'm running on Kubernetes and have configured a cluster with 30 executors each with 3 cores. Kafka is currently streaming 600000 metrics per second per source id and is configured with 600 partitions. I am trying aggregate all of them into 10 distinct outputs (i.e, each output aggregation consists of 60000 distinct source ids). I have the pipeline trigger every 10 seconds to process ~6,000,000 records from Kafka. My aggregation windows are 1 minute non-overlapping and I have my watermark set to 30 seconds. Ideally, I'd like a longer watermark to account for late arriving data, but the drop duplicates/watermark stage seems to be a bottleneck particularly when the Garbage Collector is invoked. Here is some data from a recent run of my pipeline:
Processed And Input Rows Per Second
The graph shows that the pipeline keeps up with the input rows per second for about 8-9 minutes, but then the orange line drops below the green line (~10:01 on the time axis) and the pipeline has a hard time keeping up with the input data rate. I looked into the Spark UI for clues as to why the slow down occurred and found that one executor was taking 55 seconds to perform a GC during the drop duplicates/watermark stage. Here are the summary statistics from the stage and a zoom in on the event timeline:
I've tried a number of techniques suggested here, with mixed results. In particular:
The rest of the memory advice is like "try modifying this parameter or that parameter", but it is tough to try every permutation and it doesn't indicate what behavior I should expect. Can someone point me in the direction of the next steps to follow? I feel like 55 seconds for a GC is unreasonable and there should be some way to tune it so that my job is not hindered by 1 executor.
So I should have replied to this sooner while the solution was fresh in my mind, but I ended up doing a few things that contributed to decreasing the garbage collection time. I don't remember all of the documentation sources that contributed to me being able to resolve this, but I spent a lot of time researching on SO, the gceasy recommendations, and general Java GC literature. Anyway here's what ended up helping:
So here are the final set of JVM parameters I am using after all that. I hope this helps.
-XX:+UseG1GC -XX:MaxGCPauseMillis=500 -XX:InitiatingHeapOccupancyPercent=35 -XX:+UseStringDeduplication -XX:ConcGCThreads=1 -XX:ParallelGCThreads=5