I am having some issue with celery groups. When the no. of tasks are more some of thread stuck and never return.
@celery.task(bind=True, ignore_result=False, soft_time_limit=TASK_TIME_OUT*15)
def schedule(self, data):
try:
job = group(process.s(i[0], i[1]) for i in data)
result = job.apply_async()
result_data = None
with allow_join_result():
result_data = result.get()
callback.delay(result_data)
except SoftTimeLimitExceeded as ex:
LOGGER.error("Timeout schedule Failed. " + str(ex))
Then I have another method which takes ~1min to process some data there are some I/O calls too and then return the result.
@celery.task(bind=True, ignore_result=False, soft_time_limit=TASK_TIME_OUT)
def process(self, param1, param2):
.......
.......
return result
Celery worker command using eventlet
celery -A proj worker --loglevel=debug -P eventlet --concurrency=100 -n worker@%h
I am running two workers on kubernetes with each pod having CPU=1 and MEM=512MB.
I'll explain the flow:
There is an api call which triggers a celery task "schedule". In "schedule" method I am creating a group(I tried chord too but didn't work) and creating tasks based on the input, if there are tasks less then 400 then it works perfectly.
If the tasks are more then 500 then I have there are some thread which got stuck and never returns and because of that the next steps never run in "schedule" method.
worker@service-worker-lbv46: OK
{
"broker": {
"alternates": [],
"connect_timeout": 4,
"failover_strategy": "round-robin",
"heartbeat": 120.0,
"hostname": "redis-service-stg",
"insist": false,
"login_method": null,
"port": 6379,
"ssl": false,
"transport": "redis",
"transport_options": {},
"uri_prefix": null,
"userid": null,
"virtual_host": "/"
},
"clock": "1243",
"pid": 7,
"pool": {
"free-threads": 98,
"max-concurrency": 100,
"running-threads": 2
},
"prefetch_count": 400,
"rusage": {
"idrss": 0,
"inblock": 0,
"isrss": 0,
"ixrss": 0,
"majflt": 0,
"maxrss": 376584,
"minflt": 4313949,
"msgrcv": 0,
"msgsnd": 0,
"nivcsw": 12441,
"nsignals": 0,
"nswap": 0,
"nvcsw": 1214,
"oublock": 1840,
"stime": 18.567744,
"utime": 585.98883
},
"total": {
"worker.tasks.process": 402,
"worker.tasks.schedule": 1
}
}
-> worker@service-worker-g9kh7: OK
{
"broker": {
"alternates": [],
"connect_timeout": 4,
"failover_strategy": "round-robin",
"heartbeat": 120.0,
"hostname": "redis-service-stg",
"insist": false,
"login_method": null,
"port": 6379,
"ssl": false,
"transport": "redis",
"transport_options": {},
"uri_prefix": null,
"userid": null,
"virtual_host": "/"
},
"clock": "1243",
"pid": 7,
"pool": {
"free-threads": 99,
"max-concurrency": 100,
"running-threads": 1
},
"prefetch_count": 400,
"rusage": {
"idrss": 0,
"inblock": 0,
"isrss": 0,
"ixrss": 0,
"majflt": 0,
"maxrss": 348324,
"minflt": 3903269,
"msgrcv": 0,
"msgsnd": 0,
"nivcsw": 14085,
"nsignals": 0,
"nswap": 0,
"nvcsw": 28288,
"oublock": 1840,
"stime": 11.887338,
"utime": 291.123721
},
"total": {
"worker.tasks.process": 382
}
}
Here you can see 2 running-thread in worker1 and 1 in worker2, they are like stuck from very long.
I know a task should not wait for another task, suggest me if there a better way of doing it. If there is a change in architecture.
\==========================EDIT================================
Tried using chord too but still, the thread stuck in executing "process" task, celery inspect stats shows 1 running-thread and it's not doing anything. And the problem is it will not execute the callback utill this thread return.
Another issue is its not getting timeout too.
eventlet==0.23.0 kombu==4.1.0 billiard==3.5.0.2 celery==4.0.2 redis==2.10.5
Thanks in advance.
You are launching tasks and getting their results from within another task. This is known to cause deadloack - once your workers pool is exhausted, your first taks is waiting for other tasks that cannot execute for the lack of available workers.
The solution is to use callbacks or more complex workflows instead. In your case, you should indeed use a chord, that's the proper solution, so give it another try and if you still have problems with it post about that problem.