cjm-tqdm-capture


Namecjm-tqdm-capture JSON
Version 0.0.5 PyPI version JSON
download
home_pagehttps://github.com/cj-mills/cjm-tqdm-capture
SummaryIntercept and capture progress information from tqdm progress bars via callbacks without modifying existing code.
upload_time2025-08-23 00:08:38
maintainerNone
docs_urlNone
authorChristian J. Mills
requires_python>=3.11
licenseApache Software License 2.0
keywords nbdev jupyter notebook python
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # cjm-tqdm-capture


<!-- WARNING: THIS FILE WAS AUTOGENERATED! DO NOT EDIT! -->

## Install

``` bash
pip install cjm-tqdm-capture
```

## Project Structure

    nbs/
    ├── job_runner.ipynb       # Executes functions in background threads with automatic tqdm progress capture.
    ├── patch_tqdm.ipynb       # Provides the patching mechanism to intercept tqdm and emit callbacks with that progress information
    ├── progress_info.ipynb    # Defines the data structure used to represent progress state
    ├── progress_monitor.ipynb # Thread-safe monitor for tracking and aggregating progress from multiple concurrent jobs.
    └── streaming.ipynb        # Server-Sent Events (SSE) generator for real-time progress streaming to web clients.

Total: 5 notebooks

## Module Dependencies

``` mermaid
graph LR
    job_runner[job_runner<br/>job runner]
    patch_tqdm[patch_tqdm<br/>patch tqdm]
    progress_info[progress_info<br/>progress info]
    progress_monitor[progress_monitor<br/>progress monitor]
    streaming[streaming<br/>streaming]

    job_runner --> patch_tqdm
    job_runner --> progress_info
    job_runner --> progress_monitor
    patch_tqdm --> progress_info
    progress_monitor --> patch_tqdm
    progress_monitor --> progress_info
    streaming --> job_runner
    streaming --> progress_monitor
```

*8 cross-module dependencies detected*

## CLI Reference

No CLI commands found in this project.

## Module Overview

Detailed documentation for each module in the project:

### job runner (`job_runner.ipynb`)

> Executes functions in background threads with automatic tqdm progress
> capture.

#### Import

``` python
from cjm_tqdm_capture.job_runner import (
    JobRunner
)
```

#### Classes

``` python
class JobRunner:
    def __init__(
        self,
        monitor: ProgressMonitor  # Progress monitor instance to receive updates
    )
    """
    Runs a callable in a background thread, patches tqdm inside the job,
    and forwards ProgressInfo updates to a ProgressMonitor under job_id.
    """
    
    def __init__(
            self,
            monitor: ProgressMonitor  # Progress monitor instance to receive updates
        )
        "Initialize a job runner with a progress monitor"
    
    def start(
            self,
            job_id: str,  # Unique identifier for this job
            fn: Callable[..., Any],
            *args,
            patch_kwargs: Optional[Dict[str, Any]] = None,
            **kwargs
        ) -> threading.Thread:  # The thread running the job
        "Start a job in a background thread with automatic tqdm patching"
    
    def is_alive(
            self,
            job_id: str  # Unique identifier of the job to check
        ) -> bool:  # True if the job thread is still running
        "Check if a job's thread is still running"
    
    def join(
            self,
            job_id: str,  # Unique identifier of the job to wait for
            timeout: Optional[float] = None  # Maximum seconds to wait (None for indefinite)
        ) -> None:  # Returns when thread completes or timeout expires
        "Wait for a job's thread to complete"
```

### patch tqdm (`patch_tqdm.ipynb`)

> Provides the patching mechanism to intercept tqdm and emit callbacks
> with that progress information

#### Import

``` python
from cjm_tqdm_capture.patch_tqdm import (
    patch_tqdm
)
```

#### Functions

``` python
def _make_callback_class(
    BaseTqdm: type,  # Base tqdm class to extend with callback functionality
    default_cb: Optional[Callable[[ProgressInfo], None]],
    min_update_interval: float = 0.1,  # Minimum time between callback invocations (seconds)
    min_delta_pct: float = 1.0,      # emit only if pct moves by >= this
    emit_initial: bool = False       # whether to emit at 0%
)
    "Create a tqdm subclass that emits progress callbacks during iteration"
```

``` python
@contextmanager
def patch_tqdm(
    progress_callback: Optional[Callable[[ProgressInfo], None]],  # Function to call with progress updates
    min_update_interval: float = 0.1,  # Minimum time between callback invocations (seconds)
    min_delta_pct: float = 10.0,   # e.g., only every ~10%
    emit_initial: bool = False  # Whether to emit callback at 0% progress
)
    "Context manager that patches tqdm to emit progress callbacks"
```

#### Variables

``` python
_BAR_COUNTER
```

### progress info (`progress_info.ipynb`)

> Defines the data structure used to represent progress state

#### Import

``` python
from cjm_tqdm_capture.progress_info import (
    ProgressInfo,
    serialize_job_snapshot,
    serialize_all_jobs
)
```

#### Functions

``` python
def serialize_job_snapshot(
    snapshot: Optional[Dict[str, Any]]  # Job snapshot dictionary from ProgressMonitor
) -> Optional[Dict[str, Any]]:  # JSON-serializable dictionary or None if input is None
    "Convert a job snapshot with ProgressInfo objects to a JSON-serializable format."
```

``` python
def serialize_all_jobs(
    jobs: Dict[str, Dict[str, Any]]  # Dictionary mapping job IDs to job snapshots
) -> Dict[str, Optional[Dict[str, Any]]]:  # Dictionary mapping job IDs to serialized snapshots
    "Convert all jobs from monitor.all() to JSON-serializable format."
```

#### Classes

``` python
@dataclass
class ProgressInfo:
    "Structured progress information"
    
    progress: float  # Percentage completion (0-100)
    current: Optional[int]  # Current iteration count
    total: Optional[int]  # Total iterations expected
    rate: Optional[str]  # Processing rate (e.g., "50.5 it/s")
    elapsed: Optional[str]  # Time elapsed since start
    remaining: Optional[str]  # Estimated time remaining
    description: Optional[str]  # Progress bar description/label
    raw_output: str = ''  # Raw output string (if any)
    timestamp: float  # Unix timestamp when created
    bar_id: Optional[str]  # Unique identifier for this progress bar
    position: Optional[int]  # Display position for multi-bar scenarios
    
    def to_dict(self):
            """Convert to dictionary for JSON serialization"""
            return {
                'progress': self.progress,
        "Convert to dictionary for JSON serialization"
```

### progress monitor (`progress_monitor.ipynb`)

> Thread-safe monitor for tracking and aggregating progress from
> multiple concurrent jobs.

#### Import

``` python
from cjm_tqdm_capture.progress_monitor import (
    ProgressMonitor
)
```

#### Classes

``` python
class ProgressMonitor:
    def __init__(
        self,
        keep_history: bool = False,  # Whether to maintain a history of progress updates
        history_limit: int = 500  # Maximum number of historical updates to keep per job
    )
    "Thread-safe monitor for tracking progress of multiple concurrent jobs"
    
    def __init__(
            self,
            keep_history: bool = False,  # Whether to maintain a history of progress updates
            history_limit: int = 500  # Maximum number of historical updates to keep per job
        )
        "Initialize a new progress monitor with optional history tracking"
    
    def update(
            self,
            job_id: str,  # Unique identifier for the job being tracked
            info: ProgressInfo  # Progress information update for the job
        )
        "TODO: Add function description"
    
    def snapshot(
            self,
            job_id: str  # Unique identifier of the job to snapshot
        ) -> Optional[Dict[str, Any]]:  # Job state dictionary or None if job not found
        "Get a point-in-time snapshot of a specific job's progress state"
    
    def all(
            self
        ) -> Dict[str, Dict[str, Any]]:  # Dictionary mapping job IDs to their state snapshots
        "Get snapshots of all tracked jobs"
    
    def clear_completed(
            self,
            older_than_seconds: float = 3600  # Age threshold in seconds for removing completed jobs
        )
        "Remove completed jobs that finished more than the specified seconds ago"
```

### streaming (`streaming.ipynb`)

> Server-Sent Events (SSE) generator for real-time progress streaming to
> web clients.

#### Import

``` python
from cjm_tqdm_capture.streaming import (
    sse_stream,
    sse_stream_async
)
```

#### Functions

``` python
def sse_stream(
    monitor: ProgressMonitor,  # Progress monitor instance to read job updates from
    job_id: str,  # Unique identifier of the job to stream
    interval: float = 0.25,  # Polling interval in seconds for checking progress updates
    heartbeat: float = 15.0,  # Seconds between keep-alive messages when no updates
    wait_for_start: bool = True,  # Whether to wait for job to start before ending stream
    start_timeout: float = 5.0,  # Maximum seconds to wait for job to start if wait_for_start is True
) -> Iterator[str]:  # SSE-formatted strings ready to send to client
    """
    Framework-agnostic SSE generator.
    - Yields 'data: {json}\n\n' when progress changes.
    - Sends ': keep-alive' comments every `heartbeat` seconds when idle.
    - If `wait_for_start` is True, it will wait up to `start_timeout` for
      the first snapshot before ending (avoids race at job startup).
    """
```

``` python
async def sse_stream_async(
    monitor: ProgressMonitor,  # Progress monitor instance to read job updates from
    job_id: str,  # Unique identifier of the job to stream
    interval: float = 0.25,  # Polling interval in seconds for checking progress updates
    heartbeat: float = 15.0,  # Seconds between keep-alive messages when no updates
    wait_for_start: bool = True,  # Whether to wait for job to start before ending stream
    start_timeout: float = 5.0,  # Maximum seconds to wait for job to start if wait_for_start is True
) -> AsyncIterator[str]:  # SSE-formatted strings ready to send to client
    """
    Async version of SSE generator for frameworks that require async iteration.
    - Yields 'data: {json}\n\n' when progress changes.
    - Sends ': keep-alive' comments every `heartbeat` seconds when idle.
    - If `wait_for_start` is True, it will wait up to `start_timeout` for
      the first snapshot before ending (avoids race at job startup).
    """
```

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/cj-mills/cjm-tqdm-capture",
    "name": "cjm-tqdm-capture",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.11",
    "maintainer_email": null,
    "keywords": "nbdev jupyter notebook python",
    "author": "Christian J. Mills",
    "author_email": "9126128+cj-mills@users.noreply.github.com",
    "download_url": "https://files.pythonhosted.org/packages/df/58/914c412396cf7f594c28bc004928626afb1714ea1df07dec21f1a04b289b/cjm_tqdm_capture-0.0.5.tar.gz",
    "platform": null,
    "description": "# cjm-tqdm-capture\n\n\n<!-- WARNING: THIS FILE WAS AUTOGENERATED! DO NOT EDIT! -->\n\n## Install\n\n``` bash\npip install cjm-tqdm-capture\n```\n\n## Project Structure\n\n    nbs/\n    \u251c\u2500\u2500 job_runner.ipynb       # Executes functions in background threads with automatic tqdm progress capture.\n    \u251c\u2500\u2500 patch_tqdm.ipynb       # Provides the patching mechanism to intercept tqdm and emit callbacks with that progress information\n    \u251c\u2500\u2500 progress_info.ipynb    # Defines the data structure used to represent progress state\n    \u251c\u2500\u2500 progress_monitor.ipynb # Thread-safe monitor for tracking and aggregating progress from multiple concurrent jobs.\n    \u2514\u2500\u2500 streaming.ipynb        # Server-Sent Events (SSE) generator for real-time progress streaming to web clients.\n\nTotal: 5 notebooks\n\n## Module Dependencies\n\n``` mermaid\ngraph LR\n    job_runner[job_runner<br/>job runner]\n    patch_tqdm[patch_tqdm<br/>patch tqdm]\n    progress_info[progress_info<br/>progress info]\n    progress_monitor[progress_monitor<br/>progress monitor]\n    streaming[streaming<br/>streaming]\n\n    job_runner --> patch_tqdm\n    job_runner --> progress_info\n    job_runner --> progress_monitor\n    patch_tqdm --> progress_info\n    progress_monitor --> patch_tqdm\n    progress_monitor --> progress_info\n    streaming --> job_runner\n    streaming --> progress_monitor\n```\n\n*8 cross-module dependencies detected*\n\n## CLI Reference\n\nNo CLI commands found in this project.\n\n## Module Overview\n\nDetailed documentation for each module in the project:\n\n### job runner (`job_runner.ipynb`)\n\n> Executes functions in background threads with automatic tqdm progress\n> capture.\n\n#### Import\n\n``` python\nfrom cjm_tqdm_capture.job_runner import (\n    JobRunner\n)\n```\n\n#### Classes\n\n``` python\nclass JobRunner:\n    def __init__(\n        self,\n        monitor: ProgressMonitor  # Progress monitor instance to receive updates\n    )\n    \"\"\"\n    Runs a callable in a background thread, patches tqdm inside the job,\n    and forwards ProgressInfo updates to a ProgressMonitor under job_id.\n    \"\"\"\n    \n    def __init__(\n            self,\n            monitor: ProgressMonitor  # Progress monitor instance to receive updates\n        )\n        \"Initialize a job runner with a progress monitor\"\n    \n    def start(\n            self,\n            job_id: str,  # Unique identifier for this job\n            fn: Callable[..., Any],\n            *args,\n            patch_kwargs: Optional[Dict[str, Any]] = None,\n            **kwargs\n        ) -> threading.Thread:  # The thread running the job\n        \"Start a job in a background thread with automatic tqdm patching\"\n    \n    def is_alive(\n            self,\n            job_id: str  # Unique identifier of the job to check\n        ) -> bool:  # True if the job thread is still running\n        \"Check if a job's thread is still running\"\n    \n    def join(\n            self,\n            job_id: str,  # Unique identifier of the job to wait for\n            timeout: Optional[float] = None  # Maximum seconds to wait (None for indefinite)\n        ) -> None:  # Returns when thread completes or timeout expires\n        \"Wait for a job's thread to complete\"\n```\n\n### patch tqdm (`patch_tqdm.ipynb`)\n\n> Provides the patching mechanism to intercept tqdm and emit callbacks\n> with that progress information\n\n#### Import\n\n``` python\nfrom cjm_tqdm_capture.patch_tqdm import (\n    patch_tqdm\n)\n```\n\n#### Functions\n\n``` python\ndef _make_callback_class(\n    BaseTqdm: type,  # Base tqdm class to extend with callback functionality\n    default_cb: Optional[Callable[[ProgressInfo], None]],\n    min_update_interval: float = 0.1,  # Minimum time between callback invocations (seconds)\n    min_delta_pct: float = 1.0,      # emit only if pct moves by >= this\n    emit_initial: bool = False       # whether to emit at 0%\n)\n    \"Create a tqdm subclass that emits progress callbacks during iteration\"\n```\n\n``` python\n@contextmanager\ndef patch_tqdm(\n    progress_callback: Optional[Callable[[ProgressInfo], None]],  # Function to call with progress updates\n    min_update_interval: float = 0.1,  # Minimum time between callback invocations (seconds)\n    min_delta_pct: float = 10.0,   # e.g., only every ~10%\n    emit_initial: bool = False  # Whether to emit callback at 0% progress\n)\n    \"Context manager that patches tqdm to emit progress callbacks\"\n```\n\n#### Variables\n\n``` python\n_BAR_COUNTER\n```\n\n### progress info (`progress_info.ipynb`)\n\n> Defines the data structure used to represent progress state\n\n#### Import\n\n``` python\nfrom cjm_tqdm_capture.progress_info import (\n    ProgressInfo,\n    serialize_job_snapshot,\n    serialize_all_jobs\n)\n```\n\n#### Functions\n\n``` python\ndef serialize_job_snapshot(\n    snapshot: Optional[Dict[str, Any]]  # Job snapshot dictionary from ProgressMonitor\n) -> Optional[Dict[str, Any]]:  # JSON-serializable dictionary or None if input is None\n    \"Convert a job snapshot with ProgressInfo objects to a JSON-serializable format.\"\n```\n\n``` python\ndef serialize_all_jobs(\n    jobs: Dict[str, Dict[str, Any]]  # Dictionary mapping job IDs to job snapshots\n) -> Dict[str, Optional[Dict[str, Any]]]:  # Dictionary mapping job IDs to serialized snapshots\n    \"Convert all jobs from monitor.all() to JSON-serializable format.\"\n```\n\n#### Classes\n\n``` python\n@dataclass\nclass ProgressInfo:\n    \"Structured progress information\"\n    \n    progress: float  # Percentage completion (0-100)\n    current: Optional[int]  # Current iteration count\n    total: Optional[int]  # Total iterations expected\n    rate: Optional[str]  # Processing rate (e.g., \"50.5 it/s\")\n    elapsed: Optional[str]  # Time elapsed since start\n    remaining: Optional[str]  # Estimated time remaining\n    description: Optional[str]  # Progress bar description/label\n    raw_output: str = ''  # Raw output string (if any)\n    timestamp: float  # Unix timestamp when created\n    bar_id: Optional[str]  # Unique identifier for this progress bar\n    position: Optional[int]  # Display position for multi-bar scenarios\n    \n    def to_dict(self):\n            \"\"\"Convert to dictionary for JSON serialization\"\"\"\n            return {\n                'progress': self.progress,\n        \"Convert to dictionary for JSON serialization\"\n```\n\n### progress monitor (`progress_monitor.ipynb`)\n\n> Thread-safe monitor for tracking and aggregating progress from\n> multiple concurrent jobs.\n\n#### Import\n\n``` python\nfrom cjm_tqdm_capture.progress_monitor import (\n    ProgressMonitor\n)\n```\n\n#### Classes\n\n``` python\nclass ProgressMonitor:\n    def __init__(\n        self,\n        keep_history: bool = False,  # Whether to maintain a history of progress updates\n        history_limit: int = 500  # Maximum number of historical updates to keep per job\n    )\n    \"Thread-safe monitor for tracking progress of multiple concurrent jobs\"\n    \n    def __init__(\n            self,\n            keep_history: bool = False,  # Whether to maintain a history of progress updates\n            history_limit: int = 500  # Maximum number of historical updates to keep per job\n        )\n        \"Initialize a new progress monitor with optional history tracking\"\n    \n    def update(\n            self,\n            job_id: str,  # Unique identifier for the job being tracked\n            info: ProgressInfo  # Progress information update for the job\n        )\n        \"TODO: Add function description\"\n    \n    def snapshot(\n            self,\n            job_id: str  # Unique identifier of the job to snapshot\n        ) -> Optional[Dict[str, Any]]:  # Job state dictionary or None if job not found\n        \"Get a point-in-time snapshot of a specific job's progress state\"\n    \n    def all(\n            self\n        ) -> Dict[str, Dict[str, Any]]:  # Dictionary mapping job IDs to their state snapshots\n        \"Get snapshots of all tracked jobs\"\n    \n    def clear_completed(\n            self,\n            older_than_seconds: float = 3600  # Age threshold in seconds for removing completed jobs\n        )\n        \"Remove completed jobs that finished more than the specified seconds ago\"\n```\n\n### streaming (`streaming.ipynb`)\n\n> Server-Sent Events (SSE) generator for real-time progress streaming to\n> web clients.\n\n#### Import\n\n``` python\nfrom cjm_tqdm_capture.streaming import (\n    sse_stream,\n    sse_stream_async\n)\n```\n\n#### Functions\n\n``` python\ndef sse_stream(\n    monitor: ProgressMonitor,  # Progress monitor instance to read job updates from\n    job_id: str,  # Unique identifier of the job to stream\n    interval: float = 0.25,  # Polling interval in seconds for checking progress updates\n    heartbeat: float = 15.0,  # Seconds between keep-alive messages when no updates\n    wait_for_start: bool = True,  # Whether to wait for job to start before ending stream\n    start_timeout: float = 5.0,  # Maximum seconds to wait for job to start if wait_for_start is True\n) -> Iterator[str]:  # SSE-formatted strings ready to send to client\n    \"\"\"\n    Framework-agnostic SSE generator.\n    - Yields 'data: {json}\\n\\n' when progress changes.\n    - Sends ': keep-alive' comments every `heartbeat` seconds when idle.\n    - If `wait_for_start` is True, it will wait up to `start_timeout` for\n      the first snapshot before ending (avoids race at job startup).\n    \"\"\"\n```\n\n``` python\nasync def sse_stream_async(\n    monitor: ProgressMonitor,  # Progress monitor instance to read job updates from\n    job_id: str,  # Unique identifier of the job to stream\n    interval: float = 0.25,  # Polling interval in seconds for checking progress updates\n    heartbeat: float = 15.0,  # Seconds between keep-alive messages when no updates\n    wait_for_start: bool = True,  # Whether to wait for job to start before ending stream\n    start_timeout: float = 5.0,  # Maximum seconds to wait for job to start if wait_for_start is True\n) -> AsyncIterator[str]:  # SSE-formatted strings ready to send to client\n    \"\"\"\n    Async version of SSE generator for frameworks that require async iteration.\n    - Yields 'data: {json}\\n\\n' when progress changes.\n    - Sends ': keep-alive' comments every `heartbeat` seconds when idle.\n    - If `wait_for_start` is True, it will wait up to `start_timeout` for\n      the first snapshot before ending (avoids race at job startup).\n    \"\"\"\n```\n",
    "bugtrack_url": null,
    "license": "Apache Software License 2.0",
    "summary": "Intercept and capture progress information from tqdm progress bars via callbacks without modifying existing code.",
    "version": "0.0.5",
    "project_urls": {
        "Homepage": "https://github.com/cj-mills/cjm-tqdm-capture"
    },
    "split_keywords": [
        "nbdev",
        "jupyter",
        "notebook",
        "python"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "c16e3d8fdd69e69d930543d29e82e31f9a5961502f6a7eb7e373b3ace3323425",
                "md5": "13fba25f823793a8834438ff94a20850",
                "sha256": "e20ec969811e96f60000a96d1b9f37bf058463e8ec8e78a553483f7fc32dc33b"
            },
            "downloads": -1,
            "filename": "cjm_tqdm_capture-0.0.5-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "13fba25f823793a8834438ff94a20850",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.11",
            "size": 17538,
            "upload_time": "2025-08-23T00:08:37",
            "upload_time_iso_8601": "2025-08-23T00:08:37.703452Z",
            "url": "https://files.pythonhosted.org/packages/c1/6e/3d8fdd69e69d930543d29e82e31f9a5961502f6a7eb7e373b3ace3323425/cjm_tqdm_capture-0.0.5-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "df58914c412396cf7f594c28bc004928626afb1714ea1df07dec21f1a04b289b",
                "md5": "971a430329b2bb55f22aebd4a98df369",
                "sha256": "478298fe6813a269bf18b6514cfc0ebd51f2d93d120d1486e398aaba1041d6b1"
            },
            "downloads": -1,
            "filename": "cjm_tqdm_capture-0.0.5.tar.gz",
            "has_sig": false,
            "md5_digest": "971a430329b2bb55f22aebd4a98df369",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.11",
            "size": 17086,
            "upload_time": "2025-08-23T00:08:38",
            "upload_time_iso_8601": "2025-08-23T00:08:38.922989Z",
            "url": "https://files.pythonhosted.org/packages/df/58/914c412396cf7f594c28bc004928626afb1714ea1df07dec21f1a04b289b/cjm_tqdm_capture-0.0.5.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-08-23 00:08:38",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "cj-mills",
    "github_project": "cjm-tqdm-capture",
    "github_not_found": true,
    "lcname": "cjm-tqdm-capture"
}
        
Elapsed time: 0.71650s