cjm-fasthtml-workers


Namecjm-fasthtml-workers JSON
Version 0.0.7 PyPI version JSON
download
home_pagehttps://github.com/cj-mills/cjm-fasthtml-workers
SummaryBackground worker system for FastHTML with multiprocess job execution, cancellation support, and streaming capabilities.
upload_time2025-10-27 22:28:53
maintainerNone
docs_urlNone
authorChristian J. Mills
requires_python>=3.11
licenseApache Software License 2.0
keywords nbdev jupyter notebook python
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # cjm-fasthtml-workers


<!-- WARNING: THIS FILE WAS AUTOGENERATED! DO NOT EDIT! -->

## Install

``` bash
pip install cjm_fasthtml_workers
```

## Project Structure

    nbs/
    ├── core/ (4)
    │   ├── adapters.ipynb  # Adapter utilities for making plugin managers compatible with the worker system.
    │   ├── config.ipynb    # Configuration for worker processes including restart policies, timeouts, and queue sizes.
    │   ├── protocol.ipynb  # Protocol definitions for worker communication and plugin manager integration.
    │   └── worker.ipynb    # Generic worker process for executing plugin-based jobs in isolated subprocesses.
    ├── extensions/ (2)
    │   ├── adapters.ipynb   # Common adapters for integrating popular FastHTML libraries with the worker system.
    │   └── protocols.ipynb  # Optional integration protocols for plugin registries, resource management, and event broadcasting.
    └── managers/ (1)
        └── base.ipynb  # Abstract base class for managing background jobs with worker processes.

Total: 7 notebooks across 3 directories

## Module Dependencies

``` mermaid
graph LR
    core_adapters[core.adapters<br/>adapters]
    core_config[core.config<br/>config]
    core_protocol[core.protocol<br/>protocol]
    core_worker[core.worker<br/>worker]
    extensions_adapters[extensions.adapters<br/>adapters]
    extensions_protocols[extensions.protocols<br/>protocols]
    managers_base[managers.base<br/>base]

    core_adapters --> core_protocol
    core_worker --> core_protocol
    extensions_adapters --> extensions_protocols
    managers_base --> extensions_protocols
    managers_base --> core_protocol
    managers_base --> core_config
```

*6 cross-module dependencies detected*

## CLI Reference

No CLI commands found in this project.

## Module Overview

Detailed documentation for each module in the project:

### adapters (`adapters.ipynb`)

> Adapter utilities for making plugin managers compatible with the
> worker system.

#### Import

``` python
from cjm_fasthtml_workers.core.adapters import (
    create_simple_adapter,
    default_result_adapter
)
```

#### Functions

``` python
def create_simple_adapter(
    plugin_manager:Any,  # The plugin manager instance to adapt
    result_adapter:Optional[callable]=None  # Optional function to convert plugin results to dict
) -> PluginManagerAdapter:  # Adapter that satisfies PluginManagerAdapter protocol
    "Create a simple adapter for a plugin manager."
```

``` python
def default_result_adapter(
    result:Any  # Plugin execution result
) -> Dict[str, Any]:  # Dictionary with text and metadata
    "Default adapter for converting plugin results to dictionaries."
```

### adapters (`adapters.ipynb`)

> Common adapters for integrating popular FastHTML libraries with the
> worker system.

#### Import

``` python
from cjm_fasthtml_workers.extensions.adapters import (
    UnifiedPluginRegistryAdapter,
    ResourceManagerAdapter,
    SSEBroadcasterAdapter,
    create_standard_adapters
)
```

#### Functions

```` python
def create_standard_adapters(
    plugin_registry=None,  # UnifiedPluginRegistry instance (optional)
    resource_manager=None,  # ResourceManager instance (optional)
    sse_manager=None  # SSEBroadcastManager instance (optional)
) -> tuple:  # (plugin_adapter, resource_adapter, sse_adapter)
    """
    Create standard adapters for common FastHTML libraries.
    
    This factory function creates adapters for:
    - cjm-fasthtml-plugins UnifiedPluginRegistry
    - cjm-fasthtml-resources ResourceManager
    - cjm-fasthtml-sse SSEBroadcastManager
    
    All parameters are optional. Pass None for services you don't use.
    
    Example:
        ```python
        from cjm_fasthtml_workers.extensions.adapters import create_standard_adapters
        from cjm_fasthtml_plugins.core.registry import UnifiedPluginRegistry
        from cjm_fasthtml_resources.core.manager import ResourceManager
        
        # Create services
        plugin_registry = UnifiedPluginRegistry()
        resource_manager = ResourceManager()
        
        # Create adapters
        plugin_adapter, resource_adapter, sse_adapter = create_standard_adapters(
            plugin_registry=plugin_registry,
            resource_manager=resource_manager
        )
        
        # Pass to job manager
        manager = MyJobManager(
            plugin_registry=plugin_adapter,
            resource_manager=resource_adapter
        )
        ```
    
    Returns:
        Tuple of (plugin_adapter, resource_adapter, sse_adapter)
        Each adapter will be None if the corresponding service was not provided.
    """
````

#### Classes

``` python
class UnifiedPluginRegistryAdapter:
    def __init__(self, registry):
        """
        Initialize adapter with UnifiedPluginRegistry instance.

        Args:
            registry: UnifiedPluginRegistry from cjm-fasthtml-plugins
        """
        self._registry = registry

    def get_plugins_by_category(self, category) -> list
    "Adapter for cjm-fasthtml-plugins UnifiedPluginRegistry."
    
    def __init__(self, registry):
            """
            Initialize adapter with UnifiedPluginRegistry instance.
    
            Args:
                registry: UnifiedPluginRegistry from cjm-fasthtml-plugins
            """
            self._registry = registry
    
        def get_plugins_by_category(self, category) -> list
        "Initialize adapter with UnifiedPluginRegistry instance.

Args:
    registry: UnifiedPluginRegistry from cjm-fasthtml-plugins"
    
    def get_plugins_by_category(self, category) -> list:
            """Get all plugins in a specific category."""
            return self._registry.get_plugins_by_category(category)
    
        def get_plugin(self, plugin_id: str)
        "Get all plugins in a specific category."
    
    def get_plugin(self, plugin_id: str):
            """Get a specific plugin by ID."""
            return self._registry.get_plugin(plugin_id)
    
        def load_plugin_config(self, plugin_id: str) -> Dict[str, Any]
        "Get a specific plugin by ID."
    
    def load_plugin_config(self, plugin_id: str) -> Dict[str, Any]
        "Load configuration for a plugin."
```

``` python
class ResourceManagerAdapter:
    def __init__(self, resource_manager):
        """
        Initialize adapter with ResourceManager instance.

        Args:
            resource_manager: ResourceManager from cjm-fasthtml-resources
        """
        self._resource_manager = resource_manager

    def register_worker(self, pid: int, worker_type: str) -> None
    "Adapter for cjm-fasthtml-resources ResourceManager."
    
    def __init__(self, resource_manager):
            """
            Initialize adapter with ResourceManager instance.
    
            Args:
                resource_manager: ResourceManager from cjm-fasthtml-resources
            """
            self._resource_manager = resource_manager
    
        def register_worker(self, pid: int, worker_type: str) -> None
        "Initialize adapter with ResourceManager instance.

Args:
    resource_manager: ResourceManager from cjm-fasthtml-resources"
    
    def register_worker(self, pid: int, worker_type: str) -> None:
            """Register a new worker process."""
            self._resource_manager.register_worker(pid, worker_type)
    
        def unregister_worker(self, pid: int) -> None
        "Register a new worker process."
    
    def unregister_worker(self, pid: int) -> None:
            """Unregister a worker process."""
            self._resource_manager.unregister_worker(pid)
    
        def update_worker_state(
            self,
            pid: int,
            status: Optional[str] = None,
            job_id: Optional[str] = None,
            plugin_name: Optional[str] = None,
            plugin_id: Optional[str] = None,
            loaded_plugin_resource: Optional[str] = None,
            config: Optional[Dict[str, Any]] = None,
        ) -> None
        "Unregister a worker process."
    
    def update_worker_state(
            self,
            pid: int,
            status: Optional[str] = None,
            job_id: Optional[str] = None,
            plugin_name: Optional[str] = None,
            plugin_id: Optional[str] = None,
            loaded_plugin_resource: Optional[str] = None,
            config: Optional[Dict[str, Any]] = None,
        ) -> None
        "Update worker state information."
    
    def check_gpu_availability(self):
            """Check GPU availability and identify conflicts."""
            return self._resource_manager.check_gpu_availability()
    
        def get_worker_by_pid(self, pid: int)
        "Check GPU availability and identify conflicts."
    
    def get_worker_by_pid(self, pid: int)
        "Get worker state by PID."
```

``` python
class SSEBroadcasterAdapter:
    def __init__(self, sse_manager):
        """
        Initialize adapter with SSE broadcast manager.

        Args:
            sse_manager: SSEBroadcastManager from cjm-fasthtml-sse
        """
        self._sse_manager = sse_manager

    async def broadcast(self, event_type: str, data: Dict[str, Any]) -> None
    "Adapter for cjm-fasthtml-sse SSEBroadcastManager."
    
    def __init__(self, sse_manager):
            """
            Initialize adapter with SSE broadcast manager.
    
            Args:
                sse_manager: SSEBroadcastManager from cjm-fasthtml-sse
            """
            self._sse_manager = sse_manager
    
        async def broadcast(self, event_type: str, data: Dict[str, Any]) -> None
        "Initialize adapter with SSE broadcast manager.

Args:
    sse_manager: SSEBroadcastManager from cjm-fasthtml-sse"
    
    async def broadcast(self, event_type: str, data: Dict[str, Any]) -> None:
            """
            Broadcast an event to all connected SSE clients.
    
            Args:
                event_type: Event type identifier (e.g., 'transcription:started')
                data: Event data payload
            """
            # Broadcast using the SSE manager
            # Note: SSEBroadcastManager.broadcast expects event_type and data
        "Broadcast an event to all connected SSE clients.

Args:
    event_type: Event type identifier (e.g., 'transcription:started')
    data: Event data payload"
```

### base (`base.ipynb`)

> Abstract base class for managing background jobs with worker
> processes.

#### Import

``` python
from cjm_fasthtml_workers.managers.base import (
    JobType,
    BaseJob,
    BaseJobManager
)
```

#### Functions

``` python
@patch
@abstractmethod
def create_job(
    self:BaseJobManager,
    plugin_id:str,  # Plugin unique identifier
    **kwargs  # Domain-specific job parameters
) -> JobType:  # Created job instance
    "Factory method for creating domain-specific jobs."
```

``` python
@patch
@abstractmethod
def get_worker_entry_point(
    self:BaseJobManager
) -> Callable:  # Worker process entry point function
    "Return the worker process function for this manager."
```

``` python
@patch
@abstractmethod
def prepare_execute_request(
    self:BaseJobManager,
    job:JobType  # The job to prepare for execution
) -> Dict[str, Any]:  # Dictionary of parameters for the worker execute request
    "Convert job to worker execute request parameters."
```

``` python
@patch
@abstractmethod
def extract_job_result(
    self:BaseJobManager,
    job:JobType,  # The job that was executed
    result_data:Dict[str, Any]  # Raw result data from worker
) -> Dict[str, Any]:  # Formatted result for storage
    "Extract and format job result from worker response."
```

``` python
@patch
def _extract_plugin_resource_identifier(
    self:BaseJobManager,
    config:Dict[str, Any]  # Plugin configuration dictionary
) -> str:  # Plugin resource identifier string
    "Extract plugin resource identifier from plugin configuration."
```

``` python
@patch
async def _validate_resources(
    self:BaseJobManager,
    plugin_id:str,  # Plugin unique identifier
    plugin_config:Dict[str, Any]  # Plugin configuration
) -> Optional[str]:  # Error message if validation fails, None if successful
    "Validate resources before starting a job."
```

``` python
@patch
def _on_job_completed(
    self:BaseJobManager,
    job_id:str  # ID of the completed job
) -> None
    "Hook called when a job completes successfully."
```

``` python
@patch
def _start_worker(self:BaseJobManager):
    """Start the worker process and result monitor."""
    if self.worker_process and self.worker_process.is_alive()
    "Start the worker process and result monitor."
```

``` python
@patch
def _init_worker(self:BaseJobManager):
    """Send initialization message to worker with plugin configurations."""
    # Only send plugin configs if plugin registry is available
    plugin_configs = {}
    
    if self.plugin_registry
    "Send initialization message to worker with plugin configurations."
```

``` python
@patch
def _restart_worker(self:BaseJobManager):
    """Restart the worker process after an error or cancellation."""
    # Track restart
    self.restart_count += 1
    self.last_restart_time = time.time()

    # Unregister old worker from resource manager if available
    if self.resource_manager and self.worker_process
    "Restart the worker process after an error or cancellation."
```

``` python
@patch
def _monitor_results(self:BaseJobManager):
    """Monitor the result queue in a background thread."""
    while self.monitor_running
    "Monitor the result queue in a background thread."
```

``` python
@patch
def _handle_job_result(
    self:BaseJobManager, 
    result:Dict[str, Any]  # Result data from worker
)
    "Handle a job result from the worker."
```

``` python
@patch
def _handle_stream_chunk(
    self:BaseJobManager, 
    chunk_data:Dict[str, Any]  # Chunk data from worker
)
    "Handle a streaming chunk from the worker."
```

``` python
@patch
def _handle_worker_error(self:BaseJobManager):
    """Handle worker fatal error based on restart policy."""
    policy = self.worker_config.restart_policy

    if policy == RestartPolicy.NEVER
    "Handle worker fatal error based on restart policy."
```

``` python
@patch
def get_plugin_name(
    self:BaseJobManager,
    plugin_id:str  # Plugin unique identifier
) -> Optional[str]:  # Plugin name or None
    "Get plugin name from plugin ID (requires plugin registry)."
```

``` python
@patch
async def unload_plugin(
    self:BaseJobManager,
    plugin_name:str  # Name of the plugin to unload
) -> bool:  # True if successful, False otherwise
    "Unload a plugin from the worker to free resources."
```

``` python
@patch
async def reload_plugin(
    self:BaseJobManager,
    plugin_name:str,  # Name of the plugin to reload
    config:Dict[str, Any]  # New configuration
) -> bool:  # True if successful, False otherwise
    "Reload a plugin with new configuration."
```

``` python
@patch
async def start_job(
    self:BaseJobManager,
    plugin_id:str,  # Plugin unique identifier
    **kwargs  # Domain-specific job parameters
) -> JobType:  # Created and started job
    "Start a new job."
```

``` python
@patch
async def cancel_job(
    self:BaseJobManager,
    job_id:str  # ID of the job to cancel
) -> bool:  # True if cancellation successful
    "Cancel a running job by terminating the worker process."
```

``` python
@patch
def get_job(
    self:BaseJobManager,
    job_id:str  # Unique job identifier
) -> Optional[JobType]:  # Job object or None
    "Get a job by ID."
```

``` python
@patch
def get_all_jobs(
    self:BaseJobManager
) -> List[JobType]:  # List of all jobs
    "Get all jobs."
```

``` python
@patch
def get_job_result(
    self:BaseJobManager,
    job_id:str  # Unique job identifier
) -> Optional[Dict[str, Any]]:  # Job result or None
    "Get job result."
```

``` python
@patch
def clear_completed_jobs(
    self:BaseJobManager
) -> int:  # Number of jobs cleared
    "Clear completed, failed, and cancelled jobs."
```

``` python
@patch
async def broadcast_event(
    self:BaseJobManager,
    event_type:str,  # Event type identifier
    data:Dict[str, Any]  # Event data payload
)
    "Broadcast an event to all connected SSE clients (requires event broadcaster)."
```

``` python
@patch
def check_streaming_support(
    self:BaseJobManager,
    plugin_id:str  # Plugin unique identifier
) -> bool:  # True if streaming supported
    "Check if a plugin supports streaming."
```

``` python
@patch
def shutdown(self:BaseJobManager):
    """Shutdown the manager and cleanup resources."""
    # Stop result monitor
    self.monitor_running = False
    if self.result_monitor_thread
    "Shutdown the manager and cleanup resources."
```

#### Classes

``` python
@dataclass
class BaseJob:
    "Base class for all job types."
    
    id: str  # Unique job identifier
    plugin_id: str  # Plugin identifier for this job
    status: str = 'pending'  # Job status: pending, running, completed, failed, cancelled
    created_at: str = field(...)  # ISO format timestamp
    started_at: Optional[str]  # When job started executing
    completed_at: Optional[str]  # When job finished
    result: Optional[Dict[str, Any]]  # Job result data
    error: Optional[str]  # Error message if failed
    metadata: Dict[str, Any] = field(...)  # Additional job metadata
    worker_pid: Optional[int]  # Process ID of worker handling this job
```

``` python
class BaseJobManager:
    def __init__(
        self,
        worker_type:str,  # Type identifier (e.g., "transcription", "llm", "image-gen")
        category:Any,  # Plugin category this manager handles
        supports_streaming:bool=False,  # Whether this manager supports streaming jobs
        worker_config:Optional[WorkerConfig]=None,  # Worker configuration (uses defaults if None)
        plugin_registry:Optional[PluginRegistryProtocol]=None,  # Optional plugin registry integration
        resource_manager:Optional[ResourceManagerProtocol]=None,  # Optional resource manager integration
        event_broadcaster:Optional[EventBroadcasterProtocol]=None,  # Optional SSE event broadcaster
    )
    """
    Abstract base class for managing jobs using worker processes.
    
    Features:
    - Jobs processed sequentially in subprocess
    - Plugin resources loaded once and reused
    - True cancellation via subprocess termination
    - Automatic worker restart based on policy
    - Isolated worker process avoids duplicating web app initialization
    - Optional streaming support for incremental results
    - Optional dependency injection for plugin registry, resource manager, and event broadcaster
    """
    
    def __init__(
            self,
            worker_type:str,  # Type identifier (e.g., "transcription", "llm", "image-gen")
            category:Any,  # Plugin category this manager handles
            supports_streaming:bool=False,  # Whether this manager supports streaming jobs
            worker_config:Optional[WorkerConfig]=None,  # Worker configuration (uses defaults if None)
            plugin_registry:Optional[PluginRegistryProtocol]=None,  # Optional plugin registry integration
            resource_manager:Optional[ResourceManagerProtocol]=None,  # Optional resource manager integration
            event_broadcaster:Optional[EventBroadcasterProtocol]=None,  # Optional SSE event broadcaster
        )
        "Initialize the job manager."
```

### config (`config.ipynb`)

> Configuration for worker processes including restart policies,
> timeouts, and queue sizes.

#### Import

``` python
from cjm_fasthtml_workers.core.config import (
    RestartPolicy,
    WorkerConfig
)
```

#### Classes

``` python
class RestartPolicy(Enum):
    "Policy for restarting worker processes after failures."
```

``` python
@dataclass
class WorkerConfig:
    "Configuration for worker process behavior."
    
    request_queue_size: int = 0  # 0 = unlimited
    result_queue_size: int = 100  # Larger for streaming results
    response_queue_size: int = 10  # For synchronous command responses
    restart_policy: RestartPolicy = RestartPolicy.ON_CANCELLATION
    max_restart_attempts: int = 3
    restart_backoff_base_seconds: float = 1.0  # Base delay for exponential backoff
    restart_backoff_max_seconds: float = 60.0  # Max delay for backoff
    max_workers: int = 1  # Currently only 1 is supported
    worker_start_timeout_seconds: float = 30.0
    reload_timeout_seconds: float = 30.0
    unload_timeout_seconds: float = 10.0
    shutdown_timeout_seconds: float = 5.0
    result_monitor_poll_interval_seconds: float = 0.5
    
```

### protocol (`protocol.ipynb`)

> Protocol definitions for worker communication and plugin manager
> integration.

#### Import

``` python
from cjm_fasthtml_workers.core.protocol import (
    WorkerRequestType,
    WorkerResponseType,
    WorkerRequest,
    WorkerResponse,
    WorkerStreamChunk,
    WorkerResult,
    PluginManagerAdapter
)
```

#### Classes

``` python
class WorkerRequestType(Enum):
    "Types of requests sent to worker process."
```

``` python
class WorkerResponseType(Enum):
    "Types of responses from worker process."
```

``` python
@dataclass
class WorkerRequest:
    "Base structure for worker requests."
    
    type: WorkerRequestType
    data: Dict[str, Any]
    
    def to_dict(self) -> Dict[str, Any]:
            """Convert to dictionary for queue serialization."""
            return {
                'type': self.type.value,
        "Convert to dictionary for queue serialization."
    
    def from_dict(cls, data: Dict[str, Any]) -> 'WorkerRequest':
            """Create from dictionary received from queue."""
            req_type = WorkerRequestType(data['type'])
            request_data = {k: v for k, v in data.items() if k != 'type'}
        "Create from dictionary received from queue."
```

``` python
@dataclass
class WorkerResponse:
    "Base structure for worker responses."
    
    type: WorkerResponseType
    data: Dict[str, Any]
    
    def to_dict(self) -> Dict[str, Any]:
            """Convert to dictionary for queue serialization."""
            return {
                'type': self.type.value,
        "Convert to dictionary for queue serialization."
    
    def from_dict(cls, data: Dict[str, Any]) -> 'WorkerResponse':
            """Create from dictionary received from queue."""
            resp_type = WorkerResponseType(data['type'])
            response_data = {k: v for k, v in data.items() if k != 'type'}
        "Create from dictionary received from queue."
```

``` python
@dataclass
class WorkerStreamChunk:
    "Structure for streaming job results."
    
    job_id: str  # Unique identifier for the job
    chunk: str  # Text chunk from streaming output
    is_final: bool = False  # Whether this is the final chunk
    metadata: Optional[Dict[str, Any]]  # Optional metadata
    
    def to_dict(self) -> Dict[str, Any]:
            """Convert to dictionary for queue serialization."""
            return {
                'type': WorkerResponseType.STREAM_CHUNK.value,
        "Convert to dictionary for queue serialization."
```

``` python
@dataclass
class WorkerResult:
    "Structure for job execution results."
    
    job_id: str  # Unique identifier for the job
    status: str  # 'success' or 'error'
    data: Optional[Dict[str, Any]]  # Result data on success
    error: Optional[str]  # Error message on failure
    
    def to_dict(self) -> Dict[str, Any]:
            """Convert to dictionary for queue serialization."""
            result = {
                'type': WorkerResponseType.RESULT.value,
        "Convert to dictionary for queue serialization."
```

``` python
class PluginManagerAdapter(Protocol):
    """
    Protocol that plugin managers must satisfy for worker integration.
    
    Uses structural subtyping (duck typing) - plugin managers don't need to
    explicitly inherit from this, they just need to implement these methods.
    """
    
    def discover_plugins(self) -> list:  # List of plugin metadata/data objects
            """Discover available plugins."""
            ...
    
        def load_plugin(
            self, 
            plugin_data:Any,  # Plugin metadata/data from discovery
            config:Dict[str, Any]  # Plugin configuration dictionary
        ) -> None
        "Discover available plugins."
    
    def load_plugin(
            self, 
            plugin_data:Any,  # Plugin metadata/data from discovery
            config:Dict[str, Any]  # Plugin configuration dictionary
        ) -> None
        "Load a plugin with configuration."
    
    def execute_plugin(
            self, 
            plugin_name:str,  # Name of the plugin to execute
            **params  # Plugin-specific parameters
        ) -> Any:  # Plugin execution result
        "Execute a plugin with given parameters."
    
    def execute_plugin_stream(
            self, 
            plugin_name:str,  # Name of the plugin to execute
            **params  # Plugin-specific parameters
        ) -> Iterator[str]:  # String chunks from plugin execution
        "Execute a plugin with streaming output."
    
    def reload_plugin(
            self, 
            plugin_name:str,  # Name of the plugin to reload
            config:Optional[Dict[str, Any]]=None  # New configuration (None to unload)
        ) -> None
        "Reload a plugin with new configuration."
    
    def unload_plugin(
            self, 
            plugin_name:str  # Name of the plugin to unload
        ) -> None
        "Unload a plugin to free resources."
    
    def check_streaming_support(
            self, 
            plugin_name:str  # Name of the plugin to check
        ) -> bool:  # True if plugin supports streaming
        "Check if a plugin supports streaming execution."
```

### protocols (`protocols.ipynb`)

> Optional integration protocols for plugin registries, resource
> management, and event broadcasting.

#### Import

``` python
from cjm_fasthtml_workers.extensions.protocols import (
    PluginRegistryProtocol,
    ResourceManagerProtocol,
    EventBroadcasterProtocol
)
```

#### Classes

``` python
class PluginRegistryProtocol(Protocol):
    "Protocol for plugin registry integration."
    
    def get_plugins_by_category(
            self, 
            category:Any  # Plugin category (can be enum, string, etc.)
        ) -> list:  # List of plugin metadata objects
        "Get all plugins in a specific category."
    
    def get_plugin(
            self, 
            plugin_id:str  # Unique plugin identifier
        ) -> Any:  # Plugin metadata object or None
        "Get a specific plugin by ID."
    
    def load_plugin_config(
            self, 
            plugin_id:str  # Unique plugin identifier
        ) -> Dict[str, Any]:  # Plugin configuration dictionary
        "Load configuration for a plugin."
```

``` python
class ResourceManagerProtocol(Protocol):
    "Protocol for resource management integration."
    
    def register_worker(
            self,
            pid:int,  # Worker process ID
            worker_type:str  # Type of worker (e.g., 'transcription', 'llm')
        ) -> None
        "Register a new worker process."
    
    def unregister_worker(
            self, 
            pid:int  # Process ID of the worker to unregister
        ) -> None
        "Unregister a worker process."
    
    def update_worker_state(
            self,
            pid:int,  # Worker process ID
            status:Optional[str]=None,  # Worker status: 'idle', 'running', etc.
            job_id:Optional[str]=None,  # Current job ID (None if idle)
            plugin_name:Optional[str]=None,  # Currently loaded plugin name
            plugin_id:Optional[str]=None,  # Currently loaded plugin ID
            loaded_plugin_resource:Optional[str]=None,  # Currently loaded plugin resource identifier
            config:Optional[Dict[str, Any]]=None,  # Current plugin configuration
        ) -> None
        "Update worker state information."
    
    def check_gpu_availability(self) -> Any:  # Returns ResourceConflict object
            """Check GPU availability and identify conflicts.
            
            Returns an object with:
            - status: ResourceStatus enum (AVAILABLE, APP_BUSY, EXTERNAL_BUSY)
            - app_pids: List of application PIDs using GPU
            - external_pids: List of external PIDs using GPU
            - app_processes: Detailed info about app processes
            - external_processes: Detailed info about external processes
            """
            ...
        
        def get_worker_by_pid(
            self, 
            pid:int  # Worker process ID
        ) -> Optional[Any]:  # Returns WorkerState object or None
        "Check GPU availability and identify conflicts.

Returns an object with:
- status: ResourceStatus enum (AVAILABLE, APP_BUSY, EXTERNAL_BUSY)
- app_pids: List of application PIDs using GPU
- external_pids: List of external PIDs using GPU
- app_processes: Detailed info about app processes
- external_processes: Detailed info about external processes"
    
    def get_worker_by_pid(
            self, 
            pid:int  # Worker process ID
        ) -> Optional[Any]:  # Returns WorkerState object or None
        "Get worker state by PID.

Returns worker state object with attributes:
- pid: Worker process ID
- worker_type: Type of worker
- status: Worker status ('idle', 'running', 'busy')
- job_id: Current job ID (None if idle)
- plugin_name: Currently loaded plugin name
- plugin_id: Currently loaded plugin ID
- loaded_plugin_resource: Currently loaded resource identifier
- config: Current plugin configuration"
```

``` python
class EventBroadcasterProtocol(Protocol):
    "Protocol for SSE event broadcasting."
    
    async def broadcast(
            self,
            event_type:str,  # Event type identifier
            data:Dict[str, Any]  # Event data payload
        ) -> None
        "Broadcast an event to all connected clients."
```

### worker (`worker.ipynb`)

> Generic worker process for executing plugin-based jobs in isolated
> subprocesses.

#### Import

``` python
from cjm_fasthtml_workers.core.worker import (
    base_worker_process
)
```

#### Functions

``` python
def base_worker_process(
    request_queue:multiprocessing.Queue,  # Queue for receiving job requests from parent
    result_queue:multiprocessing.Queue,  # Queue for sending job results back to parent
    response_queue:multiprocessing.Queue,  # Queue for sending command responses back to parent
    plugin_manager_factory:Callable[[], PluginManagerAdapter],  # Factory function that creates a plugin manager instance
    result_adapter:Optional[Callable[[Any], Dict[str, Any]]]=None,  # Optional function to adapt plugin results to dict format
    supports_streaming:bool=False  # Whether this worker supports streaming execution
)
    "Generic long-lived worker process that handles job execution."
```

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/cj-mills/cjm-fasthtml-workers",
    "name": "cjm-fasthtml-workers",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.11",
    "maintainer_email": null,
    "keywords": "nbdev jupyter notebook python",
    "author": "Christian J. Mills",
    "author_email": "9126128+cj-mills@users.noreply.github.com",
    "download_url": "https://files.pythonhosted.org/packages/b9/bb/53fe7cb65a32d8646cfdaa6e91ddad71ad15604b7f244c5d2a8e3fabdbcd/cjm_fasthtml_workers-0.0.7.tar.gz",
    "platform": null,
    "description": "# cjm-fasthtml-workers\n\n\n<!-- WARNING: THIS FILE WAS AUTOGENERATED! DO NOT EDIT! -->\n\n## Install\n\n``` bash\npip install cjm_fasthtml_workers\n```\n\n## Project Structure\n\n    nbs/\n    \u251c\u2500\u2500 core/ (4)\n    \u2502   \u251c\u2500\u2500 adapters.ipynb  # Adapter utilities for making plugin managers compatible with the worker system.\n    \u2502   \u251c\u2500\u2500 config.ipynb    # Configuration for worker processes including restart policies, timeouts, and queue sizes.\n    \u2502   \u251c\u2500\u2500 protocol.ipynb  # Protocol definitions for worker communication and plugin manager integration.\n    \u2502   \u2514\u2500\u2500 worker.ipynb    # Generic worker process for executing plugin-based jobs in isolated subprocesses.\n    \u251c\u2500\u2500 extensions/ (2)\n    \u2502   \u251c\u2500\u2500 adapters.ipynb   # Common adapters for integrating popular FastHTML libraries with the worker system.\n    \u2502   \u2514\u2500\u2500 protocols.ipynb  # Optional integration protocols for plugin registries, resource management, and event broadcasting.\n    \u2514\u2500\u2500 managers/ (1)\n        \u2514\u2500\u2500 base.ipynb  # Abstract base class for managing background jobs with worker processes.\n\nTotal: 7 notebooks across 3 directories\n\n## Module Dependencies\n\n``` mermaid\ngraph LR\n    core_adapters[core.adapters<br/>adapters]\n    core_config[core.config<br/>config]\n    core_protocol[core.protocol<br/>protocol]\n    core_worker[core.worker<br/>worker]\n    extensions_adapters[extensions.adapters<br/>adapters]\n    extensions_protocols[extensions.protocols<br/>protocols]\n    managers_base[managers.base<br/>base]\n\n    core_adapters --> core_protocol\n    core_worker --> core_protocol\n    extensions_adapters --> extensions_protocols\n    managers_base --> extensions_protocols\n    managers_base --> core_protocol\n    managers_base --> core_config\n```\n\n*6 cross-module dependencies detected*\n\n## CLI Reference\n\nNo CLI commands found in this project.\n\n## Module Overview\n\nDetailed documentation for each module in the project:\n\n### adapters (`adapters.ipynb`)\n\n> Adapter utilities for making plugin managers compatible with the\n> worker system.\n\n#### Import\n\n``` python\nfrom cjm_fasthtml_workers.core.adapters import (\n    create_simple_adapter,\n    default_result_adapter\n)\n```\n\n#### Functions\n\n``` python\ndef create_simple_adapter(\n    plugin_manager:Any,  # The plugin manager instance to adapt\n    result_adapter:Optional[callable]=None  # Optional function to convert plugin results to dict\n) -> PluginManagerAdapter:  # Adapter that satisfies PluginManagerAdapter protocol\n    \"Create a simple adapter for a plugin manager.\"\n```\n\n``` python\ndef default_result_adapter(\n    result:Any  # Plugin execution result\n) -> Dict[str, Any]:  # Dictionary with text and metadata\n    \"Default adapter for converting plugin results to dictionaries.\"\n```\n\n### adapters (`adapters.ipynb`)\n\n> Common adapters for integrating popular FastHTML libraries with the\n> worker system.\n\n#### Import\n\n``` python\nfrom cjm_fasthtml_workers.extensions.adapters import (\n    UnifiedPluginRegistryAdapter,\n    ResourceManagerAdapter,\n    SSEBroadcasterAdapter,\n    create_standard_adapters\n)\n```\n\n#### Functions\n\n```` python\ndef create_standard_adapters(\n    plugin_registry=None,  # UnifiedPluginRegistry instance (optional)\n    resource_manager=None,  # ResourceManager instance (optional)\n    sse_manager=None  # SSEBroadcastManager instance (optional)\n) -> tuple:  # (plugin_adapter, resource_adapter, sse_adapter)\n    \"\"\"\n    Create standard adapters for common FastHTML libraries.\n    \n    This factory function creates adapters for:\n    - cjm-fasthtml-plugins UnifiedPluginRegistry\n    - cjm-fasthtml-resources ResourceManager\n    - cjm-fasthtml-sse SSEBroadcastManager\n    \n    All parameters are optional. Pass None for services you don't use.\n    \n    Example:\n        ```python\n        from cjm_fasthtml_workers.extensions.adapters import create_standard_adapters\n        from cjm_fasthtml_plugins.core.registry import UnifiedPluginRegistry\n        from cjm_fasthtml_resources.core.manager import ResourceManager\n        \n        # Create services\n        plugin_registry = UnifiedPluginRegistry()\n        resource_manager = ResourceManager()\n        \n        # Create adapters\n        plugin_adapter, resource_adapter, sse_adapter = create_standard_adapters(\n            plugin_registry=plugin_registry,\n            resource_manager=resource_manager\n        )\n        \n        # Pass to job manager\n        manager = MyJobManager(\n            plugin_registry=plugin_adapter,\n            resource_manager=resource_adapter\n        )\n        ```\n    \n    Returns:\n        Tuple of (plugin_adapter, resource_adapter, sse_adapter)\n        Each adapter will be None if the corresponding service was not provided.\n    \"\"\"\n````\n\n#### Classes\n\n``` python\nclass UnifiedPluginRegistryAdapter:\n    def __init__(self, registry):\n        \"\"\"\n        Initialize adapter with UnifiedPluginRegistry instance.\n\n        Args:\n            registry: UnifiedPluginRegistry from cjm-fasthtml-plugins\n        \"\"\"\n        self._registry = registry\n\n    def get_plugins_by_category(self, category) -> list\n    \"Adapter for cjm-fasthtml-plugins UnifiedPluginRegistry.\"\n    \n    def __init__(self, registry):\n            \"\"\"\n            Initialize adapter with UnifiedPluginRegistry instance.\n    \n            Args:\n                registry: UnifiedPluginRegistry from cjm-fasthtml-plugins\n            \"\"\"\n            self._registry = registry\n    \n        def get_plugins_by_category(self, category) -> list\n        \"Initialize adapter with UnifiedPluginRegistry instance.\n\nArgs:\n    registry: UnifiedPluginRegistry from cjm-fasthtml-plugins\"\n    \n    def get_plugins_by_category(self, category) -> list:\n            \"\"\"Get all plugins in a specific category.\"\"\"\n            return self._registry.get_plugins_by_category(category)\n    \n        def get_plugin(self, plugin_id: str)\n        \"Get all plugins in a specific category.\"\n    \n    def get_plugin(self, plugin_id: str):\n            \"\"\"Get a specific plugin by ID.\"\"\"\n            return self._registry.get_plugin(plugin_id)\n    \n        def load_plugin_config(self, plugin_id: str) -> Dict[str, Any]\n        \"Get a specific plugin by ID.\"\n    \n    def load_plugin_config(self, plugin_id: str) -> Dict[str, Any]\n        \"Load configuration for a plugin.\"\n```\n\n``` python\nclass ResourceManagerAdapter:\n    def __init__(self, resource_manager):\n        \"\"\"\n        Initialize adapter with ResourceManager instance.\n\n        Args:\n            resource_manager: ResourceManager from cjm-fasthtml-resources\n        \"\"\"\n        self._resource_manager = resource_manager\n\n    def register_worker(self, pid: int, worker_type: str) -> None\n    \"Adapter for cjm-fasthtml-resources ResourceManager.\"\n    \n    def __init__(self, resource_manager):\n            \"\"\"\n            Initialize adapter with ResourceManager instance.\n    \n            Args:\n                resource_manager: ResourceManager from cjm-fasthtml-resources\n            \"\"\"\n            self._resource_manager = resource_manager\n    \n        def register_worker(self, pid: int, worker_type: str) -> None\n        \"Initialize adapter with ResourceManager instance.\n\nArgs:\n    resource_manager: ResourceManager from cjm-fasthtml-resources\"\n    \n    def register_worker(self, pid: int, worker_type: str) -> None:\n            \"\"\"Register a new worker process.\"\"\"\n            self._resource_manager.register_worker(pid, worker_type)\n    \n        def unregister_worker(self, pid: int) -> None\n        \"Register a new worker process.\"\n    \n    def unregister_worker(self, pid: int) -> None:\n            \"\"\"Unregister a worker process.\"\"\"\n            self._resource_manager.unregister_worker(pid)\n    \n        def update_worker_state(\n            self,\n            pid: int,\n            status: Optional[str] = None,\n            job_id: Optional[str] = None,\n            plugin_name: Optional[str] = None,\n            plugin_id: Optional[str] = None,\n            loaded_plugin_resource: Optional[str] = None,\n            config: Optional[Dict[str, Any]] = None,\n        ) -> None\n        \"Unregister a worker process.\"\n    \n    def update_worker_state(\n            self,\n            pid: int,\n            status: Optional[str] = None,\n            job_id: Optional[str] = None,\n            plugin_name: Optional[str] = None,\n            plugin_id: Optional[str] = None,\n            loaded_plugin_resource: Optional[str] = None,\n            config: Optional[Dict[str, Any]] = None,\n        ) -> None\n        \"Update worker state information.\"\n    \n    def check_gpu_availability(self):\n            \"\"\"Check GPU availability and identify conflicts.\"\"\"\n            return self._resource_manager.check_gpu_availability()\n    \n        def get_worker_by_pid(self, pid: int)\n        \"Check GPU availability and identify conflicts.\"\n    \n    def get_worker_by_pid(self, pid: int)\n        \"Get worker state by PID.\"\n```\n\n``` python\nclass SSEBroadcasterAdapter:\n    def __init__(self, sse_manager):\n        \"\"\"\n        Initialize adapter with SSE broadcast manager.\n\n        Args:\n            sse_manager: SSEBroadcastManager from cjm-fasthtml-sse\n        \"\"\"\n        self._sse_manager = sse_manager\n\n    async def broadcast(self, event_type: str, data: Dict[str, Any]) -> None\n    \"Adapter for cjm-fasthtml-sse SSEBroadcastManager.\"\n    \n    def __init__(self, sse_manager):\n            \"\"\"\n            Initialize adapter with SSE broadcast manager.\n    \n            Args:\n                sse_manager: SSEBroadcastManager from cjm-fasthtml-sse\n            \"\"\"\n            self._sse_manager = sse_manager\n    \n        async def broadcast(self, event_type: str, data: Dict[str, Any]) -> None\n        \"Initialize adapter with SSE broadcast manager.\n\nArgs:\n    sse_manager: SSEBroadcastManager from cjm-fasthtml-sse\"\n    \n    async def broadcast(self, event_type: str, data: Dict[str, Any]) -> None:\n            \"\"\"\n            Broadcast an event to all connected SSE clients.\n    \n            Args:\n                event_type: Event type identifier (e.g., 'transcription:started')\n                data: Event data payload\n            \"\"\"\n            # Broadcast using the SSE manager\n            # Note: SSEBroadcastManager.broadcast expects event_type and data\n        \"Broadcast an event to all connected SSE clients.\n\nArgs:\n    event_type: Event type identifier (e.g., 'transcription:started')\n    data: Event data payload\"\n```\n\n### base (`base.ipynb`)\n\n> Abstract base class for managing background jobs with worker\n> processes.\n\n#### Import\n\n``` python\nfrom cjm_fasthtml_workers.managers.base import (\n    JobType,\n    BaseJob,\n    BaseJobManager\n)\n```\n\n#### Functions\n\n``` python\n@patch\n@abstractmethod\ndef create_job(\n    self:BaseJobManager,\n    plugin_id:str,  # Plugin unique identifier\n    **kwargs  # Domain-specific job parameters\n) -> JobType:  # Created job instance\n    \"Factory method for creating domain-specific jobs.\"\n```\n\n``` python\n@patch\n@abstractmethod\ndef get_worker_entry_point(\n    self:BaseJobManager\n) -> Callable:  # Worker process entry point function\n    \"Return the worker process function for this manager.\"\n```\n\n``` python\n@patch\n@abstractmethod\ndef prepare_execute_request(\n    self:BaseJobManager,\n    job:JobType  # The job to prepare for execution\n) -> Dict[str, Any]:  # Dictionary of parameters for the worker execute request\n    \"Convert job to worker execute request parameters.\"\n```\n\n``` python\n@patch\n@abstractmethod\ndef extract_job_result(\n    self:BaseJobManager,\n    job:JobType,  # The job that was executed\n    result_data:Dict[str, Any]  # Raw result data from worker\n) -> Dict[str, Any]:  # Formatted result for storage\n    \"Extract and format job result from worker response.\"\n```\n\n``` python\n@patch\ndef _extract_plugin_resource_identifier(\n    self:BaseJobManager,\n    config:Dict[str, Any]  # Plugin configuration dictionary\n) -> str:  # Plugin resource identifier string\n    \"Extract plugin resource identifier from plugin configuration.\"\n```\n\n``` python\n@patch\nasync def _validate_resources(\n    self:BaseJobManager,\n    plugin_id:str,  # Plugin unique identifier\n    plugin_config:Dict[str, Any]  # Plugin configuration\n) -> Optional[str]:  # Error message if validation fails, None if successful\n    \"Validate resources before starting a job.\"\n```\n\n``` python\n@patch\ndef _on_job_completed(\n    self:BaseJobManager,\n    job_id:str  # ID of the completed job\n) -> None\n    \"Hook called when a job completes successfully.\"\n```\n\n``` python\n@patch\ndef _start_worker(self:BaseJobManager):\n    \"\"\"Start the worker process and result monitor.\"\"\"\n    if self.worker_process and self.worker_process.is_alive()\n    \"Start the worker process and result monitor.\"\n```\n\n``` python\n@patch\ndef _init_worker(self:BaseJobManager):\n    \"\"\"Send initialization message to worker with plugin configurations.\"\"\"\n    # Only send plugin configs if plugin registry is available\n    plugin_configs = {}\n    \n    if self.plugin_registry\n    \"Send initialization message to worker with plugin configurations.\"\n```\n\n``` python\n@patch\ndef _restart_worker(self:BaseJobManager):\n    \"\"\"Restart the worker process after an error or cancellation.\"\"\"\n    # Track restart\n    self.restart_count += 1\n    self.last_restart_time = time.time()\n\n    # Unregister old worker from resource manager if available\n    if self.resource_manager and self.worker_process\n    \"Restart the worker process after an error or cancellation.\"\n```\n\n``` python\n@patch\ndef _monitor_results(self:BaseJobManager):\n    \"\"\"Monitor the result queue in a background thread.\"\"\"\n    while self.monitor_running\n    \"Monitor the result queue in a background thread.\"\n```\n\n``` python\n@patch\ndef _handle_job_result(\n    self:BaseJobManager, \n    result:Dict[str, Any]  # Result data from worker\n)\n    \"Handle a job result from the worker.\"\n```\n\n``` python\n@patch\ndef _handle_stream_chunk(\n    self:BaseJobManager, \n    chunk_data:Dict[str, Any]  # Chunk data from worker\n)\n    \"Handle a streaming chunk from the worker.\"\n```\n\n``` python\n@patch\ndef _handle_worker_error(self:BaseJobManager):\n    \"\"\"Handle worker fatal error based on restart policy.\"\"\"\n    policy = self.worker_config.restart_policy\n\n    if policy == RestartPolicy.NEVER\n    \"Handle worker fatal error based on restart policy.\"\n```\n\n``` python\n@patch\ndef get_plugin_name(\n    self:BaseJobManager,\n    plugin_id:str  # Plugin unique identifier\n) -> Optional[str]:  # Plugin name or None\n    \"Get plugin name from plugin ID (requires plugin registry).\"\n```\n\n``` python\n@patch\nasync def unload_plugin(\n    self:BaseJobManager,\n    plugin_name:str  # Name of the plugin to unload\n) -> bool:  # True if successful, False otherwise\n    \"Unload a plugin from the worker to free resources.\"\n```\n\n``` python\n@patch\nasync def reload_plugin(\n    self:BaseJobManager,\n    plugin_name:str,  # Name of the plugin to reload\n    config:Dict[str, Any]  # New configuration\n) -> bool:  # True if successful, False otherwise\n    \"Reload a plugin with new configuration.\"\n```\n\n``` python\n@patch\nasync def start_job(\n    self:BaseJobManager,\n    plugin_id:str,  # Plugin unique identifier\n    **kwargs  # Domain-specific job parameters\n) -> JobType:  # Created and started job\n    \"Start a new job.\"\n```\n\n``` python\n@patch\nasync def cancel_job(\n    self:BaseJobManager,\n    job_id:str  # ID of the job to cancel\n) -> bool:  # True if cancellation successful\n    \"Cancel a running job by terminating the worker process.\"\n```\n\n``` python\n@patch\ndef get_job(\n    self:BaseJobManager,\n    job_id:str  # Unique job identifier\n) -> Optional[JobType]:  # Job object or None\n    \"Get a job by ID.\"\n```\n\n``` python\n@patch\ndef get_all_jobs(\n    self:BaseJobManager\n) -> List[JobType]:  # List of all jobs\n    \"Get all jobs.\"\n```\n\n``` python\n@patch\ndef get_job_result(\n    self:BaseJobManager,\n    job_id:str  # Unique job identifier\n) -> Optional[Dict[str, Any]]:  # Job result or None\n    \"Get job result.\"\n```\n\n``` python\n@patch\ndef clear_completed_jobs(\n    self:BaseJobManager\n) -> int:  # Number of jobs cleared\n    \"Clear completed, failed, and cancelled jobs.\"\n```\n\n``` python\n@patch\nasync def broadcast_event(\n    self:BaseJobManager,\n    event_type:str,  # Event type identifier\n    data:Dict[str, Any]  # Event data payload\n)\n    \"Broadcast an event to all connected SSE clients (requires event broadcaster).\"\n```\n\n``` python\n@patch\ndef check_streaming_support(\n    self:BaseJobManager,\n    plugin_id:str  # Plugin unique identifier\n) -> bool:  # True if streaming supported\n    \"Check if a plugin supports streaming.\"\n```\n\n``` python\n@patch\ndef shutdown(self:BaseJobManager):\n    \"\"\"Shutdown the manager and cleanup resources.\"\"\"\n    # Stop result monitor\n    self.monitor_running = False\n    if self.result_monitor_thread\n    \"Shutdown the manager and cleanup resources.\"\n```\n\n#### Classes\n\n``` python\n@dataclass\nclass BaseJob:\n    \"Base class for all job types.\"\n    \n    id: str  # Unique job identifier\n    plugin_id: str  # Plugin identifier for this job\n    status: str = 'pending'  # Job status: pending, running, completed, failed, cancelled\n    created_at: str = field(...)  # ISO format timestamp\n    started_at: Optional[str]  # When job started executing\n    completed_at: Optional[str]  # When job finished\n    result: Optional[Dict[str, Any]]  # Job result data\n    error: Optional[str]  # Error message if failed\n    metadata: Dict[str, Any] = field(...)  # Additional job metadata\n    worker_pid: Optional[int]  # Process ID of worker handling this job\n```\n\n``` python\nclass BaseJobManager:\n    def __init__(\n        self,\n        worker_type:str,  # Type identifier (e.g., \"transcription\", \"llm\", \"image-gen\")\n        category:Any,  # Plugin category this manager handles\n        supports_streaming:bool=False,  # Whether this manager supports streaming jobs\n        worker_config:Optional[WorkerConfig]=None,  # Worker configuration (uses defaults if None)\n        plugin_registry:Optional[PluginRegistryProtocol]=None,  # Optional plugin registry integration\n        resource_manager:Optional[ResourceManagerProtocol]=None,  # Optional resource manager integration\n        event_broadcaster:Optional[EventBroadcasterProtocol]=None,  # Optional SSE event broadcaster\n    )\n    \"\"\"\n    Abstract base class for managing jobs using worker processes.\n    \n    Features:\n    - Jobs processed sequentially in subprocess\n    - Plugin resources loaded once and reused\n    - True cancellation via subprocess termination\n    - Automatic worker restart based on policy\n    - Isolated worker process avoids duplicating web app initialization\n    - Optional streaming support for incremental results\n    - Optional dependency injection for plugin registry, resource manager, and event broadcaster\n    \"\"\"\n    \n    def __init__(\n            self,\n            worker_type:str,  # Type identifier (e.g., \"transcription\", \"llm\", \"image-gen\")\n            category:Any,  # Plugin category this manager handles\n            supports_streaming:bool=False,  # Whether this manager supports streaming jobs\n            worker_config:Optional[WorkerConfig]=None,  # Worker configuration (uses defaults if None)\n            plugin_registry:Optional[PluginRegistryProtocol]=None,  # Optional plugin registry integration\n            resource_manager:Optional[ResourceManagerProtocol]=None,  # Optional resource manager integration\n            event_broadcaster:Optional[EventBroadcasterProtocol]=None,  # Optional SSE event broadcaster\n        )\n        \"Initialize the job manager.\"\n```\n\n### config (`config.ipynb`)\n\n> Configuration for worker processes including restart policies,\n> timeouts, and queue sizes.\n\n#### Import\n\n``` python\nfrom cjm_fasthtml_workers.core.config import (\n    RestartPolicy,\n    WorkerConfig\n)\n```\n\n#### Classes\n\n``` python\nclass RestartPolicy(Enum):\n    \"Policy for restarting worker processes after failures.\"\n```\n\n``` python\n@dataclass\nclass WorkerConfig:\n    \"Configuration for worker process behavior.\"\n    \n    request_queue_size: int = 0  # 0 = unlimited\n    result_queue_size: int = 100  # Larger for streaming results\n    response_queue_size: int = 10  # For synchronous command responses\n    restart_policy: RestartPolicy = RestartPolicy.ON_CANCELLATION\n    max_restart_attempts: int = 3\n    restart_backoff_base_seconds: float = 1.0  # Base delay for exponential backoff\n    restart_backoff_max_seconds: float = 60.0  # Max delay for backoff\n    max_workers: int = 1  # Currently only 1 is supported\n    worker_start_timeout_seconds: float = 30.0\n    reload_timeout_seconds: float = 30.0\n    unload_timeout_seconds: float = 10.0\n    shutdown_timeout_seconds: float = 5.0\n    result_monitor_poll_interval_seconds: float = 0.5\n    \n```\n\n### protocol (`protocol.ipynb`)\n\n> Protocol definitions for worker communication and plugin manager\n> integration.\n\n#### Import\n\n``` python\nfrom cjm_fasthtml_workers.core.protocol import (\n    WorkerRequestType,\n    WorkerResponseType,\n    WorkerRequest,\n    WorkerResponse,\n    WorkerStreamChunk,\n    WorkerResult,\n    PluginManagerAdapter\n)\n```\n\n#### Classes\n\n``` python\nclass WorkerRequestType(Enum):\n    \"Types of requests sent to worker process.\"\n```\n\n``` python\nclass WorkerResponseType(Enum):\n    \"Types of responses from worker process.\"\n```\n\n``` python\n@dataclass\nclass WorkerRequest:\n    \"Base structure for worker requests.\"\n    \n    type: WorkerRequestType\n    data: Dict[str, Any]\n    \n    def to_dict(self) -> Dict[str, Any]:\n            \"\"\"Convert to dictionary for queue serialization.\"\"\"\n            return {\n                'type': self.type.value,\n        \"Convert to dictionary for queue serialization.\"\n    \n    def from_dict(cls, data: Dict[str, Any]) -> 'WorkerRequest':\n            \"\"\"Create from dictionary received from queue.\"\"\"\n            req_type = WorkerRequestType(data['type'])\n            request_data = {k: v for k, v in data.items() if k != 'type'}\n        \"Create from dictionary received from queue.\"\n```\n\n``` python\n@dataclass\nclass WorkerResponse:\n    \"Base structure for worker responses.\"\n    \n    type: WorkerResponseType\n    data: Dict[str, Any]\n    \n    def to_dict(self) -> Dict[str, Any]:\n            \"\"\"Convert to dictionary for queue serialization.\"\"\"\n            return {\n                'type': self.type.value,\n        \"Convert to dictionary for queue serialization.\"\n    \n    def from_dict(cls, data: Dict[str, Any]) -> 'WorkerResponse':\n            \"\"\"Create from dictionary received from queue.\"\"\"\n            resp_type = WorkerResponseType(data['type'])\n            response_data = {k: v for k, v in data.items() if k != 'type'}\n        \"Create from dictionary received from queue.\"\n```\n\n``` python\n@dataclass\nclass WorkerStreamChunk:\n    \"Structure for streaming job results.\"\n    \n    job_id: str  # Unique identifier for the job\n    chunk: str  # Text chunk from streaming output\n    is_final: bool = False  # Whether this is the final chunk\n    metadata: Optional[Dict[str, Any]]  # Optional metadata\n    \n    def to_dict(self) -> Dict[str, Any]:\n            \"\"\"Convert to dictionary for queue serialization.\"\"\"\n            return {\n                'type': WorkerResponseType.STREAM_CHUNK.value,\n        \"Convert to dictionary for queue serialization.\"\n```\n\n``` python\n@dataclass\nclass WorkerResult:\n    \"Structure for job execution results.\"\n    \n    job_id: str  # Unique identifier for the job\n    status: str  # 'success' or 'error'\n    data: Optional[Dict[str, Any]]  # Result data on success\n    error: Optional[str]  # Error message on failure\n    \n    def to_dict(self) -> Dict[str, Any]:\n            \"\"\"Convert to dictionary for queue serialization.\"\"\"\n            result = {\n                'type': WorkerResponseType.RESULT.value,\n        \"Convert to dictionary for queue serialization.\"\n```\n\n``` python\nclass PluginManagerAdapter(Protocol):\n    \"\"\"\n    Protocol that plugin managers must satisfy for worker integration.\n    \n    Uses structural subtyping (duck typing) - plugin managers don't need to\n    explicitly inherit from this, they just need to implement these methods.\n    \"\"\"\n    \n    def discover_plugins(self) -> list:  # List of plugin metadata/data objects\n            \"\"\"Discover available plugins.\"\"\"\n            ...\n    \n        def load_plugin(\n            self, \n            plugin_data:Any,  # Plugin metadata/data from discovery\n            config:Dict[str, Any]  # Plugin configuration dictionary\n        ) -> None\n        \"Discover available plugins.\"\n    \n    def load_plugin(\n            self, \n            plugin_data:Any,  # Plugin metadata/data from discovery\n            config:Dict[str, Any]  # Plugin configuration dictionary\n        ) -> None\n        \"Load a plugin with configuration.\"\n    \n    def execute_plugin(\n            self, \n            plugin_name:str,  # Name of the plugin to execute\n            **params  # Plugin-specific parameters\n        ) -> Any:  # Plugin execution result\n        \"Execute a plugin with given parameters.\"\n    \n    def execute_plugin_stream(\n            self, \n            plugin_name:str,  # Name of the plugin to execute\n            **params  # Plugin-specific parameters\n        ) -> Iterator[str]:  # String chunks from plugin execution\n        \"Execute a plugin with streaming output.\"\n    \n    def reload_plugin(\n            self, \n            plugin_name:str,  # Name of the plugin to reload\n            config:Optional[Dict[str, Any]]=None  # New configuration (None to unload)\n        ) -> None\n        \"Reload a plugin with new configuration.\"\n    \n    def unload_plugin(\n            self, \n            plugin_name:str  # Name of the plugin to unload\n        ) -> None\n        \"Unload a plugin to free resources.\"\n    \n    def check_streaming_support(\n            self, \n            plugin_name:str  # Name of the plugin to check\n        ) -> bool:  # True if plugin supports streaming\n        \"Check if a plugin supports streaming execution.\"\n```\n\n### protocols (`protocols.ipynb`)\n\n> Optional integration protocols for plugin registries, resource\n> management, and event broadcasting.\n\n#### Import\n\n``` python\nfrom cjm_fasthtml_workers.extensions.protocols import (\n    PluginRegistryProtocol,\n    ResourceManagerProtocol,\n    EventBroadcasterProtocol\n)\n```\n\n#### Classes\n\n``` python\nclass PluginRegistryProtocol(Protocol):\n    \"Protocol for plugin registry integration.\"\n    \n    def get_plugins_by_category(\n            self, \n            category:Any  # Plugin category (can be enum, string, etc.)\n        ) -> list:  # List of plugin metadata objects\n        \"Get all plugins in a specific category.\"\n    \n    def get_plugin(\n            self, \n            plugin_id:str  # Unique plugin identifier\n        ) -> Any:  # Plugin metadata object or None\n        \"Get a specific plugin by ID.\"\n    \n    def load_plugin_config(\n            self, \n            plugin_id:str  # Unique plugin identifier\n        ) -> Dict[str, Any]:  # Plugin configuration dictionary\n        \"Load configuration for a plugin.\"\n```\n\n``` python\nclass ResourceManagerProtocol(Protocol):\n    \"Protocol for resource management integration.\"\n    \n    def register_worker(\n            self,\n            pid:int,  # Worker process ID\n            worker_type:str  # Type of worker (e.g., 'transcription', 'llm')\n        ) -> None\n        \"Register a new worker process.\"\n    \n    def unregister_worker(\n            self, \n            pid:int  # Process ID of the worker to unregister\n        ) -> None\n        \"Unregister a worker process.\"\n    \n    def update_worker_state(\n            self,\n            pid:int,  # Worker process ID\n            status:Optional[str]=None,  # Worker status: 'idle', 'running', etc.\n            job_id:Optional[str]=None,  # Current job ID (None if idle)\n            plugin_name:Optional[str]=None,  # Currently loaded plugin name\n            plugin_id:Optional[str]=None,  # Currently loaded plugin ID\n            loaded_plugin_resource:Optional[str]=None,  # Currently loaded plugin resource identifier\n            config:Optional[Dict[str, Any]]=None,  # Current plugin configuration\n        ) -> None\n        \"Update worker state information.\"\n    \n    def check_gpu_availability(self) -> Any:  # Returns ResourceConflict object\n            \"\"\"Check GPU availability and identify conflicts.\n            \n            Returns an object with:\n            - status: ResourceStatus enum (AVAILABLE, APP_BUSY, EXTERNAL_BUSY)\n            - app_pids: List of application PIDs using GPU\n            - external_pids: List of external PIDs using GPU\n            - app_processes: Detailed info about app processes\n            - external_processes: Detailed info about external processes\n            \"\"\"\n            ...\n        \n        def get_worker_by_pid(\n            self, \n            pid:int  # Worker process ID\n        ) -> Optional[Any]:  # Returns WorkerState object or None\n        \"Check GPU availability and identify conflicts.\n\nReturns an object with:\n- status: ResourceStatus enum (AVAILABLE, APP_BUSY, EXTERNAL_BUSY)\n- app_pids: List of application PIDs using GPU\n- external_pids: List of external PIDs using GPU\n- app_processes: Detailed info about app processes\n- external_processes: Detailed info about external processes\"\n    \n    def get_worker_by_pid(\n            self, \n            pid:int  # Worker process ID\n        ) -> Optional[Any]:  # Returns WorkerState object or None\n        \"Get worker state by PID.\n\nReturns worker state object with attributes:\n- pid: Worker process ID\n- worker_type: Type of worker\n- status: Worker status ('idle', 'running', 'busy')\n- job_id: Current job ID (None if idle)\n- plugin_name: Currently loaded plugin name\n- plugin_id: Currently loaded plugin ID\n- loaded_plugin_resource: Currently loaded resource identifier\n- config: Current plugin configuration\"\n```\n\n``` python\nclass EventBroadcasterProtocol(Protocol):\n    \"Protocol for SSE event broadcasting.\"\n    \n    async def broadcast(\n            self,\n            event_type:str,  # Event type identifier\n            data:Dict[str, Any]  # Event data payload\n        ) -> None\n        \"Broadcast an event to all connected clients.\"\n```\n\n### worker (`worker.ipynb`)\n\n> Generic worker process for executing plugin-based jobs in isolated\n> subprocesses.\n\n#### Import\n\n``` python\nfrom cjm_fasthtml_workers.core.worker import (\n    base_worker_process\n)\n```\n\n#### Functions\n\n``` python\ndef base_worker_process(\n    request_queue:multiprocessing.Queue,  # Queue for receiving job requests from parent\n    result_queue:multiprocessing.Queue,  # Queue for sending job results back to parent\n    response_queue:multiprocessing.Queue,  # Queue for sending command responses back to parent\n    plugin_manager_factory:Callable[[], PluginManagerAdapter],  # Factory function that creates a plugin manager instance\n    result_adapter:Optional[Callable[[Any], Dict[str, Any]]]=None,  # Optional function to adapt plugin results to dict format\n    supports_streaming:bool=False  # Whether this worker supports streaming execution\n)\n    \"Generic long-lived worker process that handles job execution.\"\n```\n",
    "bugtrack_url": null,
    "license": "Apache Software License 2.0",
    "summary": "Background worker system for FastHTML with multiprocess job execution, cancellation support, and streaming capabilities.",
    "version": "0.0.7",
    "project_urls": {
        "Homepage": "https://github.com/cj-mills/cjm-fasthtml-workers"
    },
    "split_keywords": [
        "nbdev",
        "jupyter",
        "notebook",
        "python"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "4d4e43a948ba10cba6c1d58567b7da72516e06835d6eb331c5e31a837de3fab9",
                "md5": "555c82c97b2ec448a65fc7d14a0817c0",
                "sha256": "5ddd26d6ef9859870688a84c6ab567076e9c3414f97a765db349033688dec4db"
            },
            "downloads": -1,
            "filename": "cjm_fasthtml_workers-0.0.7-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "555c82c97b2ec448a65fc7d14a0817c0",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.11",
            "size": 31841,
            "upload_time": "2025-10-27T22:28:52",
            "upload_time_iso_8601": "2025-10-27T22:28:52.304497Z",
            "url": "https://files.pythonhosted.org/packages/4d/4e/43a948ba10cba6c1d58567b7da72516e06835d6eb331c5e31a837de3fab9/cjm_fasthtml_workers-0.0.7-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "b9bb53fe7cb65a32d8646cfdaa6e91ddad71ad15604b7f244c5d2a8e3fabdbcd",
                "md5": "8f553564dc8fef57f5b2570c8cc81491",
                "sha256": "14596717d7e743416b757d478da51d404a26b05a3fa3ef75127298ae077864eb"
            },
            "downloads": -1,
            "filename": "cjm_fasthtml_workers-0.0.7.tar.gz",
            "has_sig": false,
            "md5_digest": "8f553564dc8fef57f5b2570c8cc81491",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.11",
            "size": 39220,
            "upload_time": "2025-10-27T22:28:53",
            "upload_time_iso_8601": "2025-10-27T22:28:53.642262Z",
            "url": "https://files.pythonhosted.org/packages/b9/bb/53fe7cb65a32d8646cfdaa6e91ddad71ad15604b7f244c5d2a8e3fabdbcd/cjm_fasthtml_workers-0.0.7.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-10-27 22:28:53",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "cj-mills",
    "github_project": "cjm-fasthtml-workers",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "cjm-fasthtml-workers"
}
        
Elapsed time: 4.08015s