SSE broken after deploying with Kubernetes and connecting via Ingress

10/25/2019

I've a ReactJS client which uses EventStream and a golang backend which implements SSE.

Everything seemed to work when I connected my browser to the backend running on localhost, as well as when my backend was running on k8s with port forwarding.

As soon as I created an ingress with a hostname (so that I don't have to port-forward all the time) SSE stopped working. I still see that the client sends the request and this request is received and registered by the backend. However, when and event is sent, it never arrives to my ReactJS app.

I'm attaching the code for my backend SSE implementation:

package sse

import (
    "encoding/json"
    "fmt"
    "net/http"
    "time"

    "go.uber.org/zap"

    "github.com/talon-one/towers/controller/api/log"
)

// the amount of time to wait when pushing a message to
// a slow client or a client that closed after `range clients` started.
const patience time.Duration = time.Second * 2

type customerStateUpdate struct {
    sseEvent
    CustomerName  string `json:"customer_name"`
    CustomerState string `json:"customer_state"`
}

type contentUpdate struct {
    sseEvent
}
type sseEvent struct {
    EventType string `json:"event_type"`
}
type Broker struct {

    // Events are pushed to this channel by the main events-gathering routine
    Notifier chan []byte

    // New client connections
    newClients chan chan []byte

    // Closed client connections
    closingClients chan chan []byte

    // Client connections registry
    clients map[chan []byte]bool

    log *log.Logger
}

func NewBroker(log *log.Logger) (broker *Broker) {
    // Instantiate a broker
    broker = &Broker{
        Notifier:       make(chan []byte, 1),
        newClients:     make(chan chan []byte),
        closingClients: make(chan chan []byte),
        clients:        make(map[chan []byte]bool),
        log:            log.With(zap.String("component", "SSE")),
    }

    // Set it running - listening and broadcasting events
    go broker.listen()
    return
}

func (broker *Broker) HandleContentChange() error {
    event := contentUpdate{
        sseEvent: sseEvent{EventType: "contentUpdate"},
    }
    payload, err := json.Marshal(&event)
    if err != nil {
        return err
    }
    broker.Notifier <- payload
    return nil
}

func (broker *Broker) HandleCustomerStateChange(name, state string) error {
    event := customerStateUpdate{
        sseEvent:      sseEvent{EventType: "customerStateUpdate"},
        CustomerName:  name,
        CustomerState: state,
    }

    broker.log.Info("Sending SSE to registered clients", zap.String("name", name), zap.String("state", state))

    payload, err := json.Marshal(&event)
    if err != nil {
        return err
    }
    broker.Notifier <- payload
    return nil
}

func (broker *Broker) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
    // Make sure that the writer supports flushing.
    //
    flusher, ok := rw.(http.Flusher)

    if !ok {
        http.Error(rw, "Streaming unsupported!", http.StatusInternalServerError)
        return
    }

    rw.Header().Set("Content-Type", "text/event-stream")
    rw.Header().Set("Cache-Control", "no-cache")
    rw.Header().Set("Connection", "keep-alive")
    rw.Header().Set("Access-Control-Allow-Origin", "*")

    // Each connection registers its own message channel with the Broker's connections registry
    messageChan := make(chan []byte)

    // Signal the broker that we have a new connection
    broker.newClients <- messageChan

    // Remove this client from the map of connected clients
    // when this handler exits.
    defer func() {
        broker.closingClients <- messageChan
    }()
    notify := rw.(http.CloseNotifier).CloseNotify()

    for {
        select {
        case <-notify:
            return
        case msg := <-messageChan:
            // Write to the ResponseWriter
            // Server Sent Events compatible
            fmt.Fprintf(rw, "data: %s\n\n", msg)

            // Flush the data immediately instead of buffering it for later.
            flusher.Flush()
        }
    }

}

func (broker *Broker) listen() {
    for {
        select {
        case s := <-broker.newClients:

            // A new client has connected.
            // Register their message channel
            broker.clients[s] = true
            broker.log.Info("Client added", zap.Int("current_count", len(broker.clients)))
        case s := <-broker.closingClients:

            // A client has detached and we want to
            // stop sending them messages.
            delete(broker.clients, s)
            broker.log.Info("Client removed", zap.Int("current_count", len(broker.clients)))

        case event := <-broker.Notifier:
            // We got a new event from the outside!
            // Send event to all connected clients
            for clientMessageChan := range broker.clients {
                select {
                case clientMessageChan <- event:
                case <-time.After(patience):
                    broker.log.Info("Skipping client")
                }
            }
        }
    }

}

And in my ReactJS app:

export default class CustomersTable extends Component {
  constructor(props) {
    super(props)
    this.eventSource = new EventSource('/v1/events')
  }

  updateCustomerState(e) {
    let event = JSON.parse(e.data)
    switch (event.event_type) {
      case 'customerStateUpdate':
        let newData = this.state.customers.map(item => {
          if (item.name === event.customer_name) {
            item.k8sState = event.customer_state
          }
          return item
        })
        this.setState(Object.assign({}, { customers: newData }))
        break
      case 'contentUpdate':
        this.reload()
        break
      default:
        break
    }
  }

  componentDidMount() {
    this.setState({ isLoading: true })
    ReactModal.setAppElement('body')
    this.reload()
    this.eventSource.onmessage = e => this.updateCustomerState(e)
  }

  componentWillUnmount() {
    this.eventSource.close()
  }
...
-- AlexGordon
eventsource
go
kubernetes
reactjs
server-sent-events

1 Answer

4/9/2020

I did my SSE app working on Nginx Ingress using:

   annotations:
    nginx.ingress.kubernetes.io/proxy-read-timeout: "21600"
    nginx.ingress.kubernetes.io/eventsource: "true"
-- Wagner Caixeta
Source: StackOverflow