# Taskiq + FastAPI
This repository has a code to integrate FastAPI with taskiq easily.
Taskiq and FastAPI both have dependencies and this library makes it possible to depend on
`fastapi.Request` or `starlette.requests.HTTPConnection` in taskiq tasks.
With this library you can easily re-use your fastapi dependencies in taskiq functions.
## How does it work?
It adds startup functions to broker so it imports your fastapi application
and creates a single worker-wide Request and HTTPConnection objects that you depend on.
THIS REQUEST IS NOT RELATED TO THE ACTUAL REQUESTS IN FASTAPI!
This request won't have actual data about the request you were handling while sending task.
## Usage
Here we have an example of function that is being used by both taskiq's task and
fastapi's handler function.
I have a script called `test_script.py` so my app can be found at `test_script:app`.
We use strings to resolve application to bypass circular imports.
Also, as you can see, we use `TaskiqDepends` for Request. That's because
taskiq dependency resolver must know that this type must be injected. FastAPI disallow
Depends for Request type. That's why we use `TaskiqDepends`.
```python
from fastapi import FastAPI, Request
from pydantic import BaseModel
from redis.asyncio import ConnectionPool, Redis
from fastapi import Depends as FastAPIDepends
from taskiq import TaskiqDepends
import taskiq_fastapi
from taskiq import ZeroMQBroker
broker = ZeroMQBroker()
app = FastAPI()
@app.on_event("startup")
async def app_startup():
#####################
# IMPORTANT NOTE #
#####################
# If you won't check that this is not
# a worker process, you'll
# create an infinite recursion. Because in worker processes
# fastapi startup will be called.
if not broker.is_worker_process:
print("Starting broker")
await broker.startup()
print("Creating redis pool")
app.state.redis_pool = ConnectionPool.from_url("redis://localhost")
@app.on_event("shutdown")
async def app_shutdown():
#####################
# IMPORTANT NOTE #
#####################
# If you won't check that this is not
# a worker process, you'll
# create an infinite recursion. Because in worker processes
# fastapi startup will be called.
if not broker.is_worker_process:
print("Shutting down broker")
await broker.shutdown()
print("Stopping redis pool")
await app.state.redis_pool.disconnect()
# Here we call our magic function.
taskiq_fastapi.init(broker, "test_script:app")
# We use TaskiqDepends here, because if we use FastAPIDepends fastapi
# initialization will fail.
def get_redis_pool(request: Request = TaskiqDepends()) -> ConnectionPool:
return request.app.state.redis_pool
@broker.task
async def my_redis_task(
key: str,
val: str,
# Here we depend using TaskiqDepends.
# Please use TaskiqDepends for all tasks to be resolved correctly.
# Or dependencies won't be injected.
pool: ConnectionPool = TaskiqDepends(get_redis_pool),
):
async with Redis(connection_pool=pool) as redis:
await redis.set(key, val)
print("Value set.")
class MyVal(BaseModel):
key: str
val: str
@app.post("/val")
async def setval_endpoint(val: MyVal) -> None:
await my_redis_task.kiq(
key=val.key,
val=val.val,
)
print("Task sent")
@app.get("/val")
async def getval_endpoint(
key: str,
pool: ConnectionPool = FastAPIDepends(get_redis_pool),
) -> str:
async with Redis(connection_pool=pool, decode_responses=True) as redis:
return await redis.get(key)
```
## Manually update dependency context
When using `InMemoryBroker` it may be required to update the dependency context manually. This may also be useful when setting up tests.
```py
import taskiq_fastapi
from taskiq import InMemoryBroker
broker = InMemoryBroker()
app = FastAPI()
taskiq_fastapi.init(broker, "test_script:app")
taskiq_fastapi.populate_dependency_context(broker, app)
```
Raw data
{
"_id": null,
"home_page": null,
"name": "taskiq-fastapi",
"maintainer": "Taskiq team",
"docs_url": null,
"requires_python": "<4.0.0,>=3.8.1",
"maintainer_email": "taskiq@no-reply.com",
"keywords": "taskiq, tasks, distributed, async, fastapi",
"author": "Taskiq team",
"author_email": "taskiq@no-reply.com",
"download_url": "https://files.pythonhosted.org/packages/e2/52/9461f3ae4640c5357eb21317e6d1b06a24b758c8f2f6c8c023f2e144138a/taskiq_fastapi-0.3.3.tar.gz",
"platform": null,
"description": "# Taskiq + FastAPI\n\nThis repository has a code to integrate FastAPI with taskiq easily.\n\nTaskiq and FastAPI both have dependencies and this library makes it possible to depend on\n`fastapi.Request` or `starlette.requests.HTTPConnection` in taskiq tasks.\n\nWith this library you can easily re-use your fastapi dependencies in taskiq functions.\n\n## How does it work?\n\nIt adds startup functions to broker so it imports your fastapi application\nand creates a single worker-wide Request and HTTPConnection objects that you depend on.\n\nTHIS REQUEST IS NOT RELATED TO THE ACTUAL REQUESTS IN FASTAPI!\nThis request won't have actual data about the request you were handling while sending task.\n\n## Usage\n\nHere we have an example of function that is being used by both taskiq's task and\nfastapi's handler function.\n\nI have a script called `test_script.py` so my app can be found at `test_script:app`.\nWe use strings to resolve application to bypass circular imports.\n\nAlso, as you can see, we use `TaskiqDepends` for Request. That's because\ntaskiq dependency resolver must know that this type must be injected. FastAPI disallow\nDepends for Request type. That's why we use `TaskiqDepends`.\n\n```python\nfrom fastapi import FastAPI, Request\nfrom pydantic import BaseModel\nfrom redis.asyncio import ConnectionPool, Redis\nfrom fastapi import Depends as FastAPIDepends\nfrom taskiq import TaskiqDepends\nimport taskiq_fastapi\nfrom taskiq import ZeroMQBroker\n\nbroker = ZeroMQBroker()\n\napp = FastAPI()\n\n\n@app.on_event(\"startup\")\nasync def app_startup():\n #####################\n # IMPORTANT NOTE #\n #####################\n # If you won't check that this is not\n # a worker process, you'll\n # create an infinite recursion. Because in worker processes\n # fastapi startup will be called.\n if not broker.is_worker_process:\n print(\"Starting broker\")\n await broker.startup()\n print(\"Creating redis pool\")\n app.state.redis_pool = ConnectionPool.from_url(\"redis://localhost\")\n\n\n@app.on_event(\"shutdown\")\nasync def app_shutdown():\n #####################\n # IMPORTANT NOTE #\n #####################\n # If you won't check that this is not\n # a worker process, you'll\n # create an infinite recursion. Because in worker processes\n # fastapi startup will be called.\n if not broker.is_worker_process:\n print(\"Shutting down broker\")\n await broker.shutdown()\n print(\"Stopping redis pool\")\n await app.state.redis_pool.disconnect()\n\n\n# Here we call our magic function.\ntaskiq_fastapi.init(broker, \"test_script:app\")\n\n\n# We use TaskiqDepends here, because if we use FastAPIDepends fastapi\n# initialization will fail.\ndef get_redis_pool(request: Request = TaskiqDepends()) -> ConnectionPool:\n return request.app.state.redis_pool\n\n\n@broker.task\nasync def my_redis_task(\n key: str,\n val: str,\n # Here we depend using TaskiqDepends.\n # Please use TaskiqDepends for all tasks to be resolved correctly.\n # Or dependencies won't be injected.\n pool: ConnectionPool = TaskiqDepends(get_redis_pool),\n):\n async with Redis(connection_pool=pool) as redis:\n await redis.set(key, val)\n print(\"Value set.\")\n\n\nclass MyVal(BaseModel):\n key: str\n val: str\n\n\n@app.post(\"/val\")\nasync def setval_endpoint(val: MyVal) -> None:\n await my_redis_task.kiq(\n key=val.key,\n val=val.val,\n )\n print(\"Task sent\")\n\n\n@app.get(\"/val\")\nasync def getval_endpoint(\n key: str,\n pool: ConnectionPool = FastAPIDepends(get_redis_pool),\n) -> str:\n async with Redis(connection_pool=pool, decode_responses=True) as redis:\n return await redis.get(key)\n\n```\n\n## Manually update dependency context\n\nWhen using `InMemoryBroker` it may be required to update the dependency context manually. This may also be useful when setting up tests.\n\n```py\nimport taskiq_fastapi\nfrom taskiq import InMemoryBroker\n\nbroker = InMemoryBroker()\n\napp = FastAPI()\n\ntaskiq_fastapi.init(broker, \"test_script:app\")\ntaskiq_fastapi.populate_dependency_context(broker, app)\n```\n",
"bugtrack_url": null,
"license": "LICENSE",
"summary": "FastAPI integration for taskiq",
"version": "0.3.3",
"project_urls": null,
"split_keywords": [
"taskiq",
" tasks",
" distributed",
" async",
" fastapi"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "0cac0528ff3c45f01f5ab4bb9bef218bfed821bc4009d151c6779ce6c9453b74",
"md5": "f1aabfda6f29e03603d4ccb97444903c",
"sha256": "b08df2f923ead3acaddae4e0afe9f07a29b9fac879300ba964c894288c91e90a"
},
"downloads": -1,
"filename": "taskiq_fastapi-0.3.3-py3-none-any.whl",
"has_sig": false,
"md5_digest": "f1aabfda6f29e03603d4ccb97444903c",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "<4.0.0,>=3.8.1",
"size": 4847,
"upload_time": "2024-12-14T10:43:45",
"upload_time_iso_8601": "2024-12-14T10:43:45.577170Z",
"url": "https://files.pythonhosted.org/packages/0c/ac/0528ff3c45f01f5ab4bb9bef218bfed821bc4009d151c6779ce6c9453b74/taskiq_fastapi-0.3.3-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "e2529461f3ae4640c5357eb21317e6d1b06a24b758c8f2f6c8c023f2e144138a",
"md5": "9b7150f3c16a1041c857497020756928",
"sha256": "c72fd3f1b602b7ab611ce7a5131c9b12167c5fc1b11f3c938b0c7a3f0e507b83"
},
"downloads": -1,
"filename": "taskiq_fastapi-0.3.3.tar.gz",
"has_sig": false,
"md5_digest": "9b7150f3c16a1041c857497020756928",
"packagetype": "sdist",
"python_version": "source",
"requires_python": "<4.0.0,>=3.8.1",
"size": 5136,
"upload_time": "2024-12-14T10:43:46",
"upload_time_iso_8601": "2024-12-14T10:43:46.519319Z",
"url": "https://files.pythonhosted.org/packages/e2/52/9461f3ae4640c5357eb21317e6d1b06a24b758c8f2f6c8c023f2e144138a/taskiq_fastapi-0.3.3.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-12-14 10:43:46",
"github": false,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"lcname": "taskiq-fastapi"
}