We've aws consumers added as goroutine on the application service's http server. now we're running a buffered channel of size 60 on kubernetes pod with initial cpu: 50m , i.e 0.05 core.
code: for reading sqs messages (in batch of 10 messages)
select {
case <-ctx.Done():
logrus.Debug("Context closed. Stopping to read from queue " + config.Queue)
return nil
default:
result, err := getQueueMessages(ctx, config)
if ok := validateError(err); !ok {
return err
}
if err == nil {
processMessages(ctx, config, concurrencyLimiter, result.Messages)
} else {
logrus.Warnf("Could not get messages from queue. Err = %s", err)
}
}
}```
for processing messages:
deDupMap := map[string]bool{}
for _, message := range messages {
if _, present := deDupMap[*message.MessageId]; present {
logrus.WithFields(logrus.Fields{"message_id": *message.MessageId, "body": *message.Body}).Info("DUPLICATE_MESSAGE_FOUND_IN_BATCH")
} else {
logrus.WithFields(logrus.Fields{
"message_id": *message.MessageId,
"body": *message.Body,
"senderId": message.Attributes["SenderId"],
"ApproximateFirstReceiveTimestamp": message.Attributes["ApproximateFirstReceiveTimestamp"],
"ApproximateReceiveCount": message.Attributes["ApproximateReceiveCount"],
"SentTimestamp": message.Attributes["SentTimestamp"],
}).Info("MESSAGE_PROCESSING_STARTED")
if len(concurrencyLimiter) == cap(concurrencyLimiter) {
logrus.WithFields(logrus.Fields{"message_id": *message.MessageId, "body": *message.Body}).Warn("CHANNEL_LEN_REACHED_CAP")
continue
}
deDupMap[*message.MessageId] = true
concurrencyLimiter <- true
logrus.WithFields(logrus.Fields{"message_id": *message.MessageId, "body": *message.Body}).Warn("OUTSIDE_GO_ROUTINE")
go func() {
logrus.WithFields(logrus.Fields{"message_id": *message.MessageId, "body": *message.Body}).Warn("INSIDE_GO_ROUTINE")
defer func() {
if r := recover(); r != nil {
logrus.Error("Panic during SQS message processing: %s", r)
}
<-concurrencyLimiter
}()
logrus.WithFields(logrus.Fields{"message_id": *message.MessageId, "body": *message.Body}).Info("MESSAGE_HANDLER_INIT")
err := runHandler(config, message.Body, message.MessageId)
logrus.WithFields(logrus.Fields{"message_id": *message.MessageId, "body": *message.Body, "err": err}).Info("MESSAGE_HANDLER_RESPONSE")
if err == nil && config.SevereMessage {
logrus.WithFields(logrus.Fields{"message_id": *message.MessageId, "body": *message.Body}).Info("MESSAGE_PROCESSED_SUCCESSFULLY")
deleteMessage(ctx, config.Client, config.Queue, *message.ReceiptHandle, *message.MessageId)
}
}()
}
}
}```
here mostly the logs for sqs messages in goroutines getting created in method processMessages are not getting missed out. They're getting successfully consumed after 4-5 retries.
Tried to reduce number of concurrent sqs consumers for 0.05 cpuKubernetes from 60 to 10, also tried increasing cpu to 0.5 cpu, resulting in number of retries down from to 2-3 times.
freq of issue where goroutines were not getting executed came down, but the issue still persists.
please help!!