How to pick proper number of threads, workers, processes for Dask when running in an ephemeral environment as single machine and cluster

1/23/2020

Our company is currently leveraging prefect.io for data workflows (ELT, report generation, ML, etc). We have just started adding the ability to do parallel task execution, which is powered by Dask. Our flows are executed using ephemeral AWS Fargate containers, which will use Dask LocalCluster with a certain number of workers, threads, processes passed into the LocalCluster object.

Our journey on Dask will look very much like this:

  1. Continue using single machine LocalCluster until we out grow max cpu/memory allowed
  2. When we out grow a single container, spawn additional worker containers on the initial container (a la dask-kubernetes) and join them to the LocalCluster.

We're currently starting with containers that have 256 cpu(.25 vCPU) and 512 memory and pinning the LocalCluster to 1 n_workers and 3 threads_per_worker to get a reasonable amount of parallelism. However, this really is guess work. 1 n_workers since its a machine with less than 1 vcpu and 3 threads because that doesn't sound crazy to me based on my previous experience running other python based applications in Fargate. This seems to work fine in a very simply example that just maps a function against a list of items.

RENEWAL_TABLES = [
'Activity',
'CurrentPolicyTermStaus',
'PolicyRenewalStatus',
'PolicyTerm',
'PolicyTermStatus',
'EndorsementPolicyTerm',
'PolicyLifeState'
]

RENEWAL_TABLES_PAIRS = [
    (i, 1433 + idx) for idx, i in enumerate(RENEWAL_TABLES)
]


@task(state_handlers=[HANDLER])
def dummy_step():
    LOGGER.info('Dummy Step...')
    sleep(15)


@task(state_handlers=[HANDLER])
def test_map(table):
    LOGGER.info('table: {}...'.format(table))
    sleep(15)


with Flow(Path(__file__).stem, SCHEDULE, state_handlers=[HANDLER]) as flow:
    first_step = dummy_step()
    test_map.map(RENEWAL_TABLES_PAIRS).set_dependencies(upstream_tasks=[first_step])

I see no more than 3 tasks executed at once.

I would really like to understand how to best configure n_workers(single machinne), threads, processes as we expand the size of the single machine out to adding remote workers. I know it depends on my workload, but you could see a combination of things in a single flow where one task does an extract from a database to a csv and another task run a pandas computation. I have seen things online where it seems like it should be threads = number of cpus requested for the documentation, but it seems like you can still achieve parallelism with less than one cpu in Fargate.

Any feedback would be appreciated and could help others looking to leverage Dask in a more ephemeral nature.

Given that Fargate increments from .25 -> .50 -> 1 -> 2 -> 4 for vCPU, I think it’s safe to go with a 1 worker to 1 vcpu setup. However, would be helpful to understand how to choose a good upper limit for number of threads per worker given how Fargate vcpu allotment works.

-- braunk
aws-fargate
dask
dask-distributed
dask-kubernetes

0 Answers