Using a Celery worker to interact with a SQLAlchemy DB, including knowing the user from the request
flask-sqlalchemy with _celery
celery get task id
celery result backend
flask celery application factory
I have done plenty of research on this, including trying answers like this. It appears Celery has no access to my Flask app's context.
I know fully well my celery object, what will decorate my tasks, must have access to my Flask app's context. And I do believe it should, as I followed this guide to create my celery object. I am unsure if the confusion lies somewhere in the fact that I am using Flask-HTTPAuth.
Here is some of what I have.
def make_celery(app): celery = Celery(app.import_name, backend=app.config["CELERY_RESULT_BACKEND"], broker=app.config["CELERY_BROKER_URL"]) celery.conf.update(app.config) TaskBase = celery.Task class ContextTask(TaskBase): abstract = True def __call__(self, *args, **kwargs): with app.app_context(): return TaskBase.__call__(self, *args, **kwargs) celery.Task = ContextTask return celery app = Flask(__name__) auth = HTTPBasicAuth() app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///flask_app.db" app.config["CELERY_BROKER_URL"] = "redis://localhost:6379" app.config["CELERY_RESULT_BACKEND"] = "redis://localhost:6379" celery = make_celery(app) db = SQLAlchemy(app) @celery.task(bind=True, name="flask_app.item_loop") def loop(self): items = g.user.items for item in items: print(item)
Running this task using Flask is a no-go, though. I try to start this function by hitting the server (while authorized!).
@app.route("/item_loop") @auth.login_required def item_loop(): result = loop.delay() return "It's running."
But the Celery worker tells me the task
raised unexpected: AttributeError("'_AppCtxGlobals' object has no attribute 'user'",), which I believe would imply, as mentioned, my celery object does not have the app context, even though I used the recommended factory pattern.
While the recoomendations in Dave and Greg's answers are valid, what they miss to highlight is the misunderstanding that you have regarding the use of an application context in a Celery task.
You have a Flask application, in which you are using Flask-HTTPAuth. You probably have a
verify_password handler that sets
g.user to the authenticated user. This means that while you are handling a request you can access the user as
g.user. This is all good.
You also have one or more Celery workers, which are separate processes that have no direct connection to the Flask server. The only communication between the Flask server and the Celery worker processes happens over the message broker that you are using (typically Redis or RabbitMQ).
Depending on your needs, the Celery workers may need to have access to the Flask application. This is very common when using Flask extensions that store their configuration in the
app.config dictionary. Two common extensions that require this are Flask-SQLAlchemy and Flask-Mail. Without access to
app.config, the Celery task would have no way to open a connection to the database or to send an email, since it would not know the details of the database and/or email server.
To give the Celery workers access to the configuration, the accepted practice is to create duplicate Flask applications in each worker. These are secondary applications that are in no way connected to the actual application object used by the main Flask server. Their only purpose is to hold a copy of the original
app.config dictionary that can be accessed by your task or by any Flask extensions your task is using.
So it is invalid to expect that a
g.user set in the Flask server will be accessible also as
g.user in the Celery task, simply because these are different
g objects, from different application instances.
If you need to use the authenticated user in the Celery task, what you should do is pass the
g.user.id) as an argument to your task. Then in your task, you can load the user from the database using this
id. Hope this helps!
python, Using a Celery worker to interact with a SQLAlchemy DB, including knowing the user from the request. 发表于 2019-02-22 23:36:56. 活跃于 2019-02-23 16:41:� Flask + Celery + SQLAlchemy Example App. This example app demonstrates how to write Celery tasks that work with Flask and SQLAlchemy. I had a hard time finding a complet example that worked correctly. Based on the the Flask-User-Starter-App. Code characteristics. Tested on Python 3.6; Well organized directories with lots of comments app
To retrieve the user from within a task execution, you can try passing the User object (if celery can pickle it), or pass along enough information that the task can retrieve the User object (e.g. the User's id). In this latter case, your task would look something like
@celery.task(bind=True, name="flask_app.item_loop") def loop(self, user_id): user = User.query.get(user_id) items = user.items for item in items: print(item)
and you'd kick it off (assuming you're using flask_login) via
result = loop.delay(current_user.id)
Introduction to Celery — Celery 4.4.6 documentation, Celery communicates via messages, usually using a broker to mediate between exposing an HTTP endpoint and having a task that requests it (webhooks). of other experimental solutions, including using SQLite for local development. The --max-tasks-per-child option is used for user tasks leaking resources, like� from extensions import celery, db from flask.globals import current_app from celery.signals import task_postrun @celery.task def do_some_stuff(): current_app.logger.info("I have the application context") #you can now use the db object from extensions @task_postrun.connect def close_session(*args, **kwargs): # Flask SQLAlchemy will automatically
As noted by @Dave W. Smith, rather than relying on
g for retrieving the user, passing the user info as an argument to the Celery task might be a better approach. According to Flask documentation on app context, the lifetime of
g is a request. Since the Celery task is executed asynchronously, it would be executed within a different app context than the one in the request where you defined the user.
First steps with Celery — Celery 2.5.5 documentation, Before you can use Celery you need to choose, install and run a broker. Using a database as a message queue is not recommended, but can be sufficient for very small installations. Celery can use the SQLAlchemy and Django ORM. on how to create tasks and task classes is in the Tasks part of the user guide. $ celery beat -A tasks -S celery_sqlalchemy_scheduler.schedulers:DatabaseScheduler -l info Description After the celery beat is started, by default it create a sqlite database( schedule.db ) in current folder.
Configuration — Airflow Documentation, For example, the metadata database connection string can either be set in As Airflow was built to interact with its metadata using the great SqlAlchemy library, Airflow needs to know how to connect to your environment. Note that you can also run “Celery Flower”, a web UI built on top of Celery, to monitor your workers. SQLAlchemy is configured with the database from the app context in app/ init.py. Each process should be using it's own application context. In the case of celery, you're sharing that same app context for each worker because you're only creating the application context once and reusing it. Each worker should have it's own application.
Flask by Example – Implementing a Redis Task Queue – Real Python, Part Two: Set up a PostgreSQL database along with SQLAlchemy and Alembic to handle migrations. Part Three: Add in the back-end logic to scrape and then� database. Use a relational database supported by SQLAlchemy. See Database backend settings. redis. Use Redis to store the results. See Redis backend settings. cache. Use Memcached to store the results. See Cache backend settings. mongodb. Use MongoDB to store the results. See MongoDB backend settings. cassandra. Use Cassandra to store the results.
The Flask Mega-Tutorial Part XXII: Background Jobs, When a user makes use of this option, the application is going to start an The application submits a job, and then monitors its progress by interacting with the queue. The most popular task queue for Python is Celery. import redis import rq class User(UserMixin, db. Do you know of Flask-RQ2? The Celery workers. These are the processes that run the background jobs. Celery supports local and remote workers, so you can start with a single worker running on the same machine as the Flask server, and later add more workers as the needs of your application grow. The message broker.
- I do think I follow. Loading the user in the task makes perfect sense to me and will work, I have confirmed. Actually writing back to the database seems to be the problem. I am sorry I just cannot quite grasp this. Is that where the duplicate Flask app thing you mentioned comes in?
- If you create a secondary app as I indicated in the answer, you should have no problem reading or writing from the database. If you get a specific error, then you need to expand on that in your question.
- While I am not sure what you mean by create a secondary app, I guess I did it, because I got it working, just really studying the pattern provided. Thanks.
- All I mean by "secondary" is that it isn't the Flask instance that is running the server.
- Hi, this makes sense and works, but how would I be able to write back to the SQLAlchemy DB?
db.session.commit()will not work:
name 'db' is not defined.
- Then you'll need a db that has the same database settings that the Flask app has. See stackoverflow.com/questions/12044776/… for one approach.