cjm-tqdm-capture


Namecjm-tqdm-capture JSON
Version 0.0.6 PyPI version JSON
download
home_pagehttps://github.com/cj-mills/cjm-tqdm-capture
SummaryIntercept and capture progress information from tqdm progress bars via callbacks without modifying existing code.
upload_time2025-10-25 00:28:09
maintainerNone
docs_urlNone
authorChristian J. Mills
requires_python>=3.11
licenseApache Software License 2.0
keywords nbdev jupyter notebook python
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # cjm-tqdm-capture


<!-- WARNING: THIS FILE WAS AUTOGENERATED! DO NOT EDIT! -->

## Install

``` bash
pip install cjm-tqdm-capture
```

## Project Structure

    nbs/
    ├── job_runner.ipynb       # Executes functions in background threads with automatic tqdm progress capture.
    ├── patch_tqdm.ipynb       # Provides the patching mechanism to intercept tqdm and emit callbacks with that progress information
    ├── progress_info.ipynb    # Defines the data structure used to represent progress state
    ├── progress_monitor.ipynb # Thread-safe monitor for tracking and aggregating progress from multiple concurrent jobs.
    └── streaming.ipynb        # Server-Sent Events (SSE) generator for real-time progress streaming to web clients.

Total: 5 notebooks

## Module Dependencies

``` mermaid
graph LR
    job_runner[job_runner<br/>job runner]
    patch_tqdm[patch_tqdm<br/>patch tqdm]
    progress_info[progress_info<br/>progress info]
    progress_monitor[progress_monitor<br/>progress monitor]
    streaming[streaming<br/>streaming]

    job_runner --> progress_info
    job_runner --> progress_monitor
    job_runner --> patch_tqdm
    patch_tqdm --> progress_info
    progress_monitor --> progress_info
    progress_monitor --> patch_tqdm
    streaming --> progress_monitor
    streaming --> job_runner
```

*8 cross-module dependencies detected*

## CLI Reference

No CLI commands found in this project.

## Module Overview

Detailed documentation for each module in the project:

### job runner (`job_runner.ipynb`)

> Executes functions in background threads with automatic tqdm progress
> capture.

#### Import

``` python
from cjm_tqdm_capture.job_runner import (
    JobRunner
)
```

#### Classes

``` python
class JobRunner:
    def __init__(
        self,
        monitor: ProgressMonitor  # Progress monitor instance to receive updates
    )
    "Runs functions in background threads with automatic tqdm progress capture"
    
    def __init__(
            self,
            monitor: ProgressMonitor  # Progress monitor instance to receive updates
        )
        "Initialize a job runner with a progress monitor"
    
    def start(
            self,
            job_id: str,  # Unique identifier for this job
            fn: Callable[..., Any],
            *args,
            patch_kwargs: Optional[Dict[str, Any]] = None,
            **kwargs
        ) -> threading.Thread:  # The thread running the job
        "Start a job in a background thread with automatic tqdm patching"
    
    def is_alive(
            self,
            job_id: str  # Unique identifier of the job to check
        ) -> bool:  # True if the job thread is still running
        "Check if a job's thread is still running"
    
    def join(
            self,
            job_id: str,  # Unique identifier of the job to wait for
            timeout: Optional[float] = None  # Maximum seconds to wait (None for indefinite)
        ) -> None:  # Returns when thread completes or timeout expires
        "Wait for a job's thread to complete"
```

### patch tqdm (`patch_tqdm.ipynb`)

> Provides the patching mechanism to intercept tqdm and emit callbacks
> with that progress information

#### Import

``` python
from cjm_tqdm_capture.patch_tqdm import (
    patch_tqdm
)
```

#### Functions

``` python
def _make_callback_class(
    BaseTqdm: type,  # Base tqdm class to extend with callback functionality
    default_cb: Optional[Callable[[ProgressInfo], None]],
    min_update_interval: float = 0.1,  # Minimum time between callback invocations (seconds)
    min_delta_pct: float = 1.0,      # emit only if pct moves by >= this
    emit_initial: bool = False       # whether to emit at 0%
) -> type: # Extended tqdm class with callback support
    "Create a tqdm subclass that emits progress callbacks during iteration"
```

``` python
@contextmanager
def patch_tqdm(
    progress_callback: Optional[Callable[[ProgressInfo], None]],  # Function to call with progress updates
    min_update_interval: float = 0.1,  # Minimum time between callback invocations (seconds)
    min_delta_pct: float = 10.0,   # e.g., only every ~10%
    emit_initial: bool = False  # Whether to emit callback at 0% progress
): # Context manager that temporarily patches tqdm modules
    "Context manager that patches tqdm to emit progress callbacks"
```

#### Variables

``` python
_BAR_COUNTER
```

### progress info (`progress_info.ipynb`)

> Defines the data structure used to represent progress state

#### Import

``` python
from cjm_tqdm_capture.progress_info import (
    ProgressInfo,
    serialize_job_snapshot,
    serialize_all_jobs
)
```

#### Functions

``` python
def serialize_job_snapshot(
    snapshot: Optional[Dict[str, Any]]  # Job snapshot dictionary from ProgressMonitor
) -> Optional[Dict[str, Any]]:  # JSON-serializable dictionary or None if input is None
    "Convert a job snapshot to JSON-serializable format"
```

``` python
def serialize_all_jobs(
    jobs: Dict[str, Dict[str, Any]]  # Dictionary mapping job IDs to job snapshots
) -> Dict[str, Optional[Dict[str, Any]]]:  # Dictionary mapping job IDs to serialized snapshots
    "Convert all jobs from monitor.all() to JSON-serializable format"
```

#### Classes

``` python
@dataclass
class ProgressInfo:
    "Structured progress information"
    
    progress: float  # Percentage completion (0-100)
    current: Optional[int]  # Current iteration count
    total: Optional[int]  # Total iterations expected
    rate: Optional[str]  # Processing rate (e.g., "50.5 it/s")
    elapsed: Optional[str]  # Time elapsed since start
    remaining: Optional[str]  # Estimated time remaining
    description: Optional[str]  # Progress bar description/label
    raw_output: str = ''  # Raw output string (if any)
    timestamp: float  # Unix timestamp when created
    bar_id: Optional[str]  # Unique identifier for this progress bar
    position: Optional[int]  # Display position for multi-bar scenarios
    
    def to_dict(
            self
        ) -> dict: # Dictionary with all progress fields for JSON serialization
        "Convert to dictionary for JSON serialization"
```

### progress monitor (`progress_monitor.ipynb`)

> Thread-safe monitor for tracking and aggregating progress from
> multiple concurrent jobs.

#### Import

``` python
from cjm_tqdm_capture.progress_monitor import (
    ProgressMonitor
)
```

#### Classes

``` python
class ProgressMonitor:
    def __init__(
        self,
        keep_history: bool = False,  # Whether to maintain a history of progress updates
        history_limit: int = 500  # Maximum number of historical updates to keep per job
    )
    "Thread-safe monitor for tracking progress of multiple concurrent jobs"
    
    def __init__(
            self,
            keep_history: bool = False,  # Whether to maintain a history of progress updates
            history_limit: int = 500  # Maximum number of historical updates to keep per job
        )
        "Initialize a new progress monitor with optional history tracking"
    
    def update(
            self,
            job_id: str,  # Unique identifier for the job being tracked
            info: ProgressInfo  # Progress information update for the job
        )
        "Record a progress update for a job and recompute its completion status"
    
    def snapshot(
            self,
            job_id: str  # Unique identifier of the job to snapshot
        ) -> Optional[Dict[str, Any]]:  # Job state dictionary or None if job not found
        "Get a point-in-time snapshot of a specific job's progress state"
    
    def all(
            self
        ) -> Dict[str, Dict[str, Any]]:  # Dictionary mapping job IDs to their state snapshots
        "Get snapshots of all tracked jobs"
    
    def clear_completed(
            self,
            older_than_seconds: float = 3600  # Age threshold in seconds for removing completed jobs
        )
        "Remove completed jobs that finished more than the specified seconds ago"
```

### streaming (`streaming.ipynb`)

> Server-Sent Events (SSE) generator for real-time progress streaming to
> web clients.

#### Import

``` python
from cjm_tqdm_capture.streaming import (
    sse_stream,
    sse_stream_async
)
```

#### Functions

``` python
def sse_stream(
    monitor: ProgressMonitor,  # Progress monitor instance to read job updates from
    job_id: str,  # Unique identifier of the job to stream
    interval: float = 0.25,  # Polling interval in seconds for checking progress updates
    heartbeat: float = 15.0,  # Seconds between keep-alive messages when no updates
    wait_for_start: bool = True,  # Whether to wait for job to start before ending stream
    start_timeout: float = 5.0,  # Maximum seconds to wait for job to start if wait_for_start is True
) -> Iterator[str]:  # SSE-formatted strings ready to send to client
    "Framework-agnostic SSE generator for streaming job progress"
```

``` python
async def sse_stream_async(
    monitor: ProgressMonitor,  # Progress monitor instance to read job updates from
    job_id: str,  # Unique identifier of the job to stream
    interval: float = 0.25,  # Polling interval in seconds for checking progress updates
    heartbeat: float = 15.0,  # Seconds between keep-alive messages when no updates
    wait_for_start: bool = True,  # Whether to wait for job to start before ending stream
    start_timeout: float = 5.0,  # Maximum seconds to wait for job to start if wait_for_start is True
) -> AsyncIterator[str]:  # SSE-formatted strings ready to send to client
    "Async version of SSE generator for streaming job progress"
```

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/cj-mills/cjm-tqdm-capture",
    "name": "cjm-tqdm-capture",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.11",
    "maintainer_email": null,
    "keywords": "nbdev jupyter notebook python",
    "author": "Christian J. Mills",
    "author_email": "9126128+cj-mills@users.noreply.github.com",
    "download_url": "https://files.pythonhosted.org/packages/d8/62/a18ceb171d0127b73c3916fa91b22bc8044ae9856192c3a86524b670ef2f/cjm_tqdm_capture-0.0.6.tar.gz",
    "platform": null,
    "description": "# cjm-tqdm-capture\n\n\n<!-- WARNING: THIS FILE WAS AUTOGENERATED! DO NOT EDIT! -->\n\n## Install\n\n``` bash\npip install cjm-tqdm-capture\n```\n\n## Project Structure\n\n    nbs/\n    \u251c\u2500\u2500 job_runner.ipynb       # Executes functions in background threads with automatic tqdm progress capture.\n    \u251c\u2500\u2500 patch_tqdm.ipynb       # Provides the patching mechanism to intercept tqdm and emit callbacks with that progress information\n    \u251c\u2500\u2500 progress_info.ipynb    # Defines the data structure used to represent progress state\n    \u251c\u2500\u2500 progress_monitor.ipynb # Thread-safe monitor for tracking and aggregating progress from multiple concurrent jobs.\n    \u2514\u2500\u2500 streaming.ipynb        # Server-Sent Events (SSE) generator for real-time progress streaming to web clients.\n\nTotal: 5 notebooks\n\n## Module Dependencies\n\n``` mermaid\ngraph LR\n    job_runner[job_runner<br/>job runner]\n    patch_tqdm[patch_tqdm<br/>patch tqdm]\n    progress_info[progress_info<br/>progress info]\n    progress_monitor[progress_monitor<br/>progress monitor]\n    streaming[streaming<br/>streaming]\n\n    job_runner --> progress_info\n    job_runner --> progress_monitor\n    job_runner --> patch_tqdm\n    patch_tqdm --> progress_info\n    progress_monitor --> progress_info\n    progress_monitor --> patch_tqdm\n    streaming --> progress_monitor\n    streaming --> job_runner\n```\n\n*8 cross-module dependencies detected*\n\n## CLI Reference\n\nNo CLI commands found in this project.\n\n## Module Overview\n\nDetailed documentation for each module in the project:\n\n### job runner (`job_runner.ipynb`)\n\n> Executes functions in background threads with automatic tqdm progress\n> capture.\n\n#### Import\n\n``` python\nfrom cjm_tqdm_capture.job_runner import (\n    JobRunner\n)\n```\n\n#### Classes\n\n``` python\nclass JobRunner:\n    def __init__(\n        self,\n        monitor: ProgressMonitor  # Progress monitor instance to receive updates\n    )\n    \"Runs functions in background threads with automatic tqdm progress capture\"\n    \n    def __init__(\n            self,\n            monitor: ProgressMonitor  # Progress monitor instance to receive updates\n        )\n        \"Initialize a job runner with a progress monitor\"\n    \n    def start(\n            self,\n            job_id: str,  # Unique identifier for this job\n            fn: Callable[..., Any],\n            *args,\n            patch_kwargs: Optional[Dict[str, Any]] = None,\n            **kwargs\n        ) -> threading.Thread:  # The thread running the job\n        \"Start a job in a background thread with automatic tqdm patching\"\n    \n    def is_alive(\n            self,\n            job_id: str  # Unique identifier of the job to check\n        ) -> bool:  # True if the job thread is still running\n        \"Check if a job's thread is still running\"\n    \n    def join(\n            self,\n            job_id: str,  # Unique identifier of the job to wait for\n            timeout: Optional[float] = None  # Maximum seconds to wait (None for indefinite)\n        ) -> None:  # Returns when thread completes or timeout expires\n        \"Wait for a job's thread to complete\"\n```\n\n### patch tqdm (`patch_tqdm.ipynb`)\n\n> Provides the patching mechanism to intercept tqdm and emit callbacks\n> with that progress information\n\n#### Import\n\n``` python\nfrom cjm_tqdm_capture.patch_tqdm import (\n    patch_tqdm\n)\n```\n\n#### Functions\n\n``` python\ndef _make_callback_class(\n    BaseTqdm: type,  # Base tqdm class to extend with callback functionality\n    default_cb: Optional[Callable[[ProgressInfo], None]],\n    min_update_interval: float = 0.1,  # Minimum time between callback invocations (seconds)\n    min_delta_pct: float = 1.0,      # emit only if pct moves by >= this\n    emit_initial: bool = False       # whether to emit at 0%\n) -> type: # Extended tqdm class with callback support\n    \"Create a tqdm subclass that emits progress callbacks during iteration\"\n```\n\n``` python\n@contextmanager\ndef patch_tqdm(\n    progress_callback: Optional[Callable[[ProgressInfo], None]],  # Function to call with progress updates\n    min_update_interval: float = 0.1,  # Minimum time between callback invocations (seconds)\n    min_delta_pct: float = 10.0,   # e.g., only every ~10%\n    emit_initial: bool = False  # Whether to emit callback at 0% progress\n): # Context manager that temporarily patches tqdm modules\n    \"Context manager that patches tqdm to emit progress callbacks\"\n```\n\n#### Variables\n\n``` python\n_BAR_COUNTER\n```\n\n### progress info (`progress_info.ipynb`)\n\n> Defines the data structure used to represent progress state\n\n#### Import\n\n``` python\nfrom cjm_tqdm_capture.progress_info import (\n    ProgressInfo,\n    serialize_job_snapshot,\n    serialize_all_jobs\n)\n```\n\n#### Functions\n\n``` python\ndef serialize_job_snapshot(\n    snapshot: Optional[Dict[str, Any]]  # Job snapshot dictionary from ProgressMonitor\n) -> Optional[Dict[str, Any]]:  # JSON-serializable dictionary or None if input is None\n    \"Convert a job snapshot to JSON-serializable format\"\n```\n\n``` python\ndef serialize_all_jobs(\n    jobs: Dict[str, Dict[str, Any]]  # Dictionary mapping job IDs to job snapshots\n) -> Dict[str, Optional[Dict[str, Any]]]:  # Dictionary mapping job IDs to serialized snapshots\n    \"Convert all jobs from monitor.all() to JSON-serializable format\"\n```\n\n#### Classes\n\n``` python\n@dataclass\nclass ProgressInfo:\n    \"Structured progress information\"\n    \n    progress: float  # Percentage completion (0-100)\n    current: Optional[int]  # Current iteration count\n    total: Optional[int]  # Total iterations expected\n    rate: Optional[str]  # Processing rate (e.g., \"50.5 it/s\")\n    elapsed: Optional[str]  # Time elapsed since start\n    remaining: Optional[str]  # Estimated time remaining\n    description: Optional[str]  # Progress bar description/label\n    raw_output: str = ''  # Raw output string (if any)\n    timestamp: float  # Unix timestamp when created\n    bar_id: Optional[str]  # Unique identifier for this progress bar\n    position: Optional[int]  # Display position for multi-bar scenarios\n    \n    def to_dict(\n            self\n        ) -> dict: # Dictionary with all progress fields for JSON serialization\n        \"Convert to dictionary for JSON serialization\"\n```\n\n### progress monitor (`progress_monitor.ipynb`)\n\n> Thread-safe monitor for tracking and aggregating progress from\n> multiple concurrent jobs.\n\n#### Import\n\n``` python\nfrom cjm_tqdm_capture.progress_monitor import (\n    ProgressMonitor\n)\n```\n\n#### Classes\n\n``` python\nclass ProgressMonitor:\n    def __init__(\n        self,\n        keep_history: bool = False,  # Whether to maintain a history of progress updates\n        history_limit: int = 500  # Maximum number of historical updates to keep per job\n    )\n    \"Thread-safe monitor for tracking progress of multiple concurrent jobs\"\n    \n    def __init__(\n            self,\n            keep_history: bool = False,  # Whether to maintain a history of progress updates\n            history_limit: int = 500  # Maximum number of historical updates to keep per job\n        )\n        \"Initialize a new progress monitor with optional history tracking\"\n    \n    def update(\n            self,\n            job_id: str,  # Unique identifier for the job being tracked\n            info: ProgressInfo  # Progress information update for the job\n        )\n        \"Record a progress update for a job and recompute its completion status\"\n    \n    def snapshot(\n            self,\n            job_id: str  # Unique identifier of the job to snapshot\n        ) -> Optional[Dict[str, Any]]:  # Job state dictionary or None if job not found\n        \"Get a point-in-time snapshot of a specific job's progress state\"\n    \n    def all(\n            self\n        ) -> Dict[str, Dict[str, Any]]:  # Dictionary mapping job IDs to their state snapshots\n        \"Get snapshots of all tracked jobs\"\n    \n    def clear_completed(\n            self,\n            older_than_seconds: float = 3600  # Age threshold in seconds for removing completed jobs\n        )\n        \"Remove completed jobs that finished more than the specified seconds ago\"\n```\n\n### streaming (`streaming.ipynb`)\n\n> Server-Sent Events (SSE) generator for real-time progress streaming to\n> web clients.\n\n#### Import\n\n``` python\nfrom cjm_tqdm_capture.streaming import (\n    sse_stream,\n    sse_stream_async\n)\n```\n\n#### Functions\n\n``` python\ndef sse_stream(\n    monitor: ProgressMonitor,  # Progress monitor instance to read job updates from\n    job_id: str,  # Unique identifier of the job to stream\n    interval: float = 0.25,  # Polling interval in seconds for checking progress updates\n    heartbeat: float = 15.0,  # Seconds between keep-alive messages when no updates\n    wait_for_start: bool = True,  # Whether to wait for job to start before ending stream\n    start_timeout: float = 5.0,  # Maximum seconds to wait for job to start if wait_for_start is True\n) -> Iterator[str]:  # SSE-formatted strings ready to send to client\n    \"Framework-agnostic SSE generator for streaming job progress\"\n```\n\n``` python\nasync def sse_stream_async(\n    monitor: ProgressMonitor,  # Progress monitor instance to read job updates from\n    job_id: str,  # Unique identifier of the job to stream\n    interval: float = 0.25,  # Polling interval in seconds for checking progress updates\n    heartbeat: float = 15.0,  # Seconds between keep-alive messages when no updates\n    wait_for_start: bool = True,  # Whether to wait for job to start before ending stream\n    start_timeout: float = 5.0,  # Maximum seconds to wait for job to start if wait_for_start is True\n) -> AsyncIterator[str]:  # SSE-formatted strings ready to send to client\n    \"Async version of SSE generator for streaming job progress\"\n```\n",
    "bugtrack_url": null,
    "license": "Apache Software License 2.0",
    "summary": "Intercept and capture progress information from tqdm progress bars via callbacks without modifying existing code.",
    "version": "0.0.6",
    "project_urls": {
        "Homepage": "https://github.com/cj-mills/cjm-tqdm-capture"
    },
    "split_keywords": [
        "nbdev",
        "jupyter",
        "notebook",
        "python"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "1bdfdc7a58a8c6ade736f2603e7beefb738662b19ed664920ba40affef9e8c4d",
                "md5": "d96b917cd1f663fa153a80dc1933135e",
                "sha256": "e5507c0f321de8d6163390ac90892224bc59636dcff43369465196b2603033cc"
            },
            "downloads": -1,
            "filename": "cjm_tqdm_capture-0.0.6-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "d96b917cd1f663fa153a80dc1933135e",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.11",
            "size": 17295,
            "upload_time": "2025-10-25T00:28:08",
            "upload_time_iso_8601": "2025-10-25T00:28:08.400986Z",
            "url": "https://files.pythonhosted.org/packages/1b/df/dc7a58a8c6ade736f2603e7beefb738662b19ed664920ba40affef9e8c4d/cjm_tqdm_capture-0.0.6-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "d862a18ceb171d0127b73c3916fa91b22bc8044ae9856192c3a86524b670ef2f",
                "md5": "3ff8a98e2495a2ab86c5e471c310f81b",
                "sha256": "f7df467f6b79d012f69502e66c37e6e05c9758727143005438c9d967dfa06af1"
            },
            "downloads": -1,
            "filename": "cjm_tqdm_capture-0.0.6.tar.gz",
            "has_sig": false,
            "md5_digest": "3ff8a98e2495a2ab86c5e471c310f81b",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.11",
            "size": 16706,
            "upload_time": "2025-10-25T00:28:09",
            "upload_time_iso_8601": "2025-10-25T00:28:09.399968Z",
            "url": "https://files.pythonhosted.org/packages/d8/62/a18ceb171d0127b73c3916fa91b22bc8044ae9856192c3a86524b670ef2f/cjm_tqdm_capture-0.0.6.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-10-25 00:28:09",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "cj-mills",
    "github_project": "cjm-tqdm-capture",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "cjm-tqdm-capture"
}
        
Elapsed time: 1.43140s