# ROBOTICS.DEV App Builder
Run P2P AI Robotics.dev apps anywhere!
````
pip install robotics-dev
````
Usage:
````
await robotics.connect({
'server': 'ws://192.168.0.47:3001',
'robot': 'eeeaa722-...-9a53-c945a5822b60',
'token': '5a66b323-...-9169-77a95014f339'
}, handle_message)
robotics.speak('eeeaa722-...-9a53-c945a5822b60', 'this is a test')
robotics.twist('eeeaa722-...-9a53-c945a5822b60', forward_twist)
````
Here's an example python script:
````
import asyncio
import signal
import base64
import sys
from pathlib import Path
# Add the src directory to Python path for local testing
src_path = Path(__file__).parent.parent / 'src'
sys.path.append(str(src_path))
# Import for local testing
from robotics_dev.robotics import robotics
# For production, use:
# from robotics_dev import robotics
# Create twist message for moving forward at 20% speed
forward_twist = {
"linear": {
"x": 0.2, # 20% forward velocity
"y": 0.0,
"z": 0.0
},
"angular": {
"x": 0.0,
"y": 0.0,
"z": 0.0
}
}
# Create stop twist message
stop_twist = {
"linear": {
"x": 0.0,
"y": 0.0,
"z": 0.0
},
"angular": {
"x": 0.0,
"y": 0.0,
"z": 0.0
}
}
one_time = True
async def handle_message(ros_message):
global one_time
# ros_message is already parsed by the SDK
print('Received p2p data:', ros_message)
if one_time:
one_time = False
await robotics.speak('eeeaa722-...-9a53-c945a5822b60', 'this is a test')
print('Moving robot forward at 20% speed...')
await robotics.twist('eeeaa722-...-9a53-c945a5822b60', forward_twist)
# Stop after 5 seconds
await asyncio.sleep(5)
print('Stopping robot...')
await robotics.twist('eeeaa722-...-9a53-c945a5822b60', stop_twist)
async def main():
# Set up signal handlers for graceful shutdown
loop = asyncio.get_running_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, lambda: asyncio.create_task(cleanup()))
try:
# Connect to robotics.dev
await robotics.connect({
'server': 'ws://192.168.0.47:3001',
'robot': 'eeeaa722-...-9a53-c945a5822b60',
'token': '5a66b323-...-9169-77a95014f339'
}, handle_message)
# Keep running until interrupted
while True:
await asyncio.sleep(1)
except asyncio.CancelledError:
print("Shutdown requested...")
except Exception as e:
print(f"Error: {e}")
async def cleanup():
print("Disconnecting...")
await robotics.disconnect()
# Stop the event loop
loop = asyncio.get_running_loop()
loop.stop()
if __name__ == "__main__":
asyncio.run(main())
````
Raw data
{
"_id": null,
"home_page": null,
"name": "robotics-dev",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.8",
"maintainer_email": null,
"keywords": "AI, ROS, ROS2, motors, robotics, robots, streaming, video",
"author": null,
"author_email": "Chris Matthieu <chris@matthieu.us>",
"download_url": "https://files.pythonhosted.org/packages/53/a1/b5f75fbfd1bf659ed374ae43882b16e4803cb832a477c8f560de82289c85/robotics_dev-0.0.9.tar.gz",
"platform": null,
"description": "# ROBOTICS.DEV App Builder\n\nRun P2P AI Robotics.dev apps anywhere!\n\n````\npip install robotics-dev\n````\n\nUsage:\n\n````\nawait robotics.connect({\n 'server': 'ws://192.168.0.47:3001',\n 'robot': 'eeeaa722-...-9a53-c945a5822b60',\n 'token': '5a66b323-...-9169-77a95014f339'\n}, handle_message)\n\nrobotics.speak('eeeaa722-...-9a53-c945a5822b60', 'this is a test')\n\nrobotics.twist('eeeaa722-...-9a53-c945a5822b60', forward_twist)\n````\n\nHere's an example python script: \n````\nimport asyncio\nimport signal\nimport base64\nimport sys\nfrom pathlib import Path\n\n# Add the src directory to Python path for local testing\nsrc_path = Path(__file__).parent.parent / 'src'\nsys.path.append(str(src_path))\n\n# Import for local testing\nfrom robotics_dev.robotics import robotics\n# For production, use:\n# from robotics_dev import robotics\n\n# Create twist message for moving forward at 20% speed\nforward_twist = {\n \"linear\": {\n \"x\": 0.2, # 20% forward velocity\n \"y\": 0.0,\n \"z\": 0.0\n },\n \"angular\": {\n \"x\": 0.0,\n \"y\": 0.0,\n \"z\": 0.0\n }\n}\n\n# Create stop twist message\nstop_twist = {\n \"linear\": {\n \"x\": 0.0,\n \"y\": 0.0,\n \"z\": 0.0\n },\n \"angular\": {\n \"x\": 0.0,\n \"y\": 0.0,\n \"z\": 0.0\n }\n}\n\none_time = True\n\nasync def handle_message(ros_message):\n global one_time\n # ros_message is already parsed by the SDK\n print('Received p2p data:', ros_message)\n\n if one_time:\n one_time = False\n await robotics.speak('eeeaa722-...-9a53-c945a5822b60', 'this is a test')\n\n print('Moving robot forward at 20% speed...')\n await robotics.twist('eeeaa722-...-9a53-c945a5822b60', forward_twist)\n\n # Stop after 5 seconds\n await asyncio.sleep(5)\n print('Stopping robot...')\n await robotics.twist('eeeaa722-...-9a53-c945a5822b60', stop_twist)\n\nasync def main():\n # Set up signal handlers for graceful shutdown\n loop = asyncio.get_running_loop()\n for sig in (signal.SIGINT, signal.SIGTERM):\n loop.add_signal_handler(sig, lambda: asyncio.create_task(cleanup()))\n\n try:\n # Connect to robotics.dev\n await robotics.connect({\n 'server': 'ws://192.168.0.47:3001',\n 'robot': 'eeeaa722-...-9a53-c945a5822b60',\n 'token': '5a66b323-...-9169-77a95014f339'\n }, handle_message)\n\n # Keep running until interrupted\n while True:\n await asyncio.sleep(1)\n\n except asyncio.CancelledError:\n print(\"Shutdown requested...\")\n except Exception as e:\n print(f\"Error: {e}\")\n\nasync def cleanup():\n print(\"Disconnecting...\")\n await robotics.disconnect()\n # Stop the event loop\n loop = asyncio.get_running_loop()\n loop.stop()\n\nif __name__ == \"__main__\":\n asyncio.run(main())\n````\n",
"bugtrack_url": null,
"license": null,
"summary": "Robotics.dev AI robotics app builder",
"version": "0.0.9",
"project_urls": {
"Homepage": "https://robotics.dev",
"Issues": "https://github.com/chrismatthieu/robotics-pip/issues",
"Repository": "https://github.com/chrismatthieu/robotics-pip.git"
},
"split_keywords": [
"ai",
" ros",
" ros2",
" motors",
" robotics",
" robots",
" streaming",
" video"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "63d74334fdb1b907d85315f0537da5f6bc783268b4896c2a28b1b0a7d5d52eef",
"md5": "1e1d61435d13fcc08fa2e05891646dde",
"sha256": "97450c8031c5517364ae1c6b7f8228c0366fe36498f7c632e178aa382c8e1570"
},
"downloads": -1,
"filename": "robotics_dev-0.0.9-py3-none-any.whl",
"has_sig": false,
"md5_digest": "1e1d61435d13fcc08fa2e05891646dde",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.8",
"size": 6885,
"upload_time": "2025-02-28T21:06:30",
"upload_time_iso_8601": "2025-02-28T21:06:30.779064Z",
"url": "https://files.pythonhosted.org/packages/63/d7/4334fdb1b907d85315f0537da5f6bc783268b4896c2a28b1b0a7d5d52eef/robotics_dev-0.0.9-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "53a1b5f75fbfd1bf659ed374ae43882b16e4803cb832a477c8f560de82289c85",
"md5": "a8eb5c853821a565aa98a7d5f0a12704",
"sha256": "8f2533f9e0d8882bc295a86177cb7229cf7f2d3b76f995129a19f63df0c9880f"
},
"downloads": -1,
"filename": "robotics_dev-0.0.9.tar.gz",
"has_sig": false,
"md5_digest": "a8eb5c853821a565aa98a7d5f0a12704",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.8",
"size": 6503,
"upload_time": "2025-02-28T21:06:32",
"upload_time_iso_8601": "2025-02-28T21:06:32.194313Z",
"url": "https://files.pythonhosted.org/packages/53/a1/b5f75fbfd1bf659ed374ae43882b16e4803cb832a477c8f560de82289c85/robotics_dev-0.0.9.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-02-28 21:06:32",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "chrismatthieu",
"github_project": "robotics-pip",
"github_not_found": true,
"lcname": "robotics-dev"
}