taskiq-faststream


Nametaskiq-faststream JSON
Version 0.3.2 PyPI version JSON
download
home_pageNone
SummaryFastStream - taskiq integration to schedule FastStream tasks
upload_time2025-10-19 07:46:36
maintainerNone
docs_urlNone
authorTaskiq team, Nikita Pastukhov
requires_python>=3.10
licenseNone
keywords taskiq tasks distributed async faststream
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Taskiq - FastStream

<p align="center">
    <a href="https://github.com/taskiq-python/taskiq-faststream/actions/workflows/test.yml" target="_blank">
        <img src="https://github.com/taskiq-python/taskiq-faststream/actions/workflows/test.yml/badge.svg" alt="Tests status"/>
    </a>
    <a href="https://pypi.org/project/taskiq-faststream/" target="_blank">
        <img src="https://img.shields.io/pypi/v/taskiq-faststream?label=pypi%20package" alt="Package version">
    </a>
    <a href="https://pepy.tech/project/taskiq-faststream" target="_blank">
        <img src="https://static.pepy.tech/personalized-badge/taskiq-faststream?period=month&units=international_system&left_color=grey&right_color=blue" alt="downloads"/>
    </a>
    <a href="https://pypi.org/project/taskiq-faststream" target="_blank">
        <img src="https://img.shields.io/pypi/pyversions/taskiq-faststream.svg" alt="Supported Python versions">
    </a>
    <a href="https://github.com/taskiq-python/taskiq-faststream/blob/master/LICENSE" target="_blank">
        <img alt="GitHub" src="https://img.shields.io/github/license/taskiq-python/taskiq-faststream?color=%23007ec6">
    </a>
</p>

---

The current package is just a wrapper for [**FastStream**](https://faststream.airt.ai/0.2/?utm_source=github&utm_medium=acquisition&utm_campaign=measure) objects to make them compatible with [**Taskiq**](https://taskiq-python.github.io/) library.

The main goal of it - provide **FastStream** with a great **Taskiq** tasks [scheduling](https://taskiq-python.github.io/guide/scheduling-tasks.html) feature.

## Installation

If you already have **FastStream** project to interact with your Message Broker, you can add scheduling to it by installing just a **taskiq-faststream**

```bash
pip install taskiq-faststream
```

If you starting with a clear project, you can specify **taskiq-faststream** broker by the following distributions:

```bash
pip install taskiq-faststream[rabbit]
# or
pip install taskiq-faststream[kafka]
# or
pip install taskiq-faststream[confluent]
# or
pip install taskiq-faststream[nats]
# or
pip install taskiq-faststream[redis]
```

## Usage

The package gives you two classes: `AppWrapper` and `BrokerWrapper`

These are just containers for the related **FastStream** objects to make them **taskiq**-compatible

To create scheduling tasks for your broker, just wrap it to `BrokerWrapper` and use it like a regular **taskiq** Broker.

```python
# regular FastStream code
from faststream.nats import NatsBroker

broker = NatsBroker()

@broker.subscriber("test-subject")
async def handler(msg: str):
    print(msg)

# taskiq-faststream scheduling
from taskiq.schedule_sources import LabelScheduleSource
from taskiq_faststream import BrokerWrapper, StreamScheduler

# wrap FastStream object
taskiq_broker = BrokerWrapper(broker)

# create periodic task
taskiq_broker.task(
    message="Hi!",
    # If you are using RabbitBroker, then you need to replace subject with queue.
    # If you are using KafkaBroker, then you need to replace subject with topic.
    subject="test-subject",
    schedule=[{
        "cron": "* * * * *",
    }],
)

# create scheduler object
scheduler = StreamScheduler(
    broker=taskiq_broker,
    sources=[LabelScheduleSource(taskiq_broker)],
)
```

To run the scheduler, just use the following command

```bash
taskiq scheduler module:scheduler
```

Also, you can wrap your **FastStream** application the same way (allows to use lifespan events and AsyncAPI documentation):

```python
# regular FastStream code
from faststream import FastStream
from faststream.nats import NatsBroker

broker = NatsBroker()
app = FastStream(broker)

@broker.subscriber("test-subject")
async def handler(msg: str):
    print(msg)

# wrap FastStream object
from taskiq_faststream import AppWrapper
taskiq_broker = AppWrapper(app)

# Code below omitted 👇
```

A little feature: instead of using a final `message` argument, you can set a message callback to collect information right before sending:

```python
async def collect_information_to_send():
    return "Message to send"

taskiq_broker.task(
    message=collect_information_to_send,
    ...,
)
```

Also, you can send a multiple message by one task call just using generator message callback with `yield`

```python
async def collect_information_to_send():
    """Sends 10 messages per task call."""
    for i in range(10):
        yield i

taskiq_broker.task(
    message=collect_information_to_send,
    ...,
)
```

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "taskiq-faststream",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.10",
    "maintainer_email": null,
    "keywords": "taskiq, tasks, distributed, async, FastStream",
    "author": "Taskiq team, Nikita Pastukhov",
    "author_email": "Taskiq team <taskiq@no-reply.com>, Nikita Pastukhov <nikita@pastukhov-dev.com>",
    "download_url": "https://files.pythonhosted.org/packages/8f/5d/8c6857a9b50dd68df9f99f857880351aa10711fe35f661d70d2d1f30f299/taskiq_faststream-0.3.2.tar.gz",
    "platform": null,
    "description": "# Taskiq - FastStream\n\n<p align=\"center\">\n    <a href=\"https://github.com/taskiq-python/taskiq-faststream/actions/workflows/test.yml\" target=\"_blank\">\n        <img src=\"https://github.com/taskiq-python/taskiq-faststream/actions/workflows/test.yml/badge.svg\" alt=\"Tests status\"/>\n    </a>\n    <a href=\"https://pypi.org/project/taskiq-faststream/\" target=\"_blank\">\n        <img src=\"https://img.shields.io/pypi/v/taskiq-faststream?label=pypi%20package\" alt=\"Package version\">\n    </a>\n    <a href=\"https://pepy.tech/project/taskiq-faststream\" target=\"_blank\">\n        <img src=\"https://static.pepy.tech/personalized-badge/taskiq-faststream?period=month&units=international_system&left_color=grey&right_color=blue\" alt=\"downloads\"/>\n    </a>\n    <a href=\"https://pypi.org/project/taskiq-faststream\" target=\"_blank\">\n        <img src=\"https://img.shields.io/pypi/pyversions/taskiq-faststream.svg\" alt=\"Supported Python versions\">\n    </a>\n    <a href=\"https://github.com/taskiq-python/taskiq-faststream/blob/master/LICENSE\" target=\"_blank\">\n        <img alt=\"GitHub\" src=\"https://img.shields.io/github/license/taskiq-python/taskiq-faststream?color=%23007ec6\">\n    </a>\n</p>\n\n---\n\nThe current package is just a wrapper for [**FastStream**](https://faststream.airt.ai/0.2/?utm_source=github&utm_medium=acquisition&utm_campaign=measure) objects to make them compatible with [**Taskiq**](https://taskiq-python.github.io/) library.\n\nThe main goal of it - provide **FastStream** with a great **Taskiq** tasks [scheduling](https://taskiq-python.github.io/guide/scheduling-tasks.html) feature.\n\n## Installation\n\nIf you already have **FastStream** project to interact with your Message Broker, you can add scheduling to it by installing just a **taskiq-faststream**\n\n```bash\npip install taskiq-faststream\n```\n\nIf you starting with a clear project, you can specify **taskiq-faststream** broker by the following distributions:\n\n```bash\npip install taskiq-faststream[rabbit]\n# or\npip install taskiq-faststream[kafka]\n# or\npip install taskiq-faststream[confluent]\n# or\npip install taskiq-faststream[nats]\n# or\npip install taskiq-faststream[redis]\n```\n\n## Usage\n\nThe package gives you two classes: `AppWrapper` and `BrokerWrapper`\n\nThese are just containers for the related **FastStream** objects to make them **taskiq**-compatible\n\nTo create scheduling tasks for your broker, just wrap it to `BrokerWrapper` and use it like a regular **taskiq** Broker.\n\n```python\n# regular FastStream code\nfrom faststream.nats import NatsBroker\n\nbroker = NatsBroker()\n\n@broker.subscriber(\"test-subject\")\nasync def handler(msg: str):\n    print(msg)\n\n# taskiq-faststream scheduling\nfrom taskiq.schedule_sources import LabelScheduleSource\nfrom taskiq_faststream import BrokerWrapper, StreamScheduler\n\n# wrap FastStream object\ntaskiq_broker = BrokerWrapper(broker)\n\n# create periodic task\ntaskiq_broker.task(\n    message=\"Hi!\",\n    # If you are using RabbitBroker, then you need to replace subject with queue.\n    # If you are using KafkaBroker, then you need to replace subject with topic.\n    subject=\"test-subject\",\n    schedule=[{\n        \"cron\": \"* * * * *\",\n    }],\n)\n\n# create scheduler object\nscheduler = StreamScheduler(\n    broker=taskiq_broker,\n    sources=[LabelScheduleSource(taskiq_broker)],\n)\n```\n\nTo run the scheduler, just use the following command\n\n```bash\ntaskiq scheduler module:scheduler\n```\n\nAlso, you can wrap your **FastStream** application the same way (allows to use lifespan events and AsyncAPI documentation):\n\n```python\n# regular FastStream code\nfrom faststream import FastStream\nfrom faststream.nats import NatsBroker\n\nbroker = NatsBroker()\napp = FastStream(broker)\n\n@broker.subscriber(\"test-subject\")\nasync def handler(msg: str):\n    print(msg)\n\n# wrap FastStream object\nfrom taskiq_faststream import AppWrapper\ntaskiq_broker = AppWrapper(app)\n\n# Code below omitted \ud83d\udc47\n```\n\nA little feature: instead of using a final `message` argument, you can set a message callback to collect information right before sending:\n\n```python\nasync def collect_information_to_send():\n    return \"Message to send\"\n\ntaskiq_broker.task(\n    message=collect_information_to_send,\n    ...,\n)\n```\n\nAlso, you can send a multiple message by one task call just using generator message callback with `yield`\n\n```python\nasync def collect_information_to_send():\n    \"\"\"Sends 10 messages per task call.\"\"\"\n    for i in range(10):\n        yield i\n\ntaskiq_broker.task(\n    message=collect_information_to_send,\n    ...,\n)\n```\n",
    "bugtrack_url": null,
    "license": null,
    "summary": "FastStream - taskiq integration to schedule FastStream tasks",
    "version": "0.3.2",
    "project_urls": {
        "Homepage": "https://github.com/taskiq-python/taskiq-faststream",
        "Source": "https://github.com/taskiq-python/taskiq-faststream",
        "Tracker": "https://github.com/taskiq-python/taskiq-faststream/issues"
    },
    "split_keywords": [
        "taskiq",
        " tasks",
        " distributed",
        " async",
        " faststream"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "7c8afe7bcaa22b8ee222c0c7a170e6b4b43ceecaa09796106cb56e2284fba835",
                "md5": "34ff3c7a95ebb62e113c0cd5b4569f22",
                "sha256": "c45d9852c127561a165412e57f5dfb68651348a0ae95eb5145c69a8c4cdde109"
            },
            "downloads": -1,
            "filename": "taskiq_faststream-0.3.2-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "34ff3c7a95ebb62e113c0cd5b4569f22",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.10",
            "size": 8930,
            "upload_time": "2025-10-19T07:46:35",
            "upload_time_iso_8601": "2025-10-19T07:46:35.395685Z",
            "url": "https://files.pythonhosted.org/packages/7c/8a/fe7bcaa22b8ee222c0c7a170e6b4b43ceecaa09796106cb56e2284fba835/taskiq_faststream-0.3.2-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "8f5d8c6857a9b50dd68df9f99f857880351aa10711fe35f661d70d2d1f30f299",
                "md5": "9f486fc52fc33f553852f84cc53a1e1e",
                "sha256": "9fad72678338ea7892019be99aec5e42714e4f259e3b6c27c242bed1083e3889"
            },
            "downloads": -1,
            "filename": "taskiq_faststream-0.3.2.tar.gz",
            "has_sig": false,
            "md5_digest": "9f486fc52fc33f553852f84cc53a1e1e",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.10",
            "size": 7549,
            "upload_time": "2025-10-19T07:46:36",
            "upload_time_iso_8601": "2025-10-19T07:46:36.717769Z",
            "url": "https://files.pythonhosted.org/packages/8f/5d/8c6857a9b50dd68df9f99f857880351aa10711fe35f661d70d2d1f30f299/taskiq_faststream-0.3.2.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-10-19 07:46:36",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "taskiq-python",
    "github_project": "taskiq-faststream",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "taskiq-faststream"
}
        
Elapsed time: 3.40723s