How to run a group of celery tasks? Apps aren't loaded yet

celery beat
celery group
celery long running tasks
celery schedule one time task
django-celery run task manually
django periodic tasks without celery
celery async task
celery run task after delay

For example, I have a two tasks: a and b. I need to run them in parallel. I create a group of tasks and try run it. But I get error

proj/app/tasks.py

@app.task
def a():
    pass

@app.task
def b():
    pass

django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet.

Application app is registered in INSTALLED_APPS and all migration complited

proj/proj/__init__.py

from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app

__all__ = ('celery_app',)

proj/proj/celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('proj')
app.config_from_object('django.conf:settings', namespace='CELERY')

app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))


from celery import group
from app.tasks import a, b

run_group = group(a.s(), b.s())
run_group()

Traceback

File "/home/m0nte-cr1st0/test_projects/proj/proj/__init__.py", line 5, in <module>
    from .celery import app as celery_app
File "/home/m0nte-cr1st0/test_projects/proj/proj/celery.py", line 26, in <module>
    from app.tasks import a, b
File "/home/m0nte-cr1st0/test_projects/proj/app/tasks.py", line 14, in <module>
    from .models import Link, Prediction, PredictionBK
File "/home/m0nte-cr1st0/test_projects/proj/app/models.py", line 2, in <module>
    from django.contrib.auth.models import AbstractUser

Cancel an already executing task with Celery?, How do you stop the execution of a celery task? Upon receiving a message to run a task, the worker creates a request to represent such demand. Custom task classes may override which request class to use by changing the attribute celery.app.task.Task.Request. You may either assign the custom request class itself, or its fully qualified name. The request has several responsibilities.

There are two problems here.

Firstly you should not be importing the other tasks - that is what app.autodiscover_tasks() is for. Remove the line from app.tasks import a, b

Secondly, you should not be calling run_group. That's for when you want to run the tasks in the group.

Celery Execution Pools: What is it all about?, itself does not process any tasks. It spawns child processes (or threads) and deals with all the book keeping stuff. The child processes (or threads) execute the actual tasks. These child processes (or threads) are also known as the execution pool. from celery import Celery app = Celery @app.task def add (x, y): return x + y if __name__ == '__main__': app. worker_main () When this module is executed the tasks will be named starting with “ __main__ ”, but when the module is imported by another process, say to call a task, the tasks will be named starting with “ tasks ” (the real name of the module):

proj/proj/celery.py

import os
from celery import Celery
from django.conf import settings

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('proj')
app.config_from_object('django.conf:settings')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

proj/proj/settings.py

REDIS_HOST = 'localhost'
REDIS_PORT = '6379'
BROKER_URL = 'redis://' + REDIS_HOST + ':' + REDIS_PORT + '/0'
BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 3600}
CELERY_RESULT_BACKEND = 'redis://' + REDIS_HOST + ':' + REDIS_PORT + '/0'

proj/proj/__init__.py

from .celery import app as celery_app

proj/app/tasks.py

from celery import group


@app.task
def a():
    pass


@app.task
def b():
    pass


run_group = group(a.s(), b.s())
run_group()

Tasks, However, if you for some reason aren't able to use AMQP, feel free to use these or you want to use the current application's default backend you can use app. chord or group with tasks called by name, use the Celery.signature() method: >  create a custom task base class. add a callback to a group of tasks. split a task into several chunks. optimize the worker. see a list of built-in task states. create custom task states. set a custom task name. track when a task starts. retry a task when it fails. get the id of the current task. know what queue a task was delivered to. see a

change your proj/proj/celery.py to this:
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('proj')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

import django
django.setup()

from celery import group
from app.tasks import a, b

run_group = group(a.s(), b.s())
run_group()

Getting Started Scheduling Tasks with Celery, Many Django applications can make good use of being able to schedule work, We'll set up Celery so that your tasks run in pretty much the same a lot of messages there doesn't mean your tasks aren't getting executed. With your Django App and Redis running, open two new terminal windows/tabs. In each new window, navigate to your project directory, activate your virtualenv, and then run the following commands (one in each window): $ celery -A picha worker -l info $ celery -A picha beat -l info.

How to Schedule Tasks Using Celery Beat in a , In addition to being able to run tasks at certain days and times, beat can also list of apps in your Django settings, and change a Celery setting to tell beat to use Tasks that aren't so much like fixtures can be added at the  Each message is a request from Celery for a worker to run a task. The contents of the message are rather inscrutable, but just knowing if your task got queued can sometimes be useful. The messages tend to stay in the database, so seeing a lot of messages there doesn't mean your tasks aren't getting executed.

Is it possible to wait until celery group done? : django, But when I call task it calls group and last tasks but the last task finished before group finish. This is often required for algorithms that aren't embarrassingly parallel INSTALLED_APPS others I see them use the full path such as app.​apps. >>> from celery import group >>> from proj.tasks import add >>> group (add. s (2, 2), add. s (4, 4)) (proj.tasks.add(2, 2), proj.tasks.add(4, 4)) If you call the group, the tasks will be applied one after another in the current process, and a GroupResult instance is returned that can be used to keep track of the results, or tell how many tasks are ready and so on:

Chains do not work with nested groups · Issue #1671 · celery/celery , Given the following tasks: from time import sleep import celery It works as expected for me, i.e task1 completes successfully which calls a group with two tasks that execute in parallel. callback.delay(ret) File "/am/apps/other/python/​2.6.6/Linux.x86_64/lib/ However, we aren't sure if it's enough safe. I have a Django project with multiple celery beat tasks , I have an issue when I have multiple celery beat tasks with individual queues so is there a possibility that i can run all these at a time , what is the best practice to run these? from __future__ import absolute_import, unicode_literals i

Comments
  • Why do you have that from .celery import app as celery_app in your __init__.py?
  • docs.celeryproject.org/en/latest/django/…
  • its app.task, not app.tasks.
  • @DanielRoseman It allows shared_task to detect the app that it should use.
  • Looks like your settings are improperly configured somehow. See this question.
  • @ДмитрийДмитрук Added an update. See if that works.
  • @ДмитрийДмитрук I posted a related question in the comments. Can you show your settings.py file?
  • but I want to run tasks in a group.
  • Yes. Define run_group there, as you are doing. But don't call it there. Call it when you want to run the group.
  • Ok. Thank you for help. At now I get error amqp.exceptions.AccessRefused: (0, 0): (403) ACCESS_REFUSED - Login was refused using authentication mechanism AMQPLAIN. For details see the broker logfile.
  • It’s strange. After all, I use redis, and the error is related to rabbitmq?