I've got a Flask app in which I hope to accomplish the following things:
My below implementation has several issues:
I'm hoping for some understanding of what I'm doing wrong with threading and Queues because this seems like a hacky way of doing this. And also, how can I make the results of these queries available to all instances of Kubernetes running?
Thanks!
from flask import Flask, render_template, request, jsonify, g
from Queue import Queue
from threading import Thread
from time import sleep
app = Flask(__name__, template_folder='Templates')
@app.route('/')
def index():
return render_template('index.html')
@app.before_first_request
def before_first_request():
g.output = Queue()
g.data_results = {}
return ""
@app.route('/data')
def data():
"""
Endpoint hit to fire of a request for data from a given user (uuid)
"""
params = request.args.to_dict()
uuid = params['uuid']
# Create a list for this user, to store their results
g.data_results[uuid] = []
list_of_queries = ["SELECT * FROM tbl1;",
"SELECT * FROM tbl2;",
"SELECT * FROM tbl3;"]
for query in list_of_queries:
t = Thread(target=worker, args=(query, uuid, g.output))
t.daemon = True
t.start()
return jsonify({'msg':'Queries started'})
def worker(*args):
query, uuid, output = args
# Will actually be something like `result = run_query(query)`
result = {'uuid':uuid}
sleep(10)
output.put(result)
@app.route('/poll')
def poll():
"""
Endpoint hit ever x seconds from frontend
to see if the data is ready
"""
params = request.args.to_dict()
uuid_from_client = params['uuid']
# If client polls for result, but server has no record of this uuid
# This can happen in kubernetes with multiple instances running
if g.data_results.get(uuid_from_client) is None:
return jsonify({'msg':'pong', 'data':None, 'freshdata':None})
try:
output = g.output
# This line throws an error if there is nothing to get
results = output.get(False)
output.task_done()
# What is the uuid associated with the most recently returned data
# More than 1 chunk of data can be in here
uuid_from_data = results['uuid']
g.data_results[uuid_from_data].append(results)
except:
uuid_from_data = None
results = None
results_for_client_uuid = g.data_results[uuid_from_client]
if len(results_for_client_uuid) > 0:
res = results_for_client_uuid.pop(0)
else:
res = None
return jsonify({'msg':'pong', 'data':res})
if __name__ == "__main__":
with app.app_context():
app.run(host='0.0.0.0')
Setup your app architecture to use queuing softwares so that there is separation of concerns in terms of what job it does.
Here is a great article that can help you give some insight http://blog.gorgias.io/deploying-flask-celery-with-docker-and-kubernetes/ and one more https://endocode.com/blog/2015/03/24/using-googles-kubernetes-to-build-a-distributed-task-management-cluster/