# Taskiq - FastStream
<p align="center">
<a href="https://github.com/taskiq-python/taskiq-faststream/actions/workflows/test.yml" target="_blank">
<img src="https://github.com/taskiq-python/taskiq-faststream/actions/workflows/test.yml/badge.svg" alt="Tests status"/>
</a>
<a href="https://pypi.org/project/taskiq-faststream/" target="_blank">
<img src="https://img.shields.io/pypi/v/taskiq-faststream?label=pypi%20package" alt="Package version">
</a>
<a href="https://pepy.tech/project/taskiq-faststream" target="_blank">
<img src="https://static.pepy.tech/personalized-badge/taskiq-faststream?period=month&units=international_system&left_color=grey&right_color=blue" alt="downloads"/>
</a>
<a href="https://pypi.org/project/taskiq-faststream" target="_blank">
<img src="https://img.shields.io/pypi/pyversions/taskiq-faststream.svg" alt="Supported Python versions">
</a>
<a href="https://github.com/taskiq-python/taskiq-faststream/blob/master/LICENSE" target="_blank">
<img alt="GitHub" src="https://img.shields.io/github/license/taskiq-python/taskiq-faststream?color=%23007ec6">
</a>
</p>
---
The current package is just a wrapper for [**FastStream**](https://faststream.airt.ai/0.2/?utm_source=github&utm_medium=acquisition&utm_campaign=measure) objects to make them compatible with [**Taskiq**](https://taskiq-python.github.io/) library.
The main goal of it - provide **FastStream** with a great **Taskiq** tasks [scheduling](https://taskiq-python.github.io/guide/scheduling-tasks.html) feature.
## Installation
If you already have **FastStream** project to interact with your Message Broker, you can add scheduling to it by installing just a **taskiq-faststream**
```bash
pip install taskiq-faststream
```
If you starting with a clear project, you can specify **taskiq-faststream** broker by the following distributions:
```bash
pip install taskiq-faststream[rabbit]
# or
pip install taskiq-faststream[kafka]
# or
pip install taskiq-faststream[nats]
```
## Usage
The package gives you two classes: `AppWrapper` and `BrokerWrapper`
These are just containers for the related **FastStream** objects to make them **taskiq**-compatible
To create scheduling tasks for your broker, just wrap it to `BrokerWrapper` and use it like a regular **taskiq** Broker.
```python
# regular FastStream code
from faststream.nats import NatsBroker
broker = NatsBroker()
@broker.subscriber("test-subject")
async def handler(msg: str):
print(msg)
# taskiq-faststream scheduling
from taskiq.schedule_sources import LabelScheduleSource
from taskiq_faststream import BrokerWrapper, StreamScheduler
# wrap FastStream object
taskiq_broker = BrokerWrapper(broker)
# create periodic task
taskiq_broker.task(
message="Hi!",
# If you are using RabbitBroker, then you need to replace subject with queue.
# If you are using KafkaBroker, then you need to replace subject with topic.
subject="test-subject",
schedule=[{
"cron": "* * * * *",
}],
)
# create scheduler object
scheduler = StreamScheduler(
broker=taskiq_broker,
sources=[LabelScheduleSource(taskiq_broker)],
)
```
To run the scheduler, just use the following command
```bash
taskiq scheduler module:scheduler
```
Also, you can wrap your **FastStream** application the same way (allows to use lifespan events and AsyncAPI documentation):
```python
# regular FastStream code
from faststream import FastStream
from faststream.nats import NatsBroker
broker = NatsBroker()
app = FastStream(broker)
@broker.subscriber("test-subject")
async def handler(msg: str):
print(msg)
# wrap FastStream object
from taskiq_faststream import AppWrapper
taskiq_broker = AppWrapper(app)
# Code below omitted 👇
```
A little feature: instead of using a final `message` argument, you can set a message callback to collect information right before sending:
```python
async def collect_information_to_send():
return "Message to send"
taskiq_broker.task(
message=collect_information_to_send,
...,
)
```
Also, you can send a multiple message by one task call just using generator message callback with `yield`
```python
async def collect_information_to_send():
"""Sends 10 messages per task call."""
for i in range(10):
yield i
taskiq_broker.task(
message=collect_information_to_send,
...,
)
```
Raw data
{
"_id": null,
"home_page": null,
"name": "taskiq-faststream",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.8",
"maintainer_email": null,
"keywords": "FastStream, async, distributed, taskiq, tasks",
"author": null,
"author_email": "Taskiq team <taskiq@no-reply.com>, Nikita Pastukhov <nikita@pastukhov-dev.com>",
"download_url": "https://files.pythonhosted.org/packages/57/ad/8179b06fb032267d5da89488734edc2a70e10d50700afb0fd448639733b3/taskiq_faststream-0.2.0.tar.gz",
"platform": null,
"description": "# Taskiq - FastStream\n\n<p align=\"center\">\n <a href=\"https://github.com/taskiq-python/taskiq-faststream/actions/workflows/test.yml\" target=\"_blank\">\n <img src=\"https://github.com/taskiq-python/taskiq-faststream/actions/workflows/test.yml/badge.svg\" alt=\"Tests status\"/>\n </a>\n <a href=\"https://pypi.org/project/taskiq-faststream/\" target=\"_blank\">\n <img src=\"https://img.shields.io/pypi/v/taskiq-faststream?label=pypi%20package\" alt=\"Package version\">\n </a>\n <a href=\"https://pepy.tech/project/taskiq-faststream\" target=\"_blank\">\n <img src=\"https://static.pepy.tech/personalized-badge/taskiq-faststream?period=month&units=international_system&left_color=grey&right_color=blue\" alt=\"downloads\"/>\n </a>\n <a href=\"https://pypi.org/project/taskiq-faststream\" target=\"_blank\">\n <img src=\"https://img.shields.io/pypi/pyversions/taskiq-faststream.svg\" alt=\"Supported Python versions\">\n </a>\n <a href=\"https://github.com/taskiq-python/taskiq-faststream/blob/master/LICENSE\" target=\"_blank\">\n <img alt=\"GitHub\" src=\"https://img.shields.io/github/license/taskiq-python/taskiq-faststream?color=%23007ec6\">\n </a>\n</p>\n\n---\n\nThe current package is just a wrapper for [**FastStream**](https://faststream.airt.ai/0.2/?utm_source=github&utm_medium=acquisition&utm_campaign=measure) objects to make them compatible with [**Taskiq**](https://taskiq-python.github.io/) library.\n\nThe main goal of it - provide **FastStream** with a great **Taskiq** tasks [scheduling](https://taskiq-python.github.io/guide/scheduling-tasks.html) feature.\n\n## Installation\n\nIf you already have **FastStream** project to interact with your Message Broker, you can add scheduling to it by installing just a **taskiq-faststream**\n\n```bash\npip install taskiq-faststream\n```\n\nIf you starting with a clear project, you can specify **taskiq-faststream** broker by the following distributions:\n\n```bash\npip install taskiq-faststream[rabbit]\n# or\npip install taskiq-faststream[kafka]\n# or\npip install taskiq-faststream[nats]\n```\n\n## Usage\n\nThe package gives you two classes: `AppWrapper` and `BrokerWrapper`\n\nThese are just containers for the related **FastStream** objects to make them **taskiq**-compatible\n\nTo create scheduling tasks for your broker, just wrap it to `BrokerWrapper` and use it like a regular **taskiq** Broker.\n\n```python\n# regular FastStream code\nfrom faststream.nats import NatsBroker\n\nbroker = NatsBroker()\n\n@broker.subscriber(\"test-subject\")\nasync def handler(msg: str):\n print(msg)\n\n# taskiq-faststream scheduling\nfrom taskiq.schedule_sources import LabelScheduleSource\nfrom taskiq_faststream import BrokerWrapper, StreamScheduler\n\n# wrap FastStream object\ntaskiq_broker = BrokerWrapper(broker)\n\n# create periodic task\ntaskiq_broker.task(\n message=\"Hi!\",\n # If you are using RabbitBroker, then you need to replace subject with queue.\n # If you are using KafkaBroker, then you need to replace subject with topic.\n subject=\"test-subject\",\n schedule=[{\n \"cron\": \"* * * * *\",\n }],\n)\n\n# create scheduler object\nscheduler = StreamScheduler(\n broker=taskiq_broker,\n sources=[LabelScheduleSource(taskiq_broker)],\n)\n```\n\nTo run the scheduler, just use the following command\n\n```bash\ntaskiq scheduler module:scheduler\n```\n\nAlso, you can wrap your **FastStream** application the same way (allows to use lifespan events and AsyncAPI documentation):\n\n```python\n# regular FastStream code\nfrom faststream import FastStream\nfrom faststream.nats import NatsBroker\n\nbroker = NatsBroker()\napp = FastStream(broker)\n\n@broker.subscriber(\"test-subject\")\nasync def handler(msg: str):\n print(msg)\n\n# wrap FastStream object\nfrom taskiq_faststream import AppWrapper\ntaskiq_broker = AppWrapper(app)\n\n# Code below omitted \ud83d\udc47\n```\n\nA little feature: instead of using a final `message` argument, you can set a message callback to collect information right before sending:\n\n```python\nasync def collect_information_to_send():\n return \"Message to send\"\n\ntaskiq_broker.task(\n message=collect_information_to_send,\n ...,\n)\n```\n\nAlso, you can send a multiple message by one task call just using generator message callback with `yield`\n\n```python\nasync def collect_information_to_send():\n \"\"\"Sends 10 messages per task call.\"\"\"\n for i in range(10):\n yield i\n\ntaskiq_broker.task(\n message=collect_information_to_send,\n ...,\n)\n```\n",
"bugtrack_url": null,
"license": null,
"summary": "FastStream - taskiq integration to schedule FastStream tasks",
"version": "0.2.0",
"project_urls": {
"Homepage": "https://github.com/taskiq-python/taskiq-faststream",
"Source": "https://github.com/taskiq-python/taskiq-faststream",
"Tracker": "https://github.com/taskiq-python/taskiq-faststream/issues"
},
"split_keywords": [
"faststream",
" async",
" distributed",
" taskiq",
" tasks"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "efe26f5abae34cad68cbb303fb720efd23e3c59e18a2decbec1d5fe6682dc7bd",
"md5": "470c885fc2e84e2bfee7d09a6f03b3be",
"sha256": "d909195cf1a20057c5cfc1f8feaec879636b4eb051f3c550c64eeee57f6a55fc"
},
"downloads": -1,
"filename": "taskiq_faststream-0.2.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "470c885fc2e84e2bfee7d09a6f03b3be",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.8",
"size": 8394,
"upload_time": "2024-08-21T20:27:55",
"upload_time_iso_8601": "2024-08-21T20:27:55.783542Z",
"url": "https://files.pythonhosted.org/packages/ef/e2/6f5abae34cad68cbb303fb720efd23e3c59e18a2decbec1d5fe6682dc7bd/taskiq_faststream-0.2.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "57ad8179b06fb032267d5da89488734edc2a70e10d50700afb0fd448639733b3",
"md5": "676b4484719e2129eec1ecd13de8757a",
"sha256": "719b2a99b64e4bbb8ed1e5a7b4258da83ec7aa19c505c6e1269e192f8e416880"
},
"downloads": -1,
"filename": "taskiq_faststream-0.2.0.tar.gz",
"has_sig": false,
"md5_digest": "676b4484719e2129eec1ecd13de8757a",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.8",
"size": 9997,
"upload_time": "2024-08-21T20:27:57",
"upload_time_iso_8601": "2024-08-21T20:27:57.464486Z",
"url": "https://files.pythonhosted.org/packages/57/ad/8179b06fb032267d5da89488734edc2a70e10d50700afb0fd448639733b3/taskiq_faststream-0.2.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-08-21 20:27:57",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "taskiq-python",
"github_project": "taskiq-faststream",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "taskiq-faststream"
}