taskiq-pg


Nametaskiq-pg JSON
Version 0.1.3 PyPI version JSON
download
home_pageNone
SummaryAsyncPG and PostgreSQL integration for taskiq
upload_time2025-01-17 01:10:57
maintainerNone
docs_urlNone
authorNone
requires_python>=3.8.1
licenseNone
keywords async asyncpg distributed postgresql result_backend taskiq tasks
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # TaskIQ - asyncpg

TaskIQ-pg is a plugin for taskiq that adds a new result backend and a new broker based on PostgreSQL and [asyncpg](https://github.com/MagicStack/asyncpg).

The broker makes use of Postgres' built in `LISTEN/NOTIFY` functionality.

This is a fork of [taskiq-psqlpy](https://github.com/taskiq-python/taskiq-psqlpy) that adds a broker (because PSQLPy does not currently support `LISTEN/NOTIFY`).

## Installation

To use this project you must have installed core taskiq library:

```bash
pip install taskiq
```

This project can be installed using pip:

```bash
pip install taskiq-pg
```

Or using poetry:

```
poetry add taskiq-pg
```

## Usage

An example with the broker and result backend:

```python
# example.py
import asyncio

from taskiq_pg import AsyncpgBroker, AsyncpgResultBackend

asyncpg_result_backend = AsyncpgResultBackend(
    dsn="postgres://postgres:postgres@localhost:15432/postgres",
)

broker = AsyncpgBroker(
    dsn="postgres://postgres:postgres@localhost:15432/postgres",
).with_result_backend(asyncpg_result_backend)


@broker.task()
async def best_task_ever() -> str:
    """Solve all problems in the world."""
    await asyncio.sleep(1.0)
    return "All problems are solved!"


async def main() -> None:
    """Main."""
    await broker.startup()
    task = await best_task_ever.kiq()
    result = await task.wait_result(timeout=2)
    print(result)
    await broker.shutdown()


if __name__ == "__main__":
    asyncio.run(main())
```

### Run example

**shell 1: start a worker**

```sh
$ taskiq worker example:broker
[2025-01-06 11:48:14,171][taskiq.worker][INFO   ][MainProcess] Pid of a main process: 80434
[2025-01-06 11:48:14,171][taskiq.worker][INFO   ][MainProcess] Starting 2 worker processes.
[2025-01-06 11:48:14,175][taskiq.process-manager][INFO   ][MainProcess] Started process worker-0 with pid 80436
[2025-01-06 11:48:14,176][taskiq.process-manager][INFO   ][MainProcess] Started process worker-1 with pid 80437
```

**shell 2: run the example script**

```sh
$ python example.py
is_err=False log=None return_value='All problems are solved!' execution_time=1.0 labels={} error=None

```

### Details

The result backend stores the data as raw bytes by default, you can decode them in SQL:

```sql
select convert_from(result, 'UTF8') from taskiq_results;
-- Example results:
-- - success:
--   {
--     "is_err": false,
--     "log": null,
--     "return_value": "All problems are solved!",
--     "execution_time": 1.0,
--     "labels": {},
--     "error": null
--   }
-- - failure:
--   {
--     "is_err": true,
--     "log": null,
--     "return_value": null,
--     "execution_time": 10.0,
--     "labels": {},
--     "error": {
--       "exc_type": "ValueError",
--       "exc_message": ["Borked"],
--       "exc_module": "builtins",
--       "exc_cause": null,
--       "exc_context": null,
--       "exc_suppress_context": false
--     }
--   }
```

## AsyncpgResultBackend configuration

- `dsn`: connection string to PostgreSQL.
- `keep_results`: flag to not remove results from Redis after reading.
- `table_name`: name of the table in PostgreSQL to store TaskIQ results.
- `field_for_task_id`: type of a field for `task_id`, you may need it if you want to have length of task_id more than 255 symbols.
- `**connect_kwargs`: additional connection parameters, you can read more about it in [asyncpg](https://github.com/MagicStack/asyncpg) repository.

## AsyncpgBroker configuration

- `dsn`: Connection string to PostgreSQL.
- `result_backend`: Custom result backend.
- `task_id_generator`: Custom task_id generator.
- `channel_name`: Name of the channel to listen on.
- `table_name`: Name of the table to store messages.
- `max_retry_attempts`: Maximum number of message processing attempts.
- `connection_kwargs`: Additional arguments for asyncpg connection.
- `pool_kwargs`: Additional arguments for asyncpg pool creation.

## Acknowledgements

Builds on work from [pgmq](https://github.com/oliverlambson/pgmq).

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "taskiq-pg",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.8.1",
    "maintainer_email": null,
    "keywords": "async, asyncpg, distributed, postgresql, result_backend, taskiq, tasks",
    "author": null,
    "author_email": "Oliver Lambson <ollie@karoo.ca>",
    "download_url": "https://files.pythonhosted.org/packages/4c/5d/cc7ba5c07f4a1d2aa4a0924b1daa02e420d41038d6fa48e387b3e8f9f890/taskiq_pg-0.1.3.tar.gz",
    "platform": null,
    "description": "# TaskIQ - asyncpg\n\nTaskIQ-pg is a plugin for taskiq that adds a new result backend and a new broker based on PostgreSQL and [asyncpg](https://github.com/MagicStack/asyncpg).\n\nThe broker makes use of Postgres' built in `LISTEN/NOTIFY` functionality.\n\nThis is a fork of [taskiq-psqlpy](https://github.com/taskiq-python/taskiq-psqlpy) that adds a broker (because PSQLPy does not currently support `LISTEN/NOTIFY`).\n\n## Installation\n\nTo use this project you must have installed core taskiq library:\n\n```bash\npip install taskiq\n```\n\nThis project can be installed using pip:\n\n```bash\npip install taskiq-pg\n```\n\nOr using poetry:\n\n```\npoetry add taskiq-pg\n```\n\n## Usage\n\nAn example with the broker and result backend:\n\n```python\n# example.py\nimport asyncio\n\nfrom taskiq_pg import AsyncpgBroker, AsyncpgResultBackend\n\nasyncpg_result_backend = AsyncpgResultBackend(\n    dsn=\"postgres://postgres:postgres@localhost:15432/postgres\",\n)\n\nbroker = AsyncpgBroker(\n    dsn=\"postgres://postgres:postgres@localhost:15432/postgres\",\n).with_result_backend(asyncpg_result_backend)\n\n\n@broker.task()\nasync def best_task_ever() -> str:\n    \"\"\"Solve all problems in the world.\"\"\"\n    await asyncio.sleep(1.0)\n    return \"All problems are solved!\"\n\n\nasync def main() -> None:\n    \"\"\"Main.\"\"\"\n    await broker.startup()\n    task = await best_task_ever.kiq()\n    result = await task.wait_result(timeout=2)\n    print(result)\n    await broker.shutdown()\n\n\nif __name__ == \"__main__\":\n    asyncio.run(main())\n```\n\n### Run example\n\n**shell 1: start a worker**\n\n```sh\n$ taskiq worker example:broker\n[2025-01-06 11:48:14,171][taskiq.worker][INFO   ][MainProcess] Pid of a main process: 80434\n[2025-01-06 11:48:14,171][taskiq.worker][INFO   ][MainProcess] Starting 2 worker processes.\n[2025-01-06 11:48:14,175][taskiq.process-manager][INFO   ][MainProcess] Started process worker-0 with pid 80436\n[2025-01-06 11:48:14,176][taskiq.process-manager][INFO   ][MainProcess] Started process worker-1 with pid 80437\n```\n\n**shell 2: run the example script**\n\n```sh\n$ python example.py\nis_err=False log=None return_value='All problems are solved!' execution_time=1.0 labels={} error=None\n\n```\n\n### Details\n\nThe result backend stores the data as raw bytes by default, you can decode them in SQL:\n\n```sql\nselect convert_from(result, 'UTF8') from taskiq_results;\n-- Example results:\n-- - success:\n--   {\n--     \"is_err\": false,\n--     \"log\": null,\n--     \"return_value\": \"All problems are solved!\",\n--     \"execution_time\": 1.0,\n--     \"labels\": {},\n--     \"error\": null\n--   }\n-- - failure:\n--   {\n--     \"is_err\": true,\n--     \"log\": null,\n--     \"return_value\": null,\n--     \"execution_time\": 10.0,\n--     \"labels\": {},\n--     \"error\": {\n--       \"exc_type\": \"ValueError\",\n--       \"exc_message\": [\"Borked\"],\n--       \"exc_module\": \"builtins\",\n--       \"exc_cause\": null,\n--       \"exc_context\": null,\n--       \"exc_suppress_context\": false\n--     }\n--   }\n```\n\n## AsyncpgResultBackend configuration\n\n- `dsn`: connection string to PostgreSQL.\n- `keep_results`: flag to not remove results from Redis after reading.\n- `table_name`: name of the table in PostgreSQL to store TaskIQ results.\n- `field_for_task_id`: type of a field for `task_id`, you may need it if you want to have length of task_id more than 255 symbols.\n- `**connect_kwargs`: additional connection parameters, you can read more about it in [asyncpg](https://github.com/MagicStack/asyncpg) repository.\n\n## AsyncpgBroker configuration\n\n- `dsn`: Connection string to PostgreSQL.\n- `result_backend`: Custom result backend.\n- `task_id_generator`: Custom task_id generator.\n- `channel_name`: Name of the channel to listen on.\n- `table_name`: Name of the table to store messages.\n- `max_retry_attempts`: Maximum number of message processing attempts.\n- `connection_kwargs`: Additional arguments for asyncpg connection.\n- `pool_kwargs`: Additional arguments for asyncpg pool creation.\n\n## Acknowledgements\n\nBuilds on work from [pgmq](https://github.com/oliverlambson/pgmq).\n",
    "bugtrack_url": null,
    "license": null,
    "summary": "AsyncPG and PostgreSQL integration for taskiq",
    "version": "0.1.3",
    "project_urls": {
        "Repository": "https://github.com/karoo-ca/taskiq-pg"
    },
    "split_keywords": [
        "async",
        " asyncpg",
        " distributed",
        " postgresql",
        " result_backend",
        " taskiq",
        " tasks"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "5b30c7adb1daac2aa369d93478f45384291e58d81c200849f60f0cd31e6002ee",
                "md5": "62895d0905b96035294280b97ed9500c",
                "sha256": "088cebd0d48931cc1185d23dd547aa5a54dc3c01e08ab1be565b08c92588bf62"
            },
            "downloads": -1,
            "filename": "taskiq_pg-0.1.3-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "62895d0905b96035294280b97ed9500c",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.8.1",
            "size": 8814,
            "upload_time": "2025-01-17T01:10:53",
            "upload_time_iso_8601": "2025-01-17T01:10:53.580025Z",
            "url": "https://files.pythonhosted.org/packages/5b/30/c7adb1daac2aa369d93478f45384291e58d81c200849f60f0cd31e6002ee/taskiq_pg-0.1.3-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "4c5dcc7ba5c07f4a1d2aa4a0924b1daa02e420d41038d6fa48e387b3e8f9f890",
                "md5": "15a6d095ffd540048aa1569cab341a53",
                "sha256": "9156875e863f1ae45d217ba1e08c31830f1b49fe77adcfe34a2d9b57a0b78e8e"
            },
            "downloads": -1,
            "filename": "taskiq_pg-0.1.3.tar.gz",
            "has_sig": false,
            "md5_digest": "15a6d095ffd540048aa1569cab341a53",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.8.1",
            "size": 9032,
            "upload_time": "2025-01-17T01:10:57",
            "upload_time_iso_8601": "2025-01-17T01:10:57.936549Z",
            "url": "https://files.pythonhosted.org/packages/4c/5d/cc7ba5c07f4a1d2aa4a0924b1daa02e420d41038d6fa48e387b3e8f9f890/taskiq_pg-0.1.3.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-01-17 01:10:57",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "karoo-ca",
    "github_project": "taskiq-pg",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "taskiq-pg"
}
        
Elapsed time: 0.48236s