Processing of google pubsub messages slows down (python)

9/28/2017

Since switching to the newest python libs, which are threading/callback based we've been hitting slowness between our pubsub producers and consumers. We're relatively new to google's pubsub, and we're wondering if others have experienced similar issues after the recent library changes, or know of settings we may have missed.

We see an unexpected slow down from the time we push out messages to the time they're consumed by 3 workers (in python). Our handlers take a small number of milliseconds to process each request, and we also changed our code to call message.ack() before running our handler. E.g. self.sub_client.subscribe(subscription_path, callback=self.message_callback). These messages aren't duplicates. When we enqueue them we record the the time in msecs to get an idea how long they're in the queue.

for pod in worker-staging-deployment-1003989621-2mx0n worker-staging-deployment-1003989621-b6llt worker-staging-deployment-1003989621-lx4gq; do echo == $pod ==; kubectl logs $pod -c fra-worker | grep 'ACK start'; done
== fra-worker-staging-deployment-1003989621-2mx0n ==                                        
[2017-09-25 23:29:03,987] {pubsub.py:147} INFO - ACK start: 22 ms for 1506382143.88 (0.10699987411499023 secs)                                                                                                                                                                                                                                                                                                      
[2017-09-25 23:29:04,966] {pubsub.py:147} INFO - ACK start: 3 ms for 1506382144.767 (0.19900012016296387 secs)
[2017-09-25 23:29:14,708] {pubsub.py:147} INFO - ACK start: 2 ms for 1506382144.219 (10.488999843597412 secs)
[2017-09-25 23:29:17,706] {pubsub.py:147} INFO - ACK start: 1 ms for 1506382147.229 (10.476999998092651 secs)
[2017-09-25 23:29:37,767] {pubsub.py:147} INFO - ACK start: 1 ms for 1506382144.782 (32.984999895095825 secs)
[2017-09-25 23:30:00,649] {pubsub.py:147} INFO - ACK start: 2 ms for 1506382146.257 (54.39199995994568 secs)
== fra-worker-staging-deployment-1003989621-b6llt ==
[2017-09-25 23:29:04,083] {pubsub.py:147} INFO - ACK start: 2 ms for 1506382143.957 (0.12599992752075195 secs)
[2017-09-25 23:29:05,261] {pubsub.py:147} INFO - ACK start: 3 ms for 1506382144.916 (0.3450000286102295 secs)
[2017-09-25 23:29:15,703] {pubsub.py:147} INFO - ACK start: 1 ms for 1506382144.336 (11.367000102996826 secs)                                                                                                                                                                                                                                                                                                       
[2017-09-25 23:29:25,630] {pubsub.py:147} INFO - ACK start: 2 ms for 1506382143.812 (21.818000078201294 secs)
[2017-09-25 23:29:38,706] {pubsub.py:147} INFO - ACK start: 2 ms for 1506382144.49 (34.21600008010864 secs)
[2017-09-25 23:30:01,752] {pubsub.py:147} INFO - ACK start: 3 ms for 1506382146.696 (55.055999994277954 secs)                                                                                                                                                                                                                                                                                                       
== fra-worker-staging-deployment-1003989621-lx4gq ==                                                                                                                                                                                                                                                                                                                                                                
[2017-09-25 23:29:03,342] {pubsub.py:147} INFO - ACK start: 1 ms for 1506382142.889 (0.4530000686645508 secs)                                                                                                                                               
[2017-09-25 23:29:04,955] {pubsub.py:147} INFO - ACK start: 2 ms for 1506382143.907 (1.0469999313354492 secs)                                                                                                                                                                                                                   
[2017-09-25 23:29:14,704] {pubsub.py:147} INFO - ACK start: 1 ms for 1506382143.888 (10.815999984741211 secs)                                                                                                                                                                                                                                                                                                       
[2017-09-25 23:29:17,705] {pubsub.py:147} INFO - ACK start: 1 ms for 1506382147.205 (10.5 secs)                                                                                                                                                                                                                                                                                                                     
[2017-09-25 23:29:37,767] {pubsub.py:147} INFO - ACK start: 1 ms for 1506382144.197 (33.5699999332428 secs)                                            
[2017-09-25 23:29:59,733] {pubsub.py:147} INFO - ACK start: 1 ms for 1506382144.269 (55.46399998664856 secs)                                                                                                                                                                                                                                                                                                        
[2017-09-25 23:31:18,870] {pubsub.py:147} INFO - ACK start: 1 ms for 1506382146.924 (131.94599986076355 secs)                                                                                                                                                                                   

Initially it looks like messages are taking a short amount of time from enqueuing to reading, but then they start coming later and later as if there's a 10s, 32s, 55s delay. (and these's aren't duplicates so this is not retry logic due to failed acks).

We wrote a small test which works quickly for a small number of senders and messages, but once we bump up the messages to 1500 and senders to 3 we see that the publish call often returns a future with an exception result "PublishError('Some messages were not successfully published. The results show around 500 messages/sec but an error rate of >10% of publish() calls throw this exception

Done in 2929 ms, 512.12 qps (154 10.3%)
Done in 2901 ms, 517.06 qps (165 11.0%)
Done in 2940 ms, 510.20 qps (217 14.5%)

Although our senders finish in 3 seconds (these run in parallel) the workers are getting messages which were enqueued over 20 secs ago

Got message {'tstamp': '1506557436.988', 'msg': 'msg#393@982'} 20.289 sec

Here's the worker/listener:

import time

from google.api.core.exceptions import RetryError as core_RetryError
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.subscriber.policy import thread
from google.gax.errors import RetryError as gax_RetryError
import grpc

from core.utils import b64json_decode, b64json_encode, Timer


TOPIC = 'pubsub-speed-test'
NOTIFY_PROJECT = '== OUR PROJECT =='


def receive(message):
    decoded = b64json_decode(message.data)
    message.ack()
    took = time.time() - float(decoded.get('tstamp', 0))
    print(f'Got message {decoded} {took:0.3f} sec')


if __name__ == '__main__':
    client = pubsub_v1.SubscriberClient()
    topic_path = client.topic_path(NOTIFY_PROJECT, TOPIC)
    subs_path = client.subscription_path(NOTIFY_PROJECT, 'pubsub-worker')

    try:
        create_subscription(subs_path, topic_path)
    except Exception:
        pass
    print(f'Subscription: topic={topic_path} subscription={subs_path}')

    timer = Timer()
    client.subscribe(subs_path, callback=receive)
    time.sleep(120)

And the sender/publisher:

import os
import time
import concurrent.futures

from google.api.core.exceptions import RetryError as core_RetryError
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.subscriber.policy import thread
from google.gax.errors import RetryError as gax_RetryError
import grpc

from core.utils import b64json_decode, b64json_encode, Timer

TOPIC = 'pubsub-speed-test'
NOTIFY_PROJECT = '== OUR PROJECT =='


def publish(topic_path, message, client):
    tstamp = f'{time.time():0.3f}'
    data = {'tstamp': tstamp, 'msg': message}
    future = client.publish(topic_path, b64json_encode(data, raw=True))
    future.add_done_callback(lambda x: print(f'Publishing done callback: {data}'))
    return future


if __name__ == '__main__':
    client = pubsub_v1.PublisherClient()
    topic_path = client.topic_path(NOTIFY_PROJECT, TOPIC)

    num = 1500
    pid = os.getpid()
    fs = []
    timer = Timer()
    for i in range(0, num):
        f = publish(topic_path, f'msg#{i}@{pid}', client)
        fs.append(f)
    print(f'Launched {len(fs)} futures in {timer.get_msecs()} ms')

    good = bad = 0
    for future in fs:
        try:
            data = future.result()
            # print(f'result: {data}')
            good += 1
        except Exception as exc:
            print(f'generated an exception: {exc} ({exc!r})')
            bad += 1
    took_ms = timer.get_msecs()
    pct = bad / num
    print(f'Done in {took_ms} ms, {num / took_ms * 1000:0.2f} qps ({bad} {pct:0.1%})')

Here's the Timer class from our core.utils:

####################
# Time / Timing
####################


def utcnow():
    """Time now with tzinfo, mainly for mocking in unittests"""
    return arrow.utcnow()


def relative_time():
    """Relative time for finding timedeltas depening on your python version"""
    if sys.version_info[0] >= 3:
        return time.perf_counter()
    else:
        return time.time()


class Timer:
    def __init__(self):
        self.reset()

    def reset(self):
        self.start_time = relative_time()

    def get_msecs(self):
        return int((relative_time() - self.start_time) * 1000)

    def get_secs(self):
        return int((relative_time() - self.start_time))

Also, in our main code, we occasionally see IOErrors that the threads don't seem to recover from (in addition to DEADLINE_EXCEEDED which can be ignored). To cope with that we've wrapped the Policy which let's us catch some exceptions and restart the clients as needed (though we're not sure that's working well)

class OurPolicy(thread.Policy):
    _exception_caught = None

    def __init__(self, *args, **kws):
        logger.info(f'Initializing our PubSub Policy Wrapper')  # noqa                                                                                                                                                                                                                                              
        return super(OurPolicy, self).__init__(*args, **kws)

    def on_exception(self, exc):
        # If this is DEADLINE_EXCEEDED, then we want to retry by returning None instead of raise-ing                                                                                                                                                                                                                
        deadline_exceeded = grpc.StatusCode.DEADLINE_EXCEEDED
        code_value = getattr(exc, 'code', lambda: None)()
        logger.error(f'Caught Exception in PubSub Policy Wrapper: code={code_value} exc={exc}')
        if code_value == deadline_exceeded:
            return
        OurPolicy._exception_caught = exc
        # will just raise exc                                                                                                                                                                                                                                                                                       
        return super(OurPolicy, self).on_exception(exc)

[...later...]

                while True:
                    time.sleep(1)
                    if OurPolicy._exception_caught:
                        exc = OurPolicy._exception_caught
                        OurPolicy._exception_caught = None
                        raise exc

And our versions:

pip freeze | grep goog
gapic-google-cloud-pubsub-v1==0.15.4
google-auth==1.0.2
google-cloud-core==0.27.1
google-cloud-pubsub==0.28.2
google-gax==0.15.14
googleapis-common-protos==1.5.2
grpc-google-iam-v1==0.11.1
proto-google-cloud-pubsub-v1==0.15.4
-- Charles Thayer
concurrent.futures
google-cloud-pubsub
google-kubernetes-engine
grpc
python

0 Answers