# cjm-tqdm-capture
<!-- WARNING: THIS FILE WAS AUTOGENERATED! DO NOT EDIT! -->
## Install
``` bash
pip install cjm-tqdm-capture
```
## Project Structure
nbs/
├── job_runner.ipynb # Executes functions in background threads with automatic tqdm progress capture.
├── patch_tqdm.ipynb # Provides the patching mechanism to intercept tqdm and emit callbacks with that progress information
├── progress_info.ipynb # Defines the data structure used to represent progress state
├── progress_monitor.ipynb # Thread-safe monitor for tracking and aggregating progress from multiple concurrent jobs.
└── streaming.ipynb # Server-Sent Events (SSE) generator for real-time progress streaming to web clients.
Total: 5 notebooks
## Module Dependencies
``` mermaid
graph LR
job_runner[job_runner<br/>job runner]
patch_tqdm[patch_tqdm<br/>patch tqdm]
progress_info[progress_info<br/>progress info]
progress_monitor[progress_monitor<br/>progress monitor]
streaming[streaming<br/>streaming]
job_runner --> patch_tqdm
job_runner --> progress_info
job_runner --> progress_monitor
patch_tqdm --> progress_info
progress_monitor --> patch_tqdm
progress_monitor --> progress_info
streaming --> job_runner
streaming --> progress_monitor
```
*8 cross-module dependencies detected*
## CLI Reference
No CLI commands found in this project.
## Module Overview
Detailed documentation for each module in the project:
### job runner (`job_runner.ipynb`)
> Executes functions in background threads with automatic tqdm progress
> capture.
#### Import
``` python
from cjm_tqdm_capture.job_runner import (
JobRunner
)
```
#### Classes
``` python
class JobRunner:
def __init__(
self,
monitor: ProgressMonitor # Progress monitor instance to receive updates
)
"""
Runs a callable in a background thread, patches tqdm inside the job,
and forwards ProgressInfo updates to a ProgressMonitor under job_id.
"""
def __init__(
self,
monitor: ProgressMonitor # Progress monitor instance to receive updates
)
"Initialize a job runner with a progress monitor"
def start(
self,
job_id: str, # Unique identifier for this job
fn: Callable[..., Any],
*args,
patch_kwargs: Optional[Dict[str, Any]] = None,
**kwargs
) -> threading.Thread: # The thread running the job
"Start a job in a background thread with automatic tqdm patching"
def is_alive(
self,
job_id: str # Unique identifier of the job to check
) -> bool: # True if the job thread is still running
"Check if a job's thread is still running"
def join(
self,
job_id: str, # Unique identifier of the job to wait for
timeout: Optional[float] = None # Maximum seconds to wait (None for indefinite)
) -> None: # Returns when thread completes or timeout expires
"Wait for a job's thread to complete"
```
### patch tqdm (`patch_tqdm.ipynb`)
> Provides the patching mechanism to intercept tqdm and emit callbacks
> with that progress information
#### Import
``` python
from cjm_tqdm_capture.patch_tqdm import (
patch_tqdm
)
```
#### Functions
``` python
def _make_callback_class(
BaseTqdm: type, # Base tqdm class to extend with callback functionality
default_cb: Optional[Callable[[ProgressInfo], None]],
min_update_interval: float = 0.1, # Minimum time between callback invocations (seconds)
min_delta_pct: float = 1.0, # emit only if pct moves by >= this
emit_initial: bool = False # whether to emit at 0%
)
"Create a tqdm subclass that emits progress callbacks during iteration"
```
``` python
@contextmanager
def patch_tqdm(
progress_callback: Optional[Callable[[ProgressInfo], None]], # Function to call with progress updates
min_update_interval: float = 0.1, # Minimum time between callback invocations (seconds)
min_delta_pct: float = 10.0, # e.g., only every ~10%
emit_initial: bool = False # Whether to emit callback at 0% progress
)
"Context manager that patches tqdm to emit progress callbacks"
```
#### Variables
``` python
_BAR_COUNTER
```
### progress info (`progress_info.ipynb`)
> Defines the data structure used to represent progress state
#### Import
``` python
from cjm_tqdm_capture.progress_info import (
ProgressInfo,
serialize_job_snapshot,
serialize_all_jobs
)
```
#### Functions
``` python
def serialize_job_snapshot(
snapshot: Optional[Dict[str, Any]] # Job snapshot dictionary from ProgressMonitor
) -> Optional[Dict[str, Any]]: # JSON-serializable dictionary or None if input is None
"Convert a job snapshot with ProgressInfo objects to a JSON-serializable format."
```
``` python
def serialize_all_jobs(
jobs: Dict[str, Dict[str, Any]] # Dictionary mapping job IDs to job snapshots
) -> Dict[str, Optional[Dict[str, Any]]]: # Dictionary mapping job IDs to serialized snapshots
"Convert all jobs from monitor.all() to JSON-serializable format."
```
#### Classes
``` python
@dataclass
class ProgressInfo:
"Structured progress information"
progress: float # Percentage completion (0-100)
current: Optional[int] # Current iteration count
total: Optional[int] # Total iterations expected
rate: Optional[str] # Processing rate (e.g., "50.5 it/s")
elapsed: Optional[str] # Time elapsed since start
remaining: Optional[str] # Estimated time remaining
description: Optional[str] # Progress bar description/label
raw_output: str = '' # Raw output string (if any)
timestamp: float # Unix timestamp when created
bar_id: Optional[str] # Unique identifier for this progress bar
position: Optional[int] # Display position for multi-bar scenarios
def to_dict(self):
"""Convert to dictionary for JSON serialization"""
return {
'progress': self.progress,
"Convert to dictionary for JSON serialization"
```
### progress monitor (`progress_monitor.ipynb`)
> Thread-safe monitor for tracking and aggregating progress from
> multiple concurrent jobs.
#### Import
``` python
from cjm_tqdm_capture.progress_monitor import (
ProgressMonitor
)
```
#### Classes
``` python
class ProgressMonitor:
def __init__(
self,
keep_history: bool = False, # Whether to maintain a history of progress updates
history_limit: int = 500 # Maximum number of historical updates to keep per job
)
"Thread-safe monitor for tracking progress of multiple concurrent jobs"
def __init__(
self,
keep_history: bool = False, # Whether to maintain a history of progress updates
history_limit: int = 500 # Maximum number of historical updates to keep per job
)
"Initialize a new progress monitor with optional history tracking"
def update(
self,
job_id: str, # Unique identifier for the job being tracked
info: ProgressInfo # Progress information update for the job
)
"TODO: Add function description"
def snapshot(
self,
job_id: str # Unique identifier of the job to snapshot
) -> Optional[Dict[str, Any]]: # Job state dictionary or None if job not found
"Get a point-in-time snapshot of a specific job's progress state"
def all(
self
) -> Dict[str, Dict[str, Any]]: # Dictionary mapping job IDs to their state snapshots
"Get snapshots of all tracked jobs"
def clear_completed(
self,
older_than_seconds: float = 3600 # Age threshold in seconds for removing completed jobs
)
"Remove completed jobs that finished more than the specified seconds ago"
```
### streaming (`streaming.ipynb`)
> Server-Sent Events (SSE) generator for real-time progress streaming to
> web clients.
#### Import
``` python
from cjm_tqdm_capture.streaming import (
sse_stream,
sse_stream_async
)
```
#### Functions
``` python
def sse_stream(
monitor: ProgressMonitor, # Progress monitor instance to read job updates from
job_id: str, # Unique identifier of the job to stream
interval: float = 0.25, # Polling interval in seconds for checking progress updates
heartbeat: float = 15.0, # Seconds between keep-alive messages when no updates
wait_for_start: bool = True, # Whether to wait for job to start before ending stream
start_timeout: float = 5.0, # Maximum seconds to wait for job to start if wait_for_start is True
) -> Iterator[str]: # SSE-formatted strings ready to send to client
"""
Framework-agnostic SSE generator.
- Yields 'data: {json}\n\n' when progress changes.
- Sends ': keep-alive' comments every `heartbeat` seconds when idle.
- If `wait_for_start` is True, it will wait up to `start_timeout` for
the first snapshot before ending (avoids race at job startup).
"""
```
``` python
async def sse_stream_async(
monitor: ProgressMonitor, # Progress monitor instance to read job updates from
job_id: str, # Unique identifier of the job to stream
interval: float = 0.25, # Polling interval in seconds for checking progress updates
heartbeat: float = 15.0, # Seconds between keep-alive messages when no updates
wait_for_start: bool = True, # Whether to wait for job to start before ending stream
start_timeout: float = 5.0, # Maximum seconds to wait for job to start if wait_for_start is True
) -> AsyncIterator[str]: # SSE-formatted strings ready to send to client
"""
Async version of SSE generator for frameworks that require async iteration.
- Yields 'data: {json}\n\n' when progress changes.
- Sends ': keep-alive' comments every `heartbeat` seconds when idle.
- If `wait_for_start` is True, it will wait up to `start_timeout` for
the first snapshot before ending (avoids race at job startup).
"""
```
Raw data
{
"_id": null,
"home_page": "https://github.com/cj-mills/cjm-tqdm-capture",
"name": "cjm-tqdm-capture",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.11",
"maintainer_email": null,
"keywords": "nbdev jupyter notebook python",
"author": "Christian J. Mills",
"author_email": "9126128+cj-mills@users.noreply.github.com",
"download_url": "https://files.pythonhosted.org/packages/df/58/914c412396cf7f594c28bc004928626afb1714ea1df07dec21f1a04b289b/cjm_tqdm_capture-0.0.5.tar.gz",
"platform": null,
"description": "# cjm-tqdm-capture\n\n\n<!-- WARNING: THIS FILE WAS AUTOGENERATED! DO NOT EDIT! -->\n\n## Install\n\n``` bash\npip install cjm-tqdm-capture\n```\n\n## Project Structure\n\n nbs/\n \u251c\u2500\u2500 job_runner.ipynb # Executes functions in background threads with automatic tqdm progress capture.\n \u251c\u2500\u2500 patch_tqdm.ipynb # Provides the patching mechanism to intercept tqdm and emit callbacks with that progress information\n \u251c\u2500\u2500 progress_info.ipynb # Defines the data structure used to represent progress state\n \u251c\u2500\u2500 progress_monitor.ipynb # Thread-safe monitor for tracking and aggregating progress from multiple concurrent jobs.\n \u2514\u2500\u2500 streaming.ipynb # Server-Sent Events (SSE) generator for real-time progress streaming to web clients.\n\nTotal: 5 notebooks\n\n## Module Dependencies\n\n``` mermaid\ngraph LR\n job_runner[job_runner<br/>job runner]\n patch_tqdm[patch_tqdm<br/>patch tqdm]\n progress_info[progress_info<br/>progress info]\n progress_monitor[progress_monitor<br/>progress monitor]\n streaming[streaming<br/>streaming]\n\n job_runner --> patch_tqdm\n job_runner --> progress_info\n job_runner --> progress_monitor\n patch_tqdm --> progress_info\n progress_monitor --> patch_tqdm\n progress_monitor --> progress_info\n streaming --> job_runner\n streaming --> progress_monitor\n```\n\n*8 cross-module dependencies detected*\n\n## CLI Reference\n\nNo CLI commands found in this project.\n\n## Module Overview\n\nDetailed documentation for each module in the project:\n\n### job runner (`job_runner.ipynb`)\n\n> Executes functions in background threads with automatic tqdm progress\n> capture.\n\n#### Import\n\n``` python\nfrom cjm_tqdm_capture.job_runner import (\n JobRunner\n)\n```\n\n#### Classes\n\n``` python\nclass JobRunner:\n def __init__(\n self,\n monitor: ProgressMonitor # Progress monitor instance to receive updates\n )\n \"\"\"\n Runs a callable in a background thread, patches tqdm inside the job,\n and forwards ProgressInfo updates to a ProgressMonitor under job_id.\n \"\"\"\n \n def __init__(\n self,\n monitor: ProgressMonitor # Progress monitor instance to receive updates\n )\n \"Initialize a job runner with a progress monitor\"\n \n def start(\n self,\n job_id: str, # Unique identifier for this job\n fn: Callable[..., Any],\n *args,\n patch_kwargs: Optional[Dict[str, Any]] = None,\n **kwargs\n ) -> threading.Thread: # The thread running the job\n \"Start a job in a background thread with automatic tqdm patching\"\n \n def is_alive(\n self,\n job_id: str # Unique identifier of the job to check\n ) -> bool: # True if the job thread is still running\n \"Check if a job's thread is still running\"\n \n def join(\n self,\n job_id: str, # Unique identifier of the job to wait for\n timeout: Optional[float] = None # Maximum seconds to wait (None for indefinite)\n ) -> None: # Returns when thread completes or timeout expires\n \"Wait for a job's thread to complete\"\n```\n\n### patch tqdm (`patch_tqdm.ipynb`)\n\n> Provides the patching mechanism to intercept tqdm and emit callbacks\n> with that progress information\n\n#### Import\n\n``` python\nfrom cjm_tqdm_capture.patch_tqdm import (\n patch_tqdm\n)\n```\n\n#### Functions\n\n``` python\ndef _make_callback_class(\n BaseTqdm: type, # Base tqdm class to extend with callback functionality\n default_cb: Optional[Callable[[ProgressInfo], None]],\n min_update_interval: float = 0.1, # Minimum time between callback invocations (seconds)\n min_delta_pct: float = 1.0, # emit only if pct moves by >= this\n emit_initial: bool = False # whether to emit at 0%\n)\n \"Create a tqdm subclass that emits progress callbacks during iteration\"\n```\n\n``` python\n@contextmanager\ndef patch_tqdm(\n progress_callback: Optional[Callable[[ProgressInfo], None]], # Function to call with progress updates\n min_update_interval: float = 0.1, # Minimum time between callback invocations (seconds)\n min_delta_pct: float = 10.0, # e.g., only every ~10%\n emit_initial: bool = False # Whether to emit callback at 0% progress\n)\n \"Context manager that patches tqdm to emit progress callbacks\"\n```\n\n#### Variables\n\n``` python\n_BAR_COUNTER\n```\n\n### progress info (`progress_info.ipynb`)\n\n> Defines the data structure used to represent progress state\n\n#### Import\n\n``` python\nfrom cjm_tqdm_capture.progress_info import (\n ProgressInfo,\n serialize_job_snapshot,\n serialize_all_jobs\n)\n```\n\n#### Functions\n\n``` python\ndef serialize_job_snapshot(\n snapshot: Optional[Dict[str, Any]] # Job snapshot dictionary from ProgressMonitor\n) -> Optional[Dict[str, Any]]: # JSON-serializable dictionary or None if input is None\n \"Convert a job snapshot with ProgressInfo objects to a JSON-serializable format.\"\n```\n\n``` python\ndef serialize_all_jobs(\n jobs: Dict[str, Dict[str, Any]] # Dictionary mapping job IDs to job snapshots\n) -> Dict[str, Optional[Dict[str, Any]]]: # Dictionary mapping job IDs to serialized snapshots\n \"Convert all jobs from monitor.all() to JSON-serializable format.\"\n```\n\n#### Classes\n\n``` python\n@dataclass\nclass ProgressInfo:\n \"Structured progress information\"\n \n progress: float # Percentage completion (0-100)\n current: Optional[int] # Current iteration count\n total: Optional[int] # Total iterations expected\n rate: Optional[str] # Processing rate (e.g., \"50.5 it/s\")\n elapsed: Optional[str] # Time elapsed since start\n remaining: Optional[str] # Estimated time remaining\n description: Optional[str] # Progress bar description/label\n raw_output: str = '' # Raw output string (if any)\n timestamp: float # Unix timestamp when created\n bar_id: Optional[str] # Unique identifier for this progress bar\n position: Optional[int] # Display position for multi-bar scenarios\n \n def to_dict(self):\n \"\"\"Convert to dictionary for JSON serialization\"\"\"\n return {\n 'progress': self.progress,\n \"Convert to dictionary for JSON serialization\"\n```\n\n### progress monitor (`progress_monitor.ipynb`)\n\n> Thread-safe monitor for tracking and aggregating progress from\n> multiple concurrent jobs.\n\n#### Import\n\n``` python\nfrom cjm_tqdm_capture.progress_monitor import (\n ProgressMonitor\n)\n```\n\n#### Classes\n\n``` python\nclass ProgressMonitor:\n def __init__(\n self,\n keep_history: bool = False, # Whether to maintain a history of progress updates\n history_limit: int = 500 # Maximum number of historical updates to keep per job\n )\n \"Thread-safe monitor for tracking progress of multiple concurrent jobs\"\n \n def __init__(\n self,\n keep_history: bool = False, # Whether to maintain a history of progress updates\n history_limit: int = 500 # Maximum number of historical updates to keep per job\n )\n \"Initialize a new progress monitor with optional history tracking\"\n \n def update(\n self,\n job_id: str, # Unique identifier for the job being tracked\n info: ProgressInfo # Progress information update for the job\n )\n \"TODO: Add function description\"\n \n def snapshot(\n self,\n job_id: str # Unique identifier of the job to snapshot\n ) -> Optional[Dict[str, Any]]: # Job state dictionary or None if job not found\n \"Get a point-in-time snapshot of a specific job's progress state\"\n \n def all(\n self\n ) -> Dict[str, Dict[str, Any]]: # Dictionary mapping job IDs to their state snapshots\n \"Get snapshots of all tracked jobs\"\n \n def clear_completed(\n self,\n older_than_seconds: float = 3600 # Age threshold in seconds for removing completed jobs\n )\n \"Remove completed jobs that finished more than the specified seconds ago\"\n```\n\n### streaming (`streaming.ipynb`)\n\n> Server-Sent Events (SSE) generator for real-time progress streaming to\n> web clients.\n\n#### Import\n\n``` python\nfrom cjm_tqdm_capture.streaming import (\n sse_stream,\n sse_stream_async\n)\n```\n\n#### Functions\n\n``` python\ndef sse_stream(\n monitor: ProgressMonitor, # Progress monitor instance to read job updates from\n job_id: str, # Unique identifier of the job to stream\n interval: float = 0.25, # Polling interval in seconds for checking progress updates\n heartbeat: float = 15.0, # Seconds between keep-alive messages when no updates\n wait_for_start: bool = True, # Whether to wait for job to start before ending stream\n start_timeout: float = 5.0, # Maximum seconds to wait for job to start if wait_for_start is True\n) -> Iterator[str]: # SSE-formatted strings ready to send to client\n \"\"\"\n Framework-agnostic SSE generator.\n - Yields 'data: {json}\\n\\n' when progress changes.\n - Sends ': keep-alive' comments every `heartbeat` seconds when idle.\n - If `wait_for_start` is True, it will wait up to `start_timeout` for\n the first snapshot before ending (avoids race at job startup).\n \"\"\"\n```\n\n``` python\nasync def sse_stream_async(\n monitor: ProgressMonitor, # Progress monitor instance to read job updates from\n job_id: str, # Unique identifier of the job to stream\n interval: float = 0.25, # Polling interval in seconds for checking progress updates\n heartbeat: float = 15.0, # Seconds between keep-alive messages when no updates\n wait_for_start: bool = True, # Whether to wait for job to start before ending stream\n start_timeout: float = 5.0, # Maximum seconds to wait for job to start if wait_for_start is True\n) -> AsyncIterator[str]: # SSE-formatted strings ready to send to client\n \"\"\"\n Async version of SSE generator for frameworks that require async iteration.\n - Yields 'data: {json}\\n\\n' when progress changes.\n - Sends ': keep-alive' comments every `heartbeat` seconds when idle.\n - If `wait_for_start` is True, it will wait up to `start_timeout` for\n the first snapshot before ending (avoids race at job startup).\n \"\"\"\n```\n",
"bugtrack_url": null,
"license": "Apache Software License 2.0",
"summary": "Intercept and capture progress information from tqdm progress bars via callbacks without modifying existing code.",
"version": "0.0.5",
"project_urls": {
"Homepage": "https://github.com/cj-mills/cjm-tqdm-capture"
},
"split_keywords": [
"nbdev",
"jupyter",
"notebook",
"python"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "c16e3d8fdd69e69d930543d29e82e31f9a5961502f6a7eb7e373b3ace3323425",
"md5": "13fba25f823793a8834438ff94a20850",
"sha256": "e20ec969811e96f60000a96d1b9f37bf058463e8ec8e78a553483f7fc32dc33b"
},
"downloads": -1,
"filename": "cjm_tqdm_capture-0.0.5-py3-none-any.whl",
"has_sig": false,
"md5_digest": "13fba25f823793a8834438ff94a20850",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.11",
"size": 17538,
"upload_time": "2025-08-23T00:08:37",
"upload_time_iso_8601": "2025-08-23T00:08:37.703452Z",
"url": "https://files.pythonhosted.org/packages/c1/6e/3d8fdd69e69d930543d29e82e31f9a5961502f6a7eb7e373b3ace3323425/cjm_tqdm_capture-0.0.5-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "df58914c412396cf7f594c28bc004928626afb1714ea1df07dec21f1a04b289b",
"md5": "971a430329b2bb55f22aebd4a98df369",
"sha256": "478298fe6813a269bf18b6514cfc0ebd51f2d93d120d1486e398aaba1041d6b1"
},
"downloads": -1,
"filename": "cjm_tqdm_capture-0.0.5.tar.gz",
"has_sig": false,
"md5_digest": "971a430329b2bb55f22aebd4a98df369",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.11",
"size": 17086,
"upload_time": "2025-08-23T00:08:38",
"upload_time_iso_8601": "2025-08-23T00:08:38.922989Z",
"url": "https://files.pythonhosted.org/packages/df/58/914c412396cf7f594c28bc004928626afb1714ea1df07dec21f1a04b289b/cjm_tqdm_capture-0.0.5.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-08-23 00:08:38",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "cj-mills",
"github_project": "cjm-tqdm-capture",
"github_not_found": true,
"lcname": "cjm-tqdm-capture"
}