I have a Python Kafka consumer sometimes reading extremely slowly from Bigtable. It reads a row from Bigtable, performs some calculations and occasionally writes some information back, then moves on.
The issue is that from a 1 vCPU VM in GCE it reads/writes extremely fast, the consumer chewing through 100-150 messages/s. No problem there.
However, when deployed on the production Kubernetes cluster (GKE), which is multi-zonal (europe-west1-b/c/d
), it goes through something like 0.5 messages/s. Yes - 2s per message.
Bigtable is in europe-west1-d
- but pods scheduled on nodes in the same zone (d
), have the same performance as pods on nodes in other zones, which is weird.
The pod is hitting the CPU limits (1 vCPU) constantly. Profiling the program shows that most of the time (95%) is spent inside of the PartialRowData.cells() function, in copy.py:132(deepcopy)
It uses the newest google-cloud-bigtable==0.29.0
package.
Now, I understand that the package is in alpha, but what is the factor that so dramatically reduces the performance by 300x?
The piece of code that reads the row data is this:
def _row_to_dict(cls, row):
if row is None:
return {}
item_dict = {}
if COLUMN_FAMILY in row.cells:
structured_cells = {}
for field_name in STRUCTURED_STATIC_FIELDS:
if field_name.encode() in row.cells[COLUMN_FAMILY]:
structured_cells[field_name] = row.cells[COLUMN_FAMILY][field_name.encode()][
0].value.decode()
item_dict[COLUMN_FAMILY] = structured_cells
return item_dict
where the row
passed in is from
row = self.bt_table.read_row(row_key, filter_=filter_)
and there might be about 50 STRUCTURED_STATIC_FIELDS
.
Is the deepcopy really just taking ages to copy? Or is it waiting for data transfer from Bigtable? Am I misusing the library somehow? Any pointers on how to improve the performance?
Thanks a lot in advance.
It turns out the library defines the getter for row.cells
as:
@property
def cells(self):
"""Property returning all the cells accumulated on this partial row.
:rtype: dict
:returns: Dictionary of the :class:`Cell` objects accumulated. This
dictionary has two-levels of keys (first for column families
and second for column names/qualifiers within a family). For
a given column, a list of :class:`Cell` objects is stored.
"""
return copy.deepcopy(self._cells)
So each call to the dictionary was performing a deepcopy
in addition to the look-up.
Adding a
row_cells = row.cells
and subsequently only referring to that fixed the issue.
The difference in performance of dev/prod environment was using also the fact that prod table already had a lot more timestamps/versions of the cells, whereas the dev table had only a couple. This made the returned dictionaries which had to be deep-copied a lot larger.
Chaining the existing filters with CellsColumnLimitFilter
helped even further:
filter_ = RowFilterChain(filters=[filter_, CellsColumnLimitFilter(num_cells=1)])