[](https://pypi.org/project/taskiq-postgres/)
[](https://pypi.org/project/taskiq-postgres/)
[&style=for-the-badge)](https://github.com/danfimov/taskiq-postgres)
<div align="center">
<a href="https://github.com/danfimov/taskiq-postgres/"><img src="https://raw.githubusercontent.com/danfimov/taskiq-postgres/main/assets/logo.png" width=400></a>
<hr/>
</div>
PostgreSQL integration for Taskiq with support for asyncpg, psqlpy and aiopg drivers.
See more example of usage in [the documentation](https://danfimov.github.io/taskiq-postgres/) or [examples directory](https://github.com/danfimov/taskiq-postgres/examples).
## Installation
Depending on your preferred PostgreSQL driver, you can install this library with the corresponding extra:
```bash
# with asyncpg
pip install taskiq-postgres[asyncpg]
# with psqlpy
pip install taskiq-postgres[psqlpy]
# with aiopg
pip install taskiq-postgres[aiopg]
```
## Quick start
### Basic task processing
1. Define your broker with [asyncpg](https://github.com/MagicStack/asyncpg):
```python
# broker_example.py
import asyncio
from taskiq_pg.asyncpg import AsyncpgBroker, AsyncpgResultBackend
dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres"
broker = AsyncpgBroker(dsn).with_result_backend(AsyncpgResultBackend(dsn))
@broker.task("solve_all_problems")
async def best_task_ever() -> None:
"""Solve all problems in the world."""
await asyncio.sleep(2)
print("All problems are solved!")
async def main():
await broker.startup()
task = await best_task_ever.kiq()
print(await task.wait_result())
await broker.shutdown()
if __name__ == "__main__":
asyncio.run(main())
```
2. Start a worker to process tasks (by default taskiq runs two instances of worker):
```bash
taskiq worker broker_example:broker
```
3. Run `broker_example.py` file to send a task to the worker:
```bash
python broker_example.py
```
Your experience with other drivers will be pretty similar. Just change the import statement and that's it.
### Task scheduling
1. Define your broker and schedule source:
```python
# scheduler_example.py
import asyncio
from taskiq import TaskiqScheduler
from taskiq_pg.asyncpg import AsyncpgBroker, AsyncpgScheduleSource
dsn = "postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres"
broker = AsyncpgBroker(dsn)
scheduler = TaskiqScheduler(
broker=broker,
sources=[AsyncpgScheduleSource(
dsn=dsn,
broker=broker,
)],
)
@broker.task(
task_name="solve_all_problems",
schedule=[
{
"cron": "*/1 * * * *", # type: str, either cron or time should be specified.
"cron_offset": None, # type: str | timedelta | None, can be omitted.
"time": None, # type: datetime | None, either cron or time should be specified.
"args": [], # type list[Any] | None, can be omitted.
"kwargs": {}, # type: dict[str, Any] | None, can be omitted.
"labels": {}, # type: dict[str, Any] | None, can be omitted.
},
],
)
async def best_task_ever() -> None:
"""Solve all problems in the world."""
await asyncio.sleep(2)
print("All problems are solved!")
```
2. Start worker processes:
```bash
taskiq worker scheduler_example:broker
```
3. Run scheduler process:
```bash
taskiq scheduler scheduler_example:scheduler
```
## Motivation
There are too many libraries for PostgreSQL and Taskiq integration. Although they have different view on interface and different functionality.
To address this issue I created this library with a common interface for most popular PostgreSQL drivers that handle similarity across functionality of result backends, brokers and schedule sources.
Raw data
{
"_id": null,
"home_page": null,
"name": "taskiq-postgres",
"maintainer": null,
"docs_url": null,
"requires_python": "<3.14,>=3.10",
"maintainer_email": null,
"keywords": "taskiq, tasks, distributed, async, postgresql",
"author": "Anfimov Dima",
"author_email": "Anfimov Dima <lovesolaristics@gmail.com>",
"download_url": "https://files.pythonhosted.org/packages/27/34/f3c960ee669794090ea6b2d31338b02774b5fa677a6f697736e08c609338/taskiq_postgres-0.3.0.tar.gz",
"platform": null,
"description": "[](https://pypi.org/project/taskiq-postgres/)\n[](https://pypi.org/project/taskiq-postgres/)\n[&style=for-the-badge)](https://github.com/danfimov/taskiq-postgres)\n\n<div align=\"center\">\n<a href=\"https://github.com/danfimov/taskiq-postgres/\"><img src=\"https://raw.githubusercontent.com/danfimov/taskiq-postgres/main/assets/logo.png\" width=400></a>\n<hr/>\n</div>\n\nPostgreSQL integration for Taskiq with support for asyncpg, psqlpy and aiopg drivers.\n\nSee more example of usage in [the documentation](https://danfimov.github.io/taskiq-postgres/) or [examples directory](https://github.com/danfimov/taskiq-postgres/examples).\n\n## Installation\n\nDepending on your preferred PostgreSQL driver, you can install this library with the corresponding extra:\n\n```bash\n# with asyncpg\npip install taskiq-postgres[asyncpg]\n\n# with psqlpy\npip install taskiq-postgres[psqlpy]\n\n# with aiopg\npip install taskiq-postgres[aiopg]\n```\n\n## Quick start\n\n### Basic task processing\n\n1. Define your broker with [asyncpg](https://github.com/MagicStack/asyncpg):\n\n ```python\n # broker_example.py\n import asyncio\n from taskiq_pg.asyncpg import AsyncpgBroker, AsyncpgResultBackend\n\n\n dsn = \"postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres\"\n broker = AsyncpgBroker(dsn).with_result_backend(AsyncpgResultBackend(dsn))\n\n\n @broker.task(\"solve_all_problems\")\n async def best_task_ever() -> None:\n \"\"\"Solve all problems in the world.\"\"\"\n await asyncio.sleep(2)\n print(\"All problems are solved!\")\n\n\n async def main():\n await broker.startup()\n task = await best_task_ever.kiq()\n print(await task.wait_result())\n await broker.shutdown()\n\n\n if __name__ == \"__main__\":\n asyncio.run(main())\n ```\n\n2. Start a worker to process tasks (by default taskiq runs two instances of worker):\n\n ```bash\n taskiq worker broker_example:broker\n ```\n\n3. Run `broker_example.py` file to send a task to the worker:\n\n ```bash\n python broker_example.py\n ```\n\nYour experience with other drivers will be pretty similar. Just change the import statement and that's it.\n\n### Task scheduling\n\n1. Define your broker and schedule source:\n\n ```python\n # scheduler_example.py\n import asyncio\n from taskiq import TaskiqScheduler\n from taskiq_pg.asyncpg import AsyncpgBroker, AsyncpgScheduleSource\n\n\n dsn = \"postgres://taskiq_postgres:look_in_vault@localhost:5432/taskiq_postgres\"\n broker = AsyncpgBroker(dsn)\n scheduler = TaskiqScheduler(\n broker=broker,\n sources=[AsyncpgScheduleSource(\n dsn=dsn,\n broker=broker,\n )],\n )\n\n\n @broker.task(\n task_name=\"solve_all_problems\",\n schedule=[\n {\n \"cron\": \"*/1 * * * *\", # type: str, either cron or time should be specified.\n \"cron_offset\": None, # type: str | timedelta | None, can be omitted.\n \"time\": None, # type: datetime | None, either cron or time should be specified.\n \"args\": [], # type list[Any] | None, can be omitted.\n \"kwargs\": {}, # type: dict[str, Any] | None, can be omitted.\n \"labels\": {}, # type: dict[str, Any] | None, can be omitted.\n },\n ],\n )\n async def best_task_ever() -> None:\n \"\"\"Solve all problems in the world.\"\"\"\n await asyncio.sleep(2)\n print(\"All problems are solved!\")\n\n ```\n\n2. Start worker processes:\n\n ```bash\n taskiq worker scheduler_example:broker\n ```\n\n3. Run scheduler process:\n\n ```bash\n taskiq scheduler scheduler_example:scheduler\n ```\n\n## Motivation\n\nThere are too many libraries for PostgreSQL and Taskiq integration. Although they have different view on interface and different functionality.\nTo address this issue I created this library with a common interface for most popular PostgreSQL drivers that handle similarity across functionality of result backends, brokers and schedule sources.\n",
"bugtrack_url": null,
"license": null,
"summary": "PostgreSQL integration for taskiq",
"version": "0.3.0",
"project_urls": {
"Bug Tracker": "https://github.com/danfimov/taskiq-postgres/issues",
"Repository": "https://github.com/danfimov/taskiq-postgres/"
},
"split_keywords": [
"taskiq",
" tasks",
" distributed",
" async",
" postgresql"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "bb4e0378317174a64d94b81b68d54e502ecc8bb3d23bf9a5d8b16d0928fbecc1",
"md5": "5ab21a96580b03131969de3fe215e332",
"sha256": "86a28dcf3c1d56abbb89d188d14b5c50b4da8365d6d6e864c11a8c89ed7e546c"
},
"downloads": -1,
"filename": "taskiq_postgres-0.3.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "5ab21a96580b03131969de3fe215e332",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "<3.14,>=3.10",
"size": 18506,
"upload_time": "2025-10-09T20:43:51",
"upload_time_iso_8601": "2025-10-09T20:43:51.046801Z",
"url": "https://files.pythonhosted.org/packages/bb/4e/0378317174a64d94b81b68d54e502ecc8bb3d23bf9a5d8b16d0928fbecc1/taskiq_postgres-0.3.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "2734f3c960ee669794090ea6b2d31338b02774b5fa677a6f697736e08c609338",
"md5": "704ee622e923f6d8f80fece217125909",
"sha256": "579601c66f3acc07a7ffe500a541f14de4329a27b319bfac281c1af6531dd47b"
},
"downloads": -1,
"filename": "taskiq_postgres-0.3.0.tar.gz",
"has_sig": false,
"md5_digest": "704ee622e923f6d8f80fece217125909",
"packagetype": "sdist",
"python_version": "source",
"requires_python": "<3.14,>=3.10",
"size": 10606,
"upload_time": "2025-10-09T20:43:52",
"upload_time_iso_8601": "2025-10-09T20:43:52.208008Z",
"url": "https://files.pythonhosted.org/packages/27/34/f3c960ee669794090ea6b2d31338b02774b5fa677a6f697736e08c609338/taskiq_postgres-0.3.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-10-09 20:43:52",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "danfimov",
"github_project": "taskiq-postgres",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "taskiq-postgres"
}