# delayed
[![Build Status](https://github.com/keakon/delayed/actions/workflows/python.yml/badge.svg)](https://github.com/keakon/delayed/actions)
[![Coverage](https://codecov.io/gh/keakon/delayed/branch/master/graph/badge.svg)](https://codecov.io/gh/keakon/delayed)
Delayed is a simple but robust task queue inspired by [rq](https://python-rq.org/).
## Features
* Robust: all the enqueued tasks will run exactly once, even if the worker got killed at any time.
* Clean: finished tasks (including failed) take no space of your Redis.
* Distributed: workers as more as needed can run in the same time without further config.
* Portable: its [Go](https://github.com/keakon/go-delayed) and [Python](https://github.com/keakon/delayed) version can call each other.
## Requirements
1. Python 3.7 or later, tested on CPython 3.7 - 3.12. Versions before 1.0 have also been tested on CPython 2.7, PyPy and PyPy3.
2. To gracefully stop the workers, Unix-like systems (with Unix signal) are required, tested on Ubuntu 22.04 and macOS Monterey 12.
3. Redis 2.6.0 or later (with Lua scripts).
## Getting started
1. Run a redis server:
```bash
$ redis-server
```
2. Install delayed:
```bash
$ pip install delayed
```
3. Create a task queue:
```python
import redis
from delayed.queue import Queue
conn = redis.Redis()
queue = Queue(name='default', conn=conn)
```
4. Enqueue tasks:
* Four ways to enqueue Python tasks:
1. Define a task function and enqueue it:
```python
from delayed.delay import delayed
delayed = delayed(queue)
i = 0
@delayed
def delayed_add(a, b):
return a + b
@delayed(retry=3)
def retry_div(x):
global i
i += 1
return x / (i - 1)
delayed_add.delay(1, 2) # enqueue delayed_add
delayed_add.delay(1, b=2) # same as above
delayed_add(1, 2) # call it immediately
retry_div.delay(1) # enqueue retry_div
```
2. Directly enqueue a function:
```python
from delayed.delay import delayed
delayed = delayed(queue)
def add(a, b):
return a + b
delayed(add).delay(1, 2)
delayed(add).delay(1, b=2) # same as above
delayed(retry=3)(add).delay(1, b=2)
delayed(add, retry=3).delay(1, b=2) # same as above
```
3. Create a task and enqueue it:
```python
from delayed.task import PyTask
def add(a, b):
return a + b
task = PyTask(func=add, args=(1,), kwargs={'b': 2}, retry=1)
queue.enqueue(task)
```
4. Enqueue a predefined task function without importing it (the fastest and lightest way):
```python
from delayed.task import PyTask
task = PyTask(func='test:add', args=(1,), kwargs={'b': 2}, retry=1)
queue.enqueue(task)
```
* Enqueue Go tasks:
```python
from delayed.task import GoTask
task = GoTask(func_path='syscall.Kill', args=(0, 1))
queue.enqueue(task)
task = GoTask(func_path='fmt.Printf', args=('%d %s\n', [1, 'test'])) # the variadic argument needs to be a list or tuple
queue.enqueue(task)
task = GoTask('fmt.Println', (1, 'test')) # if the variadic argument is the only argument, it's not required to wrap it with a list or tuple
queue.enqueue(task)
```
5. Run a task worker (or more) in a separated process:
```python
import redis
from delayed.queue import Queue
from delayed.worker import Worker
conn = redis.Redis()
queue = Queue(name='default', conn=conn)
worker = Worker(queue=queue)
worker.run()
```
6. Run a task sweeper in a separated process to recovery lost tasks (mainly due to the worker got killed):
```python
import redis
from delayed.queue import Queue
from delayed.sweeper import Sweeper
conn = redis.Redis()
queue = Queue(name='default', conn=conn)
sweeper = Sweeper(queues=[queue])
sweeper.run()
```
## Examples
See [examples](examples).
```bash
$ redis-server &
$ pip install delayed
$ python -m examples.sweeper &
$ python -m examples.worker &
$ python -m examples.caller
```
## QA
1. **Q: What's the limitation on a task function?**
A: A Python task function should be defined in module level (except the `__main__` module).
Its `args` and `kwargs` should be serializable by [MessagePack](https://msgpack.org/).
After deserializing, the type of `args` and `kwargs` passed to the task function might be changed (tuple -> list), so it should take care of this change.
2. **Q: What's the `name` param of a queue?**
A: It's the key used to store the tasks of the queue. A queue with name "default" will use those keys:
* default: list, enqueued tasks.
* default_noti: list, the same length as enqueued tasks.
* default_processing: hash, the processing task of workers.
3. **Q: What's lost tasks?**
A: There are 2 situations a task might get lost:
* a worker popped a task notification, then got killed before dequeueing the task.
* a worker dequeued a task, then got killed before releasing the task.
4. **Q: How to recovery lost tasks?**
A: Runs a sweeper. It dose two things:
* it keeps the task notification length the same as the task queue.
* it checks the processing list, if the worker is dead, moves the processing task back to the task queue.
5. **Q: How to turn on the debug logs?**
A: Adds a `logging.DEBUG` level handler to `delayed.logger.logger`. The simplest way is to call `delayed.logger.setup_logger()`:
```python
from delayed.logger import setup_logger
setup_logger()
```
## Release notes
* 1.2:
1. Adds `retry` param to functions wrapped by `delayed.delay()`.
2. Adds `retry` param to `Task()`.
3. Adds `release` param to `Queue.enqueue()`.
4. The `Worker` won't retry a failed task infinitely by default now. You can set `retry=-1` to `Task()` instead. (BREAKING CHANGE)
* 1.1:
1. Adds `log_level` param to `delayed.logger.setup_logger()`.
2. Prevents different online workers have the same id.
* 1.0:
1. Python 2.7 is not supported anymore. (BREAKING CHANGE)
2. Supports Go, adds `GoTask`.
3. Use MessagePack instead of pickle to serialize / deserialize tasks. (BREAKING CHANGE)
4. Removes `ForkedWorker` and `PreforkedWorker`. You can use `Worker` instead. (BREAKING CHANGE)
5. Changes params of `Queue()`, removes `default_timeout`, `requeue_timeout` and `busy_len`, adds `dequeue_timeout` and `keep_alive_timeout`. (BREAKING CHANGE)
6. Rename `Task` to `PyTask`. (BREAKING CHANGE)
7. Removes those properties of `PyTask`: `id`, `func_path`, `args` and `kwargs`. (BREAKING CHANGE)
8. Removes those params of `PyTask()`: `id`, `timeout`, `prior` and `error_handler_path`. (BREAKING CHANGE)
9. Removes `PyTask.create()`. You can use `PyTask()` instead. (BREAKING CHANGE)
10. Rename `func_path` param of `PyTask()` to `func`, it accepts both `callable` and `str`. (BREAKING CHANGE)
11. Removes `delayed.delay()`. Removes params of `delayed.delayed()`. (BREAKING CHANGE)
* 0.11:
1. Sleeps random time when a `Worker` fails to pop a `task` before retrying.
* 0.10:
1. The `Sweeper` can handle multiple queues now. Its `queue` param has been changed to `queues`. (BREAKING CHANGE)
2. Changes the separator between `module_path` and `func_name` from `.` to `:`. (BREAKING CHANGE)
* 0.9:
1. Adds `prior` and `error_handler` params to `deleyed.delayed()`, removes its `timeout()` method. (BREAKING CHANGE)
2. Adds [examples](examples).
* 0.8:
1. The `Task` struct has been changed, it's not compatible with older versions. (BREAKING CHANGE)
* Removes `module_name` and `func_name` from `Task`, adds `func_path` instead.
* Adds `error_handler_path` to `Task`.
2. Removes `success_handler` and `error_handler` from `Worker`. (BREAKING CHANGE)
* 0.7:
1. Implements prior task.
* 0.6:
1. Adds `dequeued_len()` and `index` to `Queue`.
* 0.5:
1. Adds `delayed.task.set_pickle_protocol_version()`.
* 0.4:
1. Refactories and fixes bugs.
* 0.3:
1. Changes param `second` to `timeout` for `delayed.delayed()`. (BREAKING CHANGE)
2. Adds debug log.
* 0.2:
1. Adds `timeout()` to `delayed.delayed()`.
* 0.1:
1. Init version.
Raw data
{
"_id": null,
"home_page": "https://github.com/keakon/delayed",
"name": "delayed",
"maintainer": null,
"docs_url": null,
"requires_python": ">=2.7",
"maintainer_email": null,
"keywords": null,
"author": "keakon",
"author_email": "keakon@gmail.com",
"download_url": "https://files.pythonhosted.org/packages/04/c4/aa6808c623c4b2e6974d9be624fbb29d42d551e9c7ba7939b2748e407ae4/delayed-1.2.0b2.tar.gz",
"platform": null,
"description": "# delayed\n[![Build Status](https://github.com/keakon/delayed/actions/workflows/python.yml/badge.svg)](https://github.com/keakon/delayed/actions)\n[![Coverage](https://codecov.io/gh/keakon/delayed/branch/master/graph/badge.svg)](https://codecov.io/gh/keakon/delayed)\n\nDelayed is a simple but robust task queue inspired by [rq](https://python-rq.org/).\n\n## Features\n\n* Robust: all the enqueued tasks will run exactly once, even if the worker got killed at any time.\n* Clean: finished tasks (including failed) take no space of your Redis.\n* Distributed: workers as more as needed can run in the same time without further config.\n* Portable: its [Go](https://github.com/keakon/go-delayed) and [Python](https://github.com/keakon/delayed) version can call each other.\n\n## Requirements\n\n1. Python 3.7 or later, tested on CPython 3.7 - 3.12. Versions before 1.0 have also been tested on CPython 2.7, PyPy and PyPy3.\n2. To gracefully stop the workers, Unix-like systems (with Unix signal) are required, tested on Ubuntu 22.04 and macOS Monterey 12.\n3. Redis 2.6.0 or later (with Lua scripts).\n\n## Getting started\n\n1. Run a redis server:\n\n ```bash\n $ redis-server\n ```\n\n2. Install delayed:\n\n ```bash\n $ pip install delayed\n ```\n\n3. Create a task queue:\n\n ```python\n import redis\n from delayed.queue import Queue\n\n conn = redis.Redis()\n queue = Queue(name='default', conn=conn)\n ```\n\n\n4. Enqueue tasks:\n * Four ways to enqueue Python tasks:\n 1. Define a task function and enqueue it:\n\n ```python\n from delayed.delay import delayed\n\n delayed = delayed(queue)\n i = 0\n\n @delayed\n def delayed_add(a, b):\n return a + b\n\n @delayed(retry=3)\n def retry_div(x):\n global i\n i += 1\n return x / (i - 1)\n\n delayed_add.delay(1, 2) # enqueue delayed_add\n delayed_add.delay(1, b=2) # same as above\n delayed_add(1, 2) # call it immediately\n \n retry_div.delay(1) # enqueue retry_div\n ```\n 2. Directly enqueue a function:\n\n ```python\n from delayed.delay import delayed\n\n delayed = delayed(queue)\n\n def add(a, b):\n return a + b\n\n delayed(add).delay(1, 2)\n delayed(add).delay(1, b=2) # same as above\n delayed(retry=3)(add).delay(1, b=2)\n delayed(add, retry=3).delay(1, b=2) # same as above\n ```\n 3. Create a task and enqueue it:\n\n ```python\n from delayed.task import PyTask\n\n def add(a, b):\n return a + b\n\n task = PyTask(func=add, args=(1,), kwargs={'b': 2}, retry=1)\n queue.enqueue(task)\n ```\n 4. Enqueue a predefined task function without importing it (the fastest and lightest way):\n\n ```python\n from delayed.task import PyTask\n\n task = PyTask(func='test:add', args=(1,), kwargs={'b': 2}, retry=1)\n queue.enqueue(task)\n ```\n * Enqueue Go tasks:\n\n ```python\n from delayed.task import GoTask\n\n task = GoTask(func_path='syscall.Kill', args=(0, 1))\n queue.enqueue(task)\n\n task = GoTask(func_path='fmt.Printf', args=('%d %s\\n', [1, 'test'])) # the variadic argument needs to be a list or tuple\n queue.enqueue(task)\n\n task = GoTask('fmt.Println', (1, 'test')) # if the variadic argument is the only argument, it's not required to wrap it with a list or tuple\n queue.enqueue(task)\n ```\n\n5. Run a task worker (or more) in a separated process:\n\n ```python\n import redis\n from delayed.queue import Queue\n from delayed.worker import Worker\n\n conn = redis.Redis()\n queue = Queue(name='default', conn=conn)\n worker = Worker(queue=queue)\n worker.run()\n ```\n\n6. Run a task sweeper in a separated process to recovery lost tasks (mainly due to the worker got killed):\n\n ```python\n import redis\n from delayed.queue import Queue\n from delayed.sweeper import Sweeper\n\n conn = redis.Redis()\n queue = Queue(name='default', conn=conn)\n sweeper = Sweeper(queues=[queue])\n sweeper.run()\n ```\n\n## Examples\n\nSee [examples](examples).\n\n ```bash\n $ redis-server &\n $ pip install delayed\n $ python -m examples.sweeper &\n $ python -m examples.worker &\n $ python -m examples.caller\n ```\n\n## QA\n\n1. **Q: What's the limitation on a task function?** \nA: A Python task function should be defined in module level (except the `__main__` module).\nIts `args` and `kwargs` should be serializable by [MessagePack](https://msgpack.org/).\nAfter deserializing, the type of `args` and `kwargs` passed to the task function might be changed (tuple -> list), so it should take care of this change.\n\n2. **Q: What's the `name` param of a queue?** \nA: It's the key used to store the tasks of the queue. A queue with name \"default\" will use those keys:\n * default: list, enqueued tasks.\n * default_noti: list, the same length as enqueued tasks.\n * default_processing: hash, the processing task of workers.\n\n3. **Q: What's lost tasks?** \nA: There are 2 situations a task might get lost:\n * a worker popped a task notification, then got killed before dequeueing the task.\n * a worker dequeued a task, then got killed before releasing the task.\n\n4. **Q: How to recovery lost tasks?** \nA: Runs a sweeper. It dose two things:\n * it keeps the task notification length the same as the task queue.\n * it checks the processing list, if the worker is dead, moves the processing task back to the task queue.\n\n5. **Q: How to turn on the debug logs?** \nA: Adds a `logging.DEBUG` level handler to `delayed.logger.logger`. The simplest way is to call `delayed.logger.setup_logger()`:\n ```python\n from delayed.logger import setup_logger\n\n setup_logger()\n ```\n\n## Release notes\n\n* 1.2:\n 1. Adds `retry` param to functions wrapped by `delayed.delay()`.\n 2. Adds `retry` param to `Task()`.\n 3. Adds `release` param to `Queue.enqueue()`.\n 4. The `Worker` won't retry a failed task infinitely by default now. You can set `retry=-1` to `Task()` instead. (BREAKING CHANGE)\n\n* 1.1:\n 1. Adds `log_level` param to `delayed.logger.setup_logger()`.\n 2. Prevents different online workers have the same id.\n\n* 1.0:\n 1. Python 2.7 is not supported anymore. (BREAKING CHANGE)\n 2. Supports Go, adds `GoTask`.\n 3. Use MessagePack instead of pickle to serialize / deserialize tasks. (BREAKING CHANGE)\n 4. Removes `ForkedWorker` and `PreforkedWorker`. You can use `Worker` instead. (BREAKING CHANGE)\n 5. Changes params of `Queue()`, removes `default_timeout`, `requeue_timeout` and `busy_len`, adds `dequeue_timeout` and `keep_alive_timeout`. (BREAKING CHANGE)\n 6. Rename `Task` to `PyTask`. (BREAKING CHANGE)\n 7. Removes those properties of `PyTask`: `id`, `func_path`, `args` and `kwargs`. (BREAKING CHANGE)\n 8. Removes those params of `PyTask()`: `id`, `timeout`, `prior` and `error_handler_path`. (BREAKING CHANGE)\n 9. Removes `PyTask.create()`. You can use `PyTask()` instead. (BREAKING CHANGE)\n 10. Rename `func_path` param of `PyTask()` to `func`, it accepts both `callable` and `str`. (BREAKING CHANGE)\n 11. Removes `delayed.delay()`. Removes params of `delayed.delayed()`. (BREAKING CHANGE)\n\n* 0.11:\n 1. Sleeps random time when a `Worker` fails to pop a `task` before retrying.\n\n* 0.10:\n 1. The `Sweeper` can handle multiple queues now. Its `queue` param has been changed to `queues`. (BREAKING CHANGE)\n 2. Changes the separator between `module_path` and `func_name` from `.` to `:`. (BREAKING CHANGE)\n\n* 0.9:\n 1. Adds `prior` and `error_handler` params to `deleyed.delayed()`, removes its `timeout()` method. (BREAKING CHANGE)\n 2. Adds [examples](examples).\n\n* 0.8:\n 1. The `Task` struct has been changed, it's not compatible with older versions. (BREAKING CHANGE)\n * Removes `module_name` and `func_name` from `Task`, adds `func_path` instead.\n * Adds `error_handler_path` to `Task`.\n 2. Removes `success_handler` and `error_handler` from `Worker`. (BREAKING CHANGE)\n\n* 0.7:\n 1. Implements prior task.\n\n* 0.6:\n 1. Adds `dequeued_len()` and `index` to `Queue`.\n\n* 0.5:\n 1. Adds `delayed.task.set_pickle_protocol_version()`.\n\n* 0.4:\n 1. Refactories and fixes bugs.\n\n* 0.3:\n 1. Changes param `second` to `timeout` for `delayed.delayed()`. (BREAKING CHANGE)\n 2. Adds debug log.\n\n* 0.2:\n 1. Adds `timeout()` to `delayed.delayed()`.\n\n* 0.1:\n 1. Init version.\n",
"bugtrack_url": null,
"license": null,
"summary": "a simple but robust task queue",
"version": "1.2.0b2",
"project_urls": {
"Homepage": "https://github.com/keakon/delayed"
},
"split_keywords": [],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "7663af9476ec26c3279c859d08d0aab3601f9e49fd0ca960c66c0247481cd5ca",
"md5": "635cc30a74b0869397d8663cb46a3519",
"sha256": "e27269f97f97dd97fc48c01afb5902a2095819aaf239dae62f751796f1f5a60e"
},
"downloads": -1,
"filename": "delayed-1.2.0b2-py2.py3-none-any.whl",
"has_sig": false,
"md5_digest": "635cc30a74b0869397d8663cb46a3519",
"packagetype": "bdist_wheel",
"python_version": "py2.py3",
"requires_python": ">=2.7",
"size": 12447,
"upload_time": "2024-04-04T04:40:29",
"upload_time_iso_8601": "2024-04-04T04:40:29.279642Z",
"url": "https://files.pythonhosted.org/packages/76/63/af9476ec26c3279c859d08d0aab3601f9e49fd0ca960c66c0247481cd5ca/delayed-1.2.0b2-py2.py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "04c4aa6808c623c4b2e6974d9be624fbb29d42d551e9c7ba7939b2748e407ae4",
"md5": "6ff080de72524227bd7094b5a9b873d9",
"sha256": "f894652e725a3ecf204caafc3594408da6b32e2e56e92d7e451c2cfcd48c17dd"
},
"downloads": -1,
"filename": "delayed-1.2.0b2.tar.gz",
"has_sig": false,
"md5_digest": "6ff080de72524227bd7094b5a9b873d9",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=2.7",
"size": 15630,
"upload_time": "2024-04-04T04:40:31",
"upload_time_iso_8601": "2024-04-04T04:40:31.633872Z",
"url": "https://files.pythonhosted.org/packages/04/c4/aa6808c623c4b2e6974d9be624fbb29d42d551e9c7ba7939b2748e407ae4/delayed-1.2.0b2.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-04-04 04:40:31",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "keakon",
"github_project": "delayed",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"requirements": [
{
"name": "hiredis",
"specs": []
},
{
"name": "msgpack",
"specs": []
},
{
"name": "redis",
"specs": [
[
">=",
"3.0"
]
]
}
],
"lcname": "delayed"
}