saq


Namesaq JSON
Version 0.25.2 PyPI version JSON
download
home_pagehttps://github.com/tobymao/saq
SummaryDistributed Python job queue with asyncio and redis
upload_time2025-07-10 21:08:22
maintainerNone
docs_urlNone
authorToby Mao
requires_pythonNone
licenseMIT
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # SAQ
SAQ (Simple Async Queue) is a simple and performant job queueing framework built on top of asyncio and redis or postgres. It can be used for processing background jobs with workers. For example, you could use SAQ to schedule emails, execute long queries, or do expensive data analysis.

[Documentation](https://saq-py.readthedocs.io/)

It uses [redis-py](https://github.com/redis/redis-py) >= 4.2.

It is similar to [RQ](https://github.com/rq/rq) and heavily inspired by [ARQ](https://github.com/samuelcolvin/arq). Unlike RQ, it is async and thus [significantly faster](benchmarks) if your jobs are async. Even if they are not, SAQ is still considerably faster due to lower overhead.

SAQ optionally comes with a simple UI for monitor workers and jobs.

<img src="docs/web.png" alt="SAQ Web UI" style="width:100%;"/>

## Install
```
# minimal install for redis
pip install saq[redis]

# minimal install for postgres
pip install saq[postgres]

# web + hiredis
pip install saq[web,hiredis]
```

## Usage
```
usage: saq [-h] [--workers WORKERS] [--verbose] [--web]
           [--extra-web-settings EXTRA_WEB_SETTINGS]
           [--port PORT] [--check]
           settings

Start Simple Async Queue Worker

positional arguments:
  settings              Namespaced variable containing
                        worker settings eg: eg
                        module_a.settings

options:
  -h, --help            show this help message and exit
  --workers WORKERS     Number of worker processes
  --verbose, -v         Logging level: 0: ERROR, 1: INFO,
                        2: DEBUG
  --web                 Start web app. By default, this
                        only monitors the current
                        worker's queue. To monitor
                        multiple queues, see '--extra-
                        web-settings'
  --extra-web-settings EXTRA_WEB_SETTINGS, -e EXTRA_WEB_SETTINGS
                        Additional worker settings to
                        monitor in the web app
  --port PORT           Web app port, defaults to 8080
  --check               Perform a health check

environment variables:
  AUTH_USER     basic auth user, defaults to admin
  AUTH_PASSWORD basic auth password, if not specified, no auth will be used
```

## Example

```python
import asyncio

from saq import CronJob, Queue


class DBHelper:
    """Helper class for demo purposes"""

    async def disconnect(self):
        print("Disconnecting from the database")

    async def connect(self):
        print("Connectiong...")

    def __str__(self):
        return "Your DBHelper at work"

# all functions take in context dict and kwargs
async def test(ctx, *, a):
    await asyncio.sleep(0.5)
    # result should be json serializable
    # custom serializers and deserializers can be used through Queue(dump=,load=)
    return {"x": a}

async def cron(ctx):
    print("i am a cron job")

async def startup(ctx):
    helper = DBHelper()
    await helper.connect()
    ctx["db"] = helper

async def shutdown(ctx):
    await ctx["db"].disconnect()

async def before_process(ctx):
    print(ctx["job"], ctx["db"])

async def after_process(ctx):
    pass

queue = Queue.from_url("redis://localhost")

settings = {
    "queue": queue,
    "functions": [test],
    "concurrency": 10,
    "cron_jobs": [CronJob(cron, cron="* * * * * */5")], # run every 5 seconds
    "startup": startup,
    "shutdown": shutdown,
    "before_process": before_process,
    "after_process": after_process,
}
```

To start the worker, assuming the previous is available in the python path

```
saq module.file.settings
```

> **_Note:_** `module.file.settings` can also be a callable returning the settings dictionary.

To enqueue jobs

```python
# schedule a job normally
job = await queue.enqueue("test", a=1)

# wait 1 second for the job to complete
await job.refresh(1)
print(job.results)

# run a job and return the result
print(await queue.apply("test", a=2))

# run a job with custom polling interval to check status more frequently
print(await queue.apply("test", a=2, poll_interval=0.1))

# Run multiple jobs concurrently and collect the results into a list
print(await queue.map("test", [{"a": 3}, {"a": 4}]))

# schedule a job in 10 seconds
await queue.enqueue("test", a=1, scheduled=time.time() + 10)
```

## Demo

Start the worker

```
python -m saq examples.simple.settings --web
```

Navigate to the [web ui](http://localhost:8080])

Enqueue jobs
```
python examples/simple.py
```

## Comparison to ARQ
SAQ is heavily inspired by [ARQ](https://github.com/samuelcolvin/arq) but has several enhancements.

1. Avoids polling by leveraging [BLMOVE](https://redis.io/commands/blmove) or [RPOPLPUSH](https://redis.io/commands/rpoplpush) and NOTIFY
    1. SAQ has much lower latency than ARQ, with delays of < 5ms. ARQ's default polling frequency is 0.5 seconds
	  2. SAQ is up to [8x faster](benchmarks) than ARQ
2. Web interface for monitoring queues and workers
3. Heartbeat monitor for abandoned jobs
4. More robust failure handling
    1. Storage of stack traces
    2. Sweeping stuck jobs
    3. Handling of cancelled jobs different from failed jobs (machine redeployments)
5. Before and after job hooks
6. Easily run multiple workers to leverage more cores

## Development
```
python -m venv env
source env/bin/activate
pip install -e ".[dev,web]"
docker run -d -p 6379:6379 redis
docker run -d -p 5432:5432 -e POSTGRES_HOST_AUTH_METHOD=trust postgres
make style test
```

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/tobymao/saq",
    "name": "saq",
    "maintainer": null,
    "docs_url": null,
    "requires_python": null,
    "maintainer_email": null,
    "keywords": null,
    "author": "Toby Mao",
    "author_email": "toby.mao@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/87/48/288d184dbf0301c93a3a087b724f099dabdf42ddf5d13a6f3653c455b2ad/saq-0.25.2.tar.gz",
    "platform": null,
    "description": "# SAQ\nSAQ (Simple Async Queue) is a simple and performant job queueing framework built on top of asyncio and redis or postgres. It can be used for processing background jobs with workers. For example, you could use SAQ to schedule emails, execute long queries, or do expensive data analysis.\n\n[Documentation](https://saq-py.readthedocs.io/)\n\nIt uses [redis-py](https://github.com/redis/redis-py) >= 4.2.\n\nIt is similar to [RQ](https://github.com/rq/rq) and heavily inspired by [ARQ](https://github.com/samuelcolvin/arq). Unlike RQ, it is async and thus [significantly faster](benchmarks) if your jobs are async. Even if they are not, SAQ is still considerably faster due to lower overhead.\n\nSAQ optionally comes with a simple UI for monitor workers and jobs.\n\n<img src=\"docs/web.png\" alt=\"SAQ Web UI\" style=\"width:100%;\"/>\n\n## Install\n```\n# minimal install for redis\npip install saq[redis]\n\n# minimal install for postgres\npip install saq[postgres]\n\n# web + hiredis\npip install saq[web,hiredis]\n```\n\n## Usage\n```\nusage: saq [-h] [--workers WORKERS] [--verbose] [--web]\n           [--extra-web-settings EXTRA_WEB_SETTINGS]\n           [--port PORT] [--check]\n           settings\n\nStart Simple Async Queue Worker\n\npositional arguments:\n  settings              Namespaced variable containing\n                        worker settings eg: eg\n                        module_a.settings\n\noptions:\n  -h, --help            show this help message and exit\n  --workers WORKERS     Number of worker processes\n  --verbose, -v         Logging level: 0: ERROR, 1: INFO,\n                        2: DEBUG\n  --web                 Start web app. By default, this\n                        only monitors the current\n                        worker's queue. To monitor\n                        multiple queues, see '--extra-\n                        web-settings'\n  --extra-web-settings EXTRA_WEB_SETTINGS, -e EXTRA_WEB_SETTINGS\n                        Additional worker settings to\n                        monitor in the web app\n  --port PORT           Web app port, defaults to 8080\n  --check               Perform a health check\n\nenvironment variables:\n  AUTH_USER     basic auth user, defaults to admin\n  AUTH_PASSWORD basic auth password, if not specified, no auth will be used\n```\n\n## Example\n\n```python\nimport asyncio\n\nfrom saq import CronJob, Queue\n\n\nclass DBHelper:\n    \"\"\"Helper class for demo purposes\"\"\"\n\n    async def disconnect(self):\n        print(\"Disconnecting from the database\")\n\n    async def connect(self):\n        print(\"Connectiong...\")\n\n    def __str__(self):\n        return \"Your DBHelper at work\"\n\n# all functions take in context dict and kwargs\nasync def test(ctx, *, a):\n    await asyncio.sleep(0.5)\n    # result should be json serializable\n    # custom serializers and deserializers can be used through Queue(dump=,load=)\n    return {\"x\": a}\n\nasync def cron(ctx):\n    print(\"i am a cron job\")\n\nasync def startup(ctx):\n    helper = DBHelper()\n    await helper.connect()\n    ctx[\"db\"] = helper\n\nasync def shutdown(ctx):\n    await ctx[\"db\"].disconnect()\n\nasync def before_process(ctx):\n    print(ctx[\"job\"], ctx[\"db\"])\n\nasync def after_process(ctx):\n    pass\n\nqueue = Queue.from_url(\"redis://localhost\")\n\nsettings = {\n    \"queue\": queue,\n    \"functions\": [test],\n    \"concurrency\": 10,\n    \"cron_jobs\": [CronJob(cron, cron=\"* * * * * */5\")], # run every 5 seconds\n    \"startup\": startup,\n    \"shutdown\": shutdown,\n    \"before_process\": before_process,\n    \"after_process\": after_process,\n}\n```\n\nTo start the worker, assuming the previous is available in the python path\n\n```\nsaq module.file.settings\n```\n\n> **_Note:_** `module.file.settings` can also be a callable returning the settings dictionary.\n\nTo enqueue jobs\n\n```python\n# schedule a job normally\njob = await queue.enqueue(\"test\", a=1)\n\n# wait 1 second for the job to complete\nawait job.refresh(1)\nprint(job.results)\n\n# run a job and return the result\nprint(await queue.apply(\"test\", a=2))\n\n# run a job with custom polling interval to check status more frequently\nprint(await queue.apply(\"test\", a=2, poll_interval=0.1))\n\n# Run multiple jobs concurrently and collect the results into a list\nprint(await queue.map(\"test\", [{\"a\": 3}, {\"a\": 4}]))\n\n# schedule a job in 10 seconds\nawait queue.enqueue(\"test\", a=1, scheduled=time.time() + 10)\n```\n\n## Demo\n\nStart the worker\n\n```\npython -m saq examples.simple.settings --web\n```\n\nNavigate to the [web ui](http://localhost:8080])\n\nEnqueue jobs\n```\npython examples/simple.py\n```\n\n## Comparison to ARQ\nSAQ is heavily inspired by [ARQ](https://github.com/samuelcolvin/arq) but has several enhancements.\n\n1. Avoids polling by leveraging [BLMOVE](https://redis.io/commands/blmove) or [RPOPLPUSH](https://redis.io/commands/rpoplpush) and NOTIFY\n    1. SAQ has much lower latency than ARQ, with delays of < 5ms. ARQ's default polling frequency is 0.5 seconds\n\t  2. SAQ is up to [8x faster](benchmarks) than ARQ\n2. Web interface for monitoring queues and workers\n3. Heartbeat monitor for abandoned jobs\n4. More robust failure handling\n    1. Storage of stack traces\n    2. Sweeping stuck jobs\n    3. Handling of cancelled jobs different from failed jobs (machine redeployments)\n5. Before and after job hooks\n6. Easily run multiple workers to leverage more cores\n\n## Development\n```\npython -m venv env\nsource env/bin/activate\npip install -e \".[dev,web]\"\ndocker run -d -p 6379:6379 redis\ndocker run -d -p 5432:5432 -e POSTGRES_HOST_AUTH_METHOD=trust postgres\nmake style test\n```\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "Distributed Python job queue with asyncio and redis",
    "version": "0.25.2",
    "project_urls": {
        "Homepage": "https://github.com/tobymao/saq"
    },
    "split_keywords": [],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "520744c9ef5498188170c07234d9a116e8f4ccf96c4d7be63eab55ac55f013a8",
                "md5": "b5b0f1d941dbd139b42d421b8e96036e",
                "sha256": "9802ec239d96b922b0c9c4ebcd31ad5caef448fdf56c70cff06ba80096da30cb"
            },
            "downloads": -1,
            "filename": "saq-0.25.2-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "b5b0f1d941dbd139b42d421b8e96036e",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": null,
            "size": 61350,
            "upload_time": "2025-07-10T21:08:21",
            "upload_time_iso_8601": "2025-07-10T21:08:21.502497Z",
            "url": "https://files.pythonhosted.org/packages/52/07/44c9ef5498188170c07234d9a116e8f4ccf96c4d7be63eab55ac55f013a8/saq-0.25.2-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "8748288d184dbf0301c93a3a087b724f099dabdf42ddf5d13a6f3653c455b2ad",
                "md5": "224ac427f72f024a2c507cff1af2a346",
                "sha256": "ee917a979984f9cb2c426718cefc9a5ddf19107b35df25a6abeebc194b131d03"
            },
            "downloads": -1,
            "filename": "saq-0.25.2.tar.gz",
            "has_sig": false,
            "md5_digest": "224ac427f72f024a2c507cff1af2a346",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": null,
            "size": 58298,
            "upload_time": "2025-07-10T21:08:22",
            "upload_time_iso_8601": "2025-07-10T21:08:22.645918Z",
            "url": "https://files.pythonhosted.org/packages/87/48/288d184dbf0301c93a3a087b724f099dabdf42ddf5d13a6f3653c455b2ad/saq-0.25.2.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-07-10 21:08:22",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "tobymao",
    "github_project": "saq",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "saq"
}
        
Elapsed time: 1.72794s