I have a Spring Boot service that streams updates to the client using Server-Sent Events (SSE). The endpoint to which the client connects is implemented using Spring WebFlux.
To clean up resources (delete an AMQP queue) my service needs to detect when a client closes the EventSource, i.e. terminates the connection. To do so, I register a callback via FluxSink#onDispose(Disposable)
. Naturally, my SSE Flux sends regular heartbeats to not only prevent the connection from timing out but also to trigger onDispose
once the client has disconnected.
@Nonnull
@Override
public Flux<ServerSentEvent<?>> subscribeToNotifications(@Nonnull String queueName) {
final var queue = createQueue(queueName);
final var listenerContainer = createListenerContainer(queueName);
final var notificationStream = createNotificationStream(queueName, listenerContainer);
return notificationStream
.mergeWith(heartbeatStream)
.map(NotificationServiceImpl::toServerSentEvent);
}
@Nonnull
private Flux<NotificationDto> createNotificationStream(
@Nonnull String queueName,
@Nonnull MessageListenerContainer listenerContainer) {
return Flux.create(emitter -> {
listenerContainer.setupMessageListener(message -> handleAmqpMessage(message, emitter));
emitter.onRequest(consumer -> listenerContainer.start());
emitter.onDispose(() -> {
final var deleted = amqpAdmin.deleteQueue(queueName);
if (deleted) {
LOGGER.info("Queue {} successfully deleted", queueName);
} else {
LOGGER.warn("Failed to delete queue {}", queueName);
}
listenerContainer.stop();
});
});
}
This works like a charm locally; the queue is deleted/messages are logged once the client disconnects.
However, when deploying this service to my Kubernetes cluster, onDispose
is never called. The SSE stream still works flawlessly, i.e. the client receives all data from the server and the connection is kept alive by the heartbeat.
I'm using a NGINX Ingress Controller to expose my service and it seems as the connection between NGINX and my service is kept alive even after the client disconnects, causing onDispose
to never be called. Hence I tried setting the upstream keep-alive connections to 0
, but it didn't solve the problem - the service is never notified about the client having closed the connection:
# Source: ingress-nginx/templates/controller-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
labels:
helm.sh/chart: ingress-nginx-4.0.6
app.kubernetes.io/name: ingress-nginx
app.kubernetes.io/instance: ingress-nginx
app.kubernetes.io/version: 1.0.4
app.kubernetes.io/managed-by: Helm
app.kubernetes.io/component: controller
name: ingress-nginx-controller
namespace: ingress-nginx
data:
allow-snippet-annotations: 'true'
http-snippet: |
server{
listen 2443;
return 308 https://$host$request_uri;
}
proxy-real-ip-cidr: 192.168.0.0/16
use-forwarded-headers: 'true'
upstream-keepalive-connections: '0' # added this line
What am I missing?