Name | cuteagent JSON |
Version |
0.2.23
JSON |
| download |
home_page | None |
Summary | Computer Use Task Execution Agent |
upload_time | 2025-09-05 07:37:15 |
maintainer | None |
docs_url | None |
author | None |
requires_python | >=3.8 |
license | MIT License |
keywords |
cuteagent
openai
|
VCS |
 |
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
No Travis.
|
coveralls test coverage |
No coveralls.
|
# 🐾 CuteAgent
**Computer Use Task Execution Agent**
*A Python library for building, orchestrating, and integrating computer-use AI agents in agentic workflows.*
---
[](https://pypi.org/project/cuteagent/)
[](https://www.python.org/)
[](LICENSE)
---
# CuteAgent - Complete Agent Suite for LangGraph Workflows
**CuteAgent** provides three powerful agents for building comprehensive LangGraph workflows:
- **🤖 StationAgent**: Shared state management and workflow coordination
- **🖥️ WindowsAgent**: Computer use automation on Windows servers
- **🧭 VisionAgent**: Vision model integration for GUI element grounding and coordinate extraction
- **👥 HumanAgent**: Human-in-the-loop (HITL) task management
Together, these agents enable complete automation workflows where AI performs computer tasks, humans provide oversight and decisions, and shared state coordinates everything seamlessly.
## 🧭 VisionAgent - Vision Grounding and GUI Coordinate Extraction
**VisionAgent** integrates with vision models to locate UI elements in screenshots and return click coordinates.
### 🚀 Key Features
- Claude Integration: Send a screenshot and element description to Claude and get grounded coordinates
- Hugging Face GUI Model: Use OpenAI-compatible endpoint to parse GUI and extract coordinates via `find_element`
- Screen Scaling: Converts model coordinates to your actual screen resolution
### 🔧 Quick Start
```python
from cuteagent import VisionAgent
# Initialize with screen size and optional Anthropic API key via env
# export ANTHROPIC_API_KEY=... in your environment or .env
vision = VisionAgent(screen_size=(1366, 768))
# Claude grounding: description + image URL → (x,y)
image_url = "https://datacollectionfintor.s3.amazonaws.com/screenshot_20250517_180131.png"
description = {
"name": "Reports",
"description": "Reports tab on the top navigation bar",
"element_type": "tab"
}
coords = vision.claude(image_url, description)
print("Claude coordinates:", coords) # e.g. "(339,66)" or "NOT FOUND" or "ERROR: ..."
# Hugging Face GUI model: element name + screenshot URL
vision_hf = VisionAgent(model_selected="FINTOR_GUI")
coords_hf = vision_hf.find_element(
screenshot_url=image_url,
element_name="click on Reports tab on navigation bar"
)
print("HF model coordinates:", coords_hf)
```
### ⚙️ Configuration
```bash
# Anthropic (Claude) - required for VisionAgent.claude
export ANTHROPIC_API_KEY="your-anthropic-key"
# Hugging Face GUI endpoint (optional; defaults are provided)
export HF_TOKEN="your-hf-token"
```
### 📝 Notes
- `VisionAgent.claude(...)` returns:
- "(x,y)" coordinate string on success
- "NOT FOUND" if element cannot be grounded
- "ERROR: ..." for explicit error messaging (e.g., missing key, invalid media type)
- Coordinates are clamped to your `screen_size` and converted if the model returns a different reference size.
## 📦 Installation
```bash
pip install cuteagent
```
---
# 🤖 StationAgent - Shared State Management
**StationAgent** provides shared state management and server coordination for LangGraph workflows. It integrates with a SharedState API to enable multiple workflow instances to coordinate, share data, and manage server resources efficiently.
## 🚀 Key Features
- **Shared State Management**: Sync variables between multiple LangGraph workflow instances
- **Server Coordination**: Prevent conflicts with "busy"/"idle" server status management
- **Workflow Resumption**: Handle interrupted workflows with thread ID tracking
- **Reserved Variable Protection**: Secure server management variables from user modification (`server`, `serverThread`, `serverCheckpoint`, `serverTaskType`)
- **Multi-Server Management**: Manages state for multiple servers using array-based variables.
- **LangGraph Integration**: Seamless integration with LangGraph state objects
- **Error Handling**: Robust retry logic and comprehensive error handling
## 🔧 Quick Start
### 1. Add Shared State to Your LangGraph State Class
```python
from dataclasses import dataclass, field
from typing import Dict, Any, Optional
@dataclass
class State:
# Your existing fields...
current_node: float = 0
user_input: str = ""
stationThreadId: str = ""
# Add this field for SharedState integration
sharedState: Optional[Dict[str, Any]] = field(default_factory=dict)
```
### 2. Initialize StationAgent in Your LangGraph Nodes
⚠️ **IMPORTANT**: All StationAgent operations use synchronous HTTP calls internally and must be wrapped in `asyncio.to_thread()` to prevent blocking the ASGI event loop.
```python
from cuteagent import StationAgent
import asyncio
async def your_langgraph_node(state: State, config: RunnableConfig) -> State:
# Initialize StationAgent - MUST use asyncio.to_thread()
agent = await asyncio.to_thread(
StationAgent,
station_thread_id=state.stationThreadId,
graph_thread_id=config.get("thread_id"),
token=config.get("shared_state_token", "your-api-token"),
langgraph_token=config.get("langgraph_token") # Required for pause/unpause functionality
)
# 🔄 Agent now has agent.initial_state with any existing variables
# Sync shared state variables to LangGraph state - MUST use asyncio.to_thread()
state = await asyncio.to_thread(agent.state.sync_all, state)
# Check what initial state was loaded (optional)
if agent.initial_state:
print(f"Loaded {len(agent.initial_state)} existing variables")
# initial_state now contains arrays for server management
print(f"Server states: {agent.initial_state['server']}")
# Your node logic here...
# Update shared state - MUST use asyncio.to_thread()
await asyncio.to_thread(agent.state.set, "currentNode", "processing")
await asyncio.to_thread(agent.state.set, "timestamp", "2024-01-01T12:00:00Z")
return state
```
## 📊 Sync Patterns
StationAgent provides three sync patterns that update your LangGraph state and return the updated state object. **All sync operations must use `asyncio.to_thread()`**:
### Pattern 1: Sync Single Variable
```python
state = await asyncio.to_thread(agent.state.sync, "variableName", state)
```
### Pattern 2: Sync Multiple Variables
```python
state = await asyncio.to_thread(agent.state.sync_multiple, ["var1", "var2", "var3"], state)
```
### Pattern 3: Sync All Variables
```python
state = await asyncio.to_thread(agent.state.sync_all, state)
```
---
# 🖥️ WindowsAgent - Computer Use Automation
**WindowsAgent** enables computer use automation on Windows servers maintained by Fintor. It provides methods for clicking, taking screenshots, and performing other computer tasks remotely.
## 🚀 Key Features
- **Remote Computer Control**: Click, pause, and interact with Windows servers
- **Screenshot Capabilities**: Full and cropped screenshots with URL responses
- **Async Integration**: Thread-safe operations for LangGraph workflows
- **Error Resilience**: Graceful handling of server issues
- **Coordinate-based Actions**: Precise control with x,y coordinates
## 🔧 Quick Start
### 1. Initialize WindowsAgent
⚠️ **IMPORTANT**: All WindowsAgent operations use synchronous HTTP calls internally and must be wrapped in `asyncio.to_thread()` to prevent blocking the ASGI event loop.
```python
from cuteagent import WindowsAgent
import asyncio
async def windows_automation_node(state: State, config: RunnableConfig) -> State:
configuration = config["configurable"]
# Initialize WindowsAgent with server URL
os_url = configuration.get("os_url", "https://your-windows-server.ngrok.app")
agent = WindowsAgent(os_url=os_url)
try:
# Click at specific coordinates - MUST use asyncio.to_thread()
await asyncio.to_thread(agent.click_element, 100, 200)
# Wait/pause - MUST use asyncio.to_thread()
await asyncio.to_thread(agent.pause, 3)
# Take a full screenshot - MUST use asyncio.to_thread()
screenshot_result = await asyncio.to_thread(agent.screenshot)
if isinstance(screenshot_result, dict) and "url" in screenshot_result:
state.screenshot_url = screenshot_result["url"]
else:
state.screenshot_url = screenshot_result
# Take a cropped screenshot [x, y, width, height] - MUST use asyncio.to_thread()
cropped_result = await asyncio.to_thread(
agent.screenshot_cropped,
[10, 200, 1000, 450]
)
print(f"Screenshot URL: {state.screenshot_url}")
except Exception as e:
print(f"Windows automation error: {e}")
# Continue workflow even on errors
state.current_node = 8
return state
```
## 📖 WindowsAgent API Reference
### Constructor
```python
WindowsAgent(os_url: str)
```
**Parameters:**
- `os_url` (str): URL of the Windows server (e.g., "https://server.ngrok.app")
### Methods
⚠️ **All methods must be wrapped in `asyncio.to_thread()` when called from async LangGraph nodes.**
#### `agent.click_element(x: int, y: int)`
Click at specific screen coordinates.
```python
await asyncio.to_thread(agent.click_element, 150, 300)
```
#### `agent.pause(seconds: int)`
Pause execution for specified seconds.
```python
await asyncio.to_thread(agent.pause, 5)
```
#### `agent.screenshot()`
Take a full screenshot of the desktop.
```python
result = await asyncio.to_thread(agent.screenshot)
# Returns: {"url": "https://..."} or URL string
```
#### `agent.screenshot_cropped(coordinates: List[int])`
Take a cropped screenshot with [x, y, width, height] coordinates.
```python
result = await asyncio.to_thread(agent.screenshot_cropped, [10, 50, 800, 600])
# Returns: {"url": "https://..."} or URL string
```
---
# 👥 HumanAgent - Human-in-the-Loop Task Management
**HumanAgent** integrates with Fintor's HITL service to bring humans into LangGraph workflows for approvals, decisions, and oversight. Responses are processed manually outside of CuteAgent and update shared state via StationAgent.
## 🚀 Key Features
- **Task Submission**: Send tasks with questions and images to humans
- **Choice-based Responses**: Multiple choice questions with predefined options
- **Image Support**: Include screenshots and visual content for human review
- **External State Updates**: Human responses processed outside the system
- **Task Type Management**: Categorize tasks with custom task types
- **Reporting**: Report workflow results back to human operators
## 🔧 Quick Start
### 1. Send Task for Human Review
⚠️ **IMPORTANT**: All HumanAgent operations use synchronous HTTP calls internally and must be wrapped in `asyncio.to_thread()` to prevent blocking the ASGI event loop.
```python
from cuteagent import HumanAgent
import asyncio
async def send_human_task_node(state: State, config: RunnableConfig) -> State:
"""Send a task to humans for review with image and questions."""
configuration = config["configurable"]
hitl_token = configuration.get("hitl_token", os.getenv("HITL_TOKEN"))
agent = HumanAgent(
HITL_token=hitl_token,
HITL_url="https://d5x1qrpuf7.execute-api.us-west-1.amazonaws.com/prod/"
)
# Prepare the human review task
image_urls = [state.screenshot_url] if state.screenshot_url else []
question_text = f"Agent found {len(state.borrower_names)} borrowers with Document Date.\n"
question_text += "Please review and approve:\n"
for borrower in state.borrower_names:
question_text += f"- {borrower}\n"
question_text += "\nDo you approve this decision?"
questions = [{
"Question": question_text,
"Choices": ["TRUE", "FALSE"]
}]
thread_id = configuration.get("thread_id", str(uuid.uuid4()))
# Create state data for HITL system
state_dict = {
"user_input": state.user_input,
"current_node": state.current_node,
"borrower_names": state.borrower_names,
"stationThreadId": state.stationThreadId
}
try:
# Send task to human agent - MUST use asyncio.to_thread()
result = await asyncio.to_thread(
agent.task,
image_urls=image_urls,
thread_id=thread_id,
questions=questions,
task_type="S1-T1", # Your task type
thread_state=state_dict
)
print(f"Human task sent successfully for thread: {thread_id}")
# Store pending review info for interrupt
state.pending_review_info = {
"screenshot_url": state.screenshot_url,
"borrower_names": state.borrower_names,
"instructions": "Review extracted borrower names and respond via HITL system",
"thread_id_of_task": thread_id
}
except Exception as e:
print(f"Error sending human task: {e}")
# Continue workflow or handle error appropriately
state.current_node = 10.5
return state
```
### 2. Report Results to Humans
```python
async def report_to_human_node(state: State, config: RunnableConfig) -> State:
"""Report final workflow results to human operators."""
configuration = config["configurable"]
hitl_token = configuration.get("hitl_token", os.getenv("HITL_TOKEN"))
agent = HumanAgent(
HITL_token=hitl_token,
HITL_url="https://d5x1qrpuf7.execute-api.us-west-1.amazonaws.com/prod/"
)
thread_id = configuration.get("thread_id")
# Prepare final state report
state_dict = {
"user_input": state.user_input,
"current_node": state.current_node,
"screenshot_url": state.screenshot_url,
"borrower_names": state.borrower_names,
"human_review_decision": state.human_review_decision,
"status": state.status,
"stationThreadId": state.stationThreadId
}
try:
# Report final results - MUST use asyncio.to_thread()
result = await asyncio.to_thread(
agent.reporting,
thread_id=thread_id,
report_type="S1-R1", # Your report type
thread_state=state_dict
)
print(f"Results reported to human agent: {result}")
except Exception as e:
print(f"Error reporting to human agent: {e}")
state.current_node = 12
return state
```
## 📖 HumanAgent API Reference
### Constructor
```python
HumanAgent(HITL_token: str, HITL_url: str)
```
**Parameters:**
- `HITL_token` (str): Authentication token for HITL service
- `HITL_url` (str): URL of the HITL service API
### Methods
⚠️ **All methods must be wrapped in `asyncio.to_thread()` when called from async LangGraph nodes.**
#### `agent.task(image_urls, thread_id, questions, task_type, thread_state)`
Send a task to humans for review and decision.
**Usage:**
```python
result = await asyncio.to_thread(
agent.task,
image_urls=image_urls,
thread_id=thread_id,
questions=questions,
task_type=task_type,
thread_state=thread_state
)
```
**Parameters:**
- `image_urls` (List[str]): URLs of images (e.g., screenshots) for human review
- `thread_id` (str): Unique thread identifier for the task
- `questions` (List[Dict]): Questions with choices for humans to answer
- `task_type` (str): Category/type of the task (e.g., "S1-T1", "S2-T3")
- `thread_state` (Dict): Current workflow state data
**Questions Format:**
```python
questions = [{
"Question": "Do you approve these borrower names?",
"Choices": ["TRUE", "FALSE"]
}]
```
#### `agent.reporting(thread_id, report_type, thread_state)`
Report workflow results and final state to human operators.
**Usage:**
```python
result = await asyncio.to_thread(
agent.reporting,
thread_id=thread_id,
report_type=report_type,
thread_state=thread_state
)
```
**Parameters:**
- `thread_id` (str): Thread identifier for the report
- `report_type` (str): Type of report (e.g., "S1-R1", "FINAL")
- `thread_state` (Dict): Final workflow state and results
---
# 🔄 Complete Multi-Agent Workflow Example
Here's a complete example showing all three agents working together with proper async handling:
```python
from dataclasses import dataclass, field
from typing import Any, Dict, Optional, List
import asyncio
import uuid
from cuteagent import StationAgent, WindowsAgent, HumanAgent
@dataclass
class WorkflowState:
current_node: float = 0
user_input: str = ""
stationThreadId: str = ""
borrower_names: List[str] = field(default_factory=list)
screenshot_url: str | None = None
status: str = "Ongoing"
human_review_decision: str | None = None
pending_review_info: Optional[Dict[str, Any]] = None
# Required for StationAgent integration
sharedState: Optional[Dict[str, Any]] = field(default_factory=dict)
async def complete_workflow_node(state: WorkflowState, config) -> WorkflowState:
"""Complete workflow using all three agents with proper async handling."""
configuration = config["configurable"]
# 1. Initialize StationAgent for coordination with initial workflow state
initial_workflow_state = {
"workflowType": "complete_multi_agent",
"startTime": "2024-01-01T12:00:00Z",
"workflowStatus": "active"
}
station_agent = await asyncio.to_thread(
StationAgent,
station_thread_id=state.stationThreadId or "main-workflow",
graph_thread_id=configuration.get("thread_id"),
token=configuration.get("shared_state_token"),
initial_state=initial_workflow_state,
langgraph_token=configuration.get("langgraph_token")
)
# 2. Sync shared state to get latest workflow data
state = await asyncio.to_thread(station_agent.state.sync_all, state)
# 3. Check server availability and load for computer use
server_status = await asyncio.to_thread(station_agent.server.avail)
if server_status.get("server") == "idle":
load_result = await asyncio.to_thread(station_agent.server.load, "screenshot_task")
if load_result["status"] == "loaded":
# 4. Use WindowsAgent for computer automation
os_url = configuration.get("os_url")
windows_agent = WindowsAgent(os_url=os_url)
try:
# Perform computer tasks
await asyncio.to_thread(windows_agent.click_element, 294, 98)
await asyncio.to_thread(windows_agent.pause, 2)
# Take screenshot for human review
screenshot_result = await asyncio.to_thread(
windows_agent.screenshot_cropped,
[10, 200, 1000, 450]
)
if isinstance(screenshot_result, dict):
state.screenshot_url = screenshot_result["url"]
else:
state.screenshot_url = screenshot_result
except Exception as e:
print(f"Windows automation error: {e}")
# 5. Send task to HumanAgent for review
hitl_token = configuration.get("hitl_token")
human_agent = HumanAgent(
HITL_token=hitl_token,
HITL_url="https://d5x1qrpuf7.execute-api.us-west-1.amazonaws.com/prod/"
)
questions = [{
"Question": f"Screenshot taken successfully. Proceed with processing?",
"Choices": ["APPROVE", "REJECT"]
}]
thread_id = configuration.get("thread_id")
state_dict = {
"screenshot_url": state.screenshot_url,
"current_node": state.current_node,
"stationThreadId": state.stationThreadId
}
try:
await asyncio.to_thread(
human_agent.task,
image_urls=[state.screenshot_url] if state.screenshot_url else [],
thread_id=thread_id,
questions=questions,
task_type="S1-T1",
thread_state=state_dict
)
print("Human review task sent successfully")
except Exception as e:
print(f"Human task error: {e}")
# 6. Update shared state with workflow progress
await asyncio.to_thread(station_agent.state.push, {
"lastCompletedNode": state.current_node,
"screenshotTaken": True,
"humanTaskSent": True,
"workflowStatus": "awaiting_human_review"
})
# 7. Unload server when done
await asyncio.to_thread(station_agent.server.unload)
else:
print("Server is busy, waiting...")
# 8. Sync final state back to LangGraph
state = await asyncio.to_thread(station_agent.state.sync_all, state)
state.current_node += 1
return state
```
This example demonstrates how all three agents work together with proper async handling:
- **StationAgent** coordinates shared state and server access for multiple servers
- **WindowsAgent** performs computer automation tasks
- **HumanAgent** provides human oversight and decision-making
---
# 📋 StationAgent Detailed API Reference
## Constructor and Initialization
### `StationAgent(station_thread_id, graph_thread_id, token, initial_state=None, langgraph_token=None)`
Create a new StationAgent instance with initial state push capability.
⚠️ **IMPORTANT**: Constructor must be wrapped in `asyncio.to_thread()` in async contexts.
```python
# Correct async usage
agent = await asyncio.to_thread(
StationAgent,
station_thread_id="workflow-123",
graph_thread_id="thread-456",
token="your-shared-state-token",
initial_state=initial_state, # optional
langgraph_token="your-langgraph-token" # required for pause/unpause functionality
)
```
**Parameters:**
- `station_thread_id` (str): Identifier for the station/workflow instance
- `graph_thread_id` (str): LangGraph thread identifier
- `token` (str): Authentication token for SharedState API
- `initial_state` (dict, optional): Initial state object to push to SharedState API
- `langgraph_token` (str, optional): Authentication token for LangGraph API. Required for `pause()` and `unpause()` functionality.
**Automatic Initialization:**
- Automatically pushes initial_state to SharedState API during initialization (if provided)
- Automatically adds `server`, `serverThread`, `serverCheckpoint`, and `serverTaskType` as arrays to `initial_state` to manage 4 servers by default.
- Stores enhanced initial_state in `agent.initial_state` attribute for easy access
- Provides console feedback about pushed variables
**Attributes:**
- `agent.initial_state` (dict): Dictionary of initial state with server variables automatically added
**Example:**
```python
# Initialize agent with initial state
initial_workflow_state = {
"workflowId": "wf-123",
"currentStep": "start",
"userInput": "process this data"
}
agent = await asyncio.to_thread(
StationAgent,
"workflow-123",
"thread-456",
"token",
initial_state=initial_workflow_state
)
# Check what was automatically enhanced (server variables added)
print(f"Initial variables: {list(agent.initial_state.keys())}")
# Output: ['workflowId', 'currentStep', 'userInput', 'server', 'serverThread', 'serverCheckpoint', 'serverTaskType']
print(f"Workflow ID: {agent.initial_state['workflowId']}")
print(f"Server status: {agent.initial_state['server']}") # ['idle', 'idle', 'idle', 'idle']
print(f"Server thread: {agent.initial_state['serverThread']}") # ['idle', 'idle', 'idle', 'idle']
# Initialize without initial state
agent_empty = await asyncio.to_thread(StationAgent, "workflow-456", "thread-789", "token")
print(f"No initial state: {agent_empty.initial_state}") # None
```
## State Management Methods
⚠️ **All state methods must be wrapped in `asyncio.to_thread()` when called from async LangGraph nodes.**
### `agent.state.sync(variable_name, langgraph_state=None)`
Sync single variable from SharedState API to LangGraph state.
```python
# Returns updated state object
state = await asyncio.to_thread(agent.state.sync, "currentStep", state)
# Returns just the variable value (backward compatibility)
value = await asyncio.to_thread(agent.state.sync, "currentStep")
```
### `agent.state.sync_multiple(variable_names, langgraph_state=None)`
Sync multiple variables from SharedState API to LangGraph state.
```python
state = await asyncio.to_thread(agent.state.sync_multiple, ["var1", "var2", "var3"], state)
```
### `agent.state.sync_all(langgraph_state)`
Sync all variables from SharedState API to LangGraph state.
```python
state = await asyncio.to_thread(agent.state.sync_all, state)
```
### `agent.state.set(variable_name, value)`
Create or update a single variable in SharedState API.
```python
await asyncio.to_thread(agent.state.set, "currentStep", "processing")
await asyncio.to_thread(agent.state.set, "userPrefs", {"theme": "dark"})
```
### `agent.state.get(variable_name)`
Get a single variable from SharedState API.
```python
current_step = await asyncio.to_thread(agent.state.get, "currentStep") # Returns value or None
```
### `agent.state.push(variables_dict)`
Bulk create/update multiple variables in SharedState API.
```python
await asyncio.to_thread(agent.state.push, {
"workflowId": "wf-123",
"status": "processing",
"data": {"key": "value"}
})
```
### `agent.state.pull()`
Get all variables from SharedState API.
```python
all_vars = await asyncio.to_thread(agent.state.pull) # Returns dict of all variables
```
### `agent.state.delete(variable_name)`
Delete a variable from SharedState API.
```python
await asyncio.to_thread(agent.state.delete, "temporary_data")
```
### `agent.state.exists(variable_name)`
Check if a variable exists in SharedState API.
```python
exists = await asyncio.to_thread(agent.state.exists, "userPreferences")
if exists:
prefs = await asyncio.to_thread(agent.state.get, "userPreferences")
```
### `agent.state.list_variables()`
Get list of all variable names.
```python
var_names = await asyncio.to_thread(agent.state.list_variables) # Returns list of strings
```
## Server Management Methods
⚠️ **All server methods must be wrapped in `asyncio.to_thread()` when called from async LangGraph nodes. They now operate on a specific server via an index.**
### `agent.server.load(serverThreadId, serverCheckpoint="setup", serverIndex=0, serverTaskType="taskPlaceholder")`
Load a specific server for a task. The server must be in "idle" status and have the expected checkpoint.
**Parameters:**
- `serverThreadId` (str): The thread ID to assign to the server when loaded
- `serverCheckpoint` (str, optional): The checkpoint to verify before loading. Defaults to "setup"
- `serverIndex` (int, optional): The index of the server to load (0-3). Defaults to 0
- `serverTaskType` (str, optional): The task type to assign. Defaults to "taskPlaceholder"
```python
result = await asyncio.to_thread(
agent.server.load,
serverThreadId="GetNames",
serverCheckpoint="setup",
serverIndex=0,
serverTaskType="data_processing"
)
# Returns: {"status": "loaded", "serverThread": "GetNames"}
# or {"status": "busy", "error": "Server is busy"}
# or {"status": "wrongCheckpoint", "error": "Incorrect checkpoint. Expected setup, got running"}
# or {"status": "error", "error": "serverIndex 0 is out of bounds."}
```
### `agent.server.unload(checkpoint="setup", index=0)`
Unload a server and set it to idle with a new checkpoint. The server must be in "busy" status.
**Parameters:**
- `checkpoint` (str, optional): The checkpoint to set after unloading. Defaults to "setup"
- `index` (int, optional): The index of the server to unload (0-3). Defaults to 0
```python
result = await asyncio.to_thread(agent.server.unload, checkpoint="completed", index=0)
# With default checkpoint:
result = await asyncio.to_thread(agent.server.unload, index=0) # Uses "setup" as default
# Returns: {"status": "unloaded"}
# or {"status": "idle", "error": "Server is already idle"}
# or {"status": "error", "error": "serverIndex 0 is out of bounds."}
```
### `agent.server.avail(index=0)`
Get availability status for a specific server.
**Parameters:**
- `index` (int, optional): The index of the server to check (0-3). Defaults to 0
```python
status = await asyncio.to_thread(agent.server.avail, index=0)
# Returns: {
# "server": "busy|idle",
# "serverThread": "GetNames|idle",
# "serverCheckpoint": "setup|running|completed",
# "serverTaskType": "data_processing|taskPlaceholder"
# }
# or {"status": "error", "error": "Server state is not initialized correctly as arrays."}
```
## 🔒 Reserved Variables
StationAgent protects these variables from user modification:
* **`server`**: Array of server statuses ("busy" or "idle" only)
* **`serverThread`**: Array of current task threads when server is busy
* **`serverCheckpoint`**: Array of server checkpoints
* **`serverTaskType`**: Array of server task types
These can only be modified through server management methods:
* `agent.server.load(...)` - Sets a server to "busy"
* `agent.server.unload(...)` - Sets a server to "idle"
```python
# ❌ This will raise ValueError
await asyncio.to_thread(agent.state.set, "server", "custom_status")
# ✅ This is the correct way
await asyncio.to_thread(agent.server.load, serverThreadId="my_task_thread") # Sets server 0 to "busy"
```
---
## ⚙️ Configuration
### Environment Variables
```bash
# StationAgent
export SHARED_STATE_URL="https://your-api.amazonaws.com/prod"
export SHARED_STATE_TOKEN="your-shared-state-api-token"
export LANGGRAPH_TOKEN="your-langgraph-api-token"
# HumanAgent
export HITL_TOKEN="your-hitl-token"
# WindowsAgent (configured per workflow)
# os_url provided in LangGraph configuration
```
### LangGraph Configuration
```python
config = {
"configurable": {
"shared_state_token": "your-shared-state-api-token",
"langgraph_token": "your-langgraph-api-token", # Required for pause/unpause functionality
"hitl_token": "your-hitl-token",
"os_url": "https://your-windows-server.ngrok.app",
"thread_id": "your-langgraph-thread-id"
}
}
```
## 🚨 Error Handling
### StationAgent
- **Network Retries**: 3 attempts with exponential backoff
- **Authentication Errors**: Clear messages for invalid tokens
- **Reserved Variable Protection**: ValueError for protected variables
- **Blocking Call Prevention**: All operations must use `asyncio.to_thread()` in async contexts
### WindowsAgent
- **Connection Issues**: Graceful failure with workflow continuation
- **Server Errors**: Exception handling with logging
- **Timeout Handling**: Async operations with proper error propagation
- **Blocking Call Prevention**: All operations must use `asyncio.to_thread()` in async contexts
### HumanAgent
- **Service Issues**: Contact support_eng@fintor.com
- **Task Failures**: Manual processing required outside the system
- **Response Processing**: Done manually outside CuteAgent
- **Blocking Call Prevention**: All operations must use `asyncio.to_thread()` in async contexts
```python
try:
state = await asyncio.to_thread(agent.state.sync_all, state)
except ValueError as e:
# Handle reserved variable violations
print(f"Configuration error: {e}")
except Exception as e:
# Handle network/API errors
print(f"Network error: {e}")
# Continue with workflow using existing state
```
## 📚 Best Practices for Multi-Agent Workflows
1. **Always use `asyncio.to_thread()`** for all CuteAgent operations in async LangGraph nodes
2. **Initialize StationAgent first** in each node for state coordination
3. **Check server availability** before WindowsAgent operations
4. **Use HumanAgent for critical decisions** and quality assurance
5. **Include screenshots** in human tasks for better context
6. **Handle errors gracefully** - workflows should be resilient
7. **Update shared state regularly** for workflow coordination
8. **Use meaningful task types** for HumanAgent categorization
9. **Clean up resources** - unload servers when done
10. **Test blocking call prevention** - ensure no "Blocking call to socket.socket.connect" errors
## 🚨 Critical Async Requirements
**ALL CuteAgent operations use synchronous HTTP calls internally and MUST be wrapped in `asyncio.to_thread()` when used in async LangGraph nodes to prevent blocking the ASGI event loop.**
### ✅ Correct Usage:
```python
# StationAgent
agent = await asyncio.to_thread(StationAgent, station_id, graph_id, token)
state = await asyncio.to_thread(agent.state.sync_all, state)
await asyncio.to_thread(agent.state.set, "key", "value")
# HumanAgent
await asyncio.to_thread(agent.task, images, thread_id, questions, task_type, state)
# WindowsAgent
await asyncio.to_thread(agent.click_element, x, y)
await asyncio.to_thread(agent.screenshot)
```
### ❌ Incorrect Usage (will cause blocking errors):
```python
# These will cause "Blocking call to socket.socket.connect" errors
agent = StationAgent(station_id, graph_id, token) # ❌
state = agent.state.sync_all(state) # ❌
agent.task(images, thread_id, questions, task_type, state) # ❌
```
## 📖 Additional Documentation
- **[API Reference](docs/api_reference.md)** - Complete API documentation
- **[LangGraph Integration](docs/langgraph_integration.md)** - Detailed integration guide
- **[Deployment Guide](DEPLOYMENT.md)** - Automated deployment instructions
## 🤝 Contributing
CuteAgent is part of a comprehensive agent suite. For issues, feature requests, or contributions, please contact the development team.
## 📄 License
This project is licensed under the MIT License.
---
**Ready to build complete AI workflows with computer use, human oversight, and shared coordination? Start using CuteAgent today!** 🚀
Raw data
{
"_id": null,
"home_page": null,
"name": "cuteagent",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.8",
"maintainer_email": null,
"keywords": "cuteagent, openai",
"author": null,
"author_email": "Masoud Jb <eng@fintor.com>",
"download_url": "https://files.pythonhosted.org/packages/0d/fd/9b73fdf7d1055457a588a174eba036d481703fea09c39ff99a1ce608248b/cuteagent-0.2.23.tar.gz",
"platform": null,
"description": "# \ud83d\udc3e CuteAgent\n\n**Computer Use Task Execution Agent** \n*A Python library for building, orchestrating, and integrating computer-use AI agents in agentic workflows.*\n\n---\n[](https://pypi.org/project/cuteagent/)\n[](https://www.python.org/)\n[](LICENSE)\n\n---\n\n# CuteAgent - Complete Agent Suite for LangGraph Workflows\n\n**CuteAgent** provides three powerful agents for building comprehensive LangGraph workflows:\n\n- **\ud83e\udd16 StationAgent**: Shared state management and workflow coordination\n- **\ud83d\udda5\ufe0f WindowsAgent**: Computer use automation on Windows servers \n- **\ud83e\udded VisionAgent**: Vision model integration for GUI element grounding and coordinate extraction\n- **\ud83d\udc65 HumanAgent**: Human-in-the-loop (HITL) task management\n\nTogether, these agents enable complete automation workflows where AI performs computer tasks, humans provide oversight and decisions, and shared state coordinates everything seamlessly.\n\n## \ud83e\udded VisionAgent - Vision Grounding and GUI Coordinate Extraction\n\n**VisionAgent** integrates with vision models to locate UI elements in screenshots and return click coordinates.\n\n### \ud83d\ude80 Key Features\n\n- Claude Integration: Send a screenshot and element description to Claude and get grounded coordinates\n- Hugging Face GUI Model: Use OpenAI-compatible endpoint to parse GUI and extract coordinates via `find_element`\n- Screen Scaling: Converts model coordinates to your actual screen resolution\n\n### \ud83d\udd27 Quick Start\n\n```python\nfrom cuteagent import VisionAgent\n\n# Initialize with screen size and optional Anthropic API key via env\n# export ANTHROPIC_API_KEY=... in your environment or .env\nvision = VisionAgent(screen_size=(1366, 768))\n\n# Claude grounding: description + image URL \u2192 (x,y)\nimage_url = \"https://datacollectionfintor.s3.amazonaws.com/screenshot_20250517_180131.png\"\ndescription = {\n \"name\": \"Reports\",\n \"description\": \"Reports tab on the top navigation bar\",\n \"element_type\": \"tab\"\n}\ncoords = vision.claude(image_url, description)\nprint(\"Claude coordinates:\", coords) # e.g. \"(339,66)\" or \"NOT FOUND\" or \"ERROR: ...\"\n\n# Hugging Face GUI model: element name + screenshot URL\nvision_hf = VisionAgent(model_selected=\"FINTOR_GUI\")\ncoords_hf = vision_hf.find_element(\n screenshot_url=image_url,\n element_name=\"click on Reports tab on navigation bar\"\n)\nprint(\"HF model coordinates:\", coords_hf)\n```\n\n### \u2699\ufe0f Configuration\n\n```bash\n# Anthropic (Claude) - required for VisionAgent.claude\nexport ANTHROPIC_API_KEY=\"your-anthropic-key\"\n\n# Hugging Face GUI endpoint (optional; defaults are provided)\nexport HF_TOKEN=\"your-hf-token\"\n```\n\n### \ud83d\udcdd Notes\n\n- `VisionAgent.claude(...)` returns:\n - \"(x,y)\" coordinate string on success\n - \"NOT FOUND\" if element cannot be grounded\n - \"ERROR: ...\" for explicit error messaging (e.g., missing key, invalid media type)\n- Coordinates are clamped to your `screen_size` and converted if the model returns a different reference size.\n\n## \ud83d\udce6 Installation\n\n```bash\npip install cuteagent\n```\n\n---\n\n# \ud83e\udd16 StationAgent - Shared State Management\n\n**StationAgent** provides shared state management and server coordination for LangGraph workflows. It integrates with a SharedState API to enable multiple workflow instances to coordinate, share data, and manage server resources efficiently.\n\n## \ud83d\ude80 Key Features\n\n- **Shared State Management**: Sync variables between multiple LangGraph workflow instances\n- **Server Coordination**: Prevent conflicts with \"busy\"/\"idle\" server status management \n- **Workflow Resumption**: Handle interrupted workflows with thread ID tracking\n- **Reserved Variable Protection**: Secure server management variables from user modification (`server`, `serverThread`, `serverCheckpoint`, `serverTaskType`)\n- **Multi-Server Management**: Manages state for multiple servers using array-based variables.\n- **LangGraph Integration**: Seamless integration with LangGraph state objects\n- **Error Handling**: Robust retry logic and comprehensive error handling\n\n## \ud83d\udd27 Quick Start\n\n### 1. Add Shared State to Your LangGraph State Class\n\n```python\nfrom dataclasses import dataclass, field\nfrom typing import Dict, Any, Optional\n\n@dataclass\nclass State:\n # Your existing fields...\n current_node: float = 0\n user_input: str = \"\"\n stationThreadId: str = \"\"\n \n # Add this field for SharedState integration\n sharedState: Optional[Dict[str, Any]] = field(default_factory=dict)\n```\n\n### 2. Initialize StationAgent in Your LangGraph Nodes\n\n\u26a0\ufe0f **IMPORTANT**: All StationAgent operations use synchronous HTTP calls internally and must be wrapped in `asyncio.to_thread()` to prevent blocking the ASGI event loop.\n\n```python\nfrom cuteagent import StationAgent\nimport asyncio\n\nasync def your_langgraph_node(state: State, config: RunnableConfig) -> State:\n # Initialize StationAgent - MUST use asyncio.to_thread()\n agent = await asyncio.to_thread(\n StationAgent,\n station_thread_id=state.stationThreadId,\n graph_thread_id=config.get(\"thread_id\"),\n token=config.get(\"shared_state_token\", \"your-api-token\"),\n langgraph_token=config.get(\"langgraph_token\") # Required for pause/unpause functionality\n )\n # \ud83d\udd04 Agent now has agent.initial_state with any existing variables\n \n # Sync shared state variables to LangGraph state - MUST use asyncio.to_thread()\n state = await asyncio.to_thread(agent.state.sync_all, state)\n \n # Check what initial state was loaded (optional)\n if agent.initial_state:\n print(f\"Loaded {len(agent.initial_state)} existing variables\")\n # initial_state now contains arrays for server management\n print(f\"Server states: {agent.initial_state['server']}\")\n \n # Your node logic here...\n \n # Update shared state - MUST use asyncio.to_thread()\n await asyncio.to_thread(agent.state.set, \"currentNode\", \"processing\")\n await asyncio.to_thread(agent.state.set, \"timestamp\", \"2024-01-01T12:00:00Z\")\n \n return state\n```\n\n## \ud83d\udcca Sync Patterns\n\nStationAgent provides three sync patterns that update your LangGraph state and return the updated state object. **All sync operations must use `asyncio.to_thread()`**:\n\n### Pattern 1: Sync Single Variable\n```python\nstate = await asyncio.to_thread(agent.state.sync, \"variableName\", state)\n```\n\n### Pattern 2: Sync Multiple Variables \n```python\nstate = await asyncio.to_thread(agent.state.sync_multiple, [\"var1\", \"var2\", \"var3\"], state)\n```\n\n### Pattern 3: Sync All Variables\n```python\nstate = await asyncio.to_thread(agent.state.sync_all, state)\n```\n\n---\n\n# \ud83d\udda5\ufe0f WindowsAgent - Computer Use Automation\n\n**WindowsAgent** enables computer use automation on Windows servers maintained by Fintor. It provides methods for clicking, taking screenshots, and performing other computer tasks remotely.\n\n## \ud83d\ude80 Key Features\n\n- **Remote Computer Control**: Click, pause, and interact with Windows servers\n- **Screenshot Capabilities**: Full and cropped screenshots with URL responses\n- **Async Integration**: Thread-safe operations for LangGraph workflows\n- **Error Resilience**: Graceful handling of server issues\n- **Coordinate-based Actions**: Precise control with x,y coordinates\n\n## \ud83d\udd27 Quick Start\n\n### 1. Initialize WindowsAgent\n\n\u26a0\ufe0f **IMPORTANT**: All WindowsAgent operations use synchronous HTTP calls internally and must be wrapped in `asyncio.to_thread()` to prevent blocking the ASGI event loop.\n\n```python\nfrom cuteagent import WindowsAgent\nimport asyncio\n\nasync def windows_automation_node(state: State, config: RunnableConfig) -> State:\n configuration = config[\"configurable\"]\n \n # Initialize WindowsAgent with server URL\n os_url = configuration.get(\"os_url\", \"https://your-windows-server.ngrok.app\")\n agent = WindowsAgent(os_url=os_url)\n \n try:\n # Click at specific coordinates - MUST use asyncio.to_thread()\n await asyncio.to_thread(agent.click_element, 100, 200)\n \n # Wait/pause - MUST use asyncio.to_thread()\n await asyncio.to_thread(agent.pause, 3)\n \n # Take a full screenshot - MUST use asyncio.to_thread()\n screenshot_result = await asyncio.to_thread(agent.screenshot)\n if isinstance(screenshot_result, dict) and \"url\" in screenshot_result:\n state.screenshot_url = screenshot_result[\"url\"]\n else:\n state.screenshot_url = screenshot_result\n \n # Take a cropped screenshot [x, y, width, height] - MUST use asyncio.to_thread()\n cropped_result = await asyncio.to_thread(\n agent.screenshot_cropped, \n [10, 200, 1000, 450]\n )\n \n print(f\"Screenshot URL: {state.screenshot_url}\")\n \n except Exception as e:\n print(f\"Windows automation error: {e}\")\n # Continue workflow even on errors\n \n state.current_node = 8\n return state\n```\n\n## \ud83d\udcd6 WindowsAgent API Reference\n\n### Constructor\n\n```python\nWindowsAgent(os_url: str)\n```\n\n**Parameters:**\n- `os_url` (str): URL of the Windows server (e.g., \"https://server.ngrok.app\")\n\n### Methods\n\n\u26a0\ufe0f **All methods must be wrapped in `asyncio.to_thread()` when called from async LangGraph nodes.**\n\n#### `agent.click_element(x: int, y: int)`\nClick at specific screen coordinates.\n\n```python\nawait asyncio.to_thread(agent.click_element, 150, 300)\n```\n\n#### `agent.pause(seconds: int)`\nPause execution for specified seconds.\n\n```python\nawait asyncio.to_thread(agent.pause, 5)\n```\n\n#### `agent.screenshot()`\nTake a full screenshot of the desktop.\n\n```python\nresult = await asyncio.to_thread(agent.screenshot)\n# Returns: {\"url\": \"https://...\"} or URL string\n```\n\n#### `agent.screenshot_cropped(coordinates: List[int])`\nTake a cropped screenshot with [x, y, width, height] coordinates.\n\n```python\nresult = await asyncio.to_thread(agent.screenshot_cropped, [10, 50, 800, 600])\n# Returns: {\"url\": \"https://...\"} or URL string\n```\n\n---\n\n# \ud83d\udc65 HumanAgent - Human-in-the-Loop Task Management\n\n**HumanAgent** integrates with Fintor's HITL service to bring humans into LangGraph workflows for approvals, decisions, and oversight. Responses are processed manually outside of CuteAgent and update shared state via StationAgent.\n\n## \ud83d\ude80 Key Features\n\n- **Task Submission**: Send tasks with questions and images to humans\n- **Choice-based Responses**: Multiple choice questions with predefined options\n- **Image Support**: Include screenshots and visual content for human review\n- **External State Updates**: Human responses processed outside the system\n- **Task Type Management**: Categorize tasks with custom task types\n- **Reporting**: Report workflow results back to human operators\n\n## \ud83d\udd27 Quick Start\n\n### 1. Send Task for Human Review\n\n\u26a0\ufe0f **IMPORTANT**: All HumanAgent operations use synchronous HTTP calls internally and must be wrapped in `asyncio.to_thread()` to prevent blocking the ASGI event loop.\n\n```python\nfrom cuteagent import HumanAgent\nimport asyncio\n\nasync def send_human_task_node(state: State, config: RunnableConfig) -> State:\n \"\"\"Send a task to humans for review with image and questions.\"\"\"\n configuration = config[\"configurable\"]\n \n hitl_token = configuration.get(\"hitl_token\", os.getenv(\"HITL_TOKEN\"))\n agent = HumanAgent(\n HITL_token=hitl_token, \n HITL_url=\"https://d5x1qrpuf7.execute-api.us-west-1.amazonaws.com/prod/\"\n )\n \n # Prepare the human review task\n image_urls = [state.screenshot_url] if state.screenshot_url else []\n \n question_text = f\"Agent found {len(state.borrower_names)} borrowers with Document Date.\\n\"\n question_text += \"Please review and approve:\\n\"\n for borrower in state.borrower_names:\n question_text += f\"- {borrower}\\n\"\n question_text += \"\\nDo you approve this decision?\"\n \n questions = [{\n \"Question\": question_text, \n \"Choices\": [\"TRUE\", \"FALSE\"]\n }]\n \n thread_id = configuration.get(\"thread_id\", str(uuid.uuid4()))\n \n # Create state data for HITL system\n state_dict = {\n \"user_input\": state.user_input,\n \"current_node\": state.current_node,\n \"borrower_names\": state.borrower_names,\n \"stationThreadId\": state.stationThreadId\n }\n \n try:\n # Send task to human agent - MUST use asyncio.to_thread()\n result = await asyncio.to_thread(\n agent.task,\n image_urls=image_urls,\n thread_id=thread_id,\n questions=questions,\n task_type=\"S1-T1\", # Your task type\n thread_state=state_dict\n )\n \n print(f\"Human task sent successfully for thread: {thread_id}\")\n \n # Store pending review info for interrupt\n state.pending_review_info = {\n \"screenshot_url\": state.screenshot_url,\n \"borrower_names\": state.borrower_names,\n \"instructions\": \"Review extracted borrower names and respond via HITL system\",\n \"thread_id_of_task\": thread_id\n }\n \n except Exception as e:\n print(f\"Error sending human task: {e}\")\n # Continue workflow or handle error appropriately\n \n state.current_node = 10.5\n return state\n```\n\n### 2. Report Results to Humans\n\n```python\nasync def report_to_human_node(state: State, config: RunnableConfig) -> State:\n \"\"\"Report final workflow results to human operators.\"\"\"\n configuration = config[\"configurable\"]\n \n hitl_token = configuration.get(\"hitl_token\", os.getenv(\"HITL_TOKEN\"))\n agent = HumanAgent(\n HITL_token=hitl_token, \n HITL_url=\"https://d5x1qrpuf7.execute-api.us-west-1.amazonaws.com/prod/\"\n )\n \n thread_id = configuration.get(\"thread_id\")\n \n # Prepare final state report\n state_dict = {\n \"user_input\": state.user_input,\n \"current_node\": state.current_node,\n \"screenshot_url\": state.screenshot_url,\n \"borrower_names\": state.borrower_names,\n \"human_review_decision\": state.human_review_decision,\n \"status\": state.status,\n \"stationThreadId\": state.stationThreadId\n }\n \n try:\n # Report final results - MUST use asyncio.to_thread()\n result = await asyncio.to_thread(\n agent.reporting,\n thread_id=thread_id,\n report_type=\"S1-R1\", # Your report type\n thread_state=state_dict\n )\n \n print(f\"Results reported to human agent: {result}\")\n \n except Exception as e:\n print(f\"Error reporting to human agent: {e}\")\n \n state.current_node = 12\n return state\n```\n\n## \ud83d\udcd6 HumanAgent API Reference\n\n### Constructor\n\n```python\nHumanAgent(HITL_token: str, HITL_url: str)\n```\n\n**Parameters:**\n- `HITL_token` (str): Authentication token for HITL service\n- `HITL_url` (str): URL of the HITL service API\n\n### Methods\n\n\u26a0\ufe0f **All methods must be wrapped in `asyncio.to_thread()` when called from async LangGraph nodes.**\n\n#### `agent.task(image_urls, thread_id, questions, task_type, thread_state)`\nSend a task to humans for review and decision.\n\n**Usage:**\n```python\nresult = await asyncio.to_thread(\n agent.task,\n image_urls=image_urls,\n thread_id=thread_id,\n questions=questions,\n task_type=task_type,\n thread_state=thread_state\n)\n```\n\n**Parameters:**\n- `image_urls` (List[str]): URLs of images (e.g., screenshots) for human review\n- `thread_id` (str): Unique thread identifier for the task\n- `questions` (List[Dict]): Questions with choices for humans to answer\n- `task_type` (str): Category/type of the task (e.g., \"S1-T1\", \"S2-T3\")\n- `thread_state` (Dict): Current workflow state data\n\n**Questions Format:**\n```python\nquestions = [{\n \"Question\": \"Do you approve these borrower names?\",\n \"Choices\": [\"TRUE\", \"FALSE\"]\n}]\n```\n\n#### `agent.reporting(thread_id, report_type, thread_state)`\nReport workflow results and final state to human operators.\n\n**Usage:**\n```python\nresult = await asyncio.to_thread(\n agent.reporting,\n thread_id=thread_id,\n report_type=report_type,\n thread_state=thread_state\n)\n```\n\n**Parameters:**\n- `thread_id` (str): Thread identifier for the report\n- `report_type` (str): Type of report (e.g., \"S1-R1\", \"FINAL\")\n- `thread_state` (Dict): Final workflow state and results\n\n---\n\n# \ud83d\udd04 Complete Multi-Agent Workflow Example\n\nHere's a complete example showing all three agents working together with proper async handling:\n\n```python\nfrom dataclasses import dataclass, field\nfrom typing import Any, Dict, Optional, List\nimport asyncio\nimport uuid\nfrom cuteagent import StationAgent, WindowsAgent, HumanAgent\n\n@dataclass\nclass WorkflowState:\n current_node: float = 0\n user_input: str = \"\"\n stationThreadId: str = \"\"\n borrower_names: List[str] = field(default_factory=list)\n screenshot_url: str | None = None\n status: str = \"Ongoing\"\n human_review_decision: str | None = None\n pending_review_info: Optional[Dict[str, Any]] = None\n \n # Required for StationAgent integration\n sharedState: Optional[Dict[str, Any]] = field(default_factory=dict)\n\nasync def complete_workflow_node(state: WorkflowState, config) -> WorkflowState:\n \"\"\"Complete workflow using all three agents with proper async handling.\"\"\"\n configuration = config[\"configurable\"]\n \n # 1. Initialize StationAgent for coordination with initial workflow state\n initial_workflow_state = {\n \"workflowType\": \"complete_multi_agent\",\n \"startTime\": \"2024-01-01T12:00:00Z\",\n \"workflowStatus\": \"active\"\n }\n station_agent = await asyncio.to_thread(\n StationAgent,\n station_thread_id=state.stationThreadId or \"main-workflow\",\n graph_thread_id=configuration.get(\"thread_id\"),\n token=configuration.get(\"shared_state_token\"),\n initial_state=initial_workflow_state,\n langgraph_token=configuration.get(\"langgraph_token\")\n )\n \n # 2. Sync shared state to get latest workflow data\n state = await asyncio.to_thread(station_agent.state.sync_all, state)\n \n # 3. Check server availability and load for computer use\n server_status = await asyncio.to_thread(station_agent.server.avail)\n if server_status.get(\"server\") == \"idle\":\n load_result = await asyncio.to_thread(station_agent.server.load, \"screenshot_task\")\n if load_result[\"status\"] == \"loaded\":\n \n # 4. Use WindowsAgent for computer automation\n os_url = configuration.get(\"os_url\")\n windows_agent = WindowsAgent(os_url=os_url)\n \n try:\n # Perform computer tasks\n await asyncio.to_thread(windows_agent.click_element, 294, 98)\n await asyncio.to_thread(windows_agent.pause, 2)\n \n # Take screenshot for human review\n screenshot_result = await asyncio.to_thread(\n windows_agent.screenshot_cropped, \n [10, 200, 1000, 450]\n )\n \n if isinstance(screenshot_result, dict):\n state.screenshot_url = screenshot_result[\"url\"]\n else:\n state.screenshot_url = screenshot_result\n \n except Exception as e:\n print(f\"Windows automation error: {e}\")\n \n # 5. Send task to HumanAgent for review\n hitl_token = configuration.get(\"hitl_token\")\n human_agent = HumanAgent(\n HITL_token=hitl_token,\n HITL_url=\"https://d5x1qrpuf7.execute-api.us-west-1.amazonaws.com/prod/\"\n )\n \n questions = [{\n \"Question\": f\"Screenshot taken successfully. Proceed with processing?\",\n \"Choices\": [\"APPROVE\", \"REJECT\"]\n }]\n \n thread_id = configuration.get(\"thread_id\")\n state_dict = {\n \"screenshot_url\": state.screenshot_url,\n \"current_node\": state.current_node,\n \"stationThreadId\": state.stationThreadId\n }\n \n try:\n await asyncio.to_thread(\n human_agent.task,\n image_urls=[state.screenshot_url] if state.screenshot_url else [],\n thread_id=thread_id,\n questions=questions,\n task_type=\"S1-T1\",\n thread_state=state_dict\n )\n \n print(\"Human review task sent successfully\")\n \n except Exception as e:\n print(f\"Human task error: {e}\")\n \n # 6. Update shared state with workflow progress\n await asyncio.to_thread(station_agent.state.push, {\n \"lastCompletedNode\": state.current_node,\n \"screenshotTaken\": True,\n \"humanTaskSent\": True,\n \"workflowStatus\": \"awaiting_human_review\"\n })\n \n # 7. Unload server when done\n await asyncio.to_thread(station_agent.server.unload)\n \n else:\n print(\"Server is busy, waiting...\")\n \n # 8. Sync final state back to LangGraph\n state = await asyncio.to_thread(station_agent.state.sync_all, state)\n \n state.current_node += 1\n return state\n```\n\nThis example demonstrates how all three agents work together with proper async handling:\n- **StationAgent** coordinates shared state and server access for multiple servers\n- **WindowsAgent** performs computer automation tasks\n- **HumanAgent** provides human oversight and decision-making\n\n---\n\n# \ud83d\udccb StationAgent Detailed API Reference\n\n## Constructor and Initialization\n\n### `StationAgent(station_thread_id, graph_thread_id, token, initial_state=None, langgraph_token=None)`\n\nCreate a new StationAgent instance with initial state push capability.\n\n\u26a0\ufe0f **IMPORTANT**: Constructor must be wrapped in `asyncio.to_thread()` in async contexts.\n\n```python\n# Correct async usage\nagent = await asyncio.to_thread(\n StationAgent,\n station_thread_id=\"workflow-123\",\n graph_thread_id=\"thread-456\", \n token=\"your-shared-state-token\",\n initial_state=initial_state, # optional\n langgraph_token=\"your-langgraph-token\" # required for pause/unpause functionality\n)\n```\n\n**Parameters:**\n- `station_thread_id` (str): Identifier for the station/workflow instance\n- `graph_thread_id` (str): LangGraph thread identifier \n- `token` (str): Authentication token for SharedState API\n- `initial_state` (dict, optional): Initial state object to push to SharedState API\n- `langgraph_token` (str, optional): Authentication token for LangGraph API. Required for `pause()` and `unpause()` functionality.\n\n**Automatic Initialization:**\n- Automatically pushes initial_state to SharedState API during initialization (if provided)\n- Automatically adds `server`, `serverThread`, `serverCheckpoint`, and `serverTaskType` as arrays to `initial_state` to manage 4 servers by default.\n- Stores enhanced initial_state in `agent.initial_state` attribute for easy access\n- Provides console feedback about pushed variables\n\n**Attributes:**\n- `agent.initial_state` (dict): Dictionary of initial state with server variables automatically added\n\n**Example:**\n```python\n# Initialize agent with initial state\ninitial_workflow_state = {\n \"workflowId\": \"wf-123\",\n \"currentStep\": \"start\",\n \"userInput\": \"process this data\"\n}\nagent = await asyncio.to_thread(\n StationAgent, \n \"workflow-123\", \n \"thread-456\", \n \"token\", \n initial_state=initial_workflow_state\n)\n\n# Check what was automatically enhanced (server variables added)\nprint(f\"Initial variables: {list(agent.initial_state.keys())}\")\n# Output: ['workflowId', 'currentStep', 'userInput', 'server', 'serverThread', 'serverCheckpoint', 'serverTaskType']\nprint(f\"Workflow ID: {agent.initial_state['workflowId']}\")\nprint(f\"Server status: {agent.initial_state['server']}\") # ['idle', 'idle', 'idle', 'idle']\nprint(f\"Server thread: {agent.initial_state['serverThread']}\") # ['idle', 'idle', 'idle', 'idle']\n\n# Initialize without initial state\nagent_empty = await asyncio.to_thread(StationAgent, \"workflow-456\", \"thread-789\", \"token\")\nprint(f\"No initial state: {agent_empty.initial_state}\") # None\n```\n\n## State Management Methods\n\n\u26a0\ufe0f **All state methods must be wrapped in `asyncio.to_thread()` when called from async LangGraph nodes.**\n\n### `agent.state.sync(variable_name, langgraph_state=None)`\nSync single variable from SharedState API to LangGraph state.\n\n```python\n# Returns updated state object\nstate = await asyncio.to_thread(agent.state.sync, \"currentStep\", state)\n\n# Returns just the variable value (backward compatibility)\nvalue = await asyncio.to_thread(agent.state.sync, \"currentStep\")\n```\n\n### `agent.state.sync_multiple(variable_names, langgraph_state=None)`\nSync multiple variables from SharedState API to LangGraph state.\n\n```python\nstate = await asyncio.to_thread(agent.state.sync_multiple, [\"var1\", \"var2\", \"var3\"], state)\n```\n\n### `agent.state.sync_all(langgraph_state)`\nSync all variables from SharedState API to LangGraph state.\n\n```python\nstate = await asyncio.to_thread(agent.state.sync_all, state)\n```\n\n### `agent.state.set(variable_name, value)`\nCreate or update a single variable in SharedState API.\n\n```python\nawait asyncio.to_thread(agent.state.set, \"currentStep\", \"processing\")\nawait asyncio.to_thread(agent.state.set, \"userPrefs\", {\"theme\": \"dark\"})\n```\n\n### `agent.state.get(variable_name)`\nGet a single variable from SharedState API.\n\n```python\ncurrent_step = await asyncio.to_thread(agent.state.get, \"currentStep\") # Returns value or None\n```\n\n### `agent.state.push(variables_dict)`\nBulk create/update multiple variables in SharedState API.\n\n```python\nawait asyncio.to_thread(agent.state.push, {\n \"workflowId\": \"wf-123\",\n \"status\": \"processing\", \n \"data\": {\"key\": \"value\"}\n})\n```\n\n### `agent.state.pull()`\nGet all variables from SharedState API.\n\n```python\nall_vars = await asyncio.to_thread(agent.state.pull) # Returns dict of all variables\n```\n\n### `agent.state.delete(variable_name)`\nDelete a variable from SharedState API.\n\n```python\nawait asyncio.to_thread(agent.state.delete, \"temporary_data\")\n```\n\n### `agent.state.exists(variable_name)`\nCheck if a variable exists in SharedState API.\n\n```python\nexists = await asyncio.to_thread(agent.state.exists, \"userPreferences\")\nif exists:\n prefs = await asyncio.to_thread(agent.state.get, \"userPreferences\")\n```\n\n### `agent.state.list_variables()`\nGet list of all variable names.\n\n```python\nvar_names = await asyncio.to_thread(agent.state.list_variables) # Returns list of strings\n```\n\n## Server Management Methods\n\n\u26a0\ufe0f **All server methods must be wrapped in `asyncio.to_thread()` when called from async LangGraph nodes. They now operate on a specific server via an index.**\n\n### `agent.server.load(serverThreadId, serverCheckpoint=\"setup\", serverIndex=0, serverTaskType=\"taskPlaceholder\")`\nLoad a specific server for a task. The server must be in \"idle\" status and have the expected checkpoint.\n\n**Parameters:**\n- `serverThreadId` (str): The thread ID to assign to the server when loaded\n- `serverCheckpoint` (str, optional): The checkpoint to verify before loading. Defaults to \"setup\"\n- `serverIndex` (int, optional): The index of the server to load (0-3). Defaults to 0\n- `serverTaskType` (str, optional): The task type to assign. Defaults to \"taskPlaceholder\"\n\n```python\nresult = await asyncio.to_thread(\n agent.server.load,\n serverThreadId=\"GetNames\",\n serverCheckpoint=\"setup\",\n serverIndex=0,\n serverTaskType=\"data_processing\"\n)\n# Returns: {\"status\": \"loaded\", \"serverThread\": \"GetNames\"} \n# or {\"status\": \"busy\", \"error\": \"Server is busy\"}\n# or {\"status\": \"wrongCheckpoint\", \"error\": \"Incorrect checkpoint. Expected setup, got running\"}\n# or {\"status\": \"error\", \"error\": \"serverIndex 0 is out of bounds.\"}\n```\n\n### `agent.server.unload(checkpoint=\"setup\", index=0)`\nUnload a server and set it to idle with a new checkpoint. The server must be in \"busy\" status.\n\n**Parameters:**\n- `checkpoint` (str, optional): The checkpoint to set after unloading. Defaults to \"setup\"\n- `index` (int, optional): The index of the server to unload (0-3). Defaults to 0\n\n```python\nresult = await asyncio.to_thread(agent.server.unload, checkpoint=\"completed\", index=0)\n# With default checkpoint:\nresult = await asyncio.to_thread(agent.server.unload, index=0) # Uses \"setup\" as default\n\n# Returns: {\"status\": \"unloaded\"}\n# or {\"status\": \"idle\", \"error\": \"Server is already idle\"}\n# or {\"status\": \"error\", \"error\": \"serverIndex 0 is out of bounds.\"}\n```\n\n### `agent.server.avail(index=0)`\nGet availability status for a specific server.\n\n**Parameters:**\n- `index` (int, optional): The index of the server to check (0-3). Defaults to 0\n\n```python\nstatus = await asyncio.to_thread(agent.server.avail, index=0)\n# Returns: {\n# \"server\": \"busy|idle\", \n# \"serverThread\": \"GetNames|idle\", \n# \"serverCheckpoint\": \"setup|running|completed\", \n# \"serverTaskType\": \"data_processing|taskPlaceholder\"\n# }\n# or {\"status\": \"error\", \"error\": \"Server state is not initialized correctly as arrays.\"}\n```\n\n\n\n## \ud83d\udd12 Reserved Variables\n\nStationAgent protects these variables from user modification:\n\n* **`server`**: Array of server statuses (\"busy\" or \"idle\" only)\n* **`serverThread`**: Array of current task threads when server is busy\n* **`serverCheckpoint`**: Array of server checkpoints\n* **`serverTaskType`**: Array of server task types\n\nThese can only be modified through server management methods:\n* `agent.server.load(...)` - Sets a server to \"busy\"\n* `agent.server.unload(...)` - Sets a server to \"idle\"\n\n```python\n# \u274c This will raise ValueError\nawait asyncio.to_thread(agent.state.set, \"server\", \"custom_status\") \n\n# \u2705 This is the correct way\nawait asyncio.to_thread(agent.server.load, serverThreadId=\"my_task_thread\") # Sets server 0 to \"busy\"\n```\n\n---\n\n## \u2699\ufe0f Configuration\n\n### Environment Variables\n```bash\n# StationAgent\nexport SHARED_STATE_URL=\"https://your-api.amazonaws.com/prod\"\nexport SHARED_STATE_TOKEN=\"your-shared-state-api-token\"\nexport LANGGRAPH_TOKEN=\"your-langgraph-api-token\"\n\n# HumanAgent\nexport HITL_TOKEN=\"your-hitl-token\"\n\n# WindowsAgent (configured per workflow)\n# os_url provided in LangGraph configuration\n```\n\n### LangGraph Configuration\n```python\nconfig = {\n \"configurable\": {\n \"shared_state_token\": \"your-shared-state-api-token\",\n \"langgraph_token\": \"your-langgraph-api-token\", # Required for pause/unpause functionality\n \"hitl_token\": \"your-hitl-token\", \n \"os_url\": \"https://your-windows-server.ngrok.app\",\n \"thread_id\": \"your-langgraph-thread-id\"\n }\n}\n```\n\n## \ud83d\udea8 Error Handling\n\n### StationAgent\n- **Network Retries**: 3 attempts with exponential backoff\n- **Authentication Errors**: Clear messages for invalid tokens\n- **Reserved Variable Protection**: ValueError for protected variables\n- **Blocking Call Prevention**: All operations must use `asyncio.to_thread()` in async contexts\n\n### WindowsAgent \n- **Connection Issues**: Graceful failure with workflow continuation\n- **Server Errors**: Exception handling with logging\n- **Timeout Handling**: Async operations with proper error propagation\n- **Blocking Call Prevention**: All operations must use `asyncio.to_thread()` in async contexts\n\n### HumanAgent\n- **Service Issues**: Contact support_eng@fintor.com\n- **Task Failures**: Manual processing required outside the system\n- **Response Processing**: Done manually outside CuteAgent\n- **Blocking Call Prevention**: All operations must use `asyncio.to_thread()` in async contexts\n\n```python\ntry:\n state = await asyncio.to_thread(agent.state.sync_all, state)\nexcept ValueError as e:\n # Handle reserved variable violations\n print(f\"Configuration error: {e}\")\nexcept Exception as e:\n # Handle network/API errors\n print(f\"Network error: {e}\")\n # Continue with workflow using existing state\n```\n\n## \ud83d\udcda Best Practices for Multi-Agent Workflows\n\n1. **Always use `asyncio.to_thread()`** for all CuteAgent operations in async LangGraph nodes\n2. **Initialize StationAgent first** in each node for state coordination\n3. **Check server availability** before WindowsAgent operations\n4. **Use HumanAgent for critical decisions** and quality assurance\n5. **Include screenshots** in human tasks for better context\n6. **Handle errors gracefully** - workflows should be resilient\n7. **Update shared state regularly** for workflow coordination\n8. **Use meaningful task types** for HumanAgent categorization\n9. **Clean up resources** - unload servers when done\n10. **Test blocking call prevention** - ensure no \"Blocking call to socket.socket.connect\" errors\n\n## \ud83d\udea8 Critical Async Requirements\n\n**ALL CuteAgent operations use synchronous HTTP calls internally and MUST be wrapped in `asyncio.to_thread()` when used in async LangGraph nodes to prevent blocking the ASGI event loop.**\n\n### \u2705 Correct Usage:\n```python\n# StationAgent\nagent = await asyncio.to_thread(StationAgent, station_id, graph_id, token)\nstate = await asyncio.to_thread(agent.state.sync_all, state)\nawait asyncio.to_thread(agent.state.set, \"key\", \"value\")\n\n# HumanAgent \nawait asyncio.to_thread(agent.task, images, thread_id, questions, task_type, state)\n\n# WindowsAgent\nawait asyncio.to_thread(agent.click_element, x, y)\nawait asyncio.to_thread(agent.screenshot)\n```\n\n### \u274c Incorrect Usage (will cause blocking errors):\n```python\n# These will cause \"Blocking call to socket.socket.connect\" errors\nagent = StationAgent(station_id, graph_id, token) # \u274c\nstate = agent.state.sync_all(state) # \u274c\nagent.task(images, thread_id, questions, task_type, state) # \u274c\n```\n\n## \ud83d\udcd6 Additional Documentation\n\n- **[API Reference](docs/api_reference.md)** - Complete API documentation\n- **[LangGraph Integration](docs/langgraph_integration.md)** - Detailed integration guide\n- **[Deployment Guide](DEPLOYMENT.md)** - Automated deployment instructions\n\n## \ud83e\udd1d Contributing\n\nCuteAgent is part of a comprehensive agent suite. For issues, feature requests, or contributions, please contact the development team.\n\n## \ud83d\udcc4 License\n\nThis project is licensed under the MIT License.\n\n---\n\n**Ready to build complete AI workflows with computer use, human oversight, and shared coordination? Start using CuteAgent today!** \ud83d\ude80\n\n\n\n",
"bugtrack_url": null,
"license": "MIT License",
"summary": "Computer Use Task Execution Agent",
"version": "0.2.23",
"project_urls": {
"Homepage": "https://github.com/MasoudJB/cuteagent"
},
"split_keywords": [
"cuteagent",
" openai"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "9c1860aca75dd383c4ff5ce92544526babd7170d4d0429f033ff5a21bc8d4fac",
"md5": "e26bf05461dd9adcea340abfd5b57477",
"sha256": "dbbb9ccf07c5898509cd51e415e314b64bdea25fed7005b55684c4cfb04f5a61"
},
"downloads": -1,
"filename": "cuteagent-0.2.23-py2.py3-none-any.whl",
"has_sig": false,
"md5_digest": "e26bf05461dd9adcea340abfd5b57477",
"packagetype": "bdist_wheel",
"python_version": "py2.py3",
"requires_python": ">=3.8",
"size": 44517,
"upload_time": "2025-09-05T07:37:14",
"upload_time_iso_8601": "2025-09-05T07:37:14.299284Z",
"url": "https://files.pythonhosted.org/packages/9c/18/60aca75dd383c4ff5ce92544526babd7170d4d0429f033ff5a21bc8d4fac/cuteagent-0.2.23-py2.py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "0dfd9b73fdf7d1055457a588a174eba036d481703fea09c39ff99a1ce608248b",
"md5": "c7fd5c770e55a70be2ec3da63f2aad11",
"sha256": "bfa52f930c9588fcb3a9caa700bb28ce2eb5fa46758fd9c2c40f99ac00be8458"
},
"downloads": -1,
"filename": "cuteagent-0.2.23.tar.gz",
"has_sig": false,
"md5_digest": "c7fd5c770e55a70be2ec3da63f2aad11",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.8",
"size": 130058,
"upload_time": "2025-09-05T07:37:15",
"upload_time_iso_8601": "2025-09-05T07:37:15.341166Z",
"url": "https://files.pythonhosted.org/packages/0d/fd/9b73fdf7d1055457a588a174eba036d481703fea09c39ff99a1ce608248b/cuteagent-0.2.23.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-09-05 07:37:15",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "MasoudJB",
"github_project": "cuteagent",
"github_not_found": true,
"lcname": "cuteagent"
}