Long GC Pause on Apache Spark Structured Streaming on Kubernetes

8/27/2018

I'm trying to scale a Structured Streaming pipeline using the Apache Spark 2.3 Scala API running on Kubernetes. The basic flow of the job is like this:

  • Read a static Dataset that contains ~1,000,000 records that map individual source ids to an output aggregation
  • Read a streaming Dataset from Kafka that contains the time series metrics to be aggregated mapped to their source id
  • Repartition each Dataset based on the source id
  • Join the 2 Datasets on source id (This maps the metrics to the correct output aggregation while also filtering out data from kafka that should not be aggregated)
  • Apply watermark
  • Drop duplicates
  • Aggregate the data
  • Write to Kafka output sink

I'm running on Kubernetes and have configured a cluster with 30 executors each with 3 cores. Kafka is currently streaming 600000 metrics per second per source id and is configured with 600 partitions. I am trying aggregate all of them into 10 distinct outputs (i.e, each output aggregation consists of 60000 distinct source ids). I have the pipeline trigger every 10 seconds to process ~6,000,000 records from Kafka. My aggregation windows are 1 minute non-overlapping and I have my watermark set to 30 seconds. Ideally, I'd like a longer watermark to account for late arriving data, but the drop duplicates/watermark stage seems to be a bottleneck particularly when the Garbage Collector is invoked. Here is some data from a recent run of my pipeline:

Processed And Input Rows Per Second

The graph shows that the pipeline keeps up with the input rows per second for about 8-9 minutes, but then the orange line drops below the green line (~10:01 on the time axis) and the pipeline has a hard time keeping up with the input data rate. I looked into the Spark UI for clues as to why the slow down occurred and found that one executor was taking 55 seconds to perform a GC during the drop duplicates/watermark stage. Here are the summary statistics from the stage and a zoom in on the event timeline:

I've tried a number of techniques suggested here, with mixed results. In particular:

  • Kryo serialization seemed to have little effect.
  • Using these settings -XX:+UseG1GC -XX:MaxGCPauseMillis=500, reduces the frequency of long pauses but they still occur.
  • I turned on the GC logs and and processed them via gceasy and tried to follow their recommendations. This indicated that long pause is coming from a Full GC event and the logs do not show the symptom that increasing the number of GC threads would help. The average creation rate is 182.18 mb/sec and the average promotion rate is 49.8 mb/sec
  • I tried reducing the NewRatio to 1, but this resulted in more frequent long pauses with smaller durations (i.e, ~25 seconds per pause instead of 50+ seconds)
  • It's hard to know how much memory my streaming Dataset is using because if I try to cache it, there is an error.

The rest of the memory advice is like "try modifying this parameter or that parameter", but it is tough to try every permutation and it doesn't indicate what behavior I should expect. Can someone point me in the direction of the next steps to follow? I feel like 55 seconds for a GC is unreasonable and there should be some way to tune it so that my job is not hindered by 1 executor.

-- Sean O
apache-spark
apache-spark-sql
garbage-collection
jvm
kubernetes

1 Answer

12/10/2018

So I should have replied to this sooner while the solution was fresh in my mind, but I ended up doing a few things that contributed to decreasing the garbage collection time. I don't remember all of the documentation sources that contributed to me being able to resolve this, but I spent a lot of time researching on SO, the gceasy recommendations, and general Java GC literature. Anyway here's what ended up helping:

  • Limited the number of cores that participate in a full GC event: I believe this was the biggest contributor to increased performance. I noticed that certain executors would have large GC times during a given micro-batch, and other executors on the same kubernetes VM would have large computation times that were close to (if not exactly) the duration of the GC pause. This correlation led me down a research path where I eventually discovered that the JVM (at least for Java 8) gets its defaults for the GC from the underlying kubernetes VM rather than the limited resources dedicated to the container on which the JVM runs. Since each container had a different instance of the JVM, each executor had default GC parameters assuming it was the only JVM running on the underlying kubernetes VM. The GC parameter that specifies the number of threads available for a Full GC event is ParallelGCThreads. This is set by default by the JVM as a percentage of the total number of cores on the VM. For a 32 core kubernetes VM, it ended up being 23, if I recall correctly. So when a Full GC event occurred, the GC would cause contention on the CPUs being used by the other executors which were conducting normal computations. My theory is that this contention was pushing up the GC/computation runtimes that occurred on the same underlying kubernetes VM. For my particular test, I ended up overriding the default parameters for ConcGCThreads (to 1) and ParallelGCThreads( to 5) since I was running 6 executors per 32 core kubernetes VM.
  • Increased the memory on each executor: The gceasy graphs never really showed the memory plateau. It only increased as the pipeline continued to run. I ended up increasing the memory dedicated from each executor to ~15 GB from 8 GB and was getting plateaus around ~10 GB after that. The actual amount of memory you need will probably depend on your code.
  • Enabled string de-duplication: Most of my dataset was strings so this helped decrease the overall memory foot print for my application
  • Modified the initial heap occupancy: This was recommended in gceasy as well as some SO threads.

So here are the final set of JVM parameters I am using after all that. I hope this helps.

-XX:+UseG1GC -XX:MaxGCPauseMillis=500 -XX:InitiatingHeapOccupancyPercent=35 -XX:+UseStringDeduplication -XX:ConcGCThreads=1 -XX:ParallelGCThreads=5
-- Sean O
Source: StackOverflow