WakaQ


NameWakaQ JSON
Version 2.1.23 PyPI version JSON
download
home_pagehttps://github.com/wakatime/wakaq
SummaryBackground task queue for Python backed by Redis, a minimal Celery.
upload_time2023-12-16 10:41:20
maintainer
docs_urlNone
authorAlan Hamlett
requires_python>= 3.7
licenseBSD
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # ![logo](https://raw.githubusercontent.com/wakatime/wakaq/main/wakatime-logo.png "WakaQ") WakaQ

[![wakatime](https://wakatime.com/badge/github/wakatime/wakaq.svg)](https://wakatime.com/badge/github/wakatime/wakaq)

Background task queue for Python backed by Redis, a super minimal Celery.
Read about the motivation behind this project on [this blog post][blog launch] and the accompanying [Hacker News discussion][hacker news].
WakaQ is currently used in production at [WakaTime.com][wakatime].
WakaQ is also available in [TypeScript][wakaq-ts].

## Features

* Queue priority
* Delayed tasks (run tasks after a timedelta eta)
* Scheduled periodic tasks
* [Broadcast][broadcast] a task to all workers
* Task [soft][soft timeout] and [hard][hard timeout] timeout limits
* Optionally retry tasks on soft timeout
* Combat memory leaks with `max_mem_percent` or `max_tasks_per_worker`
* Super minimal

Want more features like rate limiting, task deduplication, etc? Too bad, feature PRs are not accepted. Maximal features belong in your app’s worker tasks.

## Installing

    pip install wakaq

## Using

```python
import logging
from datetime import timedelta
from wakaq import WakaQ, Queue, CronTask


# use constants to prevent misspelling queue names
Q_HIGH = 'a-high-priority-queue'
Q_MED = 'a-medium-priority-queue'
Q_LOW = 'a-low-priority-queue'
Q_OTHER = 'another-queue'
Q_DEFAULT = 'default-lowest-priority-queue'


wakaq = WakaQ(

    # List your queues and their priorities.
    # Queues can be defined as Queue instances, tuples, or just a str.
    queues=[
        (0, Q_HIGH),
        (1, Q_MED),
        (2, Q_LOW),
        Queue(Q_OTHER, priority=3, max_retries=5, soft_timeout=300, hard_timeout=360),
        Q_DEFAULT,
    ],

    # Number of worker processes. Must be an int or str which evaluates to an
    # int. The variable "cores" is replaced with the number of processors on
    # the current machine.
    concurrency="cores*4",

    # Raise SoftTimeout in a task if it runs longer than 30 seconds. Can also be set per
    # task or queue. If no soft timeout set, tasks can run forever.
    soft_timeout=30,  # seconds

    # SIGKILL a task if it runs longer than 1 minute. Can be set per task or queue.
    hard_timeout=timedelta(minutes=1),

    # If the task soft timeouts, retry up to 3 times. Max retries comes first
    # from the task decorator if set, next from the Queue's max_retries,
    # lastly from the option below. If No max_retries is found, the task
    # is not retried on a soft timeout.
    max_retries=3,

    # Combat memory leaks by reloading a worker (the one using the most RAM),
    # when the total machine RAM usage is at or greater than 98%.
    max_mem_percent=98,

    # Combat memory leaks by reloading a worker after it's processed 5000 tasks.
    max_tasks_per_worker=5000,

    # Schedule two tasks, the first runs every minute, the second once every ten minutes.
    # Scheduled tasks can be passed as CronTask instances or tuples. To run scheduled
    # tasks you must keep a wakaq scheduler running as a daemon.
    schedules=[

        # Runs mytask on the queue with priority 1.
        CronTask('* * * * *', 'mytask', queue=Q_MED, args=[2, 2], kwargs={}),

        # Runs mytask once every 5 minutes.
        ('*/5 * * * *', 'mytask', [1, 1], {}),

        # Runs anothertask on the default lowest priority queue.
        ('*/10 * * * *', 'anothertask'),
    ],
)


# timeouts can be customized per task with a timedelta or integer seconds
@wakaq.task(queue=Q_MED, max_retries=7, soft_timeout=420, hard_timeout=480)
def mytask(x, y):
    print(x + y)


@wakaq.task
def anothertask():
    print("hello world")


@wakaq.wrap_tasks_with
def custom_task_decorator(fn):
    def inner(*args, **kwargs):
        # do something before each task runs
        fn(*args, **kwargs)
        # do something after each task runs
    return inner


if __name__ == '__main__':

    # add 1 plus 1 on a worker somewhere
    mytask.delay(1, 1)

    # add 1 plus 1 on a worker somewhere, overwriting the task's queue from medium to high
    mytask.delay(1, 1, queue=Q_HIGH)

    # print hello world on a worker somewhere, running on the default lowest priority queue
    anothertask.delay()

    # print hello world on a worker somewhere, after 10 seconds from now
    anothertask.delay(eta=timedelta(minutes=10))
```

## Deploying

#### Optimizing

See the [WakaQ init params][wakaq init] for a full list of options, like Redis host and Redis socket timeout values.

When using in production, make sure to [increase the max open ports][max open ports] allowed for your Redis server process.

When using eta tasks a Redis sorted set is used, so eta tasks are automatically deduped based on task name, args, and kwargs.
If you want multiple pending eta tasks with the same arguments, just add a throwaway random string to the task’s kwargs for ex: `str(uuid.uuid1())`.

#### Running as a Daemon

Here’s an example systemd config to run `wakaq-worker` as a daemon:

```systemd
[Unit]
Description=WakaQ Worker Service

[Service]
WorkingDirectory=/opt/yourapp
ExecStart=/opt/yourapp/venv/bin/python /opt/yourapp/venv/bin/wakaq-worker --app=yourapp.wakaq
RemainAfterExit=no
Restart=always
RestartSec=30s
KillSignal=SIGINT
LimitNOFILE=99999

[Install]
WantedBy=multi-user.target
```

Create a file at `/etc/systemd/system/wakaqworker.service` with the above contents, then run:

    systemctl daemon-reload && systemctl enable wakaqworker



[wakatime]: https://wakatime.com
[broadcast]: https://github.com/wakatime/wakaq/blob/2300ed220d1d9e65e3f9bf328e3059a124f6b529/wakaq/task.py#L47
[soft timeout]: https://github.com/wakatime/wakaq/blob/2300ed220d1d9e65e3f9bf328e3059a124f6b529/wakaq/exceptions.py#L8
[hard timeout]: https://github.com/wakatime/wakaq/blob/2300ed220d1d9e65e3f9bf328e3059a124f6b529/wakaq/worker.py#L400
[wakaq init]: https://github.com/wakatime/wakaq/blob/2300ed220d1d9e65e3f9bf328e3059a124f6b529/wakaq/__init__.py#L49
[max open ports]: https://wakatime.com/blog/47-maximize-your-concurrent-web-server-connections
[blog launch]: https://wakatime.com/blog/56-building-a-distributed-task-queue-in-python
[hacker news]: https://news.ycombinator.com/item?id=32730038
[wakaq-ts]: https://github.com/wakatime/wakaq-ts

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/wakatime/wakaq",
    "name": "WakaQ",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">= 3.7",
    "maintainer_email": "",
    "keywords": "",
    "author": "Alan Hamlett",
    "author_email": "alan.hamlett@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/c9/4a/770037d9f8bb1f8e91fe0810d42cd74cb13ddaf2d11445dc1888bd18eb9b/WakaQ-2.1.23.tar.gz",
    "platform": "any",
    "description": "# ![logo](https://raw.githubusercontent.com/wakatime/wakaq/main/wakatime-logo.png \"WakaQ\") WakaQ\n\n[![wakatime](https://wakatime.com/badge/github/wakatime/wakaq.svg)](https://wakatime.com/badge/github/wakatime/wakaq)\n\nBackground task queue for Python backed by Redis, a super minimal Celery.\nRead about the motivation behind this project on [this blog post][blog launch] and the accompanying [Hacker News discussion][hacker news].\nWakaQ is currently used in production at [WakaTime.com][wakatime].\nWakaQ is also available in [TypeScript][wakaq-ts].\n\n## Features\n\n* Queue priority\n* Delayed tasks (run tasks after a timedelta eta)\n* Scheduled periodic tasks\n* [Broadcast][broadcast] a task to all workers\n* Task [soft][soft timeout] and [hard][hard timeout] timeout limits\n* Optionally retry tasks on soft timeout\n* Combat memory leaks with `max_mem_percent` or `max_tasks_per_worker`\n* Super minimal\n\nWant more features like rate limiting, task deduplication, etc? Too bad, feature PRs are not accepted. Maximal features belong in your app\u2019s worker tasks.\n\n## Installing\n\n    pip install wakaq\n\n## Using\n\n```python\nimport logging\nfrom datetime import timedelta\nfrom wakaq import WakaQ, Queue, CronTask\n\n\n# use constants to prevent misspelling queue names\nQ_HIGH = 'a-high-priority-queue'\nQ_MED = 'a-medium-priority-queue'\nQ_LOW = 'a-low-priority-queue'\nQ_OTHER = 'another-queue'\nQ_DEFAULT = 'default-lowest-priority-queue'\n\n\nwakaq = WakaQ(\n\n    # List your queues and their priorities.\n    # Queues can be defined as Queue instances, tuples, or just a str.\n    queues=[\n        (0, Q_HIGH),\n        (1, Q_MED),\n        (2, Q_LOW),\n        Queue(Q_OTHER, priority=3, max_retries=5, soft_timeout=300, hard_timeout=360),\n        Q_DEFAULT,\n    ],\n\n    # Number of worker processes. Must be an int or str which evaluates to an\n    # int. The variable \"cores\" is replaced with the number of processors on\n    # the current machine.\n    concurrency=\"cores*4\",\n\n    # Raise SoftTimeout in a task if it runs longer than 30 seconds. Can also be set per\n    # task or queue. If no soft timeout set, tasks can run forever.\n    soft_timeout=30,  # seconds\n\n    # SIGKILL a task if it runs longer than 1 minute. Can be set per task or queue.\n    hard_timeout=timedelta(minutes=1),\n\n    # If the task soft timeouts, retry up to 3 times. Max retries comes first\n    # from the task decorator if set, next from the Queue's max_retries,\n    # lastly from the option below. If No max_retries is found, the task\n    # is not retried on a soft timeout.\n    max_retries=3,\n\n    # Combat memory leaks by reloading a worker (the one using the most RAM),\n    # when the total machine RAM usage is at or greater than 98%.\n    max_mem_percent=98,\n\n    # Combat memory leaks by reloading a worker after it's processed 5000 tasks.\n    max_tasks_per_worker=5000,\n\n    # Schedule two tasks, the first runs every minute, the second once every ten minutes.\n    # Scheduled tasks can be passed as CronTask instances or tuples. To run scheduled\n    # tasks you must keep a wakaq scheduler running as a daemon.\n    schedules=[\n\n        # Runs mytask on the queue with priority 1.\n        CronTask('* * * * *', 'mytask', queue=Q_MED, args=[2, 2], kwargs={}),\n\n        # Runs mytask once every 5 minutes.\n        ('*/5 * * * *', 'mytask', [1, 1], {}),\n\n        # Runs anothertask on the default lowest priority queue.\n        ('*/10 * * * *', 'anothertask'),\n    ],\n)\n\n\n# timeouts can be customized per task with a timedelta or integer seconds\n@wakaq.task(queue=Q_MED, max_retries=7, soft_timeout=420, hard_timeout=480)\ndef mytask(x, y):\n    print(x + y)\n\n\n@wakaq.task\ndef anothertask():\n    print(\"hello world\")\n\n\n@wakaq.wrap_tasks_with\ndef custom_task_decorator(fn):\n    def inner(*args, **kwargs):\n        # do something before each task runs\n        fn(*args, **kwargs)\n        # do something after each task runs\n    return inner\n\n\nif __name__ == '__main__':\n\n    # add 1 plus 1 on a worker somewhere\n    mytask.delay(1, 1)\n\n    # add 1 plus 1 on a worker somewhere, overwriting the task's queue from medium to high\n    mytask.delay(1, 1, queue=Q_HIGH)\n\n    # print hello world on a worker somewhere, running on the default lowest priority queue\n    anothertask.delay()\n\n    # print hello world on a worker somewhere, after 10 seconds from now\n    anothertask.delay(eta=timedelta(minutes=10))\n```\n\n## Deploying\n\n#### Optimizing\n\nSee the [WakaQ init params][wakaq init] for a full list of options, like Redis host and Redis socket timeout values.\n\nWhen using in production, make sure to [increase the max open ports][max open ports] allowed for your Redis server process.\n\nWhen using eta tasks a Redis sorted set is used, so eta tasks are automatically deduped based on task name, args, and kwargs.\nIf you want multiple pending eta tasks with the same arguments, just add a throwaway random string to the task\u2019s kwargs for ex: `str(uuid.uuid1())`.\n\n#### Running as a Daemon\n\nHere\u2019s an example systemd config to run `wakaq-worker` as a daemon:\n\n```systemd\n[Unit]\nDescription=WakaQ Worker Service\n\n[Service]\nWorkingDirectory=/opt/yourapp\nExecStart=/opt/yourapp/venv/bin/python /opt/yourapp/venv/bin/wakaq-worker --app=yourapp.wakaq\nRemainAfterExit=no\nRestart=always\nRestartSec=30s\nKillSignal=SIGINT\nLimitNOFILE=99999\n\n[Install]\nWantedBy=multi-user.target\n```\n\nCreate a file at `/etc/systemd/system/wakaqworker.service` with the above contents, then run:\n\n    systemctl daemon-reload && systemctl enable wakaqworker\n\n\n\n[wakatime]: https://wakatime.com\n[broadcast]: https://github.com/wakatime/wakaq/blob/2300ed220d1d9e65e3f9bf328e3059a124f6b529/wakaq/task.py#L47\n[soft timeout]: https://github.com/wakatime/wakaq/blob/2300ed220d1d9e65e3f9bf328e3059a124f6b529/wakaq/exceptions.py#L8\n[hard timeout]: https://github.com/wakatime/wakaq/blob/2300ed220d1d9e65e3f9bf328e3059a124f6b529/wakaq/worker.py#L400\n[wakaq init]: https://github.com/wakatime/wakaq/blob/2300ed220d1d9e65e3f9bf328e3059a124f6b529/wakaq/__init__.py#L49\n[max open ports]: https://wakatime.com/blog/47-maximize-your-concurrent-web-server-connections\n[blog launch]: https://wakatime.com/blog/56-building-a-distributed-task-queue-in-python\n[hacker news]: https://news.ycombinator.com/item?id=32730038\n[wakaq-ts]: https://github.com/wakatime/wakaq-ts\n",
    "bugtrack_url": null,
    "license": "BSD",
    "summary": "Background task queue for Python backed by Redis, a minimal Celery.",
    "version": "2.1.23",
    "project_urls": {
        "Homepage": "https://github.com/wakatime/wakaq"
    },
    "split_keywords": [],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "c94a770037d9f8bb1f8e91fe0810d42cd74cb13ddaf2d11445dc1888bd18eb9b",
                "md5": "9f9f62c7f3c64e55c593fbfda14ef811",
                "sha256": "1a6a18e7dca9d3c5069e67e494d5fe9e724605d42f8485e8697d3f515feaea08"
            },
            "downloads": -1,
            "filename": "WakaQ-2.1.23.tar.gz",
            "has_sig": false,
            "md5_digest": "9f9f62c7f3c64e55c593fbfda14ef811",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">= 3.7",
            "size": 19447,
            "upload_time": "2023-12-16T10:41:20",
            "upload_time_iso_8601": "2023-12-16T10:41:20.647501Z",
            "url": "https://files.pythonhosted.org/packages/c9/4a/770037d9f8bb1f8e91fe0810d42cd74cb13ddaf2d11445dc1888bd18eb9b/WakaQ-2.1.23.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2023-12-16 10:41:20",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "wakatime",
    "github_project": "wakaq",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": false,
    "requirements": [],
    "lcname": "wakaq"
}
        
Elapsed time: 0.14895s