# Asyncio For Robotics
| Requirements | Compatibility | Tests |
|---|---|---|
| [](https://pypi.org/project/asyncio_for_robotics/)<br>[](https://opensource.org/license/mit) | [](https://github.com/ros2)<br>[](https://zenoh.io/) | [](https://github.com/2lian/asyncio-for-robotics/actions/workflows/python-pytest.yml)<br>[](https://github.com/2lian/asyncio-for-robotics/actions/workflows/ros-pytest.yml) |
The Asyncio For Robotics (`afor`) library makes `asyncio` usable with ROS 2, Zenoh and more, letting you write linear, testable, and non-blocking Python code.
- Better syntax.
- Only native python: Better docs and support.
- Simplifies testing.
*Will this make my code slower?* [No.](https://github.com/2lian/asyncio-for-robotics/tree/main/README.md#about-speed)
*Will this make my code faster?* No. However `asyncio` will help YOU write
better, faster code.
*Does it replace ROS 2? Is this a wrapper?* No. It is a tool adding async capabilities. It gives you more choices, not less.
## Install
### Barebone
Compatible with ROS 2 (`jazzy`,`humble` and newer) out of the box. This library is pure python (>=3.10), so it installs easily.
```bash
pip install asyncio_for_robotics
```
### For Zenoh
```bash
pip install asyncio_for_robotics[zenoh]
```
## Read more
- [Detailed ROS 2 tutorial](https://github.com/2lian/asyncio-for-robotics/blob/main/using_with_ros.md)
- [Detailed examples](https://github.com/2lian/asyncio-for-robotics/blob/main/asyncio_for_robotics/example)
- [no talking π¦ show me code π¦](https://github.com/2lian/asyncio-for-robotics/blob/main/asyncio_for_robotics/example/ros2_pubsub.py)
- [Usage for software testing](https://github.com/2lian/asyncio-for-robotics/blob/main/tests)
- [Implement your own protocol](https://github.com/2lian/asyncio-for-robotics/blob/main/own_proto_example.md)
- [Cross-Platform deployment even with ROS](https://github.com/2lian/asyncio-for-robotics/blob/main/cross_platform.md) [](https://pixi.sh)
## Code sample
Syntax is identical between ROS 2 and Zenoh.
### Wait for messages one by one
Application:
- Get the latest sensor data
- Get clock value
- Wait for trigger
- Wait for system to be operational
```python
sub = afor.Sub(...)
# get the latest message
latest = await sub.wait_for_value()
# get a new message
new = await sub.wait_for_new()
# get the next message received
next = await sub.wait_for_next()
```
### Continuously listen to a data stream
Application:
- Process a whole data stream
- React to changes in sensor data
```python
# Continuously process the latest messages
async for msg in sub.listen():
status = foo(msg)
if status == DONE:
break
# Continuously process all incoming messages
async for msg in sub.listen_reliable():
status = foo(msg)
if status == DONE:
break
```
### Reliable, non-drifting Rate
Application:
- Periodic updates and actions
```python
# Rate is simply a subscriber triggering on every tick
rate = afor.Rate(frequency=0.01, time_source=time.time_ns)
# Wait for the next tick
await rate.wait_for_new()
# Executes after a tick
async for _ in rate.listen():
foo(...)
# Reliably executes for every tick
async for _ in sub.listen_reliable():
foo(...)
```
### Improved Services / Queryable for ROS 2
Services are needlessly convoluted in ROS 2 and intrinsically not async
(because the server callback function MUST return a response). `afor` overrides
the ROS behavior, allowing for the response to be sent later. Implementing
similar systems for other transport protocol should be very easy: The server is just a
`asyncio_for_robotics.core.BaseSub` generating responder objects.
Application:
- Client request reply from a server.
- Servers can delay their response without blocking (not possible in ROS 2)
```python
# Server is once again a afor subscriber, but generating responder objects
server = afor.Server(...)
# processes all requests.
# listen_reliable method is recommanded as it cannot skip requests
async for responder in server.listen_reliable():
if responder.request == "PING!":
reponder.response = "PONG!"
await asyncio.sleep(...) # reply can be differed
reponder.send()
else:
... # reply is not necessary
```
```python
# the client implements a async call method
client = afor.Client(...)
response = await client.call("PING!")
```
### Process for the right amount of time
Application:
- Test if the system is responding as expected
- Run small tasks with small and local code
```python
# Listen with a timeout
data = await afor.soft_wait_for(sub.wait_for_new(), timeout=1)
if isinstance(data, TimeoutError):
pytest.fail(f"Failed to get new data in under 1 second")
# Process a codeblock with a timeout
async with afor.soft_timeout(1):
sum = 0
total = 0
async for msg in sub.listen_reliable():
number = process(msg)
sum += number
total += 1
last_second_average = sum/total
assert last_second_average == pytest.approx(expected_average)
```
## About Speed
The inevitable question: *βBut isnβt this slower than the ROS 2 executor? ROS 2 is the best!β*
In short: `rclpy`'s executor is the bottleneck.
- Comparing to the best ROS 2 Jazzy can do (`SingleThreadedExecutor`), `afor` increases latency from 110us to 150us.
- Comparing to other execution methods, `afor` is equivalent if not faster.
- If you find it slow, you should use C++ or Zenoh (or contribute to this repo?).
Benchmark code is available in [`./tests/bench/`](https://github.com/2lian/asyncio-for-robotics/blob/main/tests/bench/), it consists in two pairs of pub/sub infinitely echoing a message (using one single node). The messaging rate, thus measures the request to response latency.
| With `afor` | Transport | Executor | | Frequency (kHz) | Latency (ms) |
|:----------:|:----------|:----------------------------------|-|---------:|---------:|
| βοΈ | Zenoh | None | | **95** | **0.01** |
| βοΈ | ROS 2 | [Experimental Asyncio](https://github.com/ros2/rclpy/pull/1399) | | **17** | **0.06** |
| β | ROS 2 | [Experimental Asyncio](https://github.com/ros2/rclpy/pull/1399) | | 13 | 0.08 |
| β | ROS 2 | SingleThreaded | | 9 | 0.11 |
| βοΈ | ROS 2 | SingleThreaded | | **7** | **0.15** |
| βοΈ | ROS 2 | MultiThreaded | | **3** | **0.3** |
| β | ROS 2 | MultiThreaded | | **3** | **0.3** |
| βοΈ | ROS 2 | [`ros_loop` Method](https://github.com/m2-farzan/ros2-asyncio) | | 3 | 0.3 |
Details:
- `uvloop` was used, replacing the asyncio executor (more or less doubles the performances for Zenoh)
- RMW was set to `rmw_zenoh_cpp`
- ROS2 benchmarks uses `afor`'s `ros2.ThreadedSession` (the default in `afor`).
- Only the Benchmark of the [`ros_loop` method](https://github.com/m2-farzan/ros2-asyncio) uses `afor`'s second type of session: `ros2.SynchronousSession`.
- ROS 2 executors can easily be changed in `afor` when creating a session.
- The experimental `AsyncioExecutor` PR on ros rolling by nadavelkabets is incredible [https://github.com/ros2/rclpy/pull/1399](https://github.com/ros2/rclpy/pull/1399). Maybe I will add proper support for it (but only a few will want to use an unmerged experimental PR of ROS 2 rolling).
- If there is interest in those benchmarks I will improve them, so others can run them all easily.
Analysis:
- Zenoh is extremely fast, proving that `afor` is not the bottleneck.
- This `AsyncioExecutor` having better perf when using `afor` is interesting, because `afor` does not bypass code.
- I think this is due to `AsyncioExecutor` having some overhead that affects its own callback.
- Without `afor` the ROS 2 callback executes some code and publishes.
- With `afor` the ROS 2 callback returns immediately, and fully delegates execution to `asyncio`.
- The increase of latency on the `SingleThreaded` executors proves that getting data in and out of the `rclpy` executor and thread is the main bottleneck.
- `AsyncioExecutor` does not have such thread, thus can directly communicate.
- Zenoh has its own thread, however it is built exclusively for multi-thread operations, without any executor. Thus achieves far superior performances.
- `MultiThreadedExecutor` is just famously slow.
- Very surprisingly, the well known `ros_loop` method detailed here [https://github.com/m2-farzan/ros2-asyncio](https://github.com/m2-farzan/ros2-asyncio) is slow.
Raw data
{
"_id": null,
"home_page": null,
"name": "asyncio_for_robotics",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.10",
"maintainer_email": null,
"keywords": "asyncio, robotics, ros2, zenoh, publisher, subscriber",
"author": null,
"author_email": "Elian <no@no.com>",
"download_url": "https://files.pythonhosted.org/packages/48/ab/da2460f30167b316c33ca70474e5ec28b44f71fe32d36811899df5a6723f/asyncio_for_robotics-1.0.6.tar.gz",
"platform": null,
"description": "# Asyncio For Robotics\n| Requirements | Compatibility | Tests |\n|---|---|---|\n| [](https://pypi.org/project/asyncio_for_robotics/)<br>[](https://opensource.org/license/mit) | [](https://github.com/ros2)<br>[](https://zenoh.io/) | [](https://github.com/2lian/asyncio-for-robotics/actions/workflows/python-pytest.yml)<br>[](https://github.com/2lian/asyncio-for-robotics/actions/workflows/ros-pytest.yml) |\n\nThe Asyncio For Robotics (`afor`) library makes `asyncio` usable with ROS 2, Zenoh and more, letting you write linear, testable, and non-blocking Python code.\n\n- Better syntax.\n- Only native python: Better docs and support.\n- Simplifies testing.\n\n*Will this make my code slower?* [No.](https://github.com/2lian/asyncio-for-robotics/tree/main/README.md#about-speed)\n\n*Will this make my code faster?* No. However `asyncio` will help YOU write\nbetter, faster code.\n\n*Does it replace ROS 2? Is this a wrapper?* No. It is a tool adding async capabilities. It gives you more choices, not less.\n\n## Install\n\n### Barebone\n\nCompatible with ROS 2 (`jazzy`,`humble` and newer) out of the box. This library is pure python (>=3.10), so it installs easily.\n\n```bash\npip install asyncio_for_robotics\n```\n\n### For Zenoh\n\n```bash\npip install asyncio_for_robotics[zenoh]\n```\n\n## Read more\n\n- [Detailed ROS 2 tutorial](https://github.com/2lian/asyncio-for-robotics/blob/main/using_with_ros.md)\n- [Detailed examples](https://github.com/2lian/asyncio-for-robotics/blob/main/asyncio_for_robotics/example)\n - [no talking \ud83e\udd8d show me code \ud83e\udd8d](https://github.com/2lian/asyncio-for-robotics/blob/main/asyncio_for_robotics/example/ros2_pubsub.py)\n- [Usage for software testing](https://github.com/2lian/asyncio-for-robotics/blob/main/tests)\n- [Implement your own protocol](https://github.com/2lian/asyncio-for-robotics/blob/main/own_proto_example.md)\n- [Cross-Platform deployment even with ROS](https://github.com/2lian/asyncio-for-robotics/blob/main/cross_platform.md) [](https://pixi.sh)\n\n## Code sample\n\nSyntax is identical between ROS 2 and Zenoh.\n\n### Wait for messages one by one\n\nApplication:\n- Get the latest sensor data\n- Get clock value\n- Wait for trigger\n- Wait for system to be operational\n\n```python\nsub = afor.Sub(...)\n\n# get the latest message\nlatest = await sub.wait_for_value()\n\n# get a new message\nnew = await sub.wait_for_new()\n\n# get the next message received\nnext = await sub.wait_for_next()\n```\n\n### Continuously listen to a data stream\n\nApplication:\n- Process a whole data stream\n- React to changes in sensor data\n\n```python\n# Continuously process the latest messages\nasync for msg in sub.listen():\n status = foo(msg)\n if status == DONE:\n break\n\n# Continuously process all incoming messages\nasync for msg in sub.listen_reliable():\n status = foo(msg)\n if status == DONE:\n break\n```\n\n### Reliable, non-drifting Rate\n\nApplication:\n- Periodic updates and actions\n\n```python\n# Rate is simply a subscriber triggering on every tick\nrate = afor.Rate(frequency=0.01, time_source=time.time_ns)\n\n# Wait for the next tick\nawait rate.wait_for_new()\n\n# Executes after a tick\nasync for _ in rate.listen():\n foo(...)\n\n# Reliably executes for every tick\nasync for _ in sub.listen_reliable():\n foo(...)\n```\n\n### Improved Services / Queryable for ROS 2\n\nServices are needlessly convoluted in ROS 2 and intrinsically not async\n(because the server callback function MUST return a response). `afor` overrides\nthe ROS behavior, allowing for the response to be sent later. Implementing\nsimilar systems for other transport protocol should be very easy: The server is just a\n`asyncio_for_robotics.core.BaseSub` generating responder objects.\n\nApplication:\n- Client request reply from a server.\n- Servers can delay their response without blocking (not possible in ROS 2)\n\n```python\n# Server is once again a afor subscriber, but generating responder objects\nserver = afor.Server(...)\n\n# processes all requests.\n# listen_reliable method is recommanded as it cannot skip requests\nasync for responder in server.listen_reliable():\n if responder.request == \"PING!\":\n reponder.response = \"PONG!\"\n await asyncio.sleep(...) # reply can be differed\n reponder.send()\n else:\n ... # reply is not necessary\n```\n\n```python\n# the client implements a async call method\nclient = afor.Client(...)\n\nresponse = await client.call(\"PING!\")\n```\n\n### Process for the right amount of time\n\nApplication:\n- Test if the system is responding as expected\n- Run small tasks with small and local code\n\n```python\n# Listen with a timeout\ndata = await afor.soft_wait_for(sub.wait_for_new(), timeout=1)\nif isinstance(data, TimeoutError):\n pytest.fail(f\"Failed to get new data in under 1 second\")\n\n\n# Process a codeblock with a timeout\nasync with afor.soft_timeout(1):\n sum = 0\n total = 0\n async for msg in sub.listen_reliable():\n number = process(msg)\n sum += number\n total += 1\n\nlast_second_average = sum/total\nassert last_second_average == pytest.approx(expected_average)\n```\n\n## About Speed\n\nThe inevitable question: *\u201cBut isn\u2019t this slower than the ROS 2 executor? ROS 2 is the best!\u201d*\n\nIn short: `rclpy`'s executor is the bottleneck. \n- Comparing to the best ROS 2 Jazzy can do (`SingleThreadedExecutor`), `afor` increases latency from 110us to 150us.\n- Comparing to other execution methods, `afor` is equivalent if not faster.\n- If you find it slow, you should use C++ or Zenoh (or contribute to this repo?).\n\nBenchmark code is available in [`./tests/bench/`](https://github.com/2lian/asyncio-for-robotics/blob/main/tests/bench/), it consists in two pairs of pub/sub infinitely echoing a message (using one single node). The messaging rate, thus measures the request to response latency. \n\n| With `afor` | Transport | Executor | | Frequency (kHz) | Latency (ms) |\n|:----------:|:----------|:----------------------------------|-|---------:|---------:|\n| \u2714\ufe0f | Zenoh | None | | **95** | **0.01** |\n| \u2714\ufe0f | ROS 2 | [Experimental Asyncio](https://github.com/ros2/rclpy/pull/1399) | | **17** | **0.06** |\n| \u274c | ROS 2 | [Experimental Asyncio](https://github.com/ros2/rclpy/pull/1399) | | 13 | 0.08 |\n| \u274c | ROS 2 | SingleThreaded | | 9 | 0.11 |\n| \u2714\ufe0f | ROS 2 | SingleThreaded | | **7** | **0.15** |\n| \u2714\ufe0f | ROS 2 | MultiThreaded | | **3** | **0.3** |\n| \u274c | ROS 2 | MultiThreaded | | **3** | **0.3** |\n| \u2714\ufe0f | ROS 2 | [`ros_loop` Method](https://github.com/m2-farzan/ros2-asyncio) | | 3 | 0.3 |\n\n\nDetails:\n- `uvloop` was used, replacing the asyncio executor (more or less doubles the performances for Zenoh)\n- RMW was set to `rmw_zenoh_cpp`\n- ROS2 benchmarks uses `afor`'s `ros2.ThreadedSession` (the default in `afor`). \n- Only the Benchmark of the [`ros_loop` method](https://github.com/m2-farzan/ros2-asyncio) uses `afor`'s second type of session: `ros2.SynchronousSession`.\n- ROS 2 executors can easily be changed in `afor` when creating a session.\n- The experimental `AsyncioExecutor` PR on ros rolling by nadavelkabets is incredible [https://github.com/ros2/rclpy/pull/1399](https://github.com/ros2/rclpy/pull/1399). Maybe I will add proper support for it (but only a few will want to use an unmerged experimental PR of ROS 2 rolling).\n- If there is interest in those benchmarks I will improve them, so others can run them all easily.\n\nAnalysis:\n- Zenoh is extremely fast, proving that `afor` is not the bottleneck.\n- This `AsyncioExecutor` having better perf when using `afor` is interesting, because `afor` does not bypass code.\n - I think this is due to `AsyncioExecutor` having some overhead that affects its own callback.\n - Without `afor` the ROS 2 callback executes some code and publishes.\n - With `afor` the ROS 2 callback returns immediately, and fully delegates execution to `asyncio`.\n- The increase of latency on the `SingleThreaded` executors proves that getting data in and out of the `rclpy` executor and thread is the main bottleneck. \n - `AsyncioExecutor` does not have such thread, thus can directly communicate.\n - Zenoh has its own thread, however it is built exclusively for multi-thread operations, without any executor. Thus achieves far superior performances.\n- `MultiThreadedExecutor` is just famously slow.\n- Very surprisingly, the well known `ros_loop` method detailed here [https://github.com/m2-farzan/ros2-asyncio](https://github.com/m2-farzan/ros2-asyncio) is slow.\n",
"bugtrack_url": null,
"license": null,
"summary": "Asyncio interface for ROS 2, Zenoh, and other robotic middlewares.",
"version": "1.0.6",
"project_urls": {
"Homepage": "https://github.com/2lian/asyncio-for-robotics",
"Issues": "https://github.com/2lian/asyncio-for-robotics/issues",
"Repository": "https://github.com/2lian/asyncio-for-robotics"
},
"split_keywords": [
"asyncio",
" robotics",
" ros2",
" zenoh",
" publisher",
" subscriber"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "95a03878e106e20644ff7c7b534ae8a341b673053b4fa5ae81c37312c7e8580f",
"md5": "c60951d90d2b3b0ec8139043e0e0ae4f",
"sha256": "6b033e736ef4d27c73c94ea8ee012f8311150bdb8eee78273f38aae1b587ddf0"
},
"downloads": -1,
"filename": "asyncio_for_robotics-1.0.6-py3-none-any.whl",
"has_sig": false,
"md5_digest": "c60951d90d2b3b0ec8139043e0e0ae4f",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.10",
"size": 34281,
"upload_time": "2025-11-04T16:53:18",
"upload_time_iso_8601": "2025-11-04T16:53:18.018205Z",
"url": "https://files.pythonhosted.org/packages/95/a0/3878e106e20644ff7c7b534ae8a341b673053b4fa5ae81c37312c7e8580f/asyncio_for_robotics-1.0.6-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "48abda2460f30167b316c33ca70474e5ec28b44f71fe32d36811899df5a6723f",
"md5": "8ad205d1f9f0378571093de00fe343f6",
"sha256": "572bcd51df83101d2d3407d943199d605fbb5acc5708ba6fa414f5f047f82732"
},
"downloads": -1,
"filename": "asyncio_for_robotics-1.0.6.tar.gz",
"has_sig": false,
"md5_digest": "8ad205d1f9f0378571093de00fe343f6",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.10",
"size": 30712,
"upload_time": "2025-11-04T16:53:19",
"upload_time_iso_8601": "2025-11-04T16:53:19.722102Z",
"url": "https://files.pythonhosted.org/packages/48/ab/da2460f30167b316c33ca70474e5ec28b44f71fe32d36811899df5a6723f/asyncio_for_robotics-1.0.6.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-11-04 16:53:19",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "2lian",
"github_project": "asyncio-for-robotics",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "asyncio_for_robotics"
}