I am trying to consume an event stream provided by the Kubernetes api using the requests
module. I have run into what looks like a buffering problem: the requests
module seems to lag by one event.
I have code that looks something like this:
r = requests.get('http://localhost:8080/api/v1beta1/watch/services',
stream=True)
for line in r.iter_lines():
print 'LINE:', line
As Kubernetes emits event notifications, this code will only display the last event emitted when a new event comes in, which makes it almost completely useless for code that needs to respond to service add/delete events.
I have solved this by spawning curl
in a subprocess instead of using the requests
library:
p = subprocess.Popen(['curl', '-sfN',
'http://localhost:8080/api/watch/services'],
stdout=subprocess.PIPE,
bufsize=1)
for line in iter(p.stdout.readline, b''):
print 'LINE:', line
This works, but at the expense of some flexibility. Is there a way to avoid this buffering problem with the requests
library?
This behavior is due to a buggy implementation of the iter_lines
method in the requests
library.
iter_lines
iterates over the response content in chunk_size
blocks of data using the iter_content
iterator. If there are less than chunk_size
bytes of data available for reading from the remote server (which will typically be the case when reading the last line of output), the read operation will block until chunk_size
bytes of data are available.
I have written my own iter_lines
routine that operates correctly:
import os
def iter_lines(fd, chunk_size=1024):
'''Iterates over the content of a file-like object line-by-line.'''
pending = None
while True:
chunk = os.read(fd.fileno(), chunk_size)
if not chunk:
break
if pending is not None:
chunk = pending + chunk
pending = None
lines = chunk.splitlines()
if lines and lines[-1]:
pending = lines.pop()
for line in lines:
yield line
if pending:
yield(pending)
This works because os.read
will return less than chunk_size
bytes of data rather than waiting for a buffer to fill.