I am trying to consume an event stream provided by the Kubernetes api using the requests module. I have run into what looks like a buffering problem: the requests module seems to lag by one event.
I have code that looks something like this:
r = requests.get('http://localhost:8080/api/v1beta1/watch/services',
stream=True)
for line in r.iter_lines():
print 'LINE:', lineAs Kubernetes emits event notifications, this code will only display the last event emitted when a new event comes in, which makes it almost completely useless for code that needs to respond to service add/delete events.
I have solved this by spawning curl in a subprocess instead of using the requests library:
p = subprocess.Popen(['curl', '-sfN',
'http://localhost:8080/api/watch/services'],
stdout=subprocess.PIPE,
bufsize=1)
for line in iter(p.stdout.readline, b''):
print 'LINE:', lineThis works, but at the expense of some flexibility. Is there a way to avoid this buffering problem with the requests library?
This behavior is due to a buggy implementation of the iter_lines method in the requests library.
iter_lines iterates over the response content in chunk_size blocks of data using the iter_content iterator. If there are less than chunk_size bytes of data available for reading from the remote server (which will typically be the case when reading the last line of output), the read operation will block until chunk_size bytes of data are available.
I have written my own iter_lines routine that operates correctly:
import os
def iter_lines(fd, chunk_size=1024):
'''Iterates over the content of a file-like object line-by-line.'''
pending = None
while True:
chunk = os.read(fd.fileno(), chunk_size)
if not chunk:
break
if pending is not None:
chunk = pending + chunk
pending = None
lines = chunk.splitlines()
if lines and lines[-1]:
pending = lines.pop()
for line in lines:
yield line
if pending:
yield(pending)This works because os.read will return less than chunk_size bytes of data rather than waiting for a buffer to fill.