aws consumers in golang

11/11/2021

We've aws consumers added as goroutine on the application service's http server. now we're running a buffered channel of size 60 on kubernetes pod with initial cpu: 50m , i.e 0.05 core.

code: for reading sqs messages (in batch of 10 messages)

 		select {
 		case <-ctx.Done():
 			logrus.Debug("Context closed. Stopping to read from queue " + config.Queue)
 			return nil
 		default:
 			result, err := getQueueMessages(ctx, config)
 			if ok := validateError(err); !ok {
 				return err
 			}
 			if err == nil {
 				processMessages(ctx, config, concurrencyLimiter, result.Messages)
 			} else {
 				logrus.Warnf("Could not get messages from queue. Err = %s", err)
 			}
 		}
 	}```

for processing messages:

	deDupMap := map[string]bool{}
	for _, message := range messages {
		if _, present := deDupMap[*message.MessageId]; present {
			logrus.WithFields(logrus.Fields{"message_id": *message.MessageId, "body": *message.Body}).Info("DUPLICATE_MESSAGE_FOUND_IN_BATCH")
		} else {
			logrus.WithFields(logrus.Fields{
				"message_id": *message.MessageId,
				"body": *message.Body,
				"senderId": message.Attributes["SenderId"],
				"ApproximateFirstReceiveTimestamp": message.Attributes["ApproximateFirstReceiveTimestamp"],
				"ApproximateReceiveCount": message.Attributes["ApproximateReceiveCount"],
				"SentTimestamp": message.Attributes["SentTimestamp"],
			}).Info("MESSAGE_PROCESSING_STARTED")

			if len(concurrencyLimiter) == cap(concurrencyLimiter) {
				logrus.WithFields(logrus.Fields{"message_id": *message.MessageId, "body": *message.Body}).Warn("CHANNEL_LEN_REACHED_CAP")
				continue
			}
			deDupMap[*message.MessageId] = true
			concurrencyLimiter <- true
			logrus.WithFields(logrus.Fields{"message_id": *message.MessageId, "body": *message.Body}).Warn("OUTSIDE_GO_ROUTINE")
			go func() {
				logrus.WithFields(logrus.Fields{"message_id": *message.MessageId, "body": *message.Body}).Warn("INSIDE_GO_ROUTINE")
				defer func() {
					if r := recover(); r != nil {
						logrus.Error("Panic during SQS message processing: %s", r)
					}
					<-concurrencyLimiter
				}()

				logrus.WithFields(logrus.Fields{"message_id": *message.MessageId, "body": *message.Body}).Info("MESSAGE_HANDLER_INIT")
				err := runHandler(config, message.Body, message.MessageId)
				logrus.WithFields(logrus.Fields{"message_id": *message.MessageId, "body": *message.Body, "err": err}).Info("MESSAGE_HANDLER_RESPONSE")
				if err == nil && config.SevereMessage {
					logrus.WithFields(logrus.Fields{"message_id": *message.MessageId, "body": *message.Body}).Info("MESSAGE_PROCESSED_SUCCESSFULLY")
					deleteMessage(ctx, config.Client, config.Queue, *message.ReceiptHandle, *message.MessageId)
				}
			}()
		}
	}
}```

here mostly the logs for sqs messages in goroutines getting created in method processMessages are not getting missed out. They're getting successfully consumed after 4-5 retries.

Tried to reduce number of concurrent sqs consumers for 0.05 cpuKubernetes from 60 to 10, also tried increasing cpu to 0.5 cpu, resulting in number of retries down from to 2-3 times.

freq of issue where goroutines were not getting executed came down, but the issue still persists.

please help!!

-- Vipul Tak
go
goroutine
kubernetes

0 Answers