I have a dask expression as follows where I'm trying to run a sqlalchemy query in a distributed way. However, it references a .pem key file that's inputted in the connect_args
parameter. How do I upload this key file into the dask cluster/workers such that it will allow me to run this sqlalchemy query?
def execute_query(q):
conn = create_engine(f'presto://{user}:{password}@{host}:{port}/{catalog}/{schema}',
connect_args={'protocol': 'https',
'requests_kwargs': {'verify': key}})
return pd.read_sql(q, conn)
df = dd.from_delayed([
delayed(execute_query)(q) for q in queries])
I tried using client.upload_file
to send the local file to the cluster, but it complains that it's unable to find the path to the .pem key
OSError: Could not find a suitable TLS CA certificate bundle, invalid path: hdsj1ptc001.pem
While Dask can handle some maneuvering of files for you (see client.upload_file
), you should use your own methods for distributing sensitive files such as credentials into specific locations on the worker file-system. Options include scp
, kubernetes secrets and many other methods.
If you are certain of the security of your cluster, you could include the key file in the arguments to your function and either write it to a file in the functions (see below) or, if the call allows it, pass the bytes directly.
def execute_query(q, key):
if not os.path.exists(keyfile): # if the data needs to be in a file
open(keyfile, 'wb').write(key)
conn = create_engine(f'presto://{user}:{password}@{host}:{port}/{catalog}/{schema}',
connect_args={'protocol': 'https',
'requests_kwargs': {'verify': keyfile}})
return pd.read_sql(q, conn)
key = dask.delayed(open('keyfile.pem', 'rb').read())
df = dd.from_delayed([
delayed(execute_query)(q, key) for q in queries])