# sqlalchemy-celery-beat
A Scheduler Based Sqlalchemy for Celery.
> NOTE: This project was originally developed by [AngelLiang](https://github.com/AngelLiang/celery-sqlalchemy-scheduler) to use sqlalchemy as the database scheduler for Flask or FastAPI, like [django-celery-beat](https://github.com/celery/django-celery-beat) for django. I am trying to continue on his work and maintain a working solution.
### Prerequisites
- Python 3
- celery >= 5.0
- sqlalchemy >= 1.4
First you must install `celery` and `sqlalchemy`, and `celery` should be >=5.0
```
$ pip install sqlalchemy celery
```
### Installing
Install from PyPi:
```
$ pip install sqlalchemy-celery-beat
```
Install from source by cloning this repository:
```
$ git clone git@github.com:farahats9/sqlalchemy-celery-beat.git
$ cd sqlalchemy-celery-beat
$ python setup.py install
```
## Usage
After you have installed `sqlalchemy_celery_beat`, you can easily start with following steps:
This is a demo for exmaple, you can check the code in `examples` directory
1. start celery worker
```
$ celery -A tasks worker -l info
```
2. start the celery beat with `DatabaseScheduler` as scheduler:
```
$ celery -A tasks beat -S sqlalchemy_celery_beat.schedulers:DatabaseScheduler -l info
```
you can also use the shorthand argument `-S sqlalchemy`
## Description
After the celery beat is started, by default it create a sqlite database(`schedule.db`) in current folder. You can use `SQLiteStudio.exe` to inspect it.
Sample from the `PeriodicTask` model's table
![sqlite](screenshot/sqlite.png)
When you want to update scheduler, you can update the data in `schedule.db`. But `sqlalchemy_celery_beat` don't update the scheduler immediately. Then you shoule be change the first column's `last_update` field in the `celery_periodic_task_changed` to now datetime. Finally the celery beat will update scheduler at next wake-up time.
### Database Configuration
You can configure sqlalchemy db uri when you configure the celery, example as:
```Python
from celery import Celery
celery = Celery('tasks')
beat_dburi = 'sqlite:///schedule.db'
celery.conf.update(
{
'beat_dburi': beat_dburi,
'beat_schema': None # you can make the scheduler tables under different schema (tested for postgresql, not available in sqlite)
}
)
```
Also, you can use MySQL or PostgreSQL.
```Python
# MySQL: `pip install mysql-connector`
beat_dburi = 'mysql+mysqlconnector://root:root@127.0.0.1:3306/celery-schedule'
# PostgreSQL: `pip install psycopg2`
beat_dburi = 'postgresql+psycopg2://postgres:postgres@127.0.0.1:5432/celery-schedule'
```
## Passing arguments to SQLAlchemy engine creation
You can pass arguments using the `beat_engine_options` keyword in the config dictionary, for example let's make the engine use `echo=True` to show verbose ouptut:
```python
celery.conf.update(
{
'beat_dburi': beat_dburi,
'beat_engine_options': {
'echo': True
},
...
}
)
```
You can use this to pass any options required by your DB driver, for more information about what options you can use check the SQLAlchemy docs.
## Example Code 1
View `examples/base/tasks.py` for details.
Run Worker in console 1
$ cd examples/base
# Celery < 5.0
$ celery worker -A tasks:celery -l info
# Celery >= 5.0
$ celery -A tasks:celery worker -l info
Run Beat in console 2
$ cd examples/base
# Celery < 5.0
$ celery beat -A tasks:celery -S tasks:DatabaseScheduler -l info
# Celery >= 5.0
$ celery -A tasks:celery beat -S tasks:DatabaseScheduler -l info
## Example Code 2
### Example creating interval-based periodic task
To create a periodic task executing at an interval you must first
create the interval object:
```python
>>> from sqlalchemy_celery_beat.models import PeriodicTask, IntervalSchedule, Period
>>> from sqlalchemy_celery_beat.session import SessionManager
>>> from celeryconfig import beat_dburi
>>> session_manager = SessionManager()
>>> session = session_manager.session_factory(beat_dburi)
# executes every 10 seconds.
>>> schedule = session.query(IntervalSchedule).filter_by(every=10, period=Period.SECONDS).first()
>>> if not schedule:
... schedule = IntervalSchedule(every=10, period=Period.SECONDS)
... session.add(schedule)
... session.commit()
```
That's all the fields you need: a period type and the frequency.
You can choose between a specific set of periods:
- `Period.DAYS`
- `Period.HOURS`
- `Period.MINUTES`
- `Period.SECONDS`
- `Period.MICROSECONDS`
_note_:
If you have multiple periodic tasks executing every 10 seconds,
then they should all point to the same schedule object.
Now that we have defined the schedule object, we can create the periodic task
entry:
```python
>>> task = PeriodicTask(
... schedule_model=schedule, # we created this above.
... name='Importing contacts', # simply describes this periodic task.
... task='proj.tasks.import_contacts', # name of task.
... )
>>> session.add(task)
>>> session.commit()
```
Note that this is a very basic example, you can also specify the
arguments and keyword arguments used to execute the task, the `queue` to
send it to[\*], and set an expiry time.
Here\'s an example specifying the arguments, note how JSON serialization
is required:
>>> import json
>>> from datetime import datetime, timedelta
>>> periodic_task = PeriodicTask(
... schedule_model=schedule, # we created this above.
... name='Importing contacts', # simply describes this periodic task.
... task='proj.tasks.import_contacts', # name of task.
... args=json.dumps(['arg1', 'arg2']),
... kwargs=json.dumps({
... 'be_careful': True,
... }),
... expires=datetime.utcnow() + timedelta(seconds=30)
... )
... session.add(periodic_task)
... session.commit()
### Example creating crontab-based periodic task
A crontab schedule has the fields: `minute`, `hour`, `day_of_week`,
`day_of_month` and `month_of_year`, so if you want the equivalent of a
`30 * * * *` (execute every 30 minutes) crontab entry you specify:
>>> from sqlalchemy_celery_beat.models import PeriodicTask, CrontabSchedule
>>> schedule = CrontabSchedule(
... minute='30',
... hour='*',
... day_of_week='*',
... day_of_month='*',
... month_of_year='*',
... timezone='UTC',
... )
The crontab schedule is linked to a specific timezone using the
'timezone' input parameter.
Then to create a periodic task using this schedule, use the same
approach as the interval-based periodic task earlier in this document,
the `schedule_model` is a generic foreign-key implementation which makes things very easy and efficient:
>>> periodic_task = PeriodicTask(
... schedule_model=schedule,
... name='Importing contacts',
... task='proj.tasks.import_contacts',
... )
What the previous code actually do is this:
>>> periodic_task = PeriodicTask(
... schedule_id=schedule.id,
... discriminator=schedule.discriminator,
... name='Importing contacts',
... task='proj.tasks.import_contacts',
... )
So when you can use `discriminator` + `schedule_id` or use the convenient property `schedule_model` and it will populate them for you behind the scenes.
### Temporarily disable a periodic task
You can use the `enabled` flag to temporarily disable a periodic task:
>>> periodic_task.enabled = False
>>> session.add(periodic_task)
>>> session.commit()
If you are using a bulk operation to update or delete multiple tasks at the same time, the changes won't be noticed by the scheduler until you do `PeriodicTaskChanged.update_changed()` or `.update_from_session()`
example:
``` python
from sqlalchemy_celery_beat.models import PeriodicTaskChanged
from sqlalchemy_celery_beat.session import SessionManager, session_cleanup
session_manager = SessionManager()
session = session_manager.session_factory(beat_dburi)
with session_cleanup(session):
stmt = update(PeriodicTask).where(PeriodicTask.name == 'task-123').values(enabled=False)
session.execute(stmt) # using execute causes no orm event to fire, changes are in the database but the schduler has no idea
session.commit()
PeriodicTaskChanged.update_from_session(session)
# now scheduler reloads the tasks and all is good
```
This is not needed when you are updating a specific object using `session.add(task)` because it will trigger the `after_update`, `after_delete` or `after_insert` events.
### Example running periodic tasks
The periodic tasks still need 'workers' to execute them. So make sure
the default **Celery** package is installed. (If not installed, please
follow the installation instructions here:
<https://github.com/celery/celery>)
Both the worker and beat services need to be running at the same time.
1. Start a Celery worker service (specify your project name):
$ celery -A [project-name] worker --loglevel=info
2. As a separate process, start the beat service (specify the
scheduler):
$ celery -A [project-name] beat -l info --scheduler sqlalchemy_celery_beat.schedulers:DatabaseScheduler
## Working on adding the following features
- ✅ Add `ClockedSchedule` model
- ✅ Implement a generic foreign key
- ✅ More robust attribute validation on models
- ✅ Add Tests
- Add more examples
- Support for Async drivers like asyncpg and psycopg3 async mode
- Use Alembic migrations
Any help with the tasks above or feedback is appreciated 🙂
## Acknowledgments
- [django-celery-beat](https://github.com/celery/django-celery-beat)
- [celerybeatredis](https://github.com/liuliqiang/celerybeatredis)
- [celery](https://github.com/celery/celery)
- [SQLAlchemy](https://www.sqlalchemy.org)
Raw data
{
"_id": null,
"home_page": "https://github.com/farahats9/sqlalchemy-celery-beat",
"name": "sqlalchemy-celery-beat",
"maintainer": null,
"docs_url": null,
"requires_python": null,
"maintainer_email": null,
"keywords": "celery scheduler sqlalchemy beat",
"author": "Mohamed Farahat",
"author_email": "farahats9@yahoo.com",
"download_url": "https://files.pythonhosted.org/packages/83/9b/696105fdcfb3c434ec5ef4bd5aa184cadf3a88440aae3c27e2fe606e990b/sqlalchemy_celery_beat-0.8.0.tar.gz",
"platform": "any",
"description": "# sqlalchemy-celery-beat\n\nA Scheduler Based Sqlalchemy for Celery.\n\n> NOTE: This project was originally developed by [AngelLiang](https://github.com/AngelLiang/celery-sqlalchemy-scheduler) to use sqlalchemy as the database scheduler for Flask or FastAPI, like [django-celery-beat](https://github.com/celery/django-celery-beat) for django. I am trying to continue on his work and maintain a working solution.\n\n\n### Prerequisites\n\n- Python 3\n- celery >= 5.0\n- sqlalchemy >= 1.4\n\nFirst you must install `celery` and `sqlalchemy`, and `celery` should be >=5.0\n\n```\n$ pip install sqlalchemy celery\n```\n\n### Installing\n\nInstall from PyPi:\n\n```\n$ pip install sqlalchemy-celery-beat\n```\n\nInstall from source by cloning this repository:\n\n```\n$ git clone git@github.com:farahats9/sqlalchemy-celery-beat.git\n$ cd sqlalchemy-celery-beat\n$ python setup.py install\n```\n\n## Usage\n\nAfter you have installed `sqlalchemy_celery_beat`, you can easily start with following steps:\n\nThis is a demo for exmaple, you can check the code in `examples` directory\n\n1. start celery worker\n\n ```\n $ celery -A tasks worker -l info\n ```\n\n2. start the celery beat with `DatabaseScheduler` as scheduler:\n\n ```\n $ celery -A tasks beat -S sqlalchemy_celery_beat.schedulers:DatabaseScheduler -l info\n ```\n you can also use the shorthand argument `-S sqlalchemy`\n## Description\n\nAfter the celery beat is started, by default it create a sqlite database(`schedule.db`) in current folder. You can use `SQLiteStudio.exe` to inspect it.\n\nSample from the `PeriodicTask` model's table\n\n![sqlite](screenshot/sqlite.png)\n\nWhen you want to update scheduler, you can update the data in `schedule.db`. But `sqlalchemy_celery_beat` don't update the scheduler immediately. Then you shoule be change the first column's `last_update` field in the `celery_periodic_task_changed` to now datetime. Finally the celery beat will update scheduler at next wake-up time.\n\n### Database Configuration\n\nYou can configure sqlalchemy db uri when you configure the celery, example as:\n\n```Python\nfrom celery import Celery\n\ncelery = Celery('tasks')\n\nbeat_dburi = 'sqlite:///schedule.db'\n\ncelery.conf.update(\n {\n 'beat_dburi': beat_dburi,\n 'beat_schema': None # you can make the scheduler tables under different schema (tested for postgresql, not available in sqlite)\n }\n)\n```\n\nAlso, you can use MySQL or PostgreSQL.\n\n```Python\n# MySQL: `pip install mysql-connector`\nbeat_dburi = 'mysql+mysqlconnector://root:root@127.0.0.1:3306/celery-schedule'\n\n# PostgreSQL: `pip install psycopg2`\nbeat_dburi = 'postgresql+psycopg2://postgres:postgres@127.0.0.1:5432/celery-schedule'\n```\n\n## Passing arguments to SQLAlchemy engine creation\nYou can pass arguments using the `beat_engine_options` keyword in the config dictionary, for example let's make the engine use `echo=True` to show verbose ouptut:\n\n```python\ncelery.conf.update(\n {\n 'beat_dburi': beat_dburi,\n 'beat_engine_options': {\n 'echo': True\n },\n ...\n }\n)\n```\nYou can use this to pass any options required by your DB driver, for more information about what options you can use check the SQLAlchemy docs.\n\n## Example Code 1\n\nView `examples/base/tasks.py` for details.\n\n\nRun Worker in console 1\n\n $ cd examples/base\n\n # Celery < 5.0\n $ celery worker -A tasks:celery -l info\n\n # Celery >= 5.0\n $ celery -A tasks:celery worker -l info\n\nRun Beat in console 2\n\n $ cd examples/base\n\n # Celery < 5.0\n $ celery beat -A tasks:celery -S tasks:DatabaseScheduler -l info\n\n # Celery >= 5.0\n $ celery -A tasks:celery beat -S tasks:DatabaseScheduler -l info\n\n## Example Code 2\n\n### Example creating interval-based periodic task\n\nTo create a periodic task executing at an interval you must first\ncreate the interval object:\n\n```python\n>>> from sqlalchemy_celery_beat.models import PeriodicTask, IntervalSchedule, Period\n>>> from sqlalchemy_celery_beat.session import SessionManager\n>>> from celeryconfig import beat_dburi\n>>> session_manager = SessionManager()\n>>> session = session_manager.session_factory(beat_dburi)\n\n# executes every 10 seconds.\n>>> schedule = session.query(IntervalSchedule).filter_by(every=10, period=Period.SECONDS).first()\n>>> if not schedule:\n... schedule = IntervalSchedule(every=10, period=Period.SECONDS)\n... session.add(schedule)\n... session.commit()\n```\n\nThat's all the fields you need: a period type and the frequency.\n\nYou can choose between a specific set of periods:\n\n- `Period.DAYS`\n- `Period.HOURS`\n- `Period.MINUTES`\n- `Period.SECONDS`\n- `Period.MICROSECONDS`\n\n_note_:\n\n If you have multiple periodic tasks executing every 10 seconds,\n then they should all point to the same schedule object.\n\nNow that we have defined the schedule object, we can create the periodic task\nentry:\n\n```python\n >>> task = PeriodicTask(\n ... schedule_model=schedule, # we created this above.\n ... name='Importing contacts', # simply describes this periodic task.\n ... task='proj.tasks.import_contacts', # name of task.\n ... )\n >>> session.add(task)\n >>> session.commit()\n```\n\nNote that this is a very basic example, you can also specify the\narguments and keyword arguments used to execute the task, the `queue` to\nsend it to[\\*], and set an expiry time.\n\nHere\\'s an example specifying the arguments, note how JSON serialization\nis required:\n\n >>> import json\n >>> from datetime import datetime, timedelta\n\n >>> periodic_task = PeriodicTask(\n ... schedule_model=schedule, # we created this above.\n ... name='Importing contacts', # simply describes this periodic task.\n ... task='proj.tasks.import_contacts', # name of task.\n ... args=json.dumps(['arg1', 'arg2']),\n ... kwargs=json.dumps({\n ... 'be_careful': True,\n ... }),\n ... expires=datetime.utcnow() + timedelta(seconds=30)\n ... )\n ... session.add(periodic_task)\n ... session.commit()\n\n### Example creating crontab-based periodic task\n\nA crontab schedule has the fields: `minute`, `hour`, `day_of_week`,\n`day_of_month` and `month_of_year`, so if you want the equivalent of a\n`30 * * * *` (execute every 30 minutes) crontab entry you specify:\n\n >>> from sqlalchemy_celery_beat.models import PeriodicTask, CrontabSchedule\n >>> schedule = CrontabSchedule(\n ... minute='30',\n ... hour='*',\n ... day_of_week='*',\n ... day_of_month='*',\n ... month_of_year='*',\n ... timezone='UTC',\n ... )\n\nThe crontab schedule is linked to a specific timezone using the\n'timezone' input parameter.\n\nThen to create a periodic task using this schedule, use the same\napproach as the interval-based periodic task earlier in this document,\nthe `schedule_model` is a generic foreign-key implementation which makes things very easy and efficient:\n\n >>> periodic_task = PeriodicTask(\n ... schedule_model=schedule,\n ... name='Importing contacts',\n ... task='proj.tasks.import_contacts',\n ... )\n\nWhat the previous code actually do is this:\n\n >>> periodic_task = PeriodicTask(\n ... schedule_id=schedule.id,\n ... discriminator=schedule.discriminator,\n ... name='Importing contacts',\n ... task='proj.tasks.import_contacts',\n ... )\nSo when you can use `discriminator` + `schedule_id` or use the convenient property `schedule_model` and it will populate them for you behind the scenes.\n\n### Temporarily disable a periodic task\n\nYou can use the `enabled` flag to temporarily disable a periodic task:\n\n >>> periodic_task.enabled = False\n >>> session.add(periodic_task)\n >>> session.commit()\n\nIf you are using a bulk operation to update or delete multiple tasks at the same time, the changes won't be noticed by the scheduler until you do `PeriodicTaskChanged.update_changed()` or `.update_from_session()`\n\nexample:\n``` python\nfrom sqlalchemy_celery_beat.models import PeriodicTaskChanged\nfrom sqlalchemy_celery_beat.session import SessionManager, session_cleanup\n\nsession_manager = SessionManager()\nsession = session_manager.session_factory(beat_dburi)\n\nwith session_cleanup(session):\n stmt = update(PeriodicTask).where(PeriodicTask.name == 'task-123').values(enabled=False)\n\n session.execute(stmt) # using execute causes no orm event to fire, changes are in the database but the schduler has no idea\n session.commit()\n\n PeriodicTaskChanged.update_from_session(session)\n # now scheduler reloads the tasks and all is good\n```\nThis is not needed when you are updating a specific object using `session.add(task)` because it will trigger the `after_update`, `after_delete` or `after_insert` events.\n\n### Example running periodic tasks\n\nThe periodic tasks still need 'workers' to execute them. So make sure\nthe default **Celery** package is installed. (If not installed, please\nfollow the installation instructions here:\n<https://github.com/celery/celery>)\n\nBoth the worker and beat services need to be running at the same time.\n\n1. Start a Celery worker service (specify your project name):\n\n $ celery -A [project-name] worker --loglevel=info\n\n2. As a separate process, start the beat service (specify the\n scheduler):\n\n $ celery -A [project-name] beat -l info --scheduler sqlalchemy_celery_beat.schedulers:DatabaseScheduler\n\n## Working on adding the following features\n\n- \u2705 Add `ClockedSchedule` model\n- \u2705 Implement a generic foreign key\n- \u2705 More robust attribute validation on models\n- \u2705 Add Tests\n- Add more examples\n- Support for Async drivers like asyncpg and psycopg3 async mode\n- Use Alembic migrations\n\nAny help with the tasks above or feedback is appreciated \ud83d\ude42\n\n## Acknowledgments\n\n- [django-celery-beat](https://github.com/celery/django-celery-beat)\n- [celerybeatredis](https://github.com/liuliqiang/celerybeatredis)\n- [celery](https://github.com/celery/celery)\n- [SQLAlchemy](https://www.sqlalchemy.org)\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "A Scheduler Based SQLalchemy For Celery",
"version": "0.8.0",
"project_urls": {
"Homepage": "https://github.com/farahats9/sqlalchemy-celery-beat"
},
"split_keywords": [
"celery",
"scheduler",
"sqlalchemy",
"beat"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "79b85709e37ab1a30abf376b6529f16a910241a84fd681241318ad16d893ea90",
"md5": "81b3e92a1db461dbb9b75b1bacd6d7a5",
"sha256": "41ea50b4934a42276dcaf20cd80f1e8e85cf07cec7d32694d891961e7dfea495"
},
"downloads": -1,
"filename": "sqlalchemy_celery_beat-0.8.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "81b3e92a1db461dbb9b75b1bacd6d7a5",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": null,
"size": 21143,
"upload_time": "2024-06-11T20:15:07",
"upload_time_iso_8601": "2024-06-11T20:15:07.929660Z",
"url": "https://files.pythonhosted.org/packages/79/b8/5709e37ab1a30abf376b6529f16a910241a84fd681241318ad16d893ea90/sqlalchemy_celery_beat-0.8.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "839b696105fdcfb3c434ec5ef4bd5aa184cadf3a88440aae3c27e2fe606e990b",
"md5": "f1bed5bc240bd0f7a151266fba97a900",
"sha256": "cedd679f757043602d5d5b0f9ef102bbbd6becb97d78e1748a635e454b69b883"
},
"downloads": -1,
"filename": "sqlalchemy_celery_beat-0.8.0.tar.gz",
"has_sig": false,
"md5_digest": "f1bed5bc240bd0f7a151266fba97a900",
"packagetype": "sdist",
"python_version": "source",
"requires_python": null,
"size": 31214,
"upload_time": "2024-06-11T20:15:28",
"upload_time_iso_8601": "2024-06-11T20:15:28.191129Z",
"url": "https://files.pythonhosted.org/packages/83/9b/696105fdcfb3c434ec5ef4bd5aa184cadf3a88440aae3c27e2fe606e990b/sqlalchemy_celery_beat-0.8.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-06-11 20:15:28",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "farahats9",
"github_project": "sqlalchemy-celery-beat",
"travis_ci": false,
"coveralls": false,
"github_actions": false,
"requirements": [
{
"name": "celery",
"specs": [
[
">=",
"5.0"
]
]
},
{
"name": "sqlalchemy",
"specs": [
[
">=",
"1.4.0"
]
]
},
{
"name": "python-dotenv",
"specs": []
},
{
"name": "tzdata",
"specs": []
},
{
"name": "ephem",
"specs": []
}
],
"lcname": "sqlalchemy-celery-beat"
}