How to efficiently write pandas melt and join in order to run inside containers without causing SystemOOM exceptions?

4/30/2019

I have a code which is running inside a docker container, on kubernetes cluster. The issue is that regardless of the memory limit set, sometimes, the code just fails, i.e. airflow task running the code gets failed.

I tried checking in on memory and CPU utilization. Both of them are under limits and pod never restarts. Note: my pod has only one container running and that is airflow's worker. It should be noted that CPU utilization reaches "tending to 1" and the task fails.

Since the dataset is transformed correctly, I am not posting the sample data. I am looking for efficiency in terms of resources.

My code is:

# Below code is required at the start of each script
from athena.api import get_pandas_df
import pandas as pd

last_correct_constant = 11


def remove_unwanted_cols(df, col_name):
    unwanted_cols = []
    for _col in df.columns:
        if _col.startswith("unnamed"):
            if int(_col.split("unnamed__")[-1]) > last_correct_constant:
                unwanted_cols.append(_col)
        else:
            if not _col.startswith(col_name):
                unwanted_cols.append(_col)
    df = df.drop(columns=unwanted_cols)
    return df


def sanitize_column_names(df):
    corrected_columns = []
    for column in df.columns:
        corrected_columns.append(
            column
            .replace("(", "_")
            .replace(")", "_")
            .replace(" ", "_")
            .replace("/", "_")
            .replace(".", "_")
            .replace(":", "_")
            .lower())
    df.columns = corrected_columns
    return df


def get_first_row_as_header(df):
    df.columns = df.iloc[0]
    print("Columns are: ")
    print(df.columns)
    print("Head is: ")
    df = df.iloc[1:]
    print(df.head(1))
    return df


def remove_cols_for_join(df, col_name):
    unwanted_cols = []
    for _col in df.columns:
        if _col != 'period' and (not _col.startswith(col_name)) and _col != 'Markets':
            unwanted_cols.append(_col)
    print("Unwanted cols are: ")
    print(unwanted_cols)
    df = df.drop(columns=unwanted_cols)
    return df


def main(*args, **kwargs):
    """ Put your main logic here.

    Help:
        To get pandas dataframe of upstream nodes.
        data_frame = get_pandas_df("<upstream_node_name>")

        Example: data_frame = get_pandas_df("S3")

    Return:
        output_data_frame {pandas.DataFrame}
        This data frame will be transferred to downstream nodes.
    """
    # read dataframes
    df = get_pandas_df("CSV")
    df = sanitize_column_names(df)
    df_sales = df
    df_gr_vol = df
    df_gr_val = df

    print("remove unwanted cols for individual melts")
    df = remove_unwanted_cols(df, 'value_offtake_000_rs__')
    df_sales = remove_unwanted_cols(df_sales, 'sales_volume__volume_litres__')
    df_gr_vol = remove_unwanted_cols(df_gr_vol, 'gr_vol_ya')
    df_gr_val = remove_unwanted_cols(df_gr_val, 'gr_val_ya')

    df = get_first_row_as_header(df)
    df_sales = get_first_row_as_header(df_sales)
    df_gr_vol = get_first_row_as_header(df_gr_vol)
    df_gr_val = get_first_row_as_header(df_gr_val)

    print("melting dataframes")
    table_columns = df.columns
    df = pd.melt(
        df, id_vars=table_columns[:last_correct_constant+1],
        value_vars=table_columns[last_correct_constant+1:], var_name='period',
        value_name='value_offtake_000_rs__')
    df = df[(df["Markets"] != '')]


    table_columns = df_sales.columns
    df_sales = pd.melt(
        df_sales, id_vars=table_columns[:last_correct_constant+1],
        value_vars=table_columns[last_correct_constant+1:], var_name='period',
        value_name='sales_volume__volume_litres__')
    df_sales = df_sales[(df_sales["Markets"] != '')]
    df_sales = remove_cols_for_join(df_sales, 'sales_volume__volume_litres__')


    table_columns = df_gr_vol.columns
    df_gr_vol = pd.melt(
        df_gr_vol, id_vars=table_columns[:last_correct_constant+1],
        value_vars=table_columns[last_correct_constant+1:], var_name='period',
        value_name='gr_vol_ya')
    df_gr_vol = df_gr_vol[(df_gr_vol["Markets"] != '')]
    df_gr_vol = remove_cols_for_join(df_gr_vol, 'gr_vol_ya')

    table_columns = df_gr_val.columns
    df_gr_val = pd.melt(
        df_gr_val, id_vars=table_columns[:last_correct_constant+1],
        value_vars=table_columns[last_correct_constant+1:], var_name='period',
        value_name='gr_val_ya')
    df_gr_val = df_gr_val[(df_gr_val["Markets"] != '')]
    df_gr_val = remove_cols_for_join(df_gr_val, 'gr_val_ya')

    print("Before merge: ")
    for _col in df.columns :
        print(_col)
    print("==================")
    for _col in df_sales.columns :
        print(_col)

    df = pd.merge(df, df_sales, on=['Markets', 'period'])
    df = pd.merge(df, df_gr_val, on=['Markets', 'period'])
    df = pd.merge(df, df_gr_vol, on=['Markets', 'period'])
    df = sanitize_column_names(df)
    return df

I expect this code to run with efficiently using memory and cpu. My current memory is set for 32GB and 10CPU cores.

The data contains 14 rows and 637 columns which I transform in an aforementioned way.

-- Aviral Srivastava
airflow
docker
kubernetes
pandas
python-3.x

0 Answers