3

Currently i am running task in celery which take 10 to 15 minutes to complete but problem is How do i restart task which currently running in worker and also one which is currently not running but waiting for task to run while i forcefully stopped worker or in case my server got crash or stopped. What happens right now is if i start celery again it's not starting the last running task or remaining task.

2 Answers 2

2

one thing you can do is enable acks_late on the task. Additionally, it's probably worthwhile to read their FAQ section on acks late and retry.

@app.task(acks_late=True) def task(*args, **kwargs): ... 
Sign up to request clarification or add additional context in comments.

2 Comments

I tried acks_late, but when I set acks_late=True for a long running task, the task is requeued after 2 mins. I am assuming this is because the task was not acknowledged so that is why it is being requeued. This doesn't happen if I set acks_late=False. Can you please help how I can use asks_late for long running tasks
It'd be best to open a new question.
1

In my case I have to set acks_late and reject_on_worker_lost:

@app.task(bind=True, base=AbortableTask, acks_late=True, reject_on_worker_lost=True) def my_task(self, experiment_pk: int): # Your code... 

And in the celery.py file I made an adaptation of this gist which retrieves all the pending tasks considering the visibility_timeout parameter (which makes the task to be waiting without being processed after a worker restart):

import os from celery import Celery from celery.signals import worker_init os.environ.setdefault("DJANGO_SETTINGS_MODULE", "my_proj.settings") app = Celery("my_proj") app.config_from_object("django.conf:settings", namespace="CELERY") app.autodiscover_tasks() # Gets the max between all the parameters of timeout in the tasks max_timeout = a_vale # This value must be bigger than the maximum soft timeout set for a task to prevent an infinity loop app.conf.broker_transport_options = {'visibility_timeout': max_timeout + 60} # 60 seconds of margin def restore_all_unacknowledged_messages(): """ Restores all the unacknowledged messages in the queue. Taken from https://gist.github.com/mlavin/6671079 """ conn = app.connection(transport_options={'visibility_timeout': 0}) qos = conn.channel().qos qos.restore_visible() print('Unacknowledged messages restored') @worker_init.connect def configure(sender=None, conf=None, **kwargs): restore_all_unacknowledged_messages() 

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.