Name | celery-pulse JSON |
Version |
0.0.3
JSON |
| download |
home_page | None |
Summary | A simple, framework-agnostic Celery health check mechanism using a heartbeat file. |
upload_time | 2025-07-16 10:46:18 |
maintainer | None |
docs_url | None |
author | None |
requires_python | >=3.9 |
license | MIT License
Copyright (c) 2025 Kulbarakov Bakdoolot
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
|
keywords |
celery
healthcheck
monitoring
heartbeat
django
flask
fastapi
|
VCS |
|
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
No Travis.
|
coveralls test coverage |
No coveralls.
|
# Celery Pulse
[](https://badge.fury.io/py/celery-pulse)
[](https://opensource.org/licenses/MIT)

A simple, framework-agnostic heartbeat mechanism for Celery.
`celery-pulse` helps you monitor the true health of your Celery workers and beat scheduler. A common problem with Celery monitoring is that a worker process can be running but "stuck" or frozen, unable to process tasks. Standard health checks that just check if the process exists will report a false positive.
This library solves the problem by running a lightweight background thread (or greenlet) inside each Celery worker process and the beat process. This thread's only job is to periodically "touch" a healthcheck file. If the file's modification timestamp is not recent, it means the process is unresponsive, allowing you to take action.
## Key Features
- **Framework-Agnostic:** Works with Django, Flask, FastAPI, or any other Python application.
- **Smart Pool Detection:** Automatically uses the correct concurrency primitive:
- `gevent`: A non-blocking greenlet via a Celery bootstep.
- `prefork`/`solo`: A standard background thread via Celery signals.
- **Beat Support:** Monitors the `celery beat` scheduler process in addition to workers.
- **Lightweight & Safe:** The heartbeat runs in a separate thread/greenlet and won't block your tasks.
- **Simple Integration:** Requires only a one-line function call to set up.
- **Easy to Monitor:** Compatible with any monitoring system that can check a file's timestamp (Docker `HEALTHCHECK`, Kubernetes `livenessProbe`, etc.).
## Installation
```bash
pip install celery-pulse
```
If you are using the `gevent` worker pool, you can install the required dependency as an extra:
```bash
pip install celery-pulse[gevent]
```
## Quick Start
The only step is to import and call `init_celery_pulse()` with your Celery app instance after it has been configured.
### Example: Standalone Celery (Flask, FastAPI, etc.)
In your Celery app file (e.g., `tasks.py`):
```python
# tasks.py
from celery import Celery
from celery_pulse import init_celery_pulse
app = Celery('my_tasks', broker='redis://localhost:6379/0')
# Configure celery-pulse settings directly in Celery's config
app.conf.update(
pulse_heartbeat_interval=30, # Heartbeat every 30 seconds
pulse_healthcheck_file="/var/run/celery_health.txt" # Path to the heartbeat file
)
# Initialize the heartbeat mechanism
init_celery_pulse(app)
# --- Define your tasks as usual ---
@app.task
def add(x, y):
return x + y
```
### Example: Django Integration
In your project's `celery.py` file:
```python
# myproject/celery.py
import os
from celery import Celery
from celery_pulse import init_celery_pulse
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
app = Celery('myproject')
# Use a namespace for all Celery-related settings in django's settings.py
app.config_from_object('django.conf:settings', namespace='CELERY')
# Initialize the heartbeat mechanism
# It will read its configuration from your Django settings
init_celery_pulse(app)
app.autodiscover_tasks()
```
Then, configure the settings in your `myproject/settings.py`:
```python
# myproject/settings.py
# ... your other Django settings
# Celery settings
CELERY_BROKER_URL = "redis://localhost:6379/0"
# ... other celery settings
# Celery Pulse settings (must use the CELERY_ namespace)
CELERY_PULSE_HEARTBEAT_INTERVAL = 30
CELERY_PULSE_HEALTHCHECK_FILE = "/var/run/myproject/celery_health.txt"
```
## Configuration
`celery-pulse` is configured through your Celery application's configuration.
| Parameter | `app.conf` key | Django `settings.py` key | Description | Default |
| :--- | :--- | :--- | :--- | :--- |
| **Interval** | `pulse_heartbeat_interval` | `CELERY_PULSE_HEARTBEAT_INTERVAL` | The interval in seconds at which the heartbeat file is touched. | `60` |
| **File Path** | `pulse_healthcheck_file` | `CELERY_PULSE_HEALTHCHECK_FILE` | The absolute path to the healthcheck file that will be created and updated. | `"/tmp/celery_healthcheck"` |
## How to Use the Healthcheck File
Once `celery-pulse` is running, it will update the specified file every `pulse_heartbeat_interval` seconds. Your monitoring system should check if the file's last modification time is recent.
A good rule of thumb is to fail the health check if the file hasn't been updated in `2 * pulse_heartbeat_interval` seconds. This provides a safe margin for delays.
### Example: Docker `HEALTHCHECK`
In your `Dockerfile`, you can add a health check that verifies the file was modified within the last 65 seconds (assuming a 30-second interval).
```dockerfile
# Assuming pulse_heartbeat_interval = 30
HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \
CMD [ -f /var/run/celery_health.txt ] && [ $(($(date +%s) - $(stat -c %Y /var/run/celery_health.txt))) -lt 65 ] || exit 1
```
* `[ -f ... ]`: Checks if the file exists.
* `$(($(date +%s) - $(stat -c %Y ...)))`: Calculates the age of the file in seconds.
* `-lt 65`: Checks if the age is less than 65 seconds.
### Example: Kubernetes `livenessProbe`
In your Kubernetes deployment YAML, you can add a `livenessProbe` to automatically restart a pod if its worker becomes unresponsive.
```yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: celery-worker
spec:
# ...
template:
# ...
spec:
containers:
- name: worker
image: my-celery-app:latest
command: ["celery", "-A", "myproject", "worker", "-l", "info"]
livenessProbe:
exec:
command:
- /bin/sh
- -c
- "[ -f /var/run/celery_health.txt ] && [ $(($(date +%s) - $(stat -c %Y /var/run/celery_health.txt))) -lt 65 ]"
initialDelaySeconds: 60 # Give time for the worker to start
periodSeconds: 30 # Check every 30 seconds
failureThreshold: 2 # Fail after 2 consecutive failures
```
## How It Works
`celery-pulse` intelligently adapts to the Celery execution environment:
1. **Gevent Pool**: If `gevent` is detected as the worker pool, `celery-pulse` registers a Celery `bootstep`. This starts a dedicated, non-blocking **greenlet** that runs the heartbeat loop. This is the most efficient method for `gevent`.
2. **Prefork/Solo Pool & Beat**: For the default `prefork` pool, the `solo` pool, and the `celery beat` scheduler, the library connects to Celery's startup signals (`worker_process_init`, `beat_init`). When a process starts, it launches a standard Python **thread** to manage the heartbeat. The thread is gracefully stopped on shutdown.
3. **Pool Detection**: The library uses the `on_after_configure` signal to inspect the configured `worker_pool` and sets an environment variable. This variable is inherited by forked worker processes, allowing them to know which heartbeat strategy to use.
## Contributing
Contributions are welcome! If you find a bug or have an idea for an improvement, please open an issue or submit a pull request on our [GitHub repository](https://github.com/yourusername/celery-pulse).
## License
This project is licensed under the MIT License. See the [LICENSE](LICENSE) file for details.
Raw data
{
"_id": null,
"home_page": null,
"name": "celery-pulse",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.9",
"maintainer_email": null,
"keywords": "celery, healthcheck, monitoring, heartbeat, django, flask, fastapi",
"author": null,
"author_email": "Kulbarakov Bakdoolot <kulbarakovbh@gmail.com>",
"download_url": "https://files.pythonhosted.org/packages/44/cd/00176ac2628e9b09661c5bfd9126cb59d8254024fc7e16fe748ac0fdc80f/celery_pulse-0.0.3.tar.gz",
"platform": null,
"description": "# Celery Pulse\n\n[](https://badge.fury.io/py/celery-pulse)\n[](https://opensource.org/licenses/MIT)\n\n\nA simple, framework-agnostic heartbeat mechanism for Celery.\n\n`celery-pulse` helps you monitor the true health of your Celery workers and beat scheduler. A common problem with Celery monitoring is that a worker process can be running but \"stuck\" or frozen, unable to process tasks. Standard health checks that just check if the process exists will report a false positive.\n\nThis library solves the problem by running a lightweight background thread (or greenlet) inside each Celery worker process and the beat process. This thread's only job is to periodically \"touch\" a healthcheck file. If the file's modification timestamp is not recent, it means the process is unresponsive, allowing you to take action.\n\n## Key Features\n\n- **Framework-Agnostic:** Works with Django, Flask, FastAPI, or any other Python application.\n- **Smart Pool Detection:** Automatically uses the correct concurrency primitive:\n - `gevent`: A non-blocking greenlet via a Celery bootstep.\n - `prefork`/`solo`: A standard background thread via Celery signals.\n- **Beat Support:** Monitors the `celery beat` scheduler process in addition to workers.\n- **Lightweight & Safe:** The heartbeat runs in a separate thread/greenlet and won't block your tasks.\n- **Simple Integration:** Requires only a one-line function call to set up.\n- **Easy to Monitor:** Compatible with any monitoring system that can check a file's timestamp (Docker `HEALTHCHECK`, Kubernetes `livenessProbe`, etc.).\n\n## Installation\n\n```bash\npip install celery-pulse\n```\n\nIf you are using the `gevent` worker pool, you can install the required dependency as an extra:\n```bash\npip install celery-pulse[gevent]\n```\n\n## Quick Start\n\nThe only step is to import and call `init_celery_pulse()` with your Celery app instance after it has been configured.\n\n### Example: Standalone Celery (Flask, FastAPI, etc.)\n\nIn your Celery app file (e.g., `tasks.py`):\n\n```python\n# tasks.py\nfrom celery import Celery\nfrom celery_pulse import init_celery_pulse\n\napp = Celery('my_tasks', broker='redis://localhost:6379/0')\n\n# Configure celery-pulse settings directly in Celery's config\napp.conf.update(\n pulse_heartbeat_interval=30, # Heartbeat every 30 seconds\n pulse_healthcheck_file=\"/var/run/celery_health.txt\" # Path to the heartbeat file\n)\n\n# Initialize the heartbeat mechanism\ninit_celery_pulse(app)\n\n# --- Define your tasks as usual ---\n@app.task\ndef add(x, y):\n return x + y\n```\n\n### Example: Django Integration\n\nIn your project's `celery.py` file:\n\n```python\n# myproject/celery.py\nimport os\nfrom celery import Celery\nfrom celery_pulse import init_celery_pulse\n\nos.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')\n\napp = Celery('myproject')\n\n# Use a namespace for all Celery-related settings in django's settings.py\napp.config_from_object('django.conf:settings', namespace='CELERY')\n\n# Initialize the heartbeat mechanism\n# It will read its configuration from your Django settings\ninit_celery_pulse(app)\n\napp.autodiscover_tasks()\n```\n\nThen, configure the settings in your `myproject/settings.py`:\n\n```python\n# myproject/settings.py\n\n# ... your other Django settings\n\n# Celery settings\nCELERY_BROKER_URL = \"redis://localhost:6379/0\"\n# ... other celery settings\n\n# Celery Pulse settings (must use the CELERY_ namespace)\nCELERY_PULSE_HEARTBEAT_INTERVAL = 30\nCELERY_PULSE_HEALTHCHECK_FILE = \"/var/run/myproject/celery_health.txt\"\n```\n\n## Configuration\n\n`celery-pulse` is configured through your Celery application's configuration.\n\n| Parameter | `app.conf` key | Django `settings.py` key | Description | Default |\n| :--- | :--- | :--- | :--- | :--- |\n| **Interval** | `pulse_heartbeat_interval` | `CELERY_PULSE_HEARTBEAT_INTERVAL` | The interval in seconds at which the heartbeat file is touched. | `60` |\n| **File Path** | `pulse_healthcheck_file` | `CELERY_PULSE_HEALTHCHECK_FILE` | The absolute path to the healthcheck file that will be created and updated. | `\"/tmp/celery_healthcheck\"` |\n\n## How to Use the Healthcheck File\n\nOnce `celery-pulse` is running, it will update the specified file every `pulse_heartbeat_interval` seconds. Your monitoring system should check if the file's last modification time is recent.\n\nA good rule of thumb is to fail the health check if the file hasn't been updated in `2 * pulse_heartbeat_interval` seconds. This provides a safe margin for delays.\n\n### Example: Docker `HEALTHCHECK`\n\nIn your `Dockerfile`, you can add a health check that verifies the file was modified within the last 65 seconds (assuming a 30-second interval).\n\n```dockerfile\n# Assuming pulse_heartbeat_interval = 30\nHEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \\\n CMD [ -f /var/run/celery_health.txt ] && [ $(($(date +%s) - $(stat -c %Y /var/run/celery_health.txt))) -lt 65 ] || exit 1\n```\n* `[ -f ... ]`: Checks if the file exists.\n* `$(($(date +%s) - $(stat -c %Y ...)))`: Calculates the age of the file in seconds.\n* `-lt 65`: Checks if the age is less than 65 seconds.\n\n### Example: Kubernetes `livenessProbe`\n\nIn your Kubernetes deployment YAML, you can add a `livenessProbe` to automatically restart a pod if its worker becomes unresponsive.\n\n```yaml\napiVersion: apps/v1\nkind: Deployment\nmetadata:\n name: celery-worker\nspec:\n # ...\n template:\n # ...\n spec:\n containers:\n - name: worker\n image: my-celery-app:latest\n command: [\"celery\", \"-A\", \"myproject\", \"worker\", \"-l\", \"info\"]\n livenessProbe:\n exec:\n command:\n - /bin/sh\n - -c\n - \"[ -f /var/run/celery_health.txt ] && [ $(($(date +%s) - $(stat -c %Y /var/run/celery_health.txt))) -lt 65 ]\"\n initialDelaySeconds: 60 # Give time for the worker to start\n periodSeconds: 30 # Check every 30 seconds\n failureThreshold: 2 # Fail after 2 consecutive failures\n```\n\n## How It Works\n\n`celery-pulse` intelligently adapts to the Celery execution environment:\n\n1. **Gevent Pool**: If `gevent` is detected as the worker pool, `celery-pulse` registers a Celery `bootstep`. This starts a dedicated, non-blocking **greenlet** that runs the heartbeat loop. This is the most efficient method for `gevent`.\n2. **Prefork/Solo Pool & Beat**: For the default `prefork` pool, the `solo` pool, and the `celery beat` scheduler, the library connects to Celery's startup signals (`worker_process_init`, `beat_init`). When a process starts, it launches a standard Python **thread** to manage the heartbeat. The thread is gracefully stopped on shutdown.\n3. **Pool Detection**: The library uses the `on_after_configure` signal to inspect the configured `worker_pool` and sets an environment variable. This variable is inherited by forked worker processes, allowing them to know which heartbeat strategy to use.\n\n## Contributing\n\nContributions are welcome! If you find a bug or have an idea for an improvement, please open an issue or submit a pull request on our [GitHub repository](https://github.com/yourusername/celery-pulse).\n\n## License\n\nThis project is licensed under the MIT License. See the [LICENSE](LICENSE) file for details.\n",
"bugtrack_url": null,
"license": "MIT License\n \n Copyright (c) 2025 Kulbarakov Bakdoolot\n \n Permission is hereby granted, free of charge, to any person obtaining a copy\n of this software and associated documentation files (the \"Software\"), to deal\n in the Software without restriction, including without limitation the rights\n to use, copy, modify, merge, publish, distribute, sublicense, and/or sell\n copies of the Software, and to permit persons to whom the Software is\n furnished to do so, subject to the following conditions:\n \n The above copyright notice and this permission notice shall be included in all\n copies or substantial portions of the Software.\n \n THE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\n IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,\n FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE\n AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER\n LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,\n OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE\n SOFTWARE.\n ",
"summary": "A simple, framework-agnostic Celery health check mechanism using a heartbeat file.",
"version": "0.0.3",
"project_urls": null,
"split_keywords": [
"celery",
" healthcheck",
" monitoring",
" heartbeat",
" django",
" flask",
" fastapi"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "29a316e22dff5f5bb4a809b36890e0b54660c5be36a5d7c10ed69d3f7d656d76",
"md5": "15a1701cb434e80efadbc47efa818a1d",
"sha256": "a2838d2125474aa90cfb5d2ef92247fd798031a6ff7703d302eec02d579f3c34"
},
"downloads": -1,
"filename": "celery_pulse-0.0.3-py3-none-any.whl",
"has_sig": false,
"md5_digest": "15a1701cb434e80efadbc47efa818a1d",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.9",
"size": 9425,
"upload_time": "2025-07-16T10:46:16",
"upload_time_iso_8601": "2025-07-16T10:46:16.636921Z",
"url": "https://files.pythonhosted.org/packages/29/a3/16e22dff5f5bb4a809b36890e0b54660c5be36a5d7c10ed69d3f7d656d76/celery_pulse-0.0.3-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "44cd00176ac2628e9b09661c5bfd9126cb59d8254024fc7e16fe748ac0fdc80f",
"md5": "66095b931b50671fabcc627b133a861f",
"sha256": "e7030fe93cdc2c4d711cd21d6f962397dc12b6793b615ffa02089487e5c3669a"
},
"downloads": -1,
"filename": "celery_pulse-0.0.3.tar.gz",
"has_sig": false,
"md5_digest": "66095b931b50671fabcc627b133a861f",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.9",
"size": 8672,
"upload_time": "2025-07-16T10:46:18",
"upload_time_iso_8601": "2025-07-16T10:46:18.047665Z",
"url": "https://files.pythonhosted.org/packages/44/cd/00176ac2628e9b09661c5bfd9126cb59d8254024fc7e16fe748ac0fdc80f/celery_pulse-0.0.3.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-07-16 10:46:18",
"github": false,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"lcname": "celery-pulse"
}