# cjm-fasthtml-workers
<!-- WARNING: THIS FILE WAS AUTOGENERATED! DO NOT EDIT! -->
## Install
``` bash
pip install cjm_fasthtml_workers
```
## Project Structure
nbs/
├── core/ (4)
│ ├── adapters.ipynb # Adapter utilities for making plugin managers compatible with the worker system.
│ ├── config.ipynb # Configuration for worker processes including restart policies, timeouts, and queue sizes.
│ ├── protocol.ipynb # Protocol definitions for worker communication and plugin manager integration.
│ └── worker.ipynb # Generic worker process for executing plugin-based jobs in isolated subprocesses.
├── extensions/ (2)
│ ├── adapters.ipynb # Common adapters for integrating popular FastHTML libraries with the worker system.
│ └── protocols.ipynb # Optional integration protocols for plugin registries, resource management, and event broadcasting.
└── managers/ (1)
└── base.ipynb # Abstract base class for managing background jobs with worker processes.
Total: 7 notebooks across 3 directories
## Module Dependencies
``` mermaid
graph LR
core_adapters[core.adapters<br/>adapters]
core_config[core.config<br/>config]
core_protocol[core.protocol<br/>protocol]
core_worker[core.worker<br/>worker]
extensions_adapters[extensions.adapters<br/>adapters]
extensions_protocols[extensions.protocols<br/>protocols]
managers_base[managers.base<br/>base]
core_adapters --> core_protocol
core_worker --> core_protocol
extensions_adapters --> extensions_protocols
managers_base --> extensions_protocols
managers_base --> core_protocol
managers_base --> core_config
```
*6 cross-module dependencies detected*
## CLI Reference
No CLI commands found in this project.
## Module Overview
Detailed documentation for each module in the project:
### adapters (`adapters.ipynb`)
> Adapter utilities for making plugin managers compatible with the
> worker system.
#### Import
``` python
from cjm_fasthtml_workers.core.adapters import (
create_simple_adapter,
default_result_adapter
)
```
#### Functions
``` python
def create_simple_adapter(
plugin_manager:Any, # The plugin manager instance to adapt
result_adapter:Optional[callable]=None # Optional function to convert plugin results to dict
) -> PluginManagerAdapter: # Adapter that satisfies PluginManagerAdapter protocol
"Create a simple adapter for a plugin manager."
```
``` python
def default_result_adapter(
result:Any # Plugin execution result
) -> Dict[str, Any]: # Dictionary with text and metadata
"Default adapter for converting plugin results to dictionaries."
```
### adapters (`adapters.ipynb`)
> Common adapters for integrating popular FastHTML libraries with the
> worker system.
#### Import
``` python
from cjm_fasthtml_workers.extensions.adapters import (
UnifiedPluginRegistryAdapter,
ResourceManagerAdapter,
SSEBroadcasterAdapter,
create_standard_adapters
)
```
#### Functions
```` python
def create_standard_adapters(
plugin_registry=None, # UnifiedPluginRegistry instance (optional)
resource_manager=None, # ResourceManager instance (optional)
sse_manager=None # SSEBroadcastManager instance (optional)
) -> tuple: # (plugin_adapter, resource_adapter, sse_adapter)
"""
Create standard adapters for common FastHTML libraries.
This factory function creates adapters for:
- cjm-fasthtml-plugins UnifiedPluginRegistry
- cjm-fasthtml-resources ResourceManager
- cjm-fasthtml-sse SSEBroadcastManager
All parameters are optional. Pass None for services you don't use.
Example:
```python
from cjm_fasthtml_workers.extensions.adapters import create_standard_adapters
from cjm_fasthtml_plugins.core.registry import UnifiedPluginRegistry
from cjm_fasthtml_resources.core.manager import ResourceManager
# Create services
plugin_registry = UnifiedPluginRegistry()
resource_manager = ResourceManager()
# Create adapters
plugin_adapter, resource_adapter, sse_adapter = create_standard_adapters(
plugin_registry=plugin_registry,
resource_manager=resource_manager
)
# Pass to job manager
manager = MyJobManager(
plugin_registry=plugin_adapter,
resource_manager=resource_adapter
)
```
Returns:
Tuple of (plugin_adapter, resource_adapter, sse_adapter)
Each adapter will be None if the corresponding service was not provided.
"""
````
#### Classes
``` python
class UnifiedPluginRegistryAdapter:
def __init__(self, registry):
"""
Initialize adapter with UnifiedPluginRegistry instance.
Args:
registry: UnifiedPluginRegistry from cjm-fasthtml-plugins
"""
self._registry = registry
def get_plugins_by_category(self, category) -> list
"Adapter for cjm-fasthtml-plugins UnifiedPluginRegistry."
def __init__(self, registry):
"""
Initialize adapter with UnifiedPluginRegistry instance.
Args:
registry: UnifiedPluginRegistry from cjm-fasthtml-plugins
"""
self._registry = registry
def get_plugins_by_category(self, category) -> list
"Initialize adapter with UnifiedPluginRegistry instance.
Args:
registry: UnifiedPluginRegistry from cjm-fasthtml-plugins"
def get_plugins_by_category(self, category) -> list:
"""Get all plugins in a specific category."""
return self._registry.get_plugins_by_category(category)
def get_plugin(self, plugin_id: str)
"Get all plugins in a specific category."
def get_plugin(self, plugin_id: str):
"""Get a specific plugin by ID."""
return self._registry.get_plugin(plugin_id)
def load_plugin_config(self, plugin_id: str) -> Dict[str, Any]
"Get a specific plugin by ID."
def load_plugin_config(self, plugin_id: str) -> Dict[str, Any]
"Load configuration for a plugin."
```
``` python
class ResourceManagerAdapter:
def __init__(self, resource_manager):
"""
Initialize adapter with ResourceManager instance.
Args:
resource_manager: ResourceManager from cjm-fasthtml-resources
"""
self._resource_manager = resource_manager
def register_worker(self, pid: int, worker_type: str) -> None
"Adapter for cjm-fasthtml-resources ResourceManager."
def __init__(self, resource_manager):
"""
Initialize adapter with ResourceManager instance.
Args:
resource_manager: ResourceManager from cjm-fasthtml-resources
"""
self._resource_manager = resource_manager
def register_worker(self, pid: int, worker_type: str) -> None
"Initialize adapter with ResourceManager instance.
Args:
resource_manager: ResourceManager from cjm-fasthtml-resources"
def register_worker(self, pid: int, worker_type: str) -> None:
"""Register a new worker process."""
self._resource_manager.register_worker(pid, worker_type)
def unregister_worker(self, pid: int) -> None
"Register a new worker process."
def unregister_worker(self, pid: int) -> None:
"""Unregister a worker process."""
self._resource_manager.unregister_worker(pid)
def update_worker_state(
self,
pid: int,
status: Optional[str] = None,
job_id: Optional[str] = None,
plugin_name: Optional[str] = None,
plugin_id: Optional[str] = None,
loaded_plugin_resource: Optional[str] = None,
config: Optional[Dict[str, Any]] = None,
) -> None
"Unregister a worker process."
def update_worker_state(
self,
pid: int,
status: Optional[str] = None,
job_id: Optional[str] = None,
plugin_name: Optional[str] = None,
plugin_id: Optional[str] = None,
loaded_plugin_resource: Optional[str] = None,
config: Optional[Dict[str, Any]] = None,
) -> None
"Update worker state information."
def check_gpu_availability(self):
"""Check GPU availability and identify conflicts."""
return self._resource_manager.check_gpu_availability()
def get_worker_by_pid(self, pid: int)
"Check GPU availability and identify conflicts."
def get_worker_by_pid(self, pid: int)
"Get worker state by PID."
```
``` python
class SSEBroadcasterAdapter:
def __init__(self, sse_manager):
"""
Initialize adapter with SSE broadcast manager.
Args:
sse_manager: SSEBroadcastManager from cjm-fasthtml-sse
"""
self._sse_manager = sse_manager
async def broadcast(self, event_type: str, data: Dict[str, Any]) -> None
"Adapter for cjm-fasthtml-sse SSEBroadcastManager."
def __init__(self, sse_manager):
"""
Initialize adapter with SSE broadcast manager.
Args:
sse_manager: SSEBroadcastManager from cjm-fasthtml-sse
"""
self._sse_manager = sse_manager
async def broadcast(self, event_type: str, data: Dict[str, Any]) -> None
"Initialize adapter with SSE broadcast manager.
Args:
sse_manager: SSEBroadcastManager from cjm-fasthtml-sse"
async def broadcast(self, event_type: str, data: Dict[str, Any]) -> None:
"""
Broadcast an event to all connected SSE clients.
Args:
event_type: Event type identifier (e.g., 'transcription:started')
data: Event data payload
"""
# Broadcast using the SSE manager
# Note: SSEBroadcastManager.broadcast expects event_type and data
"Broadcast an event to all connected SSE clients.
Args:
event_type: Event type identifier (e.g., 'transcription:started')
data: Event data payload"
```
### base (`base.ipynb`)
> Abstract base class for managing background jobs with worker
> processes.
#### Import
``` python
from cjm_fasthtml_workers.managers.base import (
JobType,
BaseJob,
BaseJobManager
)
```
#### Functions
``` python
@patch
@abstractmethod
def create_job(
self:BaseJobManager,
plugin_id:str, # Plugin unique identifier
**kwargs # Domain-specific job parameters
) -> JobType: # Created job instance
"Factory method for creating domain-specific jobs."
```
``` python
@patch
@abstractmethod
def get_worker_entry_point(
self:BaseJobManager
) -> Callable: # Worker process entry point function
"Return the worker process function for this manager."
```
``` python
@patch
@abstractmethod
def prepare_execute_request(
self:BaseJobManager,
job:JobType # The job to prepare for execution
) -> Dict[str, Any]: # Dictionary of parameters for the worker execute request
"Convert job to worker execute request parameters."
```
``` python
@patch
@abstractmethod
def extract_job_result(
self:BaseJobManager,
job:JobType, # The job that was executed
result_data:Dict[str, Any] # Raw result data from worker
) -> Dict[str, Any]: # Formatted result for storage
"Extract and format job result from worker response."
```
``` python
@patch
def _extract_plugin_resource_identifier(
self:BaseJobManager,
config:Dict[str, Any] # Plugin configuration dictionary
) -> str: # Plugin resource identifier string
"Extract plugin resource identifier from plugin configuration."
```
``` python
@patch
async def _validate_resources(
self:BaseJobManager,
plugin_id:str, # Plugin unique identifier
plugin_config:Dict[str, Any] # Plugin configuration
) -> Optional[str]: # Error message if validation fails, None if successful
"Validate resources before starting a job."
```
``` python
@patch
def _on_job_completed(
self:BaseJobManager,
job_id:str # ID of the completed job
) -> None
"Hook called when a job completes successfully."
```
``` python
@patch
def _start_worker(self:BaseJobManager):
"""Start the worker process and result monitor."""
if self.worker_process and self.worker_process.is_alive()
"Start the worker process and result monitor."
```
``` python
@patch
def _init_worker(self:BaseJobManager):
"""Send initialization message to worker with plugin configurations."""
# Only send plugin configs if plugin registry is available
plugin_configs = {}
if self.plugin_registry
"Send initialization message to worker with plugin configurations."
```
``` python
@patch
def _restart_worker(self:BaseJobManager):
"""Restart the worker process after an error or cancellation."""
# Track restart
self.restart_count += 1
self.last_restart_time = time.time()
# Unregister old worker from resource manager if available
if self.resource_manager and self.worker_process
"Restart the worker process after an error or cancellation."
```
``` python
@patch
def _monitor_results(self:BaseJobManager):
"""Monitor the result queue in a background thread."""
while self.monitor_running
"Monitor the result queue in a background thread."
```
``` python
@patch
def _handle_job_result(
self:BaseJobManager,
result:Dict[str, Any] # Result data from worker
)
"Handle a job result from the worker."
```
``` python
@patch
def _handle_stream_chunk(
self:BaseJobManager,
chunk_data:Dict[str, Any] # Chunk data from worker
)
"Handle a streaming chunk from the worker."
```
``` python
@patch
def _handle_worker_error(self:BaseJobManager):
"""Handle worker fatal error based on restart policy."""
policy = self.worker_config.restart_policy
if policy == RestartPolicy.NEVER
"Handle worker fatal error based on restart policy."
```
``` python
@patch
def get_plugin_name(
self:BaseJobManager,
plugin_id:str # Plugin unique identifier
) -> Optional[str]: # Plugin name or None
"Get plugin name from plugin ID (requires plugin registry)."
```
``` python
@patch
async def unload_plugin(
self:BaseJobManager,
plugin_name:str # Name of the plugin to unload
) -> bool: # True if successful, False otherwise
"Unload a plugin from the worker to free resources."
```
``` python
@patch
async def reload_plugin(
self:BaseJobManager,
plugin_name:str, # Name of the plugin to reload
config:Dict[str, Any] # New configuration
) -> bool: # True if successful, False otherwise
"Reload a plugin with new configuration."
```
``` python
@patch
async def start_job(
self:BaseJobManager,
plugin_id:str, # Plugin unique identifier
**kwargs # Domain-specific job parameters
) -> JobType: # Created and started job
"Start a new job."
```
``` python
@patch
async def cancel_job(
self:BaseJobManager,
job_id:str # ID of the job to cancel
) -> bool: # True if cancellation successful
"Cancel a running job by terminating the worker process."
```
``` python
@patch
def get_job(
self:BaseJobManager,
job_id:str # Unique job identifier
) -> Optional[JobType]: # Job object or None
"Get a job by ID."
```
``` python
@patch
def get_all_jobs(
self:BaseJobManager
) -> List[JobType]: # List of all jobs
"Get all jobs."
```
``` python
@patch
def get_job_result(
self:BaseJobManager,
job_id:str # Unique job identifier
) -> Optional[Dict[str, Any]]: # Job result or None
"Get job result."
```
``` python
@patch
def clear_completed_jobs(
self:BaseJobManager
) -> int: # Number of jobs cleared
"Clear completed, failed, and cancelled jobs."
```
``` python
@patch
async def broadcast_event(
self:BaseJobManager,
event_type:str, # Event type identifier
data:Dict[str, Any] # Event data payload
)
"Broadcast an event to all connected SSE clients (requires event broadcaster)."
```
``` python
@patch
def check_streaming_support(
self:BaseJobManager,
plugin_id:str # Plugin unique identifier
) -> bool: # True if streaming supported
"Check if a plugin supports streaming."
```
``` python
@patch
def shutdown(self:BaseJobManager):
"""Shutdown the manager and cleanup resources."""
# Stop result monitor
self.monitor_running = False
if self.result_monitor_thread
"Shutdown the manager and cleanup resources."
```
#### Classes
``` python
@dataclass
class BaseJob:
"Base class for all job types."
id: str # Unique job identifier
plugin_id: str # Plugin identifier for this job
status: str = 'pending' # Job status: pending, running, completed, failed, cancelled
created_at: str = field(...) # ISO format timestamp
started_at: Optional[str] # When job started executing
completed_at: Optional[str] # When job finished
result: Optional[Dict[str, Any]] # Job result data
error: Optional[str] # Error message if failed
metadata: Dict[str, Any] = field(...) # Additional job metadata
worker_pid: Optional[int] # Process ID of worker handling this job
```
``` python
class BaseJobManager:
def __init__(
self,
worker_type:str, # Type identifier (e.g., "transcription", "llm", "image-gen")
category:Any, # Plugin category this manager handles
supports_streaming:bool=False, # Whether this manager supports streaming jobs
worker_config:Optional[WorkerConfig]=None, # Worker configuration (uses defaults if None)
plugin_registry:Optional[PluginRegistryProtocol]=None, # Optional plugin registry integration
resource_manager:Optional[ResourceManagerProtocol]=None, # Optional resource manager integration
event_broadcaster:Optional[EventBroadcasterProtocol]=None, # Optional SSE event broadcaster
)
"""
Abstract base class for managing jobs using worker processes.
Features:
- Jobs processed sequentially in subprocess
- Plugin resources loaded once and reused
- True cancellation via subprocess termination
- Automatic worker restart based on policy
- Isolated worker process avoids duplicating web app initialization
- Optional streaming support for incremental results
- Optional dependency injection for plugin registry, resource manager, and event broadcaster
"""
def __init__(
self,
worker_type:str, # Type identifier (e.g., "transcription", "llm", "image-gen")
category:Any, # Plugin category this manager handles
supports_streaming:bool=False, # Whether this manager supports streaming jobs
worker_config:Optional[WorkerConfig]=None, # Worker configuration (uses defaults if None)
plugin_registry:Optional[PluginRegistryProtocol]=None, # Optional plugin registry integration
resource_manager:Optional[ResourceManagerProtocol]=None, # Optional resource manager integration
event_broadcaster:Optional[EventBroadcasterProtocol]=None, # Optional SSE event broadcaster
)
"Initialize the job manager."
```
### config (`config.ipynb`)
> Configuration for worker processes including restart policies,
> timeouts, and queue sizes.
#### Import
``` python
from cjm_fasthtml_workers.core.config import (
RestartPolicy,
WorkerConfig
)
```
#### Classes
``` python
class RestartPolicy(Enum):
"Policy for restarting worker processes after failures."
```
``` python
@dataclass
class WorkerConfig:
"Configuration for worker process behavior."
request_queue_size: int = 0 # 0 = unlimited
result_queue_size: int = 100 # Larger for streaming results
response_queue_size: int = 10 # For synchronous command responses
restart_policy: RestartPolicy = RestartPolicy.ON_CANCELLATION
max_restart_attempts: int = 3
restart_backoff_base_seconds: float = 1.0 # Base delay for exponential backoff
restart_backoff_max_seconds: float = 60.0 # Max delay for backoff
max_workers: int = 1 # Currently only 1 is supported
worker_start_timeout_seconds: float = 30.0
reload_timeout_seconds: float = 30.0
unload_timeout_seconds: float = 10.0
shutdown_timeout_seconds: float = 5.0
result_monitor_poll_interval_seconds: float = 0.5
```
### protocol (`protocol.ipynb`)
> Protocol definitions for worker communication and plugin manager
> integration.
#### Import
``` python
from cjm_fasthtml_workers.core.protocol import (
WorkerRequestType,
WorkerResponseType,
WorkerRequest,
WorkerResponse,
WorkerStreamChunk,
WorkerResult,
PluginManagerAdapter
)
```
#### Classes
``` python
class WorkerRequestType(Enum):
"Types of requests sent to worker process."
```
``` python
class WorkerResponseType(Enum):
"Types of responses from worker process."
```
``` python
@dataclass
class WorkerRequest:
"Base structure for worker requests."
type: WorkerRequestType
data: Dict[str, Any]
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for queue serialization."""
return {
'type': self.type.value,
"Convert to dictionary for queue serialization."
def from_dict(cls, data: Dict[str, Any]) -> 'WorkerRequest':
"""Create from dictionary received from queue."""
req_type = WorkerRequestType(data['type'])
request_data = {k: v for k, v in data.items() if k != 'type'}
"Create from dictionary received from queue."
```
``` python
@dataclass
class WorkerResponse:
"Base structure for worker responses."
type: WorkerResponseType
data: Dict[str, Any]
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for queue serialization."""
return {
'type': self.type.value,
"Convert to dictionary for queue serialization."
def from_dict(cls, data: Dict[str, Any]) -> 'WorkerResponse':
"""Create from dictionary received from queue."""
resp_type = WorkerResponseType(data['type'])
response_data = {k: v for k, v in data.items() if k != 'type'}
"Create from dictionary received from queue."
```
``` python
@dataclass
class WorkerStreamChunk:
"Structure for streaming job results."
job_id: str # Unique identifier for the job
chunk: str # Text chunk from streaming output
is_final: bool = False # Whether this is the final chunk
metadata: Optional[Dict[str, Any]] # Optional metadata
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for queue serialization."""
return {
'type': WorkerResponseType.STREAM_CHUNK.value,
"Convert to dictionary for queue serialization."
```
``` python
@dataclass
class WorkerResult:
"Structure for job execution results."
job_id: str # Unique identifier for the job
status: str # 'success' or 'error'
data: Optional[Dict[str, Any]] # Result data on success
error: Optional[str] # Error message on failure
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for queue serialization."""
result = {
'type': WorkerResponseType.RESULT.value,
"Convert to dictionary for queue serialization."
```
``` python
class PluginManagerAdapter(Protocol):
"""
Protocol that plugin managers must satisfy for worker integration.
Uses structural subtyping (duck typing) - plugin managers don't need to
explicitly inherit from this, they just need to implement these methods.
"""
def discover_plugins(self) -> list: # List of plugin metadata/data objects
"""Discover available plugins."""
...
def load_plugin(
self,
plugin_data:Any, # Plugin metadata/data from discovery
config:Dict[str, Any] # Plugin configuration dictionary
) -> None
"Discover available plugins."
def load_plugin(
self,
plugin_data:Any, # Plugin metadata/data from discovery
config:Dict[str, Any] # Plugin configuration dictionary
) -> None
"Load a plugin with configuration."
def execute_plugin(
self,
plugin_name:str, # Name of the plugin to execute
**params # Plugin-specific parameters
) -> Any: # Plugin execution result
"Execute a plugin with given parameters."
def execute_plugin_stream(
self,
plugin_name:str, # Name of the plugin to execute
**params # Plugin-specific parameters
) -> Iterator[str]: # String chunks from plugin execution
"Execute a plugin with streaming output."
def reload_plugin(
self,
plugin_name:str, # Name of the plugin to reload
config:Optional[Dict[str, Any]]=None # New configuration (None to unload)
) -> None
"Reload a plugin with new configuration."
def unload_plugin(
self,
plugin_name:str # Name of the plugin to unload
) -> None
"Unload a plugin to free resources."
def check_streaming_support(
self,
plugin_name:str # Name of the plugin to check
) -> bool: # True if plugin supports streaming
"Check if a plugin supports streaming execution."
```
### protocols (`protocols.ipynb`)
> Optional integration protocols for plugin registries, resource
> management, and event broadcasting.
#### Import
``` python
from cjm_fasthtml_workers.extensions.protocols import (
PluginRegistryProtocol,
ResourceManagerProtocol,
EventBroadcasterProtocol
)
```
#### Classes
``` python
class PluginRegistryProtocol(Protocol):
"Protocol for plugin registry integration."
def get_plugins_by_category(
self,
category:Any # Plugin category (can be enum, string, etc.)
) -> list: # List of plugin metadata objects
"Get all plugins in a specific category."
def get_plugin(
self,
plugin_id:str # Unique plugin identifier
) -> Any: # Plugin metadata object or None
"Get a specific plugin by ID."
def load_plugin_config(
self,
plugin_id:str # Unique plugin identifier
) -> Dict[str, Any]: # Plugin configuration dictionary
"Load configuration for a plugin."
```
``` python
class ResourceManagerProtocol(Protocol):
"Protocol for resource management integration."
def register_worker(
self,
pid:int, # Worker process ID
worker_type:str # Type of worker (e.g., 'transcription', 'llm')
) -> None
"Register a new worker process."
def unregister_worker(
self,
pid:int # Process ID of the worker to unregister
) -> None
"Unregister a worker process."
def update_worker_state(
self,
pid:int, # Worker process ID
status:Optional[str]=None, # Worker status: 'idle', 'running', etc.
job_id:Optional[str]=None, # Current job ID (None if idle)
plugin_name:Optional[str]=None, # Currently loaded plugin name
plugin_id:Optional[str]=None, # Currently loaded plugin ID
loaded_plugin_resource:Optional[str]=None, # Currently loaded plugin resource identifier
config:Optional[Dict[str, Any]]=None, # Current plugin configuration
) -> None
"Update worker state information."
def check_gpu_availability(self) -> Any: # Returns ResourceConflict object
"""Check GPU availability and identify conflicts.
Returns an object with:
- status: ResourceStatus enum (AVAILABLE, APP_BUSY, EXTERNAL_BUSY)
- app_pids: List of application PIDs using GPU
- external_pids: List of external PIDs using GPU
- app_processes: Detailed info about app processes
- external_processes: Detailed info about external processes
"""
...
def get_worker_by_pid(
self,
pid:int # Worker process ID
) -> Optional[Any]: # Returns WorkerState object or None
"Check GPU availability and identify conflicts.
Returns an object with:
- status: ResourceStatus enum (AVAILABLE, APP_BUSY, EXTERNAL_BUSY)
- app_pids: List of application PIDs using GPU
- external_pids: List of external PIDs using GPU
- app_processes: Detailed info about app processes
- external_processes: Detailed info about external processes"
def get_worker_by_pid(
self,
pid:int # Worker process ID
) -> Optional[Any]: # Returns WorkerState object or None
"Get worker state by PID.
Returns worker state object with attributes:
- pid: Worker process ID
- worker_type: Type of worker
- status: Worker status ('idle', 'running', 'busy')
- job_id: Current job ID (None if idle)
- plugin_name: Currently loaded plugin name
- plugin_id: Currently loaded plugin ID
- loaded_plugin_resource: Currently loaded resource identifier
- config: Current plugin configuration"
```
``` python
class EventBroadcasterProtocol(Protocol):
"Protocol for SSE event broadcasting."
async def broadcast(
self,
event_type:str, # Event type identifier
data:Dict[str, Any] # Event data payload
) -> None
"Broadcast an event to all connected clients."
```
### worker (`worker.ipynb`)
> Generic worker process for executing plugin-based jobs in isolated
> subprocesses.
#### Import
``` python
from cjm_fasthtml_workers.core.worker import (
base_worker_process
)
```
#### Functions
``` python
def base_worker_process(
request_queue:multiprocessing.Queue, # Queue for receiving job requests from parent
result_queue:multiprocessing.Queue, # Queue for sending job results back to parent
response_queue:multiprocessing.Queue, # Queue for sending command responses back to parent
plugin_manager_factory:Callable[[], PluginManagerAdapter], # Factory function that creates a plugin manager instance
result_adapter:Optional[Callable[[Any], Dict[str, Any]]]=None, # Optional function to adapt plugin results to dict format
supports_streaming:bool=False # Whether this worker supports streaming execution
)
"Generic long-lived worker process that handles job execution."
```
Raw data
{
"_id": null,
"home_page": "https://github.com/cj-mills/cjm-fasthtml-workers",
"name": "cjm-fasthtml-workers",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.11",
"maintainer_email": null,
"keywords": "nbdev jupyter notebook python",
"author": "Christian J. Mills",
"author_email": "9126128+cj-mills@users.noreply.github.com",
"download_url": "https://files.pythonhosted.org/packages/b9/bb/53fe7cb65a32d8646cfdaa6e91ddad71ad15604b7f244c5d2a8e3fabdbcd/cjm_fasthtml_workers-0.0.7.tar.gz",
"platform": null,
"description": "# cjm-fasthtml-workers\n\n\n<!-- WARNING: THIS FILE WAS AUTOGENERATED! DO NOT EDIT! -->\n\n## Install\n\n``` bash\npip install cjm_fasthtml_workers\n```\n\n## Project Structure\n\n nbs/\n \u251c\u2500\u2500 core/ (4)\n \u2502 \u251c\u2500\u2500 adapters.ipynb # Adapter utilities for making plugin managers compatible with the worker system.\n \u2502 \u251c\u2500\u2500 config.ipynb # Configuration for worker processes including restart policies, timeouts, and queue sizes.\n \u2502 \u251c\u2500\u2500 protocol.ipynb # Protocol definitions for worker communication and plugin manager integration.\n \u2502 \u2514\u2500\u2500 worker.ipynb # Generic worker process for executing plugin-based jobs in isolated subprocesses.\n \u251c\u2500\u2500 extensions/ (2)\n \u2502 \u251c\u2500\u2500 adapters.ipynb # Common adapters for integrating popular FastHTML libraries with the worker system.\n \u2502 \u2514\u2500\u2500 protocols.ipynb # Optional integration protocols for plugin registries, resource management, and event broadcasting.\n \u2514\u2500\u2500 managers/ (1)\n \u2514\u2500\u2500 base.ipynb # Abstract base class for managing background jobs with worker processes.\n\nTotal: 7 notebooks across 3 directories\n\n## Module Dependencies\n\n``` mermaid\ngraph LR\n core_adapters[core.adapters<br/>adapters]\n core_config[core.config<br/>config]\n core_protocol[core.protocol<br/>protocol]\n core_worker[core.worker<br/>worker]\n extensions_adapters[extensions.adapters<br/>adapters]\n extensions_protocols[extensions.protocols<br/>protocols]\n managers_base[managers.base<br/>base]\n\n core_adapters --> core_protocol\n core_worker --> core_protocol\n extensions_adapters --> extensions_protocols\n managers_base --> extensions_protocols\n managers_base --> core_protocol\n managers_base --> core_config\n```\n\n*6 cross-module dependencies detected*\n\n## CLI Reference\n\nNo CLI commands found in this project.\n\n## Module Overview\n\nDetailed documentation for each module in the project:\n\n### adapters (`adapters.ipynb`)\n\n> Adapter utilities for making plugin managers compatible with the\n> worker system.\n\n#### Import\n\n``` python\nfrom cjm_fasthtml_workers.core.adapters import (\n create_simple_adapter,\n default_result_adapter\n)\n```\n\n#### Functions\n\n``` python\ndef create_simple_adapter(\n plugin_manager:Any, # The plugin manager instance to adapt\n result_adapter:Optional[callable]=None # Optional function to convert plugin results to dict\n) -> PluginManagerAdapter: # Adapter that satisfies PluginManagerAdapter protocol\n \"Create a simple adapter for a plugin manager.\"\n```\n\n``` python\ndef default_result_adapter(\n result:Any # Plugin execution result\n) -> Dict[str, Any]: # Dictionary with text and metadata\n \"Default adapter for converting plugin results to dictionaries.\"\n```\n\n### adapters (`adapters.ipynb`)\n\n> Common adapters for integrating popular FastHTML libraries with the\n> worker system.\n\n#### Import\n\n``` python\nfrom cjm_fasthtml_workers.extensions.adapters import (\n UnifiedPluginRegistryAdapter,\n ResourceManagerAdapter,\n SSEBroadcasterAdapter,\n create_standard_adapters\n)\n```\n\n#### Functions\n\n```` python\ndef create_standard_adapters(\n plugin_registry=None, # UnifiedPluginRegistry instance (optional)\n resource_manager=None, # ResourceManager instance (optional)\n sse_manager=None # SSEBroadcastManager instance (optional)\n) -> tuple: # (plugin_adapter, resource_adapter, sse_adapter)\n \"\"\"\n Create standard adapters for common FastHTML libraries.\n \n This factory function creates adapters for:\n - cjm-fasthtml-plugins UnifiedPluginRegistry\n - cjm-fasthtml-resources ResourceManager\n - cjm-fasthtml-sse SSEBroadcastManager\n \n All parameters are optional. Pass None for services you don't use.\n \n Example:\n ```python\n from cjm_fasthtml_workers.extensions.adapters import create_standard_adapters\n from cjm_fasthtml_plugins.core.registry import UnifiedPluginRegistry\n from cjm_fasthtml_resources.core.manager import ResourceManager\n \n # Create services\n plugin_registry = UnifiedPluginRegistry()\n resource_manager = ResourceManager()\n \n # Create adapters\n plugin_adapter, resource_adapter, sse_adapter = create_standard_adapters(\n plugin_registry=plugin_registry,\n resource_manager=resource_manager\n )\n \n # Pass to job manager\n manager = MyJobManager(\n plugin_registry=plugin_adapter,\n resource_manager=resource_adapter\n )\n ```\n \n Returns:\n Tuple of (plugin_adapter, resource_adapter, sse_adapter)\n Each adapter will be None if the corresponding service was not provided.\n \"\"\"\n````\n\n#### Classes\n\n``` python\nclass UnifiedPluginRegistryAdapter:\n def __init__(self, registry):\n \"\"\"\n Initialize adapter with UnifiedPluginRegistry instance.\n\n Args:\n registry: UnifiedPluginRegistry from cjm-fasthtml-plugins\n \"\"\"\n self._registry = registry\n\n def get_plugins_by_category(self, category) -> list\n \"Adapter for cjm-fasthtml-plugins UnifiedPluginRegistry.\"\n \n def __init__(self, registry):\n \"\"\"\n Initialize adapter with UnifiedPluginRegistry instance.\n \n Args:\n registry: UnifiedPluginRegistry from cjm-fasthtml-plugins\n \"\"\"\n self._registry = registry\n \n def get_plugins_by_category(self, category) -> list\n \"Initialize adapter with UnifiedPluginRegistry instance.\n\nArgs:\n registry: UnifiedPluginRegistry from cjm-fasthtml-plugins\"\n \n def get_plugins_by_category(self, category) -> list:\n \"\"\"Get all plugins in a specific category.\"\"\"\n return self._registry.get_plugins_by_category(category)\n \n def get_plugin(self, plugin_id: str)\n \"Get all plugins in a specific category.\"\n \n def get_plugin(self, plugin_id: str):\n \"\"\"Get a specific plugin by ID.\"\"\"\n return self._registry.get_plugin(plugin_id)\n \n def load_plugin_config(self, plugin_id: str) -> Dict[str, Any]\n \"Get a specific plugin by ID.\"\n \n def load_plugin_config(self, plugin_id: str) -> Dict[str, Any]\n \"Load configuration for a plugin.\"\n```\n\n``` python\nclass ResourceManagerAdapter:\n def __init__(self, resource_manager):\n \"\"\"\n Initialize adapter with ResourceManager instance.\n\n Args:\n resource_manager: ResourceManager from cjm-fasthtml-resources\n \"\"\"\n self._resource_manager = resource_manager\n\n def register_worker(self, pid: int, worker_type: str) -> None\n \"Adapter for cjm-fasthtml-resources ResourceManager.\"\n \n def __init__(self, resource_manager):\n \"\"\"\n Initialize adapter with ResourceManager instance.\n \n Args:\n resource_manager: ResourceManager from cjm-fasthtml-resources\n \"\"\"\n self._resource_manager = resource_manager\n \n def register_worker(self, pid: int, worker_type: str) -> None\n \"Initialize adapter with ResourceManager instance.\n\nArgs:\n resource_manager: ResourceManager from cjm-fasthtml-resources\"\n \n def register_worker(self, pid: int, worker_type: str) -> None:\n \"\"\"Register a new worker process.\"\"\"\n self._resource_manager.register_worker(pid, worker_type)\n \n def unregister_worker(self, pid: int) -> None\n \"Register a new worker process.\"\n \n def unregister_worker(self, pid: int) -> None:\n \"\"\"Unregister a worker process.\"\"\"\n self._resource_manager.unregister_worker(pid)\n \n def update_worker_state(\n self,\n pid: int,\n status: Optional[str] = None,\n job_id: Optional[str] = None,\n plugin_name: Optional[str] = None,\n plugin_id: Optional[str] = None,\n loaded_plugin_resource: Optional[str] = None,\n config: Optional[Dict[str, Any]] = None,\n ) -> None\n \"Unregister a worker process.\"\n \n def update_worker_state(\n self,\n pid: int,\n status: Optional[str] = None,\n job_id: Optional[str] = None,\n plugin_name: Optional[str] = None,\n plugin_id: Optional[str] = None,\n loaded_plugin_resource: Optional[str] = None,\n config: Optional[Dict[str, Any]] = None,\n ) -> None\n \"Update worker state information.\"\n \n def check_gpu_availability(self):\n \"\"\"Check GPU availability and identify conflicts.\"\"\"\n return self._resource_manager.check_gpu_availability()\n \n def get_worker_by_pid(self, pid: int)\n \"Check GPU availability and identify conflicts.\"\n \n def get_worker_by_pid(self, pid: int)\n \"Get worker state by PID.\"\n```\n\n``` python\nclass SSEBroadcasterAdapter:\n def __init__(self, sse_manager):\n \"\"\"\n Initialize adapter with SSE broadcast manager.\n\n Args:\n sse_manager: SSEBroadcastManager from cjm-fasthtml-sse\n \"\"\"\n self._sse_manager = sse_manager\n\n async def broadcast(self, event_type: str, data: Dict[str, Any]) -> None\n \"Adapter for cjm-fasthtml-sse SSEBroadcastManager.\"\n \n def __init__(self, sse_manager):\n \"\"\"\n Initialize adapter with SSE broadcast manager.\n \n Args:\n sse_manager: SSEBroadcastManager from cjm-fasthtml-sse\n \"\"\"\n self._sse_manager = sse_manager\n \n async def broadcast(self, event_type: str, data: Dict[str, Any]) -> None\n \"Initialize adapter with SSE broadcast manager.\n\nArgs:\n sse_manager: SSEBroadcastManager from cjm-fasthtml-sse\"\n \n async def broadcast(self, event_type: str, data: Dict[str, Any]) -> None:\n \"\"\"\n Broadcast an event to all connected SSE clients.\n \n Args:\n event_type: Event type identifier (e.g., 'transcription:started')\n data: Event data payload\n \"\"\"\n # Broadcast using the SSE manager\n # Note: SSEBroadcastManager.broadcast expects event_type and data\n \"Broadcast an event to all connected SSE clients.\n\nArgs:\n event_type: Event type identifier (e.g., 'transcription:started')\n data: Event data payload\"\n```\n\n### base (`base.ipynb`)\n\n> Abstract base class for managing background jobs with worker\n> processes.\n\n#### Import\n\n``` python\nfrom cjm_fasthtml_workers.managers.base import (\n JobType,\n BaseJob,\n BaseJobManager\n)\n```\n\n#### Functions\n\n``` python\n@patch\n@abstractmethod\ndef create_job(\n self:BaseJobManager,\n plugin_id:str, # Plugin unique identifier\n **kwargs # Domain-specific job parameters\n) -> JobType: # Created job instance\n \"Factory method for creating domain-specific jobs.\"\n```\n\n``` python\n@patch\n@abstractmethod\ndef get_worker_entry_point(\n self:BaseJobManager\n) -> Callable: # Worker process entry point function\n \"Return the worker process function for this manager.\"\n```\n\n``` python\n@patch\n@abstractmethod\ndef prepare_execute_request(\n self:BaseJobManager,\n job:JobType # The job to prepare for execution\n) -> Dict[str, Any]: # Dictionary of parameters for the worker execute request\n \"Convert job to worker execute request parameters.\"\n```\n\n``` python\n@patch\n@abstractmethod\ndef extract_job_result(\n self:BaseJobManager,\n job:JobType, # The job that was executed\n result_data:Dict[str, Any] # Raw result data from worker\n) -> Dict[str, Any]: # Formatted result for storage\n \"Extract and format job result from worker response.\"\n```\n\n``` python\n@patch\ndef _extract_plugin_resource_identifier(\n self:BaseJobManager,\n config:Dict[str, Any] # Plugin configuration dictionary\n) -> str: # Plugin resource identifier string\n \"Extract plugin resource identifier from plugin configuration.\"\n```\n\n``` python\n@patch\nasync def _validate_resources(\n self:BaseJobManager,\n plugin_id:str, # Plugin unique identifier\n plugin_config:Dict[str, Any] # Plugin configuration\n) -> Optional[str]: # Error message if validation fails, None if successful\n \"Validate resources before starting a job.\"\n```\n\n``` python\n@patch\ndef _on_job_completed(\n self:BaseJobManager,\n job_id:str # ID of the completed job\n) -> None\n \"Hook called when a job completes successfully.\"\n```\n\n``` python\n@patch\ndef _start_worker(self:BaseJobManager):\n \"\"\"Start the worker process and result monitor.\"\"\"\n if self.worker_process and self.worker_process.is_alive()\n \"Start the worker process and result monitor.\"\n```\n\n``` python\n@patch\ndef _init_worker(self:BaseJobManager):\n \"\"\"Send initialization message to worker with plugin configurations.\"\"\"\n # Only send plugin configs if plugin registry is available\n plugin_configs = {}\n \n if self.plugin_registry\n \"Send initialization message to worker with plugin configurations.\"\n```\n\n``` python\n@patch\ndef _restart_worker(self:BaseJobManager):\n \"\"\"Restart the worker process after an error or cancellation.\"\"\"\n # Track restart\n self.restart_count += 1\n self.last_restart_time = time.time()\n\n # Unregister old worker from resource manager if available\n if self.resource_manager and self.worker_process\n \"Restart the worker process after an error or cancellation.\"\n```\n\n``` python\n@patch\ndef _monitor_results(self:BaseJobManager):\n \"\"\"Monitor the result queue in a background thread.\"\"\"\n while self.monitor_running\n \"Monitor the result queue in a background thread.\"\n```\n\n``` python\n@patch\ndef _handle_job_result(\n self:BaseJobManager, \n result:Dict[str, Any] # Result data from worker\n)\n \"Handle a job result from the worker.\"\n```\n\n``` python\n@patch\ndef _handle_stream_chunk(\n self:BaseJobManager, \n chunk_data:Dict[str, Any] # Chunk data from worker\n)\n \"Handle a streaming chunk from the worker.\"\n```\n\n``` python\n@patch\ndef _handle_worker_error(self:BaseJobManager):\n \"\"\"Handle worker fatal error based on restart policy.\"\"\"\n policy = self.worker_config.restart_policy\n\n if policy == RestartPolicy.NEVER\n \"Handle worker fatal error based on restart policy.\"\n```\n\n``` python\n@patch\ndef get_plugin_name(\n self:BaseJobManager,\n plugin_id:str # Plugin unique identifier\n) -> Optional[str]: # Plugin name or None\n \"Get plugin name from plugin ID (requires plugin registry).\"\n```\n\n``` python\n@patch\nasync def unload_plugin(\n self:BaseJobManager,\n plugin_name:str # Name of the plugin to unload\n) -> bool: # True if successful, False otherwise\n \"Unload a plugin from the worker to free resources.\"\n```\n\n``` python\n@patch\nasync def reload_plugin(\n self:BaseJobManager,\n plugin_name:str, # Name of the plugin to reload\n config:Dict[str, Any] # New configuration\n) -> bool: # True if successful, False otherwise\n \"Reload a plugin with new configuration.\"\n```\n\n``` python\n@patch\nasync def start_job(\n self:BaseJobManager,\n plugin_id:str, # Plugin unique identifier\n **kwargs # Domain-specific job parameters\n) -> JobType: # Created and started job\n \"Start a new job.\"\n```\n\n``` python\n@patch\nasync def cancel_job(\n self:BaseJobManager,\n job_id:str # ID of the job to cancel\n) -> bool: # True if cancellation successful\n \"Cancel a running job by terminating the worker process.\"\n```\n\n``` python\n@patch\ndef get_job(\n self:BaseJobManager,\n job_id:str # Unique job identifier\n) -> Optional[JobType]: # Job object or None\n \"Get a job by ID.\"\n```\n\n``` python\n@patch\ndef get_all_jobs(\n self:BaseJobManager\n) -> List[JobType]: # List of all jobs\n \"Get all jobs.\"\n```\n\n``` python\n@patch\ndef get_job_result(\n self:BaseJobManager,\n job_id:str # Unique job identifier\n) -> Optional[Dict[str, Any]]: # Job result or None\n \"Get job result.\"\n```\n\n``` python\n@patch\ndef clear_completed_jobs(\n self:BaseJobManager\n) -> int: # Number of jobs cleared\n \"Clear completed, failed, and cancelled jobs.\"\n```\n\n``` python\n@patch\nasync def broadcast_event(\n self:BaseJobManager,\n event_type:str, # Event type identifier\n data:Dict[str, Any] # Event data payload\n)\n \"Broadcast an event to all connected SSE clients (requires event broadcaster).\"\n```\n\n``` python\n@patch\ndef check_streaming_support(\n self:BaseJobManager,\n plugin_id:str # Plugin unique identifier\n) -> bool: # True if streaming supported\n \"Check if a plugin supports streaming.\"\n```\n\n``` python\n@patch\ndef shutdown(self:BaseJobManager):\n \"\"\"Shutdown the manager and cleanup resources.\"\"\"\n # Stop result monitor\n self.monitor_running = False\n if self.result_monitor_thread\n \"Shutdown the manager and cleanup resources.\"\n```\n\n#### Classes\n\n``` python\n@dataclass\nclass BaseJob:\n \"Base class for all job types.\"\n \n id: str # Unique job identifier\n plugin_id: str # Plugin identifier for this job\n status: str = 'pending' # Job status: pending, running, completed, failed, cancelled\n created_at: str = field(...) # ISO format timestamp\n started_at: Optional[str] # When job started executing\n completed_at: Optional[str] # When job finished\n result: Optional[Dict[str, Any]] # Job result data\n error: Optional[str] # Error message if failed\n metadata: Dict[str, Any] = field(...) # Additional job metadata\n worker_pid: Optional[int] # Process ID of worker handling this job\n```\n\n``` python\nclass BaseJobManager:\n def __init__(\n self,\n worker_type:str, # Type identifier (e.g., \"transcription\", \"llm\", \"image-gen\")\n category:Any, # Plugin category this manager handles\n supports_streaming:bool=False, # Whether this manager supports streaming jobs\n worker_config:Optional[WorkerConfig]=None, # Worker configuration (uses defaults if None)\n plugin_registry:Optional[PluginRegistryProtocol]=None, # Optional plugin registry integration\n resource_manager:Optional[ResourceManagerProtocol]=None, # Optional resource manager integration\n event_broadcaster:Optional[EventBroadcasterProtocol]=None, # Optional SSE event broadcaster\n )\n \"\"\"\n Abstract base class for managing jobs using worker processes.\n \n Features:\n - Jobs processed sequentially in subprocess\n - Plugin resources loaded once and reused\n - True cancellation via subprocess termination\n - Automatic worker restart based on policy\n - Isolated worker process avoids duplicating web app initialization\n - Optional streaming support for incremental results\n - Optional dependency injection for plugin registry, resource manager, and event broadcaster\n \"\"\"\n \n def __init__(\n self,\n worker_type:str, # Type identifier (e.g., \"transcription\", \"llm\", \"image-gen\")\n category:Any, # Plugin category this manager handles\n supports_streaming:bool=False, # Whether this manager supports streaming jobs\n worker_config:Optional[WorkerConfig]=None, # Worker configuration (uses defaults if None)\n plugin_registry:Optional[PluginRegistryProtocol]=None, # Optional plugin registry integration\n resource_manager:Optional[ResourceManagerProtocol]=None, # Optional resource manager integration\n event_broadcaster:Optional[EventBroadcasterProtocol]=None, # Optional SSE event broadcaster\n )\n \"Initialize the job manager.\"\n```\n\n### config (`config.ipynb`)\n\n> Configuration for worker processes including restart policies,\n> timeouts, and queue sizes.\n\n#### Import\n\n``` python\nfrom cjm_fasthtml_workers.core.config import (\n RestartPolicy,\n WorkerConfig\n)\n```\n\n#### Classes\n\n``` python\nclass RestartPolicy(Enum):\n \"Policy for restarting worker processes after failures.\"\n```\n\n``` python\n@dataclass\nclass WorkerConfig:\n \"Configuration for worker process behavior.\"\n \n request_queue_size: int = 0 # 0 = unlimited\n result_queue_size: int = 100 # Larger for streaming results\n response_queue_size: int = 10 # For synchronous command responses\n restart_policy: RestartPolicy = RestartPolicy.ON_CANCELLATION\n max_restart_attempts: int = 3\n restart_backoff_base_seconds: float = 1.0 # Base delay for exponential backoff\n restart_backoff_max_seconds: float = 60.0 # Max delay for backoff\n max_workers: int = 1 # Currently only 1 is supported\n worker_start_timeout_seconds: float = 30.0\n reload_timeout_seconds: float = 30.0\n unload_timeout_seconds: float = 10.0\n shutdown_timeout_seconds: float = 5.0\n result_monitor_poll_interval_seconds: float = 0.5\n \n```\n\n### protocol (`protocol.ipynb`)\n\n> Protocol definitions for worker communication and plugin manager\n> integration.\n\n#### Import\n\n``` python\nfrom cjm_fasthtml_workers.core.protocol import (\n WorkerRequestType,\n WorkerResponseType,\n WorkerRequest,\n WorkerResponse,\n WorkerStreamChunk,\n WorkerResult,\n PluginManagerAdapter\n)\n```\n\n#### Classes\n\n``` python\nclass WorkerRequestType(Enum):\n \"Types of requests sent to worker process.\"\n```\n\n``` python\nclass WorkerResponseType(Enum):\n \"Types of responses from worker process.\"\n```\n\n``` python\n@dataclass\nclass WorkerRequest:\n \"Base structure for worker requests.\"\n \n type: WorkerRequestType\n data: Dict[str, Any]\n \n def to_dict(self) -> Dict[str, Any]:\n \"\"\"Convert to dictionary for queue serialization.\"\"\"\n return {\n 'type': self.type.value,\n \"Convert to dictionary for queue serialization.\"\n \n def from_dict(cls, data: Dict[str, Any]) -> 'WorkerRequest':\n \"\"\"Create from dictionary received from queue.\"\"\"\n req_type = WorkerRequestType(data['type'])\n request_data = {k: v for k, v in data.items() if k != 'type'}\n \"Create from dictionary received from queue.\"\n```\n\n``` python\n@dataclass\nclass WorkerResponse:\n \"Base structure for worker responses.\"\n \n type: WorkerResponseType\n data: Dict[str, Any]\n \n def to_dict(self) -> Dict[str, Any]:\n \"\"\"Convert to dictionary for queue serialization.\"\"\"\n return {\n 'type': self.type.value,\n \"Convert to dictionary for queue serialization.\"\n \n def from_dict(cls, data: Dict[str, Any]) -> 'WorkerResponse':\n \"\"\"Create from dictionary received from queue.\"\"\"\n resp_type = WorkerResponseType(data['type'])\n response_data = {k: v for k, v in data.items() if k != 'type'}\n \"Create from dictionary received from queue.\"\n```\n\n``` python\n@dataclass\nclass WorkerStreamChunk:\n \"Structure for streaming job results.\"\n \n job_id: str # Unique identifier for the job\n chunk: str # Text chunk from streaming output\n is_final: bool = False # Whether this is the final chunk\n metadata: Optional[Dict[str, Any]] # Optional metadata\n \n def to_dict(self) -> Dict[str, Any]:\n \"\"\"Convert to dictionary for queue serialization.\"\"\"\n return {\n 'type': WorkerResponseType.STREAM_CHUNK.value,\n \"Convert to dictionary for queue serialization.\"\n```\n\n``` python\n@dataclass\nclass WorkerResult:\n \"Structure for job execution results.\"\n \n job_id: str # Unique identifier for the job\n status: str # 'success' or 'error'\n data: Optional[Dict[str, Any]] # Result data on success\n error: Optional[str] # Error message on failure\n \n def to_dict(self) -> Dict[str, Any]:\n \"\"\"Convert to dictionary for queue serialization.\"\"\"\n result = {\n 'type': WorkerResponseType.RESULT.value,\n \"Convert to dictionary for queue serialization.\"\n```\n\n``` python\nclass PluginManagerAdapter(Protocol):\n \"\"\"\n Protocol that plugin managers must satisfy for worker integration.\n \n Uses structural subtyping (duck typing) - plugin managers don't need to\n explicitly inherit from this, they just need to implement these methods.\n \"\"\"\n \n def discover_plugins(self) -> list: # List of plugin metadata/data objects\n \"\"\"Discover available plugins.\"\"\"\n ...\n \n def load_plugin(\n self, \n plugin_data:Any, # Plugin metadata/data from discovery\n config:Dict[str, Any] # Plugin configuration dictionary\n ) -> None\n \"Discover available plugins.\"\n \n def load_plugin(\n self, \n plugin_data:Any, # Plugin metadata/data from discovery\n config:Dict[str, Any] # Plugin configuration dictionary\n ) -> None\n \"Load a plugin with configuration.\"\n \n def execute_plugin(\n self, \n plugin_name:str, # Name of the plugin to execute\n **params # Plugin-specific parameters\n ) -> Any: # Plugin execution result\n \"Execute a plugin with given parameters.\"\n \n def execute_plugin_stream(\n self, \n plugin_name:str, # Name of the plugin to execute\n **params # Plugin-specific parameters\n ) -> Iterator[str]: # String chunks from plugin execution\n \"Execute a plugin with streaming output.\"\n \n def reload_plugin(\n self, \n plugin_name:str, # Name of the plugin to reload\n config:Optional[Dict[str, Any]]=None # New configuration (None to unload)\n ) -> None\n \"Reload a plugin with new configuration.\"\n \n def unload_plugin(\n self, \n plugin_name:str # Name of the plugin to unload\n ) -> None\n \"Unload a plugin to free resources.\"\n \n def check_streaming_support(\n self, \n plugin_name:str # Name of the plugin to check\n ) -> bool: # True if plugin supports streaming\n \"Check if a plugin supports streaming execution.\"\n```\n\n### protocols (`protocols.ipynb`)\n\n> Optional integration protocols for plugin registries, resource\n> management, and event broadcasting.\n\n#### Import\n\n``` python\nfrom cjm_fasthtml_workers.extensions.protocols import (\n PluginRegistryProtocol,\n ResourceManagerProtocol,\n EventBroadcasterProtocol\n)\n```\n\n#### Classes\n\n``` python\nclass PluginRegistryProtocol(Protocol):\n \"Protocol for plugin registry integration.\"\n \n def get_plugins_by_category(\n self, \n category:Any # Plugin category (can be enum, string, etc.)\n ) -> list: # List of plugin metadata objects\n \"Get all plugins in a specific category.\"\n \n def get_plugin(\n self, \n plugin_id:str # Unique plugin identifier\n ) -> Any: # Plugin metadata object or None\n \"Get a specific plugin by ID.\"\n \n def load_plugin_config(\n self, \n plugin_id:str # Unique plugin identifier\n ) -> Dict[str, Any]: # Plugin configuration dictionary\n \"Load configuration for a plugin.\"\n```\n\n``` python\nclass ResourceManagerProtocol(Protocol):\n \"Protocol for resource management integration.\"\n \n def register_worker(\n self,\n pid:int, # Worker process ID\n worker_type:str # Type of worker (e.g., 'transcription', 'llm')\n ) -> None\n \"Register a new worker process.\"\n \n def unregister_worker(\n self, \n pid:int # Process ID of the worker to unregister\n ) -> None\n \"Unregister a worker process.\"\n \n def update_worker_state(\n self,\n pid:int, # Worker process ID\n status:Optional[str]=None, # Worker status: 'idle', 'running', etc.\n job_id:Optional[str]=None, # Current job ID (None if idle)\n plugin_name:Optional[str]=None, # Currently loaded plugin name\n plugin_id:Optional[str]=None, # Currently loaded plugin ID\n loaded_plugin_resource:Optional[str]=None, # Currently loaded plugin resource identifier\n config:Optional[Dict[str, Any]]=None, # Current plugin configuration\n ) -> None\n \"Update worker state information.\"\n \n def check_gpu_availability(self) -> Any: # Returns ResourceConflict object\n \"\"\"Check GPU availability and identify conflicts.\n \n Returns an object with:\n - status: ResourceStatus enum (AVAILABLE, APP_BUSY, EXTERNAL_BUSY)\n - app_pids: List of application PIDs using GPU\n - external_pids: List of external PIDs using GPU\n - app_processes: Detailed info about app processes\n - external_processes: Detailed info about external processes\n \"\"\"\n ...\n \n def get_worker_by_pid(\n self, \n pid:int # Worker process ID\n ) -> Optional[Any]: # Returns WorkerState object or None\n \"Check GPU availability and identify conflicts.\n\nReturns an object with:\n- status: ResourceStatus enum (AVAILABLE, APP_BUSY, EXTERNAL_BUSY)\n- app_pids: List of application PIDs using GPU\n- external_pids: List of external PIDs using GPU\n- app_processes: Detailed info about app processes\n- external_processes: Detailed info about external processes\"\n \n def get_worker_by_pid(\n self, \n pid:int # Worker process ID\n ) -> Optional[Any]: # Returns WorkerState object or None\n \"Get worker state by PID.\n\nReturns worker state object with attributes:\n- pid: Worker process ID\n- worker_type: Type of worker\n- status: Worker status ('idle', 'running', 'busy')\n- job_id: Current job ID (None if idle)\n- plugin_name: Currently loaded plugin name\n- plugin_id: Currently loaded plugin ID\n- loaded_plugin_resource: Currently loaded resource identifier\n- config: Current plugin configuration\"\n```\n\n``` python\nclass EventBroadcasterProtocol(Protocol):\n \"Protocol for SSE event broadcasting.\"\n \n async def broadcast(\n self,\n event_type:str, # Event type identifier\n data:Dict[str, Any] # Event data payload\n ) -> None\n \"Broadcast an event to all connected clients.\"\n```\n\n### worker (`worker.ipynb`)\n\n> Generic worker process for executing plugin-based jobs in isolated\n> subprocesses.\n\n#### Import\n\n``` python\nfrom cjm_fasthtml_workers.core.worker import (\n base_worker_process\n)\n```\n\n#### Functions\n\n``` python\ndef base_worker_process(\n request_queue:multiprocessing.Queue, # Queue for receiving job requests from parent\n result_queue:multiprocessing.Queue, # Queue for sending job results back to parent\n response_queue:multiprocessing.Queue, # Queue for sending command responses back to parent\n plugin_manager_factory:Callable[[], PluginManagerAdapter], # Factory function that creates a plugin manager instance\n result_adapter:Optional[Callable[[Any], Dict[str, Any]]]=None, # Optional function to adapt plugin results to dict format\n supports_streaming:bool=False # Whether this worker supports streaming execution\n)\n \"Generic long-lived worker process that handles job execution.\"\n```\n",
"bugtrack_url": null,
"license": "Apache Software License 2.0",
"summary": "Background worker system for FastHTML with multiprocess job execution, cancellation support, and streaming capabilities.",
"version": "0.0.7",
"project_urls": {
"Homepage": "https://github.com/cj-mills/cjm-fasthtml-workers"
},
"split_keywords": [
"nbdev",
"jupyter",
"notebook",
"python"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "4d4e43a948ba10cba6c1d58567b7da72516e06835d6eb331c5e31a837de3fab9",
"md5": "555c82c97b2ec448a65fc7d14a0817c0",
"sha256": "5ddd26d6ef9859870688a84c6ab567076e9c3414f97a765db349033688dec4db"
},
"downloads": -1,
"filename": "cjm_fasthtml_workers-0.0.7-py3-none-any.whl",
"has_sig": false,
"md5_digest": "555c82c97b2ec448a65fc7d14a0817c0",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.11",
"size": 31841,
"upload_time": "2025-10-27T22:28:52",
"upload_time_iso_8601": "2025-10-27T22:28:52.304497Z",
"url": "https://files.pythonhosted.org/packages/4d/4e/43a948ba10cba6c1d58567b7da72516e06835d6eb331c5e31a837de3fab9/cjm_fasthtml_workers-0.0.7-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "b9bb53fe7cb65a32d8646cfdaa6e91ddad71ad15604b7f244c5d2a8e3fabdbcd",
"md5": "8f553564dc8fef57f5b2570c8cc81491",
"sha256": "14596717d7e743416b757d478da51d404a26b05a3fa3ef75127298ae077864eb"
},
"downloads": -1,
"filename": "cjm_fasthtml_workers-0.0.7.tar.gz",
"has_sig": false,
"md5_digest": "8f553564dc8fef57f5b2570c8cc81491",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.11",
"size": 39220,
"upload_time": "2025-10-27T22:28:53",
"upload_time_iso_8601": "2025-10-27T22:28:53.642262Z",
"url": "https://files.pythonhosted.org/packages/b9/bb/53fe7cb65a32d8646cfdaa6e91ddad71ad15604b7f244c5d2a8e3fabdbcd/cjm_fasthtml_workers-0.0.7.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-10-27 22:28:53",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "cj-mills",
"github_project": "cjm-fasthtml-workers",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "cjm-fasthtml-workers"
}