Name | nanograph-sdk JSON |
Version |
0.1.12
JSON |
| download |
home_page | None |
Summary | Official Python SDK for Nanograph |
upload_time | 2025-07-11 22:19:02 |
maintainer | None |
docs_url | None |
author | None |
requires_python | >=3.8 |
license | None |
keywords |
nanograph
python
sdk
|
VCS |
 |
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
No Travis.
|
coveralls test coverage |
No coveralls.
|
# Nano SDK for Python
This package provides the Python implementation of the Nano SDK, allowing you to create node servers that communicate with the Nano orchestrator.
## Installation
The package is available on PyPI and can be installed using pip:
```bash
pip install nanograph-sdk
```
## Using with NanoCore
[NanoCore](https://github.com/nanograph/NanoCore) is the orchestrator for Nanograph servers. It manages both JavaScript and Python servers. To use your Python server with NanoCore:
1. Create a `nanoserver.json` in your project root (see [Configuration](#configuration) section for all options)
2. Register your server with [NanoCore](https://github.com/nanograph/NanoCore):
```bash
nanocore register /path/to/your/server
```
This will:
- Validate your `nanoserver.json`
- Create a Python virtual environment
- Install dependencies if you have a `requirements.txt`
3. Start all registered servers:
```bash
nanocore start
```
[NanoCore](https://github.com/nanograph/NanoCore) will:
- Start an asset server for file management
- Assign a port to your server
- Set up required environment variables (NANOCORE_ENDPOINT, NANOCORE_TOKEN)
- Start and monitor your server process
- Restart it if configuration changes
For additional commands and features, please refer to the [NanoCore documentation](https://github.com/nanograph/NanoCore).
## Usage
### Creating a Server
```python
from nanograph_sdk import NanoSDK
import asyncio
# Initialize SDK (configuration is loaded from nanoserver.json)
sdk = NanoSDK()
# Start the server
async def main():
await sdk.start()
print('Python Server started')
# Handle shutdown
async def shutdown_handler():
print('Python Server is shutting down')
# Add any cleanup logic here
sdk.on_shutdown(shutdown_handler)
# Graceful shutdown
async def run():
try:
await main()
except KeyboardInterrupt:
print('Interrupted, stopping server...')
finally:
await sdk.stop()
if __name__ == '__main__':
asyncio.run(run())
```
### Configuration
The SDK requires a `nanoserver.json` file in your project root. Here's a complete example with all available options:
```json
{
"domain": "local-python.nanograph", // Required: Domain to group servers
"serverName": "My Python Server", // Required: Name of your server
"serverUid": "my-python-server", // Required: Unique server identifier
"language": "python", // Required: Must be 'python'
"port": 3017, // Optional: HTTP port (default: 3017)
"nodesPath": "nodes", // Optional: Path to nodes directory
"autoWatch": true, // Optional: Auto-reload on changes
"watchDebounceTime": 500 // Optional: Debounce time for reload
}
```
| Key | Type | Default | Description |
|-------------------|-----------|-----------|--------------------------------------------------------------------|
| `domain` | `str` | — | Domain to group servers (required) |
| `serverName` | `str` | — | Name of your server (required) |
| `serverUid` | `str` | — | Unique server identifier (required) |
| `language` | `str` | — | Must be 'python' for Python servers (required) |
| `port` | `int` | `3017` | HTTP port to listen on |
| `nodesPath` | `str` | `'nodes'` | Path to the directory containing node files |
| `autoWatch` | `bool` | `True` | If true, automatically reload nodes on file changes |
| `watchDebounceTime`| `int` | `500` | Debounce time in milliseconds for file watcher reloads |
Note: The `port` can be overridden by setting the `PORT` environment variable.
### Asset Handling
The SDK provides built-in support for handling assets through the following methods:
```python
# Instance methods
await sdk.resolve_asset(ref, options) # Resolve an asset reference to data
sdk.get_asset_download_url(ref) # Get direct download URL
await sdk.get_asset_presigned_url(ref) # Get a presigned URL
await sdk.upload_asset(file, options) # Upload an asset
# Static methods (can be used without SDK instance)
await NanoSDK.resolve_asset_static(ref, options)
NanoSDK.get_asset_download_url_static(ref)
await NanoSDK.get_asset_presigned_url_static(ref)
await NanoSDK.upload_asset_static(file, options)
```
To use asset handling capabilities, the following environment variables must be set:
- `NANOCORE_ENDPOINT`: The endpoint URL for the Nanocore asset server
- `NANOCORE_TOKEN`: Authentication token for accessing the asset server
### Node Initialization
Nodes can have an optional async initialization function that will be called when the node is loaded:
```python
from nanograph_sdk import NanoSDK, NodeDefinition
# Define the node
definition = {
'uid': 'my-node',
'name': 'My Node',
# ... other definition fields ...
}
# Create node instance
node = NanoSDK.register_node(definition)
# Optional async initialization function
async def init(node_instance):
# Perform any async initialization here
# This will be called when the node is loaded
pass
# Export both the node and init function
export = node
```
### Creating Nodes
```python
from nanograph_sdk import NanoSDK, NodeDefinition, NodeInstance, ExecutionContext
# Define the node
definition = {
'uid': 'my-unique-python-node-id',
'name': 'My Python Node',
'category': 'Processing',
'version': '1.0.0',
'description': 'Description of my python node',
'inputs': [
{'name': 'input1', 'type': 'string', 'description': 'First input'}
],
'outputs': [
{'name': 'output1', 'type': 'string', 'description': 'First output'}
],
'parameters': [
{
'name': 'param1',
'type': 'boolean',
'value': True,
'default': True,
'label': 'Parameter 1',
'description': 'Description of parameter 1'
}
]
}
# Register the node
my_node = NanoSDK.register_node(definition)
# Implement the execution logic
async def execute_node(ctx: ExecutionContext):
# Get input values
input1 = ctx.inputs.get('input1', '')
# Send status update
await ctx.context['send_status']({'type': 'running', 'message': 'Processing...'})
# Check for abort
if ctx.context['is_aborted']():
raise Exception('Execution aborted')
# Process the inputs
output1 = f'Processed by Python: {input1}'
# Return the outputs
return {'output1': output1}
my_node['execute'] = execute_node
# To export the node if it's in its own file:
# export = my_node
Nodes are defined in `node.py` files. You can organize your nodes by placing each `node.py`
file (along with any helper modules it might need) into its own subdirectory within the
main `nodes` directory (or the path specified in `nodes_path` in the SDK configuration).
The SDK will scan these directories for `node.py` files to load the definitions.
---
## ExecutionContext Reference
When you implement a node's `execute` function, it receives a single argument: `ctx` (the execution context). This object provides everything your node needs to process inputs, parameters, and interact with the workflow engine.
**The `ExecutionContext` object has the following structure:**
| Field | Type | Description |
|---------------|---------------------|-----------------------------------------------------------------------------|
| `inputs` | `dict` | Input values for this node, keyed by input name. |
| `parameters` | `list` | List of parameter dicts for this node (see your node definition). |
| `context` | `dict` | Runtime context utilities and metadata (see below). |
### `ctx.context` fields
| Key | Type | Description |
|----------------|-------------|-----------------------------------------------------------------------------|
| `send_status` | `callable` | `await ctx.context['send_status']({...})` to send a status/progress update. |
| `is_aborted` | `callable` | `ctx.context['is_aborted']()` returns `True` if execution was aborted. |
| `graph_node` | `dict` | The full graph node definition (with position, etc). |
| `instance_id` | `str` | The workflow instance ID for this execution. |
**Example usage in a node:**
```python
async def execute_node(ctx):
# Access input
value = ctx.inputs.get('input1')
# Access parameter
param = next((p for p in ctx.parameters if p['name'] == 'param1'), None)
# Send a running status
await ctx.context['send_status']({'type': 'running', 'message': 'Working...'})
# Check for abort
if ctx.context['is_aborted']():
raise Exception('Aborted!')
# ...
```
---
## NodeStatus Reference
The `NodeStatus` object is used to communicate the current status, progress, or result of a node execution back to the orchestrator. You send it using `await ctx.context['send_status'](status)` from within your node's `execute` function.
**NodeStatus fields:**
| Field | Type | Description |
|------------|---------------------|----------------------------------------------------------------------|
| `type` | `str` | One of: `'idle'`, `'running'`, `'complete'`, `'error'`, `'missing'` |
| `message` | `str` (optional) | Human-readable status or error message |
| `progress` | `dict` (optional) | Progress info, e.g. `{ 'step': 2, 'total': 5 }` |
| `outputs` | `dict` (optional) | Output values (only for `'complete'` status) |
**Example: Sending progress updates from a node**
```python
async def execute_node(ctx):
total_steps = 5
for step in range(1, total_steps + 1):
# Abort fast if needed
if ctx.context['is_aborted']():
raise Exception('Aborted!')
# Simulate work
await asyncio.sleep(1)
# Send progress update
await ctx.context['send_status']({
'type': 'running',
'message': f'Processing step {step}/{total_steps}',
'progress': {'step': step, 'total': total_steps}
})
# Just return the outputs; the SDK will send the 'complete' status automatically
return {'result': 'done'}
```
> **Note:** You do **not** need to manually send a `'complete'` status at the end. The SDK will automatically send a `'complete'` status with the outputs you return from your `execute` function.
---
## Folder Structure
Recommended project structure for a Python NanoServer:
```
my-python-nodeserver/
├── main.py # Entry point
├── nanoserver.json # Server configuration (required)
├── nodes/ # Nodes directory (scans for node.py files in subdirectories)
│ ├── processing/ # Category directory (optional organization)
│ │ ├── simple_text_node/ # Directory for a single node
│ │ │ └── node.py # Node definition for simple_text_node
│ │ └── complex_math_node/ # Directory for a more complex node
│ │ ├── __init__.py # Optional, makes 'complex_math_node' a Python package
│ │ ├── node.py # Main node definition for complex_math_node
│ │ └── math_utils.py # Helper functions specific to this node
│ └── another_category/ # Another category directory
│ └── another_node/ # Directory for another_node
│ └── node.py # Node definition for another_node
├── pyproject.toml # Dependencies and package info
└── README.md
```
## License
MIT
Raw data
{
"_id": null,
"home_page": null,
"name": "nanograph-sdk",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.8",
"maintainer_email": null,
"keywords": "nanograph, python, sdk",
"author": null,
"author_email": "Nanograph <anthony@volted.ai>",
"download_url": "https://files.pythonhosted.org/packages/29/90/7397b7374e22f550858d25f7f1cd7a1222c32c78311a4f9ae3c67652765a/nanograph_sdk-0.1.12.tar.gz",
"platform": null,
"description": "# Nano SDK for Python\n\nThis package provides the Python implementation of the Nano SDK, allowing you to create node servers that communicate with the Nano orchestrator.\n\n## Installation\n\nThe package is available on PyPI and can be installed using pip:\n\n```bash\npip install nanograph-sdk\n```\n\n## Using with NanoCore\n\n[NanoCore](https://github.com/nanograph/NanoCore) is the orchestrator for Nanograph servers. It manages both JavaScript and Python servers. To use your Python server with NanoCore:\n\n1. Create a `nanoserver.json` in your project root (see [Configuration](#configuration) section for all options)\n2. Register your server with [NanoCore](https://github.com/nanograph/NanoCore):\n```bash\nnanocore register /path/to/your/server\n```\nThis will:\n- Validate your `nanoserver.json`\n- Create a Python virtual environment\n- Install dependencies if you have a `requirements.txt`\n\n3. Start all registered servers:\n```bash\nnanocore start\n```\n\n[NanoCore](https://github.com/nanograph/NanoCore) will:\n- Start an asset server for file management\n- Assign a port to your server\n- Set up required environment variables (NANOCORE_ENDPOINT, NANOCORE_TOKEN)\n- Start and monitor your server process\n- Restart it if configuration changes\n\nFor additional commands and features, please refer to the [NanoCore documentation](https://github.com/nanograph/NanoCore).\n\n## Usage\n\n### Creating a Server\n\n```python\nfrom nanograph_sdk import NanoSDK\nimport asyncio\n\n# Initialize SDK (configuration is loaded from nanoserver.json)\nsdk = NanoSDK()\n\n# Start the server\nasync def main():\n await sdk.start()\n print('Python Server started')\n\n# Handle shutdown\nasync def shutdown_handler():\n print('Python Server is shutting down')\n # Add any cleanup logic here\n\nsdk.on_shutdown(shutdown_handler)\n\n# Graceful shutdown\nasync def run():\n try:\n await main()\n except KeyboardInterrupt:\n print('Interrupted, stopping server...')\n finally:\n await sdk.stop()\n\nif __name__ == '__main__':\n asyncio.run(run())\n```\n\n### Configuration\n\nThe SDK requires a `nanoserver.json` file in your project root. Here's a complete example with all available options:\n\n```json\n{\n \"domain\": \"local-python.nanograph\", // Required: Domain to group servers\n \"serverName\": \"My Python Server\", // Required: Name of your server\n \"serverUid\": \"my-python-server\", // Required: Unique server identifier\n \"language\": \"python\", // Required: Must be 'python'\n \"port\": 3017, // Optional: HTTP port (default: 3017)\n \"nodesPath\": \"nodes\", // Optional: Path to nodes directory\n \"autoWatch\": true, // Optional: Auto-reload on changes\n \"watchDebounceTime\": 500 // Optional: Debounce time for reload\n}\n```\n\n| Key | Type | Default | Description |\n|-------------------|-----------|-----------|--------------------------------------------------------------------|\n| `domain` | `str` | \u2014 | Domain to group servers (required) |\n| `serverName` | `str` | \u2014 | Name of your server (required) |\n| `serverUid` | `str` | \u2014 | Unique server identifier (required) |\n| `language` | `str` | \u2014 | Must be 'python' for Python servers (required) |\n| `port` | `int` | `3017` | HTTP port to listen on |\n| `nodesPath` | `str` | `'nodes'` | Path to the directory containing node files |\n| `autoWatch` | `bool` | `True` | If true, automatically reload nodes on file changes |\n| `watchDebounceTime`| `int` | `500` | Debounce time in milliseconds for file watcher reloads |\n\nNote: The `port` can be overridden by setting the `PORT` environment variable.\n\n### Asset Handling\n\nThe SDK provides built-in support for handling assets through the following methods:\n\n```python\n# Instance methods\nawait sdk.resolve_asset(ref, options) # Resolve an asset reference to data\nsdk.get_asset_download_url(ref) # Get direct download URL\nawait sdk.get_asset_presigned_url(ref) # Get a presigned URL\nawait sdk.upload_asset(file, options) # Upload an asset\n\n# Static methods (can be used without SDK instance)\nawait NanoSDK.resolve_asset_static(ref, options)\nNanoSDK.get_asset_download_url_static(ref)\nawait NanoSDK.get_asset_presigned_url_static(ref)\nawait NanoSDK.upload_asset_static(file, options)\n```\n\nTo use asset handling capabilities, the following environment variables must be set:\n- `NANOCORE_ENDPOINT`: The endpoint URL for the Nanocore asset server\n- `NANOCORE_TOKEN`: Authentication token for accessing the asset server\n\n### Node Initialization\n\nNodes can have an optional async initialization function that will be called when the node is loaded:\n\n```python\nfrom nanograph_sdk import NanoSDK, NodeDefinition\n\n# Define the node\ndefinition = {\n 'uid': 'my-node',\n 'name': 'My Node',\n # ... other definition fields ...\n}\n\n# Create node instance\nnode = NanoSDK.register_node(definition)\n\n# Optional async initialization function\nasync def init(node_instance):\n # Perform any async initialization here\n # This will be called when the node is loaded\n pass\n\n# Export both the node and init function\nexport = node\n```\n\n### Creating Nodes\n\n```python\nfrom nanograph_sdk import NanoSDK, NodeDefinition, NodeInstance, ExecutionContext\n\n# Define the node\ndefinition = {\n 'uid': 'my-unique-python-node-id',\n 'name': 'My Python Node',\n 'category': 'Processing',\n 'version': '1.0.0',\n 'description': 'Description of my python node',\n 'inputs': [\n {'name': 'input1', 'type': 'string', 'description': 'First input'}\n ],\n 'outputs': [\n {'name': 'output1', 'type': 'string', 'description': 'First output'}\n ],\n 'parameters': [\n {\n 'name': 'param1',\n 'type': 'boolean',\n 'value': True,\n 'default': True,\n 'label': 'Parameter 1',\n 'description': 'Description of parameter 1'\n }\n ]\n}\n\n# Register the node\nmy_node = NanoSDK.register_node(definition)\n\n# Implement the execution logic\nasync def execute_node(ctx: ExecutionContext):\n # Get input values\n input1 = ctx.inputs.get('input1', '')\n \n # Send status update\n await ctx.context['send_status']({'type': 'running', 'message': 'Processing...'})\n \n # Check for abort\n if ctx.context['is_aborted']():\n raise Exception('Execution aborted')\n \n # Process the inputs\n output1 = f'Processed by Python: {input1}'\n \n # Return the outputs\n return {'output1': output1}\n\nmy_node['execute'] = execute_node\n\n# To export the node if it's in its own file:\n# export = my_node \n\nNodes are defined in `node.py` files. You can organize your nodes by placing each `node.py`\nfile (along with any helper modules it might need) into its own subdirectory within the\nmain `nodes` directory (or the path specified in `nodes_path` in the SDK configuration).\nThe SDK will scan these directories for `node.py` files to load the definitions.\n\n---\n\n## ExecutionContext Reference\n\nWhen you implement a node's `execute` function, it receives a single argument: `ctx` (the execution context). This object provides everything your node needs to process inputs, parameters, and interact with the workflow engine.\n\n**The `ExecutionContext` object has the following structure:**\n\n| Field | Type | Description |\n|---------------|---------------------|-----------------------------------------------------------------------------|\n| `inputs` | `dict` | Input values for this node, keyed by input name. |\n| `parameters` | `list` | List of parameter dicts for this node (see your node definition). |\n| `context` | `dict` | Runtime context utilities and metadata (see below). |\n\n### `ctx.context` fields\n\n| Key | Type | Description |\n|----------------|-------------|-----------------------------------------------------------------------------|\n| `send_status` | `callable` | `await ctx.context['send_status']({...})` to send a status/progress update. |\n| `is_aborted` | `callable` | `ctx.context['is_aborted']()` returns `True` if execution was aborted. |\n| `graph_node` | `dict` | The full graph node definition (with position, etc). |\n| `instance_id` | `str` | The workflow instance ID for this execution. |\n\n**Example usage in a node:**\n\n```python\nasync def execute_node(ctx):\n # Access input\n value = ctx.inputs.get('input1')\n # Access parameter\n param = next((p for p in ctx.parameters if p['name'] == 'param1'), None)\n # Send a running status\n await ctx.context['send_status']({'type': 'running', 'message': 'Working...'})\n # Check for abort\n if ctx.context['is_aborted']():\n raise Exception('Aborted!')\n # ...\n```\n\n---\n\n## NodeStatus Reference\n\nThe `NodeStatus` object is used to communicate the current status, progress, or result of a node execution back to the orchestrator. You send it using `await ctx.context['send_status'](status)` from within your node's `execute` function.\n\n**NodeStatus fields:**\n\n| Field | Type | Description |\n|------------|---------------------|----------------------------------------------------------------------|\n| `type` | `str` | One of: `'idle'`, `'running'`, `'complete'`, `'error'`, `'missing'` |\n| `message` | `str` (optional) | Human-readable status or error message |\n| `progress` | `dict` (optional) | Progress info, e.g. `{ 'step': 2, 'total': 5 }` |\n| `outputs` | `dict` (optional) | Output values (only for `'complete'` status) |\n\n**Example: Sending progress updates from a node**\n\n```python\nasync def execute_node(ctx):\n total_steps = 5\n for step in range(1, total_steps + 1):\n # Abort fast if needed\n if ctx.context['is_aborted']():\n raise Exception('Aborted!')\n # Simulate work\n await asyncio.sleep(1)\n # Send progress update\n await ctx.context['send_status']({\n 'type': 'running',\n 'message': f'Processing step {step}/{total_steps}',\n 'progress': {'step': step, 'total': total_steps}\n })\n # Just return the outputs; the SDK will send the 'complete' status automatically\n return {'result': 'done'}\n```\n\n> **Note:** You do **not** need to manually send a `'complete'` status at the end. The SDK will automatically send a `'complete'` status with the outputs you return from your `execute` function.\n\n---\n\n## Folder Structure\n\nRecommended project structure for a Python NanoServer:\n\n```\nmy-python-nodeserver/\n\u251c\u2500\u2500 main.py # Entry point\n\u251c\u2500\u2500 nanoserver.json # Server configuration (required)\n\u251c\u2500\u2500 nodes/ # Nodes directory (scans for node.py files in subdirectories)\n\u2502 \u251c\u2500\u2500 processing/ # Category directory (optional organization)\n\u2502 \u2502 \u251c\u2500\u2500 simple_text_node/ # Directory for a single node\n\u2502 \u2502 \u2502 \u2514\u2500\u2500 node.py # Node definition for simple_text_node\n\u2502 \u2502 \u2514\u2500\u2500 complex_math_node/ # Directory for a more complex node\n\u2502 \u2502 \u251c\u2500\u2500 __init__.py # Optional, makes 'complex_math_node' a Python package\n\u2502 \u2502 \u251c\u2500\u2500 node.py # Main node definition for complex_math_node\n\u2502 \u2502 \u2514\u2500\u2500 math_utils.py # Helper functions specific to this node\n\u2502 \u2514\u2500\u2500 another_category/ # Another category directory\n\u2502 \u2514\u2500\u2500 another_node/ # Directory for another_node\n\u2502 \u2514\u2500\u2500 node.py # Node definition for another_node\n\u251c\u2500\u2500 pyproject.toml # Dependencies and package info\n\u2514\u2500\u2500 README.md\n```\n\n## License\n\nMIT\n",
"bugtrack_url": null,
"license": null,
"summary": "Official Python SDK for Nanograph",
"version": "0.1.12",
"project_urls": {
"Bug Tracker": "https://github.com/nanograph/sdk-py/issues",
"Documentation": "https://github.com/nanograph/sdk-py",
"Homepage": "https://github.com/nanograph/sdk-py",
"Repository": "https://github.com/nanograph/sdk-py.git"
},
"split_keywords": [
"nanograph",
" python",
" sdk"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "b7555cd9c074eca3e4a97a7d3bdb46c7615f795b4854663f80801d46f968e24e",
"md5": "37ecd8b344206dab767322990ee434f5",
"sha256": "31ca052d61ee6b6a5b59f98ee9024f278c7dfa561006a89e342fe0170942c4b2"
},
"downloads": -1,
"filename": "nanograph_sdk-0.1.12-py3-none-any.whl",
"has_sig": false,
"md5_digest": "37ecd8b344206dab767322990ee434f5",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.8",
"size": 28591,
"upload_time": "2025-07-11T22:19:00",
"upload_time_iso_8601": "2025-07-11T22:19:00.886025Z",
"url": "https://files.pythonhosted.org/packages/b7/55/5cd9c074eca3e4a97a7d3bdb46c7615f795b4854663f80801d46f968e24e/nanograph_sdk-0.1.12-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "29907397b7374e22f550858d25f7f1cd7a1222c32c78311a4f9ae3c67652765a",
"md5": "1a96872c9ae6aa6677b29894604e1058",
"sha256": "3ec8c543cf7699427ef69deb0e2b32f4e443fefa5134eed9e1d1b509ec3c008d"
},
"downloads": -1,
"filename": "nanograph_sdk-0.1.12.tar.gz",
"has_sig": false,
"md5_digest": "1a96872c9ae6aa6677b29894604e1058",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.8",
"size": 21488,
"upload_time": "2025-07-11T22:19:02",
"upload_time_iso_8601": "2025-07-11T22:19:02.039989Z",
"url": "https://files.pythonhosted.org/packages/29/90/7397b7374e22f550858d25f7f1cd7a1222c32c78311a4f9ae3c67652765a/nanograph_sdk-0.1.12.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-07-11 22:19:02",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "nanograph",
"github_project": "sdk-py",
"github_not_found": true,
"lcname": "nanograph-sdk"
}