Spring WebFlux SSE connection kept alive by Kubernetes NGINX Ingress after client disconnects

11/20/2021

I have a Spring Boot service that streams updates to the client using Server-Sent Events (SSE). The endpoint to which the client connects is implemented using Spring WebFlux.

To clean up resources (delete an AMQP queue) my service needs to detect when a client closes the EventSource, i.e. terminates the connection. To do so, I register a callback via FluxSink#onDispose(Disposable). Naturally, my SSE Flux sends regular heartbeats to not only prevent the connection from timing out but also to trigger onDispose once the client has disconnected.

@Nonnull
@Override
public Flux<ServerSentEvent<?>> subscribeToNotifications(@Nonnull String queueName) {
    final var queue = createQueue(queueName);
    final var listenerContainer = createListenerContainer(queueName);
    final var notificationStream = createNotificationStream(queueName, listenerContainer);
    return notificationStream
            .mergeWith(heartbeatStream)
            .map(NotificationServiceImpl::toServerSentEvent);
}

@Nonnull
private Flux<NotificationDto> createNotificationStream(
        @Nonnull String queueName,
        @Nonnull MessageListenerContainer listenerContainer) {
    return Flux.create(emitter -> {
        listenerContainer.setupMessageListener(message -> handleAmqpMessage(message, emitter));
        emitter.onRequest(consumer -> listenerContainer.start());
        emitter.onDispose(() -> {
            final var deleted = amqpAdmin.deleteQueue(queueName);
            if (deleted) {
                LOGGER.info("Queue {} successfully deleted", queueName);
            } else {
                LOGGER.warn("Failed to delete queue {}", queueName);
            }
            listenerContainer.stop();
        });
    });
}

This works like a charm locally; the queue is deleted/messages are logged once the client disconnects.

However, when deploying this service to my Kubernetes cluster, onDispose is never called. The SSE stream still works flawlessly, i.e. the client receives all data from the server and the connection is kept alive by the heartbeat.

I'm using a NGINX Ingress Controller to expose my service and it seems as the connection between NGINX and my service is kept alive even after the client disconnects, causing onDispose to never be called. Hence I tried setting the upstream keep-alive connections to 0, but it didn't solve the problem - the service is never notified about the client having closed the connection:

# Source: ingress-nginx/templates/controller-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  labels:
    helm.sh/chart: ingress-nginx-4.0.6
    app.kubernetes.io/name: ingress-nginx
    app.kubernetes.io/instance: ingress-nginx
    app.kubernetes.io/version: 1.0.4
    app.kubernetes.io/managed-by: Helm
    app.kubernetes.io/component: controller
  name: ingress-nginx-controller
  namespace: ingress-nginx
data:
  allow-snippet-annotations: 'true'
  http-snippet: |
    server{
      listen 2443;
      return 308 https://$host$request_uri;
    }
  proxy-real-ip-cidr: 192.168.0.0/16
  use-forwarded-headers: 'true'
  upstream-keepalive-connections: '0' # added this line

What am I missing?


-- IggyBlob
kubernetes
nginx
nginx-ingress
spring
spring-webflux

0 Answers