delayed


Namedelayed JSON
Version 1.2.0b2 PyPI version JSON
download
home_pagehttps://github.com/keakon/delayed
Summarya simple but robust task queue
upload_time2024-04-04 04:40:31
maintainerNone
docs_urlNone
authorkeakon
requires_python>=2.7
licenseNone
keywords
VCS
bugtrack_url
requirements hiredis msgpack redis
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # delayed
[![Build Status](https://github.com/keakon/delayed/actions/workflows/python.yml/badge.svg)](https://github.com/keakon/delayed/actions)
[![Coverage](https://codecov.io/gh/keakon/delayed/branch/master/graph/badge.svg)](https://codecov.io/gh/keakon/delayed)

Delayed is a simple but robust task queue inspired by [rq](https://python-rq.org/).

## Features

* Robust: all the enqueued tasks will run exactly once, even if the worker got killed at any time.
* Clean: finished tasks (including failed) take no space of your Redis.
* Distributed: workers as more as needed can run in the same time without further config.
* Portable: its [Go](https://github.com/keakon/go-delayed) and [Python](https://github.com/keakon/delayed) version can call each other.

## Requirements

1. Python 3.7 or later, tested on CPython 3.7 - 3.12. Versions before 1.0 have also been tested on CPython 2.7, PyPy and PyPy3.
2. To gracefully stop the workers, Unix-like systems (with Unix signal) are required, tested on Ubuntu 22.04 and macOS Monterey 12.
3. Redis 2.6.0 or later (with Lua scripts).

## Getting started

1. Run a redis server:

    ```bash
    $ redis-server
    ```

2. Install delayed:

    ```bash
    $ pip install delayed
    ```

3. Create a task queue:

    ```python
    import redis
    from delayed.queue import Queue

    conn = redis.Redis()
    queue = Queue(name='default', conn=conn)
    ```


4. Enqueue tasks:
    * Four ways to enqueue Python tasks:
        1. Define a task function and enqueue it:

            ```python
            from delayed.delay import delayed

            delayed = delayed(queue)
            i = 0

            @delayed
            def delayed_add(a, b):
                return a + b

            @delayed(retry=3)
            def retry_div(x):
                global i
                i += 1
                return x / (i - 1)

            delayed_add.delay(1, 2)  # enqueue delayed_add
            delayed_add.delay(1, b=2)  # same as above
            delayed_add(1, 2)  # call it immediately
            
            retry_div.delay(1)  # enqueue retry_div
            ```
        2. Directly enqueue a function:

            ```python
            from delayed.delay import delayed

            delayed = delayed(queue)

            def add(a, b):
                return a + b

            delayed(add).delay(1, 2)
            delayed(add).delay(1, b=2)  # same as above
            delayed(retry=3)(add).delay(1, b=2)
            delayed(add, retry=3).delay(1, b=2)  # same as above
            ```
        3. Create a task and enqueue it:

            ```python
            from delayed.task import PyTask

            def add(a, b):
                return a + b

            task = PyTask(func=add, args=(1,), kwargs={'b': 2}, retry=1)
            queue.enqueue(task)
            ```
        4. Enqueue a predefined task function without importing it (the fastest and lightest way):

            ```python
            from delayed.task import PyTask

            task = PyTask(func='test:add', args=(1,), kwargs={'b': 2}, retry=1)
            queue.enqueue(task)
            ```
    * Enqueue Go tasks:

        ```python
            from delayed.task import GoTask

            task = GoTask(func_path='syscall.Kill', args=(0, 1))
            queue.enqueue(task)

            task = GoTask(func_path='fmt.Printf', args=('%d %s\n', [1, 'test']))  # the variadic argument needs to be a list or tuple
            queue.enqueue(task)

            task = GoTask('fmt.Println', (1, 'test'))  # if the variadic argument is the only argument, it's not required to wrap it with a list or tuple
            queue.enqueue(task)
        ```

5. Run a task worker (or more) in a separated process:

    ```python
    import redis
    from delayed.queue import Queue
    from delayed.worker import Worker

    conn = redis.Redis()
    queue = Queue(name='default', conn=conn)
    worker = Worker(queue=queue)
    worker.run()
    ```

6. Run a task sweeper in a separated process to recovery lost tasks (mainly due to the worker got killed):

    ```python
    import redis
    from delayed.queue import Queue
    from delayed.sweeper import Sweeper

    conn = redis.Redis()
    queue = Queue(name='default', conn=conn)
    sweeper = Sweeper(queues=[queue])
    sweeper.run()
    ```

## Examples

See [examples](examples).

    ```bash
    $ redis-server &
    $ pip install delayed
    $ python -m examples.sweeper &
    $ python -m examples.worker &
    $ python -m examples.caller
    ```

## QA

1. **Q: What's the limitation on a task function?**  
A: A Python task function should be defined in module level (except the `__main__` module).
Its `args` and `kwargs` should be serializable by [MessagePack](https://msgpack.org/).
After deserializing, the type of `args` and `kwargs` passed to the task function might be changed (tuple -> list), so it should take care of this change.

2. **Q: What's the `name` param of a queue?**  
A: It's the key used to store the tasks of the queue. A queue with name "default" will use those keys:
    * default: list, enqueued tasks.
    * default_noti: list, the same length as enqueued tasks.
    * default_processing: hash, the processing task of workers.

3. **Q: What's lost tasks?**  
A: There are 2 situations a task might get lost:
    * a worker popped a task notification, then got killed before dequeueing the task.
    * a worker dequeued a task, then got killed before releasing the task.

4. **Q: How to recovery lost tasks?**  
A: Runs a sweeper. It dose two things:
    * it keeps the task notification length the same as the task queue.
    * it checks the processing list, if the worker is dead, moves the processing task back to the task queue.

5. **Q: How to turn on the debug logs?**  
A: Adds a `logging.DEBUG` level handler to `delayed.logger.logger`. The simplest way is to call `delayed.logger.setup_logger()`:
    ```python
    from delayed.logger import setup_logger

    setup_logger()
    ```

## Release notes

* 1.2:
    1. Adds `retry` param to functions wrapped by `delayed.delay()`.
    2. Adds `retry` param to `Task()`.
    3. Adds `release` param to `Queue.enqueue()`.
    4. The `Worker` won't retry a failed task infinitely by default now. You can set `retry=-1` to `Task()` instead. (BREAKING CHANGE)

* 1.1:
    1. Adds `log_level` param to `delayed.logger.setup_logger()`.
    2. Prevents different online workers have the same id.

* 1.0:
    1. Python 2.7 is not supported anymore. (BREAKING CHANGE)
    2. Supports Go, adds `GoTask`.
    3. Use MessagePack instead of pickle to serialize / deserialize tasks. (BREAKING CHANGE)
    4. Removes `ForkedWorker` and `PreforkedWorker`. You can use `Worker` instead. (BREAKING CHANGE)
    5. Changes params of `Queue()`, removes `default_timeout`, `requeue_timeout` and `busy_len`, adds `dequeue_timeout` and `keep_alive_timeout`. (BREAKING CHANGE)
    6. Rename `Task` to `PyTask`. (BREAKING CHANGE)
    7. Removes those properties of `PyTask`: `id`, `func_path`, `args` and `kwargs`. (BREAKING CHANGE)
    8. Removes those params of `PyTask()`: `id`, `timeout`, `prior` and `error_handler_path`. (BREAKING CHANGE)
    9. Removes `PyTask.create()`. You can use `PyTask()` instead. (BREAKING CHANGE)
    10. Rename `func_path` param of `PyTask()` to `func`, it accepts both `callable` and `str`. (BREAKING CHANGE)
    11. Removes `delayed.delay()`. Removes params of `delayed.delayed()`. (BREAKING CHANGE)

* 0.11:
    1. Sleeps random time when a `Worker` fails to pop a `task` before retrying.

* 0.10:
    1. The `Sweeper` can handle multiple queues now. Its `queue` param has been changed to `queues`. (BREAKING CHANGE)
    2. Changes the separator between `module_path` and `func_name` from `.` to `:`. (BREAKING CHANGE)

* 0.9:
    1. Adds `prior` and `error_handler` params to `deleyed.delayed()`, removes its `timeout()` method. (BREAKING CHANGE)
    2. Adds [examples](examples).

* 0.8:
    1. The `Task` struct has been changed, it's not compatible with older versions. (BREAKING CHANGE)
        * Removes `module_name` and `func_name` from `Task`, adds `func_path` instead.
        * Adds `error_handler_path` to `Task`.
    2. Removes `success_handler` and `error_handler` from `Worker`. (BREAKING CHANGE)

* 0.7:
    1. Implements prior task.

* 0.6:
    1. Adds `dequeued_len()` and `index` to `Queue`.

* 0.5:
    1. Adds `delayed.task.set_pickle_protocol_version()`.

* 0.4:
    1. Refactories and fixes bugs.

* 0.3:
    1. Changes param `second` to `timeout` for `delayed.delayed()`. (BREAKING CHANGE)
    2. Adds debug log.

* 0.2:
    1. Adds `timeout()` to `delayed.delayed()`.

* 0.1:
    1. Init version.

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/keakon/delayed",
    "name": "delayed",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=2.7",
    "maintainer_email": null,
    "keywords": null,
    "author": "keakon",
    "author_email": "keakon@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/04/c4/aa6808c623c4b2e6974d9be624fbb29d42d551e9c7ba7939b2748e407ae4/delayed-1.2.0b2.tar.gz",
    "platform": null,
    "description": "# delayed\n[![Build Status](https://github.com/keakon/delayed/actions/workflows/python.yml/badge.svg)](https://github.com/keakon/delayed/actions)\n[![Coverage](https://codecov.io/gh/keakon/delayed/branch/master/graph/badge.svg)](https://codecov.io/gh/keakon/delayed)\n\nDelayed is a simple but robust task queue inspired by [rq](https://python-rq.org/).\n\n## Features\n\n* Robust: all the enqueued tasks will run exactly once, even if the worker got killed at any time.\n* Clean: finished tasks (including failed) take no space of your Redis.\n* Distributed: workers as more as needed can run in the same time without further config.\n* Portable: its [Go](https://github.com/keakon/go-delayed) and [Python](https://github.com/keakon/delayed) version can call each other.\n\n## Requirements\n\n1. Python 3.7 or later, tested on CPython 3.7 - 3.12. Versions before 1.0 have also been tested on CPython 2.7, PyPy and PyPy3.\n2. To gracefully stop the workers, Unix-like systems (with Unix signal) are required, tested on Ubuntu 22.04 and macOS Monterey 12.\n3. Redis 2.6.0 or later (with Lua scripts).\n\n## Getting started\n\n1. Run a redis server:\n\n    ```bash\n    $ redis-server\n    ```\n\n2. Install delayed:\n\n    ```bash\n    $ pip install delayed\n    ```\n\n3. Create a task queue:\n\n    ```python\n    import redis\n    from delayed.queue import Queue\n\n    conn = redis.Redis()\n    queue = Queue(name='default', conn=conn)\n    ```\n\n\n4. Enqueue tasks:\n    * Four ways to enqueue Python tasks:\n        1. Define a task function and enqueue it:\n\n            ```python\n            from delayed.delay import delayed\n\n            delayed = delayed(queue)\n            i = 0\n\n            @delayed\n            def delayed_add(a, b):\n                return a + b\n\n            @delayed(retry=3)\n            def retry_div(x):\n                global i\n                i += 1\n                return x / (i - 1)\n\n            delayed_add.delay(1, 2)  # enqueue delayed_add\n            delayed_add.delay(1, b=2)  # same as above\n            delayed_add(1, 2)  # call it immediately\n            \n            retry_div.delay(1)  # enqueue retry_div\n            ```\n        2. Directly enqueue a function:\n\n            ```python\n            from delayed.delay import delayed\n\n            delayed = delayed(queue)\n\n            def add(a, b):\n                return a + b\n\n            delayed(add).delay(1, 2)\n            delayed(add).delay(1, b=2)  # same as above\n            delayed(retry=3)(add).delay(1, b=2)\n            delayed(add, retry=3).delay(1, b=2)  # same as above\n            ```\n        3. Create a task and enqueue it:\n\n            ```python\n            from delayed.task import PyTask\n\n            def add(a, b):\n                return a + b\n\n            task = PyTask(func=add, args=(1,), kwargs={'b': 2}, retry=1)\n            queue.enqueue(task)\n            ```\n        4. Enqueue a predefined task function without importing it (the fastest and lightest way):\n\n            ```python\n            from delayed.task import PyTask\n\n            task = PyTask(func='test:add', args=(1,), kwargs={'b': 2}, retry=1)\n            queue.enqueue(task)\n            ```\n    * Enqueue Go tasks:\n\n        ```python\n            from delayed.task import GoTask\n\n            task = GoTask(func_path='syscall.Kill', args=(0, 1))\n            queue.enqueue(task)\n\n            task = GoTask(func_path='fmt.Printf', args=('%d %s\\n', [1, 'test']))  # the variadic argument needs to be a list or tuple\n            queue.enqueue(task)\n\n            task = GoTask('fmt.Println', (1, 'test'))  # if the variadic argument is the only argument, it's not required to wrap it with a list or tuple\n            queue.enqueue(task)\n        ```\n\n5. Run a task worker (or more) in a separated process:\n\n    ```python\n    import redis\n    from delayed.queue import Queue\n    from delayed.worker import Worker\n\n    conn = redis.Redis()\n    queue = Queue(name='default', conn=conn)\n    worker = Worker(queue=queue)\n    worker.run()\n    ```\n\n6. Run a task sweeper in a separated process to recovery lost tasks (mainly due to the worker got killed):\n\n    ```python\n    import redis\n    from delayed.queue import Queue\n    from delayed.sweeper import Sweeper\n\n    conn = redis.Redis()\n    queue = Queue(name='default', conn=conn)\n    sweeper = Sweeper(queues=[queue])\n    sweeper.run()\n    ```\n\n## Examples\n\nSee [examples](examples).\n\n    ```bash\n    $ redis-server &\n    $ pip install delayed\n    $ python -m examples.sweeper &\n    $ python -m examples.worker &\n    $ python -m examples.caller\n    ```\n\n## QA\n\n1. **Q: What's the limitation on a task function?**  \nA: A Python task function should be defined in module level (except the `__main__` module).\nIts `args` and `kwargs` should be serializable by [MessagePack](https://msgpack.org/).\nAfter deserializing, the type of `args` and `kwargs` passed to the task function might be changed (tuple -> list), so it should take care of this change.\n\n2. **Q: What's the `name` param of a queue?**  \nA: It's the key used to store the tasks of the queue. A queue with name \"default\" will use those keys:\n    * default: list, enqueued tasks.\n    * default_noti: list, the same length as enqueued tasks.\n    * default_processing: hash, the processing task of workers.\n\n3. **Q: What's lost tasks?**  \nA: There are 2 situations a task might get lost:\n    * a worker popped a task notification, then got killed before dequeueing the task.\n    * a worker dequeued a task, then got killed before releasing the task.\n\n4. **Q: How to recovery lost tasks?**  \nA: Runs a sweeper. It dose two things:\n    * it keeps the task notification length the same as the task queue.\n    * it checks the processing list, if the worker is dead, moves the processing task back to the task queue.\n\n5. **Q: How to turn on the debug logs?**  \nA: Adds a `logging.DEBUG` level handler to `delayed.logger.logger`. The simplest way is to call `delayed.logger.setup_logger()`:\n    ```python\n    from delayed.logger import setup_logger\n\n    setup_logger()\n    ```\n\n## Release notes\n\n* 1.2:\n    1. Adds `retry` param to functions wrapped by `delayed.delay()`.\n    2. Adds `retry` param to `Task()`.\n    3. Adds `release` param to `Queue.enqueue()`.\n    4. The `Worker` won't retry a failed task infinitely by default now. You can set `retry=-1` to `Task()` instead. (BREAKING CHANGE)\n\n* 1.1:\n    1. Adds `log_level` param to `delayed.logger.setup_logger()`.\n    2. Prevents different online workers have the same id.\n\n* 1.0:\n    1. Python 2.7 is not supported anymore. (BREAKING CHANGE)\n    2. Supports Go, adds `GoTask`.\n    3. Use MessagePack instead of pickle to serialize / deserialize tasks. (BREAKING CHANGE)\n    4. Removes `ForkedWorker` and `PreforkedWorker`. You can use `Worker` instead. (BREAKING CHANGE)\n    5. Changes params of `Queue()`, removes `default_timeout`, `requeue_timeout` and `busy_len`, adds `dequeue_timeout` and `keep_alive_timeout`. (BREAKING CHANGE)\n    6. Rename `Task` to `PyTask`. (BREAKING CHANGE)\n    7. Removes those properties of `PyTask`: `id`, `func_path`, `args` and `kwargs`. (BREAKING CHANGE)\n    8. Removes those params of `PyTask()`: `id`, `timeout`, `prior` and `error_handler_path`. (BREAKING CHANGE)\n    9. Removes `PyTask.create()`. You can use `PyTask()` instead. (BREAKING CHANGE)\n    10. Rename `func_path` param of `PyTask()` to `func`, it accepts both `callable` and `str`. (BREAKING CHANGE)\n    11. Removes `delayed.delay()`. Removes params of `delayed.delayed()`. (BREAKING CHANGE)\n\n* 0.11:\n    1. Sleeps random time when a `Worker` fails to pop a `task` before retrying.\n\n* 0.10:\n    1. The `Sweeper` can handle multiple queues now. Its `queue` param has been changed to `queues`. (BREAKING CHANGE)\n    2. Changes the separator between `module_path` and `func_name` from `.` to `:`. (BREAKING CHANGE)\n\n* 0.9:\n    1. Adds `prior` and `error_handler` params to `deleyed.delayed()`, removes its `timeout()` method. (BREAKING CHANGE)\n    2. Adds [examples](examples).\n\n* 0.8:\n    1. The `Task` struct has been changed, it's not compatible with older versions. (BREAKING CHANGE)\n        * Removes `module_name` and `func_name` from `Task`, adds `func_path` instead.\n        * Adds `error_handler_path` to `Task`.\n    2. Removes `success_handler` and `error_handler` from `Worker`. (BREAKING CHANGE)\n\n* 0.7:\n    1. Implements prior task.\n\n* 0.6:\n    1. Adds `dequeued_len()` and `index` to `Queue`.\n\n* 0.5:\n    1. Adds `delayed.task.set_pickle_protocol_version()`.\n\n* 0.4:\n    1. Refactories and fixes bugs.\n\n* 0.3:\n    1. Changes param `second` to `timeout` for `delayed.delayed()`. (BREAKING CHANGE)\n    2. Adds debug log.\n\n* 0.2:\n    1. Adds `timeout()` to `delayed.delayed()`.\n\n* 0.1:\n    1. Init version.\n",
    "bugtrack_url": null,
    "license": null,
    "summary": "a simple but robust task queue",
    "version": "1.2.0b2",
    "project_urls": {
        "Homepage": "https://github.com/keakon/delayed"
    },
    "split_keywords": [],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "7663af9476ec26c3279c859d08d0aab3601f9e49fd0ca960c66c0247481cd5ca",
                "md5": "635cc30a74b0869397d8663cb46a3519",
                "sha256": "e27269f97f97dd97fc48c01afb5902a2095819aaf239dae62f751796f1f5a60e"
            },
            "downloads": -1,
            "filename": "delayed-1.2.0b2-py2.py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "635cc30a74b0869397d8663cb46a3519",
            "packagetype": "bdist_wheel",
            "python_version": "py2.py3",
            "requires_python": ">=2.7",
            "size": 12447,
            "upload_time": "2024-04-04T04:40:29",
            "upload_time_iso_8601": "2024-04-04T04:40:29.279642Z",
            "url": "https://files.pythonhosted.org/packages/76/63/af9476ec26c3279c859d08d0aab3601f9e49fd0ca960c66c0247481cd5ca/delayed-1.2.0b2-py2.py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "04c4aa6808c623c4b2e6974d9be624fbb29d42d551e9c7ba7939b2748e407ae4",
                "md5": "6ff080de72524227bd7094b5a9b873d9",
                "sha256": "f894652e725a3ecf204caafc3594408da6b32e2e56e92d7e451c2cfcd48c17dd"
            },
            "downloads": -1,
            "filename": "delayed-1.2.0b2.tar.gz",
            "has_sig": false,
            "md5_digest": "6ff080de72524227bd7094b5a9b873d9",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=2.7",
            "size": 15630,
            "upload_time": "2024-04-04T04:40:31",
            "upload_time_iso_8601": "2024-04-04T04:40:31.633872Z",
            "url": "https://files.pythonhosted.org/packages/04/c4/aa6808c623c4b2e6974d9be624fbb29d42d551e9c7ba7939b2748e407ae4/delayed-1.2.0b2.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-04-04 04:40:31",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "keakon",
    "github_project": "delayed",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "requirements": [
        {
            "name": "hiredis",
            "specs": []
        },
        {
            "name": "msgpack",
            "specs": []
        },
        {
            "name": "redis",
            "specs": [
                [
                    ">=",
                    "3.0"
                ]
            ]
        }
    ],
    "lcname": "delayed"
}
        
Elapsed time: 0.50488s