I have an app deployed on GKE, separated in different microservices. One of the microservices, let's call it "worker", receives tasks to execute from pubsub messages.
The tasks can take up to 1 hour to be executed. The regular acknowledgement deadline for Google pubsub messages being pretty short, we renew the deadline every 10s before it ends. Here is the piece of code responsible for that:
def watchdog(businessDoneEvent, subscription, ack_deadline, message, ack_id):
'''
Prevents message from being republished as long as computation is
running
'''
while True:
# Wait (defaultDeadline - 10) seconds before renewing if defaultDeadline
# is > 5 seconds; renewed every second otherwise
sleepTime = ack_deadline - 10 if ack_deadline > 10 else 1
startTime = time.time()
while time.time() - startTime < sleepTime:
LOGGER.info('Sleeping time: {} - ack_deadline: {}'.format(time.time() - startTime, ack_deadline))
if businessDoneEvent.isSet():
LOGGER.info('Business done!')
return
time.sleep(1)
subscriber = SubscriberClient()
LOGGER.info('Modifying ack deadline for message ' +
str(message.data) + ' processing to ' +
str(ack_deadline))
subscriber.modify_ack_deadline(subscription, [ack_id],
ack_deadline)
Once the execution is over, we reach this piece of code:
def callbackWrapper(callback,
subscription,
message,
ack_id,
endpoint,
context,
subscriber,
postAcknowledgmentCallback=None):
'''
Pub/sub message acknowledgment if everything ran correctly
'''
try:
callback(message.data, endpoint, context, **message.attributes)
except Exception as e:
LOGGER.info(message.data)
LOGGER.error(traceback.format_exc())
raise e
else:
LOGGER.info("Trying to acknowledge...")
my_retry = Retry(predicate=if_exception_type(ServiceUnavailable), deadline=3600)
subscriber.acknowledge(subscription, [ack_id], retry=my_retry)
LOGGER.info(str(ack_id) + ' has been acknowledged')
if postAcknowledgmentCallback is not None:
postAcknowledgmentCallback(message.data,
**message.attributes)
Note that we also use this code in most of our microservices and it works just fine.
My problem is, even though I do not get any error from this code and it seems that the acknowledgement request is sent properly, it is actually acknowledged later. For example, according to the GCP console, right now I have 8 unacknowledged messages, but I should only have 3. It also said there are 12 when I should only have 5 for an hour:
I have a horizontal pod autoscaler using the pubsub metric. When the pods are done, they are not scaled down, or only 1 hour later or more. This creates some useless costs that I would like to avoid.
Does anyone have an idea about why this is happening?