| Name | dishka-faststream JSON |
| Version |
0.5.0
JSON |
| download |
| home_page | None |
| Summary | Integration package for Dishka DI and FastStream framework |
| upload_time | 2025-10-11 14:30:59 |
| maintainer | None |
| docs_url | None |
| author | Nikita Pastukhov |
| requires_python | >=3.10 |
| license | None |
| keywords |
|
| VCS |
 |
| bugtrack_url |
|
| requirements |
No requirements were recorded.
|
| Travis-CI |
No Travis.
|
| coveralls test coverage |
|
# FastStream integration for Dishka
[](https://www.pepy.tech/projects/dishka-faststream)
[](https://pypi.org/project/dishka-faststream)
[](https://pypi.org/project/dishka-faststream)
[](https://github.com/faststream-community/dishka-faststream/blob/main/LICENSE)
[](https://faststream.ag2.ai)
Though it is not required, you can use *dishka-faststream* integration. It features:
* automatic *REQUEST* scope management using middleware
* passing `StreamMessage` and `ContextRepo` object as a context data to providers
* automatic injection of dependencies into message handler.
You can use auto-injection for `FastStream` 0.5.0 and higher. For older version you need to specify `@inject` manually.
> **Note**
>
> If you are using **FastAPI plugin** of **FastStream** you need to use both dishka integrations, but you can share the same container.
>
> * Call `dishka_faststream.setup_dishka` on faststream broker or router.
> * Call `dishka.integrations.fastapi.setup_dishka` on fastapi app.
## Installation
Install using `pip`
```sh
pip install dishka-faststream
```
Or with `uv`
```sh
uv add dishka-faststream
```
## How to use
1. Import
```python
from dishka_faststream import (
FromDishka,
inject,
setup_dishka,
FastStreamProvider,
)
from dishka import make_async_container, Provider, provide, Scope
```
2. Create provider. You can use `faststream.types.StreamMessage` and `faststream.ContextRepo` as a factory parameter to access on *REQUEST*-scope
```python
class YourProvider(Provider):
@provide(scope=Scope.REQUEST)
def create_x(self, event: StreamMessage) -> X:
...
```
3. Mark those of your handlers parameters which are to be injected with `FromDishka[]`
```python
@broker.subscriber("test")
async def start(
gateway: FromDishka[Gateway],
):
...
```
3a. *(optional)* decorate them using `@inject` if you are not using auto-injection
```python
@broker.subscriber("test")
@inject
async def start(
gateway: FromDishka[Gateway],
):
...
```
4. *(optional)* Use `FastStreamProvider()` when creating container if you are going to use `faststream.types.StreamMessage` or `faststream.ContextRepo` in providers
```python
container = make_async_container(YourProvider(), FastStreamProvider())
```
5. Setup `dishka` integration. `auto_inject=True` is required unless you explicitly use `@inject` decorator
```python
setup_dishka(container=container, app=app, auto_inject=True)
```
Or pass your own inject decorator
```python
setup_dishka(container=container, broker=broker, auto_inject=my_inject)
```
## FastStream - Litestar/FastAPI - dishka integration
1. Running RabbitMQ
```shell
docker run -d --name rabbitmq \
-p 5672:5672 -p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=guest \
-e RABBITMQ_DEFAULT_PASS=guest \
rabbitmq:management
```
2. Example of usage FastStream + Litestar
```python
import uvicorn
from dishka import Provider, Scope, provide
from dishka import make_async_container
import dishka_faststream as faststream_integration
from dishka.integrations import litestar as litestar_integration
from dishka.integrations.base import FromDishka
from dishka_faststream import inject as faststream_inject
from dishka.integrations.litestar import inject as litestar_inject
from faststream.rabbit import RabbitBroker, RabbitRouter
from litestar import Litestar, route, HttpMethod
class SomeDependency:
async def do_something(self) -> int:
print("Hello world")
return 42
class SomeProvider(Provider):
@provide(scope=Scope.REQUEST)
def some_dependency(self) -> SomeDependency:
return SomeDependency()
@route(http_method=HttpMethod.GET, path="/", status_code=200)
@litestar_inject
async def http_handler(some_dependency: FromDishka[SomeDependency]) -> None:
await some_dependency.do_something()
amqp_router = RabbitRouter()
@amqp_router.subscriber("test-queue")
@faststream_inject
async def amqp_handler(some_dependency: FromDishka[SomeDependency]) -> None:
await some_dependency.do_something()
def create_app() -> Litestar:
container = make_async_container(SomeProvider())
broker = RabbitBroker(url="amqp://guest:guest@localhost:5672/")
broker.include_router(amqp_router)
faststream_integration.setup_dishka(container, broker=broker)
http = Litestar(
route_handlers=[http_handler],
on_startup=[broker.start],
on_shutdown=[broker.stop],
)
litestar_integration.setup_dishka(container, http)
return http
if __name__ == "__main__":
uvicorn.run(create_app(), host="0.0.0.0", port=8000)
```
### Example of usage FastStream + FastAPI
```python
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
import uvicorn
from fastapi import APIRouter, FastAPI
from faststream.rabbit import RabbitBroker, RabbitRouter
from dishka import Provider, Scope, make_async_container, provide
from dishka.integrations import fastapi as fastapi_integration
import dishka_faststream as faststream_integration
from dishka.integrations.base import FromDishka
from dishka.integrations.fastapi import DishkaRoute
from dishka_faststream import inject as faststream_inject
class SomeDependency:
async def do_something(self) -> int:
print("Hello world")
return 42
class SomeProvider(Provider):
@provide(scope=Scope.REQUEST)
def some_dependency(self) -> SomeDependency:
return SomeDependency()
router = APIRouter(route_class=DishkaRoute)
@router.get("/")
async def http_handler(some_dependency: FromDishka[SomeDependency]) -> None:
await some_dependency.do_something()
amqp_router = RabbitRouter()
@amqp_router.subscriber("test-queue")
@faststream_inject
async def amqp_handler(some_dependency: FromDishka[SomeDependency]) -> None:
await some_dependency.do_something()
def create_app() -> FastAPI:
container = make_async_container(SomeProvider())
broker = RabbitBroker(url="amqp://guest:guest@localhost:5672/")
broker.include_router(amqp_router)
faststream_integration.setup_dishka(container, broker=broker)
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
async with broker:
await broker.start()
yield
http = FastAPI(lifespan=lifespan)
http.include_router(router)
fastapi_integration.setup_dishka(container, http)
return http
if __name__ == "__main__":
uvicorn.run(create_app(), host="0.0.0.0", port=8000)
```
## Testing FastStream with dishka
Simple example:
```python
from collections.abc import AsyncIterator
import pytest
from dishka import AsyncContainer, make_async_container
from dishka import Provider, Scope, provide
import dishka_faststream as faststream_integration
from dishka.integrations.base import FromDishka as Depends
from faststream import FastStream, TestApp
from faststream.rabbit import RabbitBroker, TestRabbitBroker, RabbitRouter
router = RabbitRouter()
@router.subscriber("test-queue")
async def handler(msg: str, some_dependency: Depends[int]) -> int:
print(f"{msg=}")
return some_dependency
@pytest.fixture
async def broker() -> RabbitBroker:
broker = RabbitBroker()
broker.include_router(router)
return broker
@pytest.fixture
def mock_provider() -> Provider:
class MockProvider(Provider):
@provide(scope=Scope.REQUEST)
async def get_some_dependency(self) -> int:
return 42
return MockProvider()
@pytest.fixture
def container(mock_provider: Provider) -> AsyncContainer:
return make_async_container(mock_provider)
@pytest.fixture
async def app(broker: RabbitBroker, container: AsyncContainer) -> FastStream:
app = FastStream(broker)
faststream_integration.setup_dishka(container, app, auto_inject=True)
return FastStream(broker)
@pytest.fixture
async def client(app: FastStream) -> AsyncIterator[RabbitBroker]:
async with TestRabbitBroker(app.broker) as br, TestApp(app):
yield br
@pytest.mark.asyncio
async def test_handler(client: RabbitBroker) -> None:
result = await client.request("hello", "test-queue")
assert await result.decode() == 42
```
Raw data
{
"_id": null,
"home_page": null,
"name": "dishka-faststream",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.10",
"maintainer_email": null,
"keywords": null,
"author": "Nikita Pastukhov",
"author_email": "Nikita Pastukhov <nikita@pastukhov-dev.com>",
"download_url": "https://files.pythonhosted.org/packages/3a/3d/92197bfa75aee9fe692f4630833ec519b774735dc3b270de2f0ae7312115/dishka_faststream-0.5.0.tar.gz",
"platform": null,
"description": "# FastStream integration for Dishka\n\n[](https://www.pepy.tech/projects/dishka-faststream)\n[](https://pypi.org/project/dishka-faststream)\n[](https://pypi.org/project/dishka-faststream)\n[](https://github.com/faststream-community/dishka-faststream/blob/main/LICENSE)\n[](https://faststream.ag2.ai)\n\nThough it is not required, you can use *dishka-faststream* integration. It features:\n\n* automatic *REQUEST* scope management using middleware\n* passing `StreamMessage` and `ContextRepo` object as a context data to providers\n* automatic injection of dependencies into message handler.\n\nYou can use auto-injection for `FastStream` 0.5.0 and higher. For older version you need to specify `@inject` manually.\n\n> **Note**\n> \n> If you are using **FastAPI plugin** of **FastStream** you need to use both dishka integrations, but you can share the same container.\n> \n> * Call `dishka_faststream.setup_dishka` on faststream broker or router.\n> * Call `dishka.integrations.fastapi.setup_dishka` on fastapi app.\n\n## Installation\n\nInstall using `pip`\n\n```sh\npip install dishka-faststream\n```\n\nOr with `uv`\n\n```sh\nuv add dishka-faststream\n```\n\n## How to use\n\n1. Import\n\n```python\nfrom dishka_faststream import (\n FromDishka,\n inject,\n setup_dishka,\n FastStreamProvider,\n)\nfrom dishka import make_async_container, Provider, provide, Scope\n```\n\n2. Create provider. You can use `faststream.types.StreamMessage` and `faststream.ContextRepo` as a factory parameter to access on *REQUEST*-scope\n\n```python\nclass YourProvider(Provider):\n @provide(scope=Scope.REQUEST)\n def create_x(self, event: StreamMessage) -> X:\n ...\n```\n\n3. Mark those of your handlers parameters which are to be injected with `FromDishka[]`\n\n```python\n@broker.subscriber(\"test\")\nasync def start(\n gateway: FromDishka[Gateway],\n):\n ...\n```\n\n3a. *(optional)* decorate them using `@inject` if you are not using auto-injection\n\n```python\n@broker.subscriber(\"test\")\n@inject\nasync def start(\n gateway: FromDishka[Gateway],\n):\n ...\n```\n\n4. *(optional)* Use `FastStreamProvider()` when creating container if you are going to use `faststream.types.StreamMessage` or `faststream.ContextRepo` in providers\n\n```python\ncontainer = make_async_container(YourProvider(), FastStreamProvider())\n```\n\n5. Setup `dishka` integration. `auto_inject=True` is required unless you explicitly use `@inject` decorator\n\n```python\nsetup_dishka(container=container, app=app, auto_inject=True)\n```\n\nOr pass your own inject decorator\n\n```python\nsetup_dishka(container=container, broker=broker, auto_inject=my_inject)\n```\n\n## FastStream - Litestar/FastAPI - dishka integration\n\n1. Running RabbitMQ\n\n```shell\ndocker run -d --name rabbitmq \\\n -p 5672:5672 -p 15672:15672 \\\n -e RABBITMQ_DEFAULT_USER=guest \\\n -e RABBITMQ_DEFAULT_PASS=guest \\\n rabbitmq:management\n```\n\n2. Example of usage FastStream + Litestar\n\n```python\nimport uvicorn\nfrom dishka import Provider, Scope, provide\nfrom dishka import make_async_container\nimport dishka_faststream as faststream_integration\nfrom dishka.integrations import litestar as litestar_integration\nfrom dishka.integrations.base import FromDishka\nfrom dishka_faststream import inject as faststream_inject\nfrom dishka.integrations.litestar import inject as litestar_inject\nfrom faststream.rabbit import RabbitBroker, RabbitRouter\nfrom litestar import Litestar, route, HttpMethod\n\n\nclass SomeDependency:\n async def do_something(self) -> int:\n print(\"Hello world\")\n return 42\n\n\nclass SomeProvider(Provider):\n @provide(scope=Scope.REQUEST)\n def some_dependency(self) -> SomeDependency:\n return SomeDependency()\n\n\n@route(http_method=HttpMethod.GET, path=\"/\", status_code=200)\n@litestar_inject\nasync def http_handler(some_dependency: FromDishka[SomeDependency]) -> None:\n await some_dependency.do_something()\n\n\namqp_router = RabbitRouter()\n\n\n@amqp_router.subscriber(\"test-queue\")\n@faststream_inject\nasync def amqp_handler(some_dependency: FromDishka[SomeDependency]) -> None:\n await some_dependency.do_something()\n\n\ndef create_app() -> Litestar:\n container = make_async_container(SomeProvider())\n\n broker = RabbitBroker(url=\"amqp://guest:guest@localhost:5672/\")\n broker.include_router(amqp_router)\n faststream_integration.setup_dishka(container, broker=broker)\n\n http = Litestar(\n route_handlers=[http_handler],\n on_startup=[broker.start],\n on_shutdown=[broker.stop],\n )\n litestar_integration.setup_dishka(container, http)\n return http\n\n\nif __name__ == \"__main__\":\n uvicorn.run(create_app(), host=\"0.0.0.0\", port=8000)\n```\n\n### Example of usage FastStream + FastAPI\n\n```python\nfrom collections.abc import AsyncIterator\nfrom contextlib import asynccontextmanager\n\nimport uvicorn\nfrom fastapi import APIRouter, FastAPI\nfrom faststream.rabbit import RabbitBroker, RabbitRouter\nfrom dishka import Provider, Scope, make_async_container, provide\nfrom dishka.integrations import fastapi as fastapi_integration\nimport dishka_faststream as faststream_integration\nfrom dishka.integrations.base import FromDishka\nfrom dishka.integrations.fastapi import DishkaRoute\nfrom dishka_faststream import inject as faststream_inject\n\n\nclass SomeDependency:\n async def do_something(self) -> int:\n print(\"Hello world\")\n return 42\n\n\nclass SomeProvider(Provider):\n @provide(scope=Scope.REQUEST)\n def some_dependency(self) -> SomeDependency:\n return SomeDependency()\n\n\nrouter = APIRouter(route_class=DishkaRoute)\n\n\n@router.get(\"/\")\nasync def http_handler(some_dependency: FromDishka[SomeDependency]) -> None:\n await some_dependency.do_something()\n\n\namqp_router = RabbitRouter()\n\n\n@amqp_router.subscriber(\"test-queue\")\n@faststream_inject\nasync def amqp_handler(some_dependency: FromDishka[SomeDependency]) -> None:\n await some_dependency.do_something()\n\n\ndef create_app() -> FastAPI:\n container = make_async_container(SomeProvider())\n\n broker = RabbitBroker(url=\"amqp://guest:guest@localhost:5672/\")\n broker.include_router(amqp_router)\n faststream_integration.setup_dishka(container, broker=broker)\n\n @asynccontextmanager\n async def lifespan(app: FastAPI) -> AsyncIterator[None]:\n async with broker:\n await broker.start()\n yield\n\n http = FastAPI(lifespan=lifespan)\n http.include_router(router)\n fastapi_integration.setup_dishka(container, http)\n return http\n\n\nif __name__ == \"__main__\":\n uvicorn.run(create_app(), host=\"0.0.0.0\", port=8000)\n```\n\n## Testing FastStream with dishka\n\nSimple example:\n\n```python\nfrom collections.abc import AsyncIterator\n\nimport pytest\nfrom dishka import AsyncContainer, make_async_container\nfrom dishka import Provider, Scope, provide\nimport dishka_faststream as faststream_integration\nfrom dishka.integrations.base import FromDishka as Depends\nfrom faststream import FastStream, TestApp\nfrom faststream.rabbit import RabbitBroker, TestRabbitBroker, RabbitRouter\n\nrouter = RabbitRouter()\n\n\n@router.subscriber(\"test-queue\")\nasync def handler(msg: str, some_dependency: Depends[int]) -> int:\n print(f\"{msg=}\")\n return some_dependency\n\n\n@pytest.fixture\nasync def broker() -> RabbitBroker:\n broker = RabbitBroker()\n broker.include_router(router)\n return broker\n\n\n@pytest.fixture\ndef mock_provider() -> Provider:\n class MockProvider(Provider):\n @provide(scope=Scope.REQUEST)\n async def get_some_dependency(self) -> int:\n return 42\n\n return MockProvider()\n\n\n@pytest.fixture\ndef container(mock_provider: Provider) -> AsyncContainer:\n return make_async_container(mock_provider)\n\n\n@pytest.fixture\nasync def app(broker: RabbitBroker, container: AsyncContainer) -> FastStream:\n app = FastStream(broker)\n faststream_integration.setup_dishka(container, app, auto_inject=True)\n return FastStream(broker)\n\n\n@pytest.fixture\nasync def client(app: FastStream) -> AsyncIterator[RabbitBroker]:\n async with TestRabbitBroker(app.broker) as br, TestApp(app):\n yield br\n\n\n@pytest.mark.asyncio\nasync def test_handler(client: RabbitBroker) -> None:\n result = await client.request(\"hello\", \"test-queue\")\n assert await result.decode() == 42\n```\n",
"bugtrack_url": null,
"license": null,
"summary": "Integration package for Dishka DI and FastStream framework",
"version": "0.5.0",
"project_urls": {
"Homepage": "https://github.com/faststream-community/dishka-faststream",
"Source": "https://github.com/faststream-community/dishka-faststream",
"Tracker": "https://github.com/faststream-community/dishka-faststream/issues"
},
"split_keywords": [],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "59c16b7e7eefaaad83c368af8311142f22869a23a7e5bf2dcf3d3fed1fd76c52",
"md5": "95e289badb9757d4d00a80184b254771",
"sha256": "6accddba99e4bf7f89b75927bbe3c6e7555de7e9347f33437131c7fc28f08aa3"
},
"downloads": -1,
"filename": "dishka_faststream-0.5.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "95e289badb9757d4d00a80184b254771",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.10",
"size": 8096,
"upload_time": "2025-10-11T14:30:57",
"upload_time_iso_8601": "2025-10-11T14:30:57.534920Z",
"url": "https://files.pythonhosted.org/packages/59/c1/6b7e7eefaaad83c368af8311142f22869a23a7e5bf2dcf3d3fed1fd76c52/dishka_faststream-0.5.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "3a3d92197bfa75aee9fe692f4630833ec519b774735dc3b270de2f0ae7312115",
"md5": "59195adc0d1869d80300095a66cd042f",
"sha256": "126420d12340ae5962548d0fff0a8e97c6c6ccdeb9ece3096c1ee73cfa38e597"
},
"downloads": -1,
"filename": "dishka_faststream-0.5.0.tar.gz",
"has_sig": false,
"md5_digest": "59195adc0d1869d80300095a66cd042f",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.10",
"size": 5736,
"upload_time": "2025-10-11T14:30:59",
"upload_time_iso_8601": "2025-10-11T14:30:59.081809Z",
"url": "https://files.pythonhosted.org/packages/3a/3d/92197bfa75aee9fe692f4630833ec519b774735dc3b270de2f0ae7312115/dishka_faststream-0.5.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-10-11 14:30:59",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "faststream-community",
"github_project": "dishka-faststream",
"travis_ci": false,
"coveralls": true,
"github_actions": true,
"lcname": "dishka-faststream"
}