frequenz-channels


Namefrequenz-channels JSON
Version 1.3.0 PyPI version JSON
download
home_pageNone
SummaryChannel implementations for Python
upload_time2024-11-19 13:36:54
maintainerNone
docs_urlNone
authorNone
requires_python<4,>=3.11
licenseMIT
keywords frequenz python lib library channels channel
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Frequenz channels

[![Build Status](https://github.com/frequenz-floss/frequenz-channels-python/actions/workflows/ci.yaml/badge.svg)](https://github.com/frequenz-floss/frequenz-channels-python/actions/workflows/ci.yaml)
[![PyPI Package](https://img.shields.io/pypi/v/frequenz-channels)](https://pypi.org/project/frequenz-channels/)
[![Docs](https://img.shields.io/badge/docs-latest-informational)](https://frequenz-floss.github.io/frequenz-channels-python/)

## Introduction

<!-- introduction -->

Frequenz Channels is a *channels* implementation for Python.

According to [Wikipedia](https://en.wikipedia.org/wiki/Channel_(programming)):

> A channel is a model for interprocess communication and synchronization via
> message passing. A message may be sent over a channel, and another process or
> thread is able to receive messages sent over a channel it has a reference to,
> as a stream. Different implementations of channels may be buffered or not,
> and either synchronous or asynchronous.

Frequenz Channels are mostly designed after [Go
channels](https://tour.golang.org/concurrency/2) but it also borrows ideas from
[Rust channels](https://doc.rust-lang.org/book/ch16-02-message-passing.html).

<!-- /introduction -->

## Supported Platforms

<!-- supported-platforms -->

The following platforms are officially supported (tested):

- **Python:** 3.11
- **Operating System:** Ubuntu Linux 20.04
- **Architectures:** amd64, arm64

> [!NOTE]
> Newer Python versions and other operating systems and architectures might
> work too, but they are not automatically tested, so we cannot guarantee it.

<!-- /supported-platforms -->

## Quick Start

### Installing

<!-- quick-start-installing -->

Assuming a [supported](#supported-platforms) working Python environment:

```sh
python3 -m pip install frequenz-channels
```

> [!TIP]
> For more details please read the [Installation
> Guide](docs/user-guide/installation.md).

<!-- /quick-start-installing -->

### Examples

#### Hello World

<!-- quick-start-hello-world -->

```python
import asyncio

from frequenz.channels import Anycast


async def main() -> None:
    hello_channel = Anycast[str](name="hello-world-channel")
    sender = hello_channel.new_sender()
    receiver = hello_channel.new_receiver()

    await sender.send("Hello World!")
    message = await receiver.receive()
    print(message)


asyncio.run(main())
```

<!-- /quick-start-hello-world -->

#### Showcase

<!-- quick-start-showcase -->

This is a comprehensive example that shows most of the main features of the
library:

```python
import asyncio
from dataclasses import dataclass
from datetime import timedelta
from enum import Enum, auto
from typing import assert_never

from frequenz.channels import (
    Anycast,
    Broadcast,
    Receiver,
    Sender,
    merge,
    select,
    selected_from,
)
from frequenz.channels.timer import SkipMissedAndDrift, Timer, TriggerAllMissed


class Command(Enum):
    PING = auto()
    STOP_SENDER = auto()


class ReplyCommand(Enum):
    PONG = auto()


@dataclass(frozen=True)
class Reply:
    reply: ReplyCommand
    source: str


async def send(
    sender: Sender[str],
    control_command: Receiver[Command],
    control_reply: Sender[Reply],
) -> None:
    """Send a counter value every second, until a stop command is received."""
    print(f"{sender}: Starting")
    timer = Timer(timedelta(seconds=1.0), TriggerAllMissed())
    counter = 0
    async for selected in select(timer, control_command):
        if selected_from(selected, timer):
            print(f"{sender}: Sending {counter}")
            await sender.send(f"{sender}: {counter}")
            counter += 1
        elif selected_from(selected, control_command):
            print(f"{sender}: Received command: {selected.message.name}")
            match selected.message:
                case Command.STOP_SENDER:
                    print(f"{sender}: Stopping")
                    break
                case Command.PING:
                    print(f"{sender}: Ping received, reply with pong")
                    await control_reply.send(Reply(ReplyCommand.PONG, str(sender)))
                case _ as unknown:
                    assert_never(unknown)
    print(f"{sender}: Finished")


async def receive(
    receivers: list[Receiver[str]],
    control_command: Receiver[Command],
    control_reply: Sender[Reply],
) -> None:
    """Receive data from multiple channels, until no more data is received for 2 seconds."""
    print("receive: Starting")
    timer = Timer(timedelta(seconds=2.0), SkipMissedAndDrift())
    print(f"{timer=}")
    merged = merge(*receivers)
    async for selected in select(merged, timer, control_command):
        if selected_from(selected, merged):
            message = selected.message
            print(f"receive: Received {message=}")
            timer.reset()
            print(f"{timer=}")
        elif selected_from(selected, control_command):
            print(f"receive: received command: {selected.message.name}")
            match selected.message:
                case Command.PING:
                    print("receive: Ping received, reply with pong")
                    await control_reply.send(Reply(ReplyCommand.PONG, "receive"))
                case Command.STOP_SENDER:
                    pass  # Ignore
                case _ as unknown:
                    assert_never(unknown)
        elif selected_from(selected, timer):
            drift = selected.message
            print(
                f"receive: No data received for {timer.interval + drift} seconds, "
                "giving up"
            )
            break
    print("receive: Finished")


async def main() -> None:
    data_channel_1 = Anycast[str](name="data-channel-1")
    data_channel_2 = Anycast[str](name="data-channel-2")
    command_channel = Broadcast[Command](name="control-channel")  # (1)!
    reply_channel = Anycast[Reply](name="reply-channel")

    async with asyncio.TaskGroup() as tasks:
        tasks.create_task(
            send(
                data_channel_1.new_sender(),
                command_channel.new_receiver(),
                reply_channel.new_sender(),
            ),
            name="send-channel-1",
        )
        tasks.create_task(
            send(
                data_channel_2.new_sender(),
                command_channel.new_receiver(),
                reply_channel.new_sender(),
            ),
            name="send-channel-2",
        )
        tasks.create_task(
            receive(
                [data_channel_1.new_receiver(), data_channel_2.new_receiver()],
                command_channel.new_receiver(),
                reply_channel.new_sender(),
            ),
            name="receive",
        )

        control_sender = command_channel.new_sender()
        reply_receiver = reply_channel.new_receiver()

        # Send a ping command to all tasks and wait for the replies
        await control_sender.send(Command.PING)
        print(f"main: {await reply_receiver.receive()}")
        print(f"main: {await reply_receiver.receive()}")
        print(f"main: {await reply_receiver.receive()}")

        await asyncio.sleep(5.0)

        # Stop senders, after 2 seconds not receiving any data,
        # the receiver will stop too
        await control_sender.send(Command.STOP_SENDER)


asyncio.run(main())
```

<!-- /quick-start-showcase -->

## Documentation

For more information, please read the [documentation
website](https://frequenz-floss.github.io/frequenz-channels-python/).

## Contributing

If you want to know how to build this project and contribute to it, please
check out the [Contributing Guide](docs/CONTRIBUTING.md).

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "frequenz-channels",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<4,>=3.11",
    "maintainer_email": null,
    "keywords": "frequenz, python, lib, library, channels, channel",
    "author": null,
    "author_email": "Frequenz Energy-as-a-Service GmbH <floss@frequenz.com>",
    "download_url": "https://files.pythonhosted.org/packages/0c/bc/cf53e8eba012ca24164a0e7698e6ad0e096f2e2d60a0a28bdfcedbe3cd22/frequenz-channels-1.3.0.tar.gz",
    "platform": null,
    "description": "# Frequenz channels\n\n[![Build Status](https://github.com/frequenz-floss/frequenz-channels-python/actions/workflows/ci.yaml/badge.svg)](https://github.com/frequenz-floss/frequenz-channels-python/actions/workflows/ci.yaml)\n[![PyPI Package](https://img.shields.io/pypi/v/frequenz-channels)](https://pypi.org/project/frequenz-channels/)\n[![Docs](https://img.shields.io/badge/docs-latest-informational)](https://frequenz-floss.github.io/frequenz-channels-python/)\n\n## Introduction\n\n<!-- introduction -->\n\nFrequenz Channels is a *channels* implementation for Python.\n\nAccording to [Wikipedia](https://en.wikipedia.org/wiki/Channel_(programming)):\n\n> A channel is a model for interprocess communication and synchronization via\n> message passing. A message may be sent over a channel, and another process or\n> thread is able to receive messages sent over a channel it has a reference to,\n> as a stream. Different implementations of channels may be buffered or not,\n> and either synchronous or asynchronous.\n\nFrequenz Channels are mostly designed after [Go\nchannels](https://tour.golang.org/concurrency/2) but it also borrows ideas from\n[Rust channels](https://doc.rust-lang.org/book/ch16-02-message-passing.html).\n\n<!-- /introduction -->\n\n## Supported Platforms\n\n<!-- supported-platforms -->\n\nThe following platforms are officially supported (tested):\n\n- **Python:** 3.11\n- **Operating System:** Ubuntu Linux 20.04\n- **Architectures:** amd64, arm64\n\n> [!NOTE]\n> Newer Python versions and other operating systems and architectures might\n> work too, but they are not automatically tested, so we cannot guarantee it.\n\n<!-- /supported-platforms -->\n\n## Quick Start\n\n### Installing\n\n<!-- quick-start-installing -->\n\nAssuming a [supported](#supported-platforms) working Python environment:\n\n```sh\npython3 -m pip install frequenz-channels\n```\n\n> [!TIP]\n> For more details please read the [Installation\n> Guide](docs/user-guide/installation.md).\n\n<!-- /quick-start-installing -->\n\n### Examples\n\n#### Hello World\n\n<!-- quick-start-hello-world -->\n\n```python\nimport asyncio\n\nfrom frequenz.channels import Anycast\n\n\nasync def main() -> None:\n    hello_channel = Anycast[str](name=\"hello-world-channel\")\n    sender = hello_channel.new_sender()\n    receiver = hello_channel.new_receiver()\n\n    await sender.send(\"Hello World!\")\n    message = await receiver.receive()\n    print(message)\n\n\nasyncio.run(main())\n```\n\n<!-- /quick-start-hello-world -->\n\n#### Showcase\n\n<!-- quick-start-showcase -->\n\nThis is a comprehensive example that shows most of the main features of the\nlibrary:\n\n```python\nimport asyncio\nfrom dataclasses import dataclass\nfrom datetime import timedelta\nfrom enum import Enum, auto\nfrom typing import assert_never\n\nfrom frequenz.channels import (\n    Anycast,\n    Broadcast,\n    Receiver,\n    Sender,\n    merge,\n    select,\n    selected_from,\n)\nfrom frequenz.channels.timer import SkipMissedAndDrift, Timer, TriggerAllMissed\n\n\nclass Command(Enum):\n    PING = auto()\n    STOP_SENDER = auto()\n\n\nclass ReplyCommand(Enum):\n    PONG = auto()\n\n\n@dataclass(frozen=True)\nclass Reply:\n    reply: ReplyCommand\n    source: str\n\n\nasync def send(\n    sender: Sender[str],\n    control_command: Receiver[Command],\n    control_reply: Sender[Reply],\n) -> None:\n    \"\"\"Send a counter value every second, until a stop command is received.\"\"\"\n    print(f\"{sender}: Starting\")\n    timer = Timer(timedelta(seconds=1.0), TriggerAllMissed())\n    counter = 0\n    async for selected in select(timer, control_command):\n        if selected_from(selected, timer):\n            print(f\"{sender}: Sending {counter}\")\n            await sender.send(f\"{sender}: {counter}\")\n            counter += 1\n        elif selected_from(selected, control_command):\n            print(f\"{sender}: Received command: {selected.message.name}\")\n            match selected.message:\n                case Command.STOP_SENDER:\n                    print(f\"{sender}: Stopping\")\n                    break\n                case Command.PING:\n                    print(f\"{sender}: Ping received, reply with pong\")\n                    await control_reply.send(Reply(ReplyCommand.PONG, str(sender)))\n                case _ as unknown:\n                    assert_never(unknown)\n    print(f\"{sender}: Finished\")\n\n\nasync def receive(\n    receivers: list[Receiver[str]],\n    control_command: Receiver[Command],\n    control_reply: Sender[Reply],\n) -> None:\n    \"\"\"Receive data from multiple channels, until no more data is received for 2 seconds.\"\"\"\n    print(\"receive: Starting\")\n    timer = Timer(timedelta(seconds=2.0), SkipMissedAndDrift())\n    print(f\"{timer=}\")\n    merged = merge(*receivers)\n    async for selected in select(merged, timer, control_command):\n        if selected_from(selected, merged):\n            message = selected.message\n            print(f\"receive: Received {message=}\")\n            timer.reset()\n            print(f\"{timer=}\")\n        elif selected_from(selected, control_command):\n            print(f\"receive: received command: {selected.message.name}\")\n            match selected.message:\n                case Command.PING:\n                    print(\"receive: Ping received, reply with pong\")\n                    await control_reply.send(Reply(ReplyCommand.PONG, \"receive\"))\n                case Command.STOP_SENDER:\n                    pass  # Ignore\n                case _ as unknown:\n                    assert_never(unknown)\n        elif selected_from(selected, timer):\n            drift = selected.message\n            print(\n                f\"receive: No data received for {timer.interval + drift} seconds, \"\n                \"giving up\"\n            )\n            break\n    print(\"receive: Finished\")\n\n\nasync def main() -> None:\n    data_channel_1 = Anycast[str](name=\"data-channel-1\")\n    data_channel_2 = Anycast[str](name=\"data-channel-2\")\n    command_channel = Broadcast[Command](name=\"control-channel\")  # (1)!\n    reply_channel = Anycast[Reply](name=\"reply-channel\")\n\n    async with asyncio.TaskGroup() as tasks:\n        tasks.create_task(\n            send(\n                data_channel_1.new_sender(),\n                command_channel.new_receiver(),\n                reply_channel.new_sender(),\n            ),\n            name=\"send-channel-1\",\n        )\n        tasks.create_task(\n            send(\n                data_channel_2.new_sender(),\n                command_channel.new_receiver(),\n                reply_channel.new_sender(),\n            ),\n            name=\"send-channel-2\",\n        )\n        tasks.create_task(\n            receive(\n                [data_channel_1.new_receiver(), data_channel_2.new_receiver()],\n                command_channel.new_receiver(),\n                reply_channel.new_sender(),\n            ),\n            name=\"receive\",\n        )\n\n        control_sender = command_channel.new_sender()\n        reply_receiver = reply_channel.new_receiver()\n\n        # Send a ping command to all tasks and wait for the replies\n        await control_sender.send(Command.PING)\n        print(f\"main: {await reply_receiver.receive()}\")\n        print(f\"main: {await reply_receiver.receive()}\")\n        print(f\"main: {await reply_receiver.receive()}\")\n\n        await asyncio.sleep(5.0)\n\n        # Stop senders, after 2 seconds not receiving any data,\n        # the receiver will stop too\n        await control_sender.send(Command.STOP_SENDER)\n\n\nasyncio.run(main())\n```\n\n<!-- /quick-start-showcase -->\n\n## Documentation\n\nFor more information, please read the [documentation\nwebsite](https://frequenz-floss.github.io/frequenz-channels-python/).\n\n## Contributing\n\nIf you want to know how to build this project and contribute to it, please\ncheck out the [Contributing Guide](docs/CONTRIBUTING.md).\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "Channel implementations for Python",
    "version": "1.3.0",
    "project_urls": {
        "Changelog": "https://github.com/frequenz-floss/frequenz-channels-python/releases",
        "Documentation": "https://frequenz-floss.github.io/frequenz-channels-python/",
        "Issues": "https://github.com/frequenz-floss/frequenz-channels-python/issues",
        "Repository": "https://github.com/frequenz-floss/frequenz-channels-python",
        "Support": "https://github.com/frequenz-floss/frequenz-channels-python/discussions/categories/support"
    },
    "split_keywords": [
        "frequenz",
        " python",
        " lib",
        " library",
        " channels",
        " channel"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "3327b913bd0e16f7eab6a029233116e3b936c6dd7a11d6dbe5fd397635e9bfde",
                "md5": "9d4420397a6d1e85965d33dc4746f0fc",
                "sha256": "ff33dd460c5794f030924074b6782d8f22ec2c0fb8e75f5c4dfc0dcc006dc3af"
            },
            "downloads": -1,
            "filename": "frequenz_channels-1.3.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "9d4420397a6d1e85965d33dc4746f0fc",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<4,>=3.11",
            "size": 47816,
            "upload_time": "2024-11-19T13:36:53",
            "upload_time_iso_8601": "2024-11-19T13:36:53.212873Z",
            "url": "https://files.pythonhosted.org/packages/33/27/b913bd0e16f7eab6a029233116e3b936c6dd7a11d6dbe5fd397635e9bfde/frequenz_channels-1.3.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "0cbccf53e8eba012ca24164a0e7698e6ad0e096f2e2d60a0a28bdfcedbe3cd22",
                "md5": "9559bea72ef2c01e440f3f3e4eed4c4b",
                "sha256": "29bfc2935a7db546dbed62fd8bc27f276a3164a3a426d16e6f850d75d1c4c9fa"
            },
            "downloads": -1,
            "filename": "frequenz-channels-1.3.0.tar.gz",
            "has_sig": false,
            "md5_digest": "9559bea72ef2c01e440f3f3e4eed4c4b",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<4,>=3.11",
            "size": 40066,
            "upload_time": "2024-11-19T13:36:54",
            "upload_time_iso_8601": "2024-11-19T13:36:54.500459Z",
            "url": "https://files.pythonhosted.org/packages/0c/bc/cf53e8eba012ca24164a0e7698e6ad0e096f2e2d60a0a28bdfcedbe3cd22/frequenz-channels-1.3.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-11-19 13:36:54",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "frequenz-floss",
    "github_project": "frequenz-channels-python",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "frequenz-channels"
}
        
Elapsed time: 0.51953s