Reading streaming http response with Python "requests" library

1/25/2015

I am trying to consume an event stream provided by the Kubernetes api using the requests module. I have run into what looks like a buffering problem: the requests module seems to lag by one event.

I have code that looks something like this:

r = requests.get('http://localhost:8080/api/v1beta1/watch/services',
                 stream=True)

for line in r.iter_lines():
    print 'LINE:', line

As Kubernetes emits event notifications, this code will only display the last event emitted when a new event comes in, which makes it almost completely useless for code that needs to respond to service add/delete events.

I have solved this by spawning curl in a subprocess instead of using the requests library:

p = subprocess.Popen(['curl', '-sfN',
                      'http://localhost:8080/api/watch/services'],
                     stdout=subprocess.PIPE,
                     bufsize=1)

for line in iter(p.stdout.readline, b''):
    print 'LINE:', line

This works, but at the expense of some flexibility. Is there a way to avoid this buffering problem with the requests library?

-- larsks
kubernetes
python
python-requests
stream

1 Answer

1/26/2015

This behavior is due to a buggy implementation of the iter_lines method in the requests library.

iter_lines iterates over the response content in chunk_size blocks of data using the iter_content iterator. If there are less than chunk_size bytes of data available for reading from the remote server (which will typically be the case when reading the last line of output), the read operation will block until chunk_size bytes of data are available.

I have written my own iter_lines routine that operates correctly:

import os


def iter_lines(fd, chunk_size=1024):
    '''Iterates over the content of a file-like object line-by-line.'''

    pending = None

    while True:
        chunk = os.read(fd.fileno(), chunk_size)
        if not chunk:
            break

        if pending is not None:
            chunk = pending + chunk
            pending = None

        lines = chunk.splitlines()

        if lines and lines[-1]:
            pending = lines.pop()

        for line in lines:
            yield line

    if pending:
        yield(pending)

This works because os.read will return less than chunk_size bytes of data rather than waiting for a buffer to fill.

-- larsks
Source: StackOverflow