Django Task Queue
=================
A short, simple, boring, and reliable Celery replacement for my Django projects.
* ETA (estimated time of arrival) is a fundamental feature and does not require caching tasks in memory or moving to a different queue.
* Retry support built-in (Celery-style)
* Database is the only backend. Successful tasks are removed from the database, the failed ones are kept for manual inspection. Tasks obey the same transaction rules as the rest of your models. No more ``transaction.on_commit``.
* Django Admin integration to view future and failed tasks and to restart or delete the failed ones.
* Tasks produce no result. When your task produces a valuable result, store it in your models.
Installation
------------
Install the package:
.. code-block:: bash
python -m pip install django-taskq
And add the ``django_taskq`` to the ``INSTALLED_APPS``:
.. code-block:: python
INSTALLED_APPS = [
# ...
"django_taskq",
]
Run the migrations to create the database tables and indexes:
.. code-block:: bash
python manage.py migrate
Celery API
----------
The main API is the Celery API (``shared_task``) with ``delay``, ``apply_async`` and ``s``. Just to make switching between implementations easier.
.. code-block:: python
from django_taskq.celery import shared_task
@shared_task(autoretry_for=(MyException,), retry_kwargs={"max_retries": 10, "countdown": 5})
def my_task(foo, bar=None):
...
.. code-block:: python
my_task.delay(1,bar=2)
my_task.appy_async((1,2))
my_task.s(1,2).apply_async()
To start a worker just running the management command:
.. code-block:: bash
python manage.py taskq
python manage.py taskq -Q default -l DEBUG
Tasks also return emulations of ``AsyncResult`` and ``EagerResult``. The main motivation is to provide a UUID of the scheduled task and to be able to revoke it before execution.
.. code-block:: python
result = my_task.s(1,2).apply_async(countdown=60)
...
result.revoke()
#
result = my_task.s(1,2).apply_async(countdown=60)
store_task_id(result.id)
...
AsyncResult(id=retrieve_task_id()).revoke()
It obeys also some of the Celery configuration parameters. ``CELERY_TASK_ALWAYS_EAGER`` in your Django settings will cause the task to be executed immediately and it might be useful in tests:
.. code-block:: python
CELERY_TASK_ALWAYS_EAGER = True
And ``CELERY_TASK_EAGER_PROPAGATES`` will cause exceptions for eagerly executed tasks to be raised which is another feature often used in tests:
.. code-block:: python
CELERY_TASK_EAGER_PROPAGATES = True
NOT Celery API
--------------
This task queue is unintrusive, all you get is the execution of a function. How you organize the code after that is up to you.
There are no Celery bound tasks and task inheritance, naming, task requests, special logging, etc. You get the idea.
Retry can't change the args/kwargs. That is not a retry but a new task.
Tasks have no result. If you can wait for the result, you can execute the function directly.
No Redis, Flower, or Django:Celery integrations are needed.
Admin page
----------
The Django admin page shows tasks in the following groups:
- Failed tasks -- Tasks that failed after retries and countdowns. You should inspect them and remove them by hand or with a script. You can execute them again as well.
- Dirty tasks -- Tasks that got started but failed without reaching a final state due to killed processes or crashing machines. Review them and either delete or execute again.
- Active tasks -- Tasks being executed right now. You might catch some longer-running tasks here
- Pending tasks -- Tasks that should be executed now but are not due to lack of available workers. You might start some extra ones to catch up.
- Future tasks -- Tasks scheduled to be executed in the future.
Internals
---------
Adding a new task to the queue creates a new task model instance. When there is an active transaction, the task creation is atomic with the rest of the model updates: either all of that is persisted or none.
Executing a task is a bit more expensive:
1. A task is picked up from a queue and the state is updated to "started" within a single transaction. Think of it as taking a lease.
2. Python code is executed, and a background thread updates the "alive at" field every second ("a liveness probe").
3. Successful tasks are deleted from the table. Failed tasks are marked as such and retried (based on configuration).
This is a bit more expensive than necessary but:
* we can recognize running tasks - the task is "started" and the record is updated in the last few seconds. There is no need to guess the right lease timeout ahead of time.
* we can recognize "dirty" tasks that got killed or lost database connection in the middle without reaching a final state - the task is "started" and the record has not been updated for a while.
In an ideal world, tasks should be idempotent and it would be safe to retry "dirty" tasks automatically but things happen and I prefer to know which tasks crashed and double-check if some cleanup is necessary.
Performance
-----------
A single process can execute around 150 dummy tasks per second which is more than enough. After years of struggling with Celery, correctness, and observability are more important.
On the other hand, to handle more "tasks" you probably want to store many events not tasks, and have a single task that processes them in batches.
Known issues
------------
Tests checking for a specific query limit might fail because creating new tasks does queries as well.
Recipes
-------
*Exactly once, at most once, at least once, idempotency:*
Implementing these semantics presents too many design questions to answer *on the task level*. Instead, treat the tasks as function calls that are decoupled in time. We do not enforce these semantics on functions, we write code inside functions to perform the necessary checks.
Within the task do this:
1. Lock the application model
2. Check that all conditions still apply
3. Perform the action
*Task priorities:*
There are no priorities. If you need priority or slow background tasks, just add them to another queue. Start as many processors for the queues as you want.
Some of them might be idle but it's under your control unlike trying to come up with a proper algorithm that prioritizes tasks and avoids starvation.
*Non-concurrent tasks:*
You have two options:
- Either synchronize on some database record by taking a lock and enforce it explicitly
- Or keep a dedicated queue with a single worker and have it implicitly
*Storing results:*
Instead of the task storing its results and returning that to the caller or triggering another task to process it either:
- Store the result directly in the target application model
- Call a function or another task to process the result **explicitly**
*Scheduling tasks:*
Call a Python script from the Unix crontab. Use Kubernetes CronJobs.
Or build a simple Django command using the nice `schedule library <https://pypi.org/project/schedule/>`_.
*Scaling workers:*
Start multiple Docker containers, and start multiple Kubernetes pods/scale deployment. Or use something like ``supervisord`` to start multiple processes.
*Boosting performance:*
Instead of executing thousands of tasks (function calls with specific arguments) consider recording thousands of events (domain-specific model) and executing a task once in a while that processes all available events in bulk.
Or do not record any events, just schedule a task that queries models matching certain criteria and does processing for all of them.
Raw data
{
"_id": null,
"home_page": "https://github.com/aivarsk/django-taskq",
"name": "django-taskq",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.10",
"maintainer_email": null,
"keywords": null,
"author": "Aivars Kalvans",
"author_email": "aivars.kalvans@gmail.com",
"download_url": "https://files.pythonhosted.org/packages/8d/1d/fbd0dcffb4fe16f22b7a01f1ed888fda3d32540a2f05358f6610d1a8bdb7/django_taskq-1.0.19.tar.gz",
"platform": null,
"description": "Django Task Queue\n=================\n\nA short, simple, boring, and reliable Celery replacement for my Django projects.\n\n* ETA (estimated time of arrival) is a fundamental feature and does not require caching tasks in memory or moving to a different queue.\n* Retry support built-in (Celery-style)\n* Database is the only backend. Successful tasks are removed from the database, the failed ones are kept for manual inspection. Tasks obey the same transaction rules as the rest of your models. No more ``transaction.on_commit``.\n* Django Admin integration to view future and failed tasks and to restart or delete the failed ones.\n* Tasks produce no result. When your task produces a valuable result, store it in your models.\n\nInstallation\n------------\n\nInstall the package:\n\n.. code-block:: bash\n \n python -m pip install django-taskq\n\n\nAnd add the ``django_taskq`` to the ``INSTALLED_APPS``:\n\n.. code-block:: python\n\n INSTALLED_APPS = [\n # ...\n \"django_taskq\",\n ]\n\nRun the migrations to create the database tables and indexes:\n\n.. code-block:: bash\n \n python manage.py migrate\n\n\nCelery API\n----------\n\nThe main API is the Celery API (``shared_task``) with ``delay``, ``apply_async`` and ``s``. Just to make switching between implementations easier.\n\n.. code-block:: python\n \n from django_taskq.celery import shared_task\n\n @shared_task(autoretry_for=(MyException,), retry_kwargs={\"max_retries\": 10, \"countdown\": 5})\n def my_task(foo, bar=None):\n ...\n\n.. code-block:: python\n \n my_task.delay(1,bar=2)\n my_task.appy_async((1,2))\n my_task.s(1,2).apply_async()\n\n\nTo start a worker just running the management command:\n\n.. code-block:: bash\n\n python manage.py taskq\n python manage.py taskq -Q default -l DEBUG\n\n\nTasks also return emulations of ``AsyncResult`` and ``EagerResult``. The main motivation is to provide a UUID of the scheduled task and to be able to revoke it before execution.\n\n.. code-block:: python\n\n result = my_task.s(1,2).apply_async(countdown=60)\n ...\n result.revoke()\n #\n result = my_task.s(1,2).apply_async(countdown=60)\n store_task_id(result.id)\n ...\n AsyncResult(id=retrieve_task_id()).revoke()\n\n\nIt obeys also some of the Celery configuration parameters. ``CELERY_TASK_ALWAYS_EAGER`` in your Django settings will cause the task to be executed immediately and it might be useful in tests:\n\n.. code-block:: python\n\n CELERY_TASK_ALWAYS_EAGER = True\n\n\nAnd ``CELERY_TASK_EAGER_PROPAGATES`` will cause exceptions for eagerly executed tasks to be raised which is another feature often used in tests:\n\n.. code-block:: python\n\n CELERY_TASK_EAGER_PROPAGATES = True\n\n\n\nNOT Celery API\n--------------\n\nThis task queue is unintrusive, all you get is the execution of a function. How you organize the code after that is up to you.\nThere are no Celery bound tasks and task inheritance, naming, task requests, special logging, etc. You get the idea.\n\nRetry can't change the args/kwargs. That is not a retry but a new task.\n\nTasks have no result. If you can wait for the result, you can execute the function directly.\n\nNo Redis, Flower, or Django:Celery integrations are needed.\n\n\nAdmin page\n----------\n\nThe Django admin page shows tasks in the following groups:\n\n- Failed tasks -- Tasks that failed after retries and countdowns. You should inspect them and remove them by hand or with a script. You can execute them again as well.\n- Dirty tasks -- Tasks that got started but failed without reaching a final state due to killed processes or crashing machines. Review them and either delete or execute again.\n- Active tasks -- Tasks being executed right now. You might catch some longer-running tasks here\n- Pending tasks -- Tasks that should be executed now but are not due to lack of available workers. You might start some extra ones to catch up.\n- Future tasks -- Tasks scheduled to be executed in the future.\n\n\nInternals\n---------\n\nAdding a new task to the queue creates a new task model instance. When there is an active transaction, the task creation is atomic with the rest of the model updates: either all of that is persisted or none.\n\nExecuting a task is a bit more expensive:\n\n1. A task is picked up from a queue and the state is updated to \"started\" within a single transaction. Think of it as taking a lease.\n2. Python code is executed, and a background thread updates the \"alive at\" field every second (\"a liveness probe\").\n3. Successful tasks are deleted from the table. Failed tasks are marked as such and retried (based on configuration).\n\nThis is a bit more expensive than necessary but:\n\n* we can recognize running tasks - the task is \"started\" and the record is updated in the last few seconds. There is no need to guess the right lease timeout ahead of time.\n* we can recognize \"dirty\" tasks that got killed or lost database connection in the middle without reaching a final state - the task is \"started\" and the record has not been updated for a while.\n\nIn an ideal world, tasks should be idempotent and it would be safe to retry \"dirty\" tasks automatically but things happen and I prefer to know which tasks crashed and double-check if some cleanup is necessary.\n\n\nPerformance\n-----------\n\nA single process can execute around 150 dummy tasks per second which is more than enough. After years of struggling with Celery, correctness, and observability are more important.\nOn the other hand, to handle more \"tasks\" you probably want to store many events not tasks, and have a single task that processes them in batches.\n\nKnown issues\n------------\n\nTests checking for a specific query limit might fail because creating new tasks does queries as well.\n\nRecipes\n-------\n\n*Exactly once, at most once, at least once, idempotency:*\n\nImplementing these semantics presents too many design questions to answer *on the task level*. Instead, treat the tasks as function calls that are decoupled in time. We do not enforce these semantics on functions, we write code inside functions to perform the necessary checks.\n\nWithin the task do this:\n\n1. Lock the application model\n2. Check that all conditions still apply\n3. Perform the action\n\n*Task priorities:*\n\nThere are no priorities. If you need priority or slow background tasks, just add them to another queue. Start as many processors for the queues as you want.\nSome of them might be idle but it's under your control unlike trying to come up with a proper algorithm that prioritizes tasks and avoids starvation.\n\n*Non-concurrent tasks:*\n\nYou have two options:\n\n- Either synchronize on some database record by taking a lock and enforce it explicitly\n- Or keep a dedicated queue with a single worker and have it implicitly\n\n*Storing results:*\n\nInstead of the task storing its results and returning that to the caller or triggering another task to process it either:\n\n- Store the result directly in the target application model\n- Call a function or another task to process the result **explicitly**\n\n*Scheduling tasks:*\n\nCall a Python script from the Unix crontab. Use Kubernetes CronJobs.\nOr build a simple Django command using the nice `schedule library <https://pypi.org/project/schedule/>`_.\n\n*Scaling workers:*\n\nStart multiple Docker containers, and start multiple Kubernetes pods/scale deployment. Or use something like ``supervisord`` to start multiple processes.\n\n*Boosting performance:*\n\nInstead of executing thousands of tasks (function calls with specific arguments) consider recording thousands of events (domain-specific model) and executing a task once in a while that processes all available events in bulk.\n\nOr do not record any events, just schedule a task that queries models matching certain criteria and does processing for all of them.\n\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "A simple Celery-like database-backed task queue for Django",
"version": "1.0.19",
"project_urls": {
"Homepage": "https://github.com/aivarsk/django-taskq"
},
"split_keywords": [],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "8d1dfbd0dcffb4fe16f22b7a01f1ed888fda3d32540a2f05358f6610d1a8bdb7",
"md5": "7222c1dc7ecfbc8347fbb0d5d7423214",
"sha256": "f4efce8cf378dc8489cc6c4b5435352cfdfe1cf294e6ab655e129d10650c34f7"
},
"downloads": -1,
"filename": "django_taskq-1.0.19.tar.gz",
"has_sig": false,
"md5_digest": "7222c1dc7ecfbc8347fbb0d5d7423214",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.10",
"size": 18314,
"upload_time": "2025-07-15T06:30:02",
"upload_time_iso_8601": "2025-07-15T06:30:02.441350Z",
"url": "https://files.pythonhosted.org/packages/8d/1d/fbd0dcffb4fe16f22b7a01f1ed888fda3d32540a2f05358f6610d1a8bdb7/django_taskq-1.0.19.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-07-15 06:30:02",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "aivarsk",
"github_project": "django-taskq",
"travis_ci": false,
"coveralls": false,
"github_actions": false,
"requirements": [
{
"name": "Django",
"specs": [
[
">=",
"4.0"
]
]
},
{
"name": "django-picklefield",
"specs": [
[
">=",
"3.2"
]
]
},
{
"name": "psycopg",
"specs": [
[
"==",
"3.2.1"
]
]
},
{
"name": "black",
"specs": []
},
{
"name": "isort",
"specs": []
}
],
"lcname": "django-taskq"
}