# Frequenz channels
[![Build Status](https://github.com/frequenz-floss/frequenz-channels-python/actions/workflows/ci.yaml/badge.svg)](https://github.com/frequenz-floss/frequenz-channels-python/actions/workflows/ci.yaml)
[![PyPI Package](https://img.shields.io/pypi/v/frequenz-channels)](https://pypi.org/project/frequenz-channels/)
[![Docs](https://img.shields.io/badge/docs-latest-informational)](https://frequenz-floss.github.io/frequenz-channels-python/)
## Introduction
<!-- introduction -->
Frequenz Channels is a *channels* implementation for Python.
According to [Wikipedia](https://en.wikipedia.org/wiki/Channel_(programming)):
> A channel is a model for interprocess communication and synchronization via
> message passing. A message may be sent over a channel, and another process or
> thread is able to receive messages sent over a channel it has a reference to,
> as a stream. Different implementations of channels may be buffered or not,
> and either synchronous or asynchronous.
Frequenz Channels are mostly designed after [Go
channels](https://tour.golang.org/concurrency/2) but it also borrows ideas from
[Rust channels](https://doc.rust-lang.org/book/ch16-02-message-passing.html).
<!-- /introduction -->
## Supported Platforms
<!-- supported-platforms -->
The following platforms are officially supported (tested):
- **Python:** 3.11
- **Operating System:** Ubuntu Linux 20.04
- **Architectures:** amd64, arm64
> [!NOTE]
> Newer Python versions and other operating systems and architectures might
> work too, but they are not automatically tested, so we cannot guarantee it.
<!-- /supported-platforms -->
## Quick Start
### Installing
<!-- quick-start-installing -->
Assuming a [supported](#supported-platforms) working Python environment:
```sh
python3 -m pip install frequenz-channels
```
> [!TIP]
> For more details please read the [Installation
> Guide](docs/user-guide/installation.md).
<!-- /quick-start-installing -->
### Examples
#### Hello World
<!-- quick-start-hello-world -->
```python
import asyncio
from frequenz.channels import Anycast
async def main() -> None:
hello_channel = Anycast[str](name="hello-world-channel")
sender = hello_channel.new_sender()
receiver = hello_channel.new_receiver()
await sender.send("Hello World!")
message = await receiver.receive()
print(message)
asyncio.run(main())
```
<!-- /quick-start-hello-world -->
#### Showcase
<!-- quick-start-showcase -->
This is a comprehensive example that shows most of the main features of the
library:
```python
import asyncio
from dataclasses import dataclass
from datetime import timedelta
from enum import Enum, auto
from typing import assert_never
from frequenz.channels import (
Anycast,
Broadcast,
Receiver,
Sender,
merge,
select,
selected_from,
)
from frequenz.channels.timer import SkipMissedAndDrift, Timer, TriggerAllMissed
class Command(Enum):
PING = auto()
STOP_SENDER = auto()
class ReplyCommand(Enum):
PONG = auto()
@dataclass(frozen=True)
class Reply:
reply: ReplyCommand
source: str
async def send(
sender: Sender[str],
control_command: Receiver[Command],
control_reply: Sender[Reply],
) -> None:
"""Send a counter value every second, until a stop command is received."""
print(f"{sender}: Starting")
timer = Timer(timedelta(seconds=1.0), TriggerAllMissed())
counter = 0
async for selected in select(timer, control_command):
if selected_from(selected, timer):
print(f"{sender}: Sending {counter}")
await sender.send(f"{sender}: {counter}")
counter += 1
elif selected_from(selected, control_command):
print(f"{sender}: Received command: {selected.message.name}")
match selected.message:
case Command.STOP_SENDER:
print(f"{sender}: Stopping")
break
case Command.PING:
print(f"{sender}: Ping received, reply with pong")
await control_reply.send(Reply(ReplyCommand.PONG, str(sender)))
case _ as unknown:
assert_never(unknown)
print(f"{sender}: Finished")
async def receive(
receivers: list[Receiver[str]],
control_command: Receiver[Command],
control_reply: Sender[Reply],
) -> None:
"""Receive data from multiple channels, until no more data is received for 2 seconds."""
print("receive: Starting")
timer = Timer(timedelta(seconds=2.0), SkipMissedAndDrift())
print(f"{timer=}")
merged = merge(*receivers)
async for selected in select(merged, timer, control_command):
if selected_from(selected, merged):
message = selected.message
print(f"receive: Received {message=}")
timer.reset()
print(f"{timer=}")
elif selected_from(selected, control_command):
print(f"receive: received command: {selected.message.name}")
match selected.message:
case Command.PING:
print("receive: Ping received, reply with pong")
await control_reply.send(Reply(ReplyCommand.PONG, "receive"))
case Command.STOP_SENDER:
pass # Ignore
case _ as unknown:
assert_never(unknown)
elif selected_from(selected, timer):
drift = selected.message
print(
f"receive: No data received for {timer.interval + drift} seconds, "
"giving up"
)
break
print("receive: Finished")
async def main() -> None:
data_channel_1 = Anycast[str](name="data-channel-1")
data_channel_2 = Anycast[str](name="data-channel-2")
command_channel = Broadcast[Command](name="control-channel") # (1)!
reply_channel = Anycast[Reply](name="reply-channel")
async with asyncio.TaskGroup() as tasks:
tasks.create_task(
send(
data_channel_1.new_sender(),
command_channel.new_receiver(),
reply_channel.new_sender(),
),
name="send-channel-1",
)
tasks.create_task(
send(
data_channel_2.new_sender(),
command_channel.new_receiver(),
reply_channel.new_sender(),
),
name="send-channel-2",
)
tasks.create_task(
receive(
[data_channel_1.new_receiver(), data_channel_2.new_receiver()],
command_channel.new_receiver(),
reply_channel.new_sender(),
),
name="receive",
)
control_sender = command_channel.new_sender()
reply_receiver = reply_channel.new_receiver()
# Send a ping command to all tasks and wait for the replies
await control_sender.send(Command.PING)
print(f"main: {await reply_receiver.receive()}")
print(f"main: {await reply_receiver.receive()}")
print(f"main: {await reply_receiver.receive()}")
await asyncio.sleep(5.0)
# Stop senders, after 2 seconds not receiving any data,
# the receiver will stop too
await control_sender.send(Command.STOP_SENDER)
asyncio.run(main())
```
<!-- /quick-start-showcase -->
## Documentation
For more information, please read the [documentation
website](https://frequenz-floss.github.io/frequenz-channels-python/).
## Contributing
If you want to know how to build this project and contribute to it, please
check out the [Contributing Guide](docs/CONTRIBUTING.md).
Raw data
{
"_id": null,
"home_page": null,
"name": "frequenz-channels",
"maintainer": null,
"docs_url": null,
"requires_python": "<4,>=3.11",
"maintainer_email": null,
"keywords": "frequenz, python, lib, library, channels, channel",
"author": null,
"author_email": "Frequenz Energy-as-a-Service GmbH <floss@frequenz.com>",
"download_url": "https://files.pythonhosted.org/packages/0c/bc/cf53e8eba012ca24164a0e7698e6ad0e096f2e2d60a0a28bdfcedbe3cd22/frequenz-channels-1.3.0.tar.gz",
"platform": null,
"description": "# Frequenz channels\n\n[![Build Status](https://github.com/frequenz-floss/frequenz-channels-python/actions/workflows/ci.yaml/badge.svg)](https://github.com/frequenz-floss/frequenz-channels-python/actions/workflows/ci.yaml)\n[![PyPI Package](https://img.shields.io/pypi/v/frequenz-channels)](https://pypi.org/project/frequenz-channels/)\n[![Docs](https://img.shields.io/badge/docs-latest-informational)](https://frequenz-floss.github.io/frequenz-channels-python/)\n\n## Introduction\n\n<!-- introduction -->\n\nFrequenz Channels is a *channels* implementation for Python.\n\nAccording to [Wikipedia](https://en.wikipedia.org/wiki/Channel_(programming)):\n\n> A channel is a model for interprocess communication and synchronization via\n> message passing. A message may be sent over a channel, and another process or\n> thread is able to receive messages sent over a channel it has a reference to,\n> as a stream. Different implementations of channels may be buffered or not,\n> and either synchronous or asynchronous.\n\nFrequenz Channels are mostly designed after [Go\nchannels](https://tour.golang.org/concurrency/2) but it also borrows ideas from\n[Rust channels](https://doc.rust-lang.org/book/ch16-02-message-passing.html).\n\n<!-- /introduction -->\n\n## Supported Platforms\n\n<!-- supported-platforms -->\n\nThe following platforms are officially supported (tested):\n\n- **Python:** 3.11\n- **Operating System:** Ubuntu Linux 20.04\n- **Architectures:** amd64, arm64\n\n> [!NOTE]\n> Newer Python versions and other operating systems and architectures might\n> work too, but they are not automatically tested, so we cannot guarantee it.\n\n<!-- /supported-platforms -->\n\n## Quick Start\n\n### Installing\n\n<!-- quick-start-installing -->\n\nAssuming a [supported](#supported-platforms) working Python environment:\n\n```sh\npython3 -m pip install frequenz-channels\n```\n\n> [!TIP]\n> For more details please read the [Installation\n> Guide](docs/user-guide/installation.md).\n\n<!-- /quick-start-installing -->\n\n### Examples\n\n#### Hello World\n\n<!-- quick-start-hello-world -->\n\n```python\nimport asyncio\n\nfrom frequenz.channels import Anycast\n\n\nasync def main() -> None:\n hello_channel = Anycast[str](name=\"hello-world-channel\")\n sender = hello_channel.new_sender()\n receiver = hello_channel.new_receiver()\n\n await sender.send(\"Hello World!\")\n message = await receiver.receive()\n print(message)\n\n\nasyncio.run(main())\n```\n\n<!-- /quick-start-hello-world -->\n\n#### Showcase\n\n<!-- quick-start-showcase -->\n\nThis is a comprehensive example that shows most of the main features of the\nlibrary:\n\n```python\nimport asyncio\nfrom dataclasses import dataclass\nfrom datetime import timedelta\nfrom enum import Enum, auto\nfrom typing import assert_never\n\nfrom frequenz.channels import (\n Anycast,\n Broadcast,\n Receiver,\n Sender,\n merge,\n select,\n selected_from,\n)\nfrom frequenz.channels.timer import SkipMissedAndDrift, Timer, TriggerAllMissed\n\n\nclass Command(Enum):\n PING = auto()\n STOP_SENDER = auto()\n\n\nclass ReplyCommand(Enum):\n PONG = auto()\n\n\n@dataclass(frozen=True)\nclass Reply:\n reply: ReplyCommand\n source: str\n\n\nasync def send(\n sender: Sender[str],\n control_command: Receiver[Command],\n control_reply: Sender[Reply],\n) -> None:\n \"\"\"Send a counter value every second, until a stop command is received.\"\"\"\n print(f\"{sender}: Starting\")\n timer = Timer(timedelta(seconds=1.0), TriggerAllMissed())\n counter = 0\n async for selected in select(timer, control_command):\n if selected_from(selected, timer):\n print(f\"{sender}: Sending {counter}\")\n await sender.send(f\"{sender}: {counter}\")\n counter += 1\n elif selected_from(selected, control_command):\n print(f\"{sender}: Received command: {selected.message.name}\")\n match selected.message:\n case Command.STOP_SENDER:\n print(f\"{sender}: Stopping\")\n break\n case Command.PING:\n print(f\"{sender}: Ping received, reply with pong\")\n await control_reply.send(Reply(ReplyCommand.PONG, str(sender)))\n case _ as unknown:\n assert_never(unknown)\n print(f\"{sender}: Finished\")\n\n\nasync def receive(\n receivers: list[Receiver[str]],\n control_command: Receiver[Command],\n control_reply: Sender[Reply],\n) -> None:\n \"\"\"Receive data from multiple channels, until no more data is received for 2 seconds.\"\"\"\n print(\"receive: Starting\")\n timer = Timer(timedelta(seconds=2.0), SkipMissedAndDrift())\n print(f\"{timer=}\")\n merged = merge(*receivers)\n async for selected in select(merged, timer, control_command):\n if selected_from(selected, merged):\n message = selected.message\n print(f\"receive: Received {message=}\")\n timer.reset()\n print(f\"{timer=}\")\n elif selected_from(selected, control_command):\n print(f\"receive: received command: {selected.message.name}\")\n match selected.message:\n case Command.PING:\n print(\"receive: Ping received, reply with pong\")\n await control_reply.send(Reply(ReplyCommand.PONG, \"receive\"))\n case Command.STOP_SENDER:\n pass # Ignore\n case _ as unknown:\n assert_never(unknown)\n elif selected_from(selected, timer):\n drift = selected.message\n print(\n f\"receive: No data received for {timer.interval + drift} seconds, \"\n \"giving up\"\n )\n break\n print(\"receive: Finished\")\n\n\nasync def main() -> None:\n data_channel_1 = Anycast[str](name=\"data-channel-1\")\n data_channel_2 = Anycast[str](name=\"data-channel-2\")\n command_channel = Broadcast[Command](name=\"control-channel\") # (1)!\n reply_channel = Anycast[Reply](name=\"reply-channel\")\n\n async with asyncio.TaskGroup() as tasks:\n tasks.create_task(\n send(\n data_channel_1.new_sender(),\n command_channel.new_receiver(),\n reply_channel.new_sender(),\n ),\n name=\"send-channel-1\",\n )\n tasks.create_task(\n send(\n data_channel_2.new_sender(),\n command_channel.new_receiver(),\n reply_channel.new_sender(),\n ),\n name=\"send-channel-2\",\n )\n tasks.create_task(\n receive(\n [data_channel_1.new_receiver(), data_channel_2.new_receiver()],\n command_channel.new_receiver(),\n reply_channel.new_sender(),\n ),\n name=\"receive\",\n )\n\n control_sender = command_channel.new_sender()\n reply_receiver = reply_channel.new_receiver()\n\n # Send a ping command to all tasks and wait for the replies\n await control_sender.send(Command.PING)\n print(f\"main: {await reply_receiver.receive()}\")\n print(f\"main: {await reply_receiver.receive()}\")\n print(f\"main: {await reply_receiver.receive()}\")\n\n await asyncio.sleep(5.0)\n\n # Stop senders, after 2 seconds not receiving any data,\n # the receiver will stop too\n await control_sender.send(Command.STOP_SENDER)\n\n\nasyncio.run(main())\n```\n\n<!-- /quick-start-showcase -->\n\n## Documentation\n\nFor more information, please read the [documentation\nwebsite](https://frequenz-floss.github.io/frequenz-channels-python/).\n\n## Contributing\n\nIf you want to know how to build this project and contribute to it, please\ncheck out the [Contributing Guide](docs/CONTRIBUTING.md).\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "Channel implementations for Python",
"version": "1.3.0",
"project_urls": {
"Changelog": "https://github.com/frequenz-floss/frequenz-channels-python/releases",
"Documentation": "https://frequenz-floss.github.io/frequenz-channels-python/",
"Issues": "https://github.com/frequenz-floss/frequenz-channels-python/issues",
"Repository": "https://github.com/frequenz-floss/frequenz-channels-python",
"Support": "https://github.com/frequenz-floss/frequenz-channels-python/discussions/categories/support"
},
"split_keywords": [
"frequenz",
" python",
" lib",
" library",
" channels",
" channel"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "3327b913bd0e16f7eab6a029233116e3b936c6dd7a11d6dbe5fd397635e9bfde",
"md5": "9d4420397a6d1e85965d33dc4746f0fc",
"sha256": "ff33dd460c5794f030924074b6782d8f22ec2c0fb8e75f5c4dfc0dcc006dc3af"
},
"downloads": -1,
"filename": "frequenz_channels-1.3.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "9d4420397a6d1e85965d33dc4746f0fc",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "<4,>=3.11",
"size": 47816,
"upload_time": "2024-11-19T13:36:53",
"upload_time_iso_8601": "2024-11-19T13:36:53.212873Z",
"url": "https://files.pythonhosted.org/packages/33/27/b913bd0e16f7eab6a029233116e3b936c6dd7a11d6dbe5fd397635e9bfde/frequenz_channels-1.3.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "0cbccf53e8eba012ca24164a0e7698e6ad0e096f2e2d60a0a28bdfcedbe3cd22",
"md5": "9559bea72ef2c01e440f3f3e4eed4c4b",
"sha256": "29bfc2935a7db546dbed62fd8bc27f276a3164a3a426d16e6f850d75d1c4c9fa"
},
"downloads": -1,
"filename": "frequenz-channels-1.3.0.tar.gz",
"has_sig": false,
"md5_digest": "9559bea72ef2c01e440f3f3e4eed4c4b",
"packagetype": "sdist",
"python_version": "source",
"requires_python": "<4,>=3.11",
"size": 40066,
"upload_time": "2024-11-19T13:36:54",
"upload_time_iso_8601": "2024-11-19T13:36:54.500459Z",
"url": "https://files.pythonhosted.org/packages/0c/bc/cf53e8eba012ca24164a0e7698e6ad0e096f2e2d60a0a28bdfcedbe3cd22/frequenz-channels-1.3.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-11-19 13:36:54",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "frequenz-floss",
"github_project": "frequenz-channels-python",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "frequenz-channels"
}