mq-python


Namemq-python JSON
Version 0.1.0 PyPI version JSON
download
home_pageNone
SummaryAn extensible and scalable MongoDB-based job queue system for Python
upload_time2025-08-24 11:45:27
maintainerNone
docs_urlNone
authorNone
requires_python>=3.11
licenseNone
keywords background job jobs mongodb queue scheduler task threading worker
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # MongoQueue with Job Stores

This implementation of MongoQueue provides job store functionality, allowing you to organize jobs into separate stores with customizable capacity limits.

## Features

- Create named job stores with specific capacity limits
- Isolate jobs in different stores for better organization and management
- Limit the number of jobs in specific stores to control resource usage
- All MongoDB queue operations available for each job store
- Store-specific job processing with dedicated workers
- Statistics for job stores and capacity management

## Architecture

- `JobStore` - Base class for managing jobs in a specific store
- `MongoQueue` - A special JobStore that manages jobs with no specific store (main queue) and maintains a registry of all job stores

## Usage

### Basic Usage

```python
from mq import MongoQueue

# Initialize the main MongoQueue
mq = MongoQueue()

# Create job stores with different capacities
users_store = mq.get_job_store("users", max_capacity=5)
posts_store = mq.get_job_store("posts", max_capacity=10)
analytics_store = mq.get_job_store("analytics")  # No capacity limit

# Add jobs to different stores
mq.put({"type": "system_maintenance"})  # Job in main queue (no store_name)
users_store.put({"user_id": "user_1", "action": "update_profile"})  # Job in users store
posts_store.put({"post_id": "post_1", "action": "index_content"})  # Job in posts store

# List jobs in the users store
users_jobs = users_store.list_jobs()
print("Users store jobs:", users_jobs)

# List jobs in the main queue
main_queue_jobs = mq.list_jobs()
print("Main queue jobs:", main_queue_jobs)

# Get capacity stats for users store
users_capacity = users_store.get_capacity_stats()
print("Users store capacity:", users_capacity)

# List available job stores
stores = mq.list_job_stores()
print("Available stores:", stores)
```

## Running Jobs

### Option 1: Simple Job Processing

The simplest way to process jobs from a specific job store:

```python
# Define a job processing function
def process_job(job):
    print(f"Processing job {job.id} with payload: {job.payload}")
    # Process the job...
    job.complete()
    return True

# Process jobs from a specific store
# This will run continuously, processing jobs as they become available
users_store.run_jobs(process_job, max_workers=2, poll_interval=1)
```

### Option 2: Process Jobs from All Stores

Process jobs from all job stores in parallel:

```python
# This will run jobs from all stores (including main queue) in parallel
# It creates worker pools for each store
mq.run_all_job_stores(process_job, max_workers_per_store=2, poll_interval=1)
```

### Option 3: Using ThreadPoolExecutor

For more control over job processing, you can use ThreadPoolExecutor:

```python
from concurrent.futures import ThreadPoolExecutor
import signal

def run_store_with_timeout(store, timeout=10):
    """Run jobs from a store with a timeout"""
    # Set up a timeout handler
    def handle_timeout(signum, frame):
        raise TimeoutError(f"Processing time limit reached")
    
    signal.signal(signal.SIGALRM, handle_timeout)
    signal.alarm(timeout)
    
    try:
        store.run_jobs(process_job, max_workers=2, poll_interval=0.5)
    except TimeoutError:
        print("Timeout reached")
    finally:
        signal.alarm(0)

# Process jobs using ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=4) as executor:
    # Get all stores
    stores = [mq] + [mq.get_job_store(name) for name in mq.list_job_stores()]
    
    # Submit jobs for each store with a timeout
    futures = {
        executor.submit(run_store_with_timeout, store, 5): store 
        for store in stores
    }
    
    # Wait for all to complete
    for future in futures:
        try:
            future.result()
        except Exception as e:
            print(f"Error: {str(e)}")
```

### Option 4: Independent Threads

Run job stores in separate threads:

```python
import threading

# Create a thread for each store
threads = []

# Add a thread for the main queue
main_thread = threading.Thread(
    target=lambda: mq.run_jobs(process_job, max_workers=2)
)
threads.append(main_thread)

# Add threads for each job store
for store_name in mq.list_job_stores():
    store = mq.get_job_store(store_name)
    thread = threading.Thread(
        target=lambda s=store: s.run_jobs(process_job, max_workers=2)
    )
    threads.append(thread)

# Start all threads
for thread in threads:
    thread.start()

# Wait for threads to complete (or use a timeout mechanism)
for thread in threads:
    thread.join()
```

### Recommended Production Setup

For production environments, we recommend:

1. Using a separate process for job processing
2. Implementing proper error handling and logging
3. Using a process manager like Supervisor to manage job processing services
4. Setting up monitoring and alerting for job processing

Example production setup:

```python
# job_processor.py
from mq import MongoQueue
import logging
import signal
import sys
import time

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Initialize the main MongoQueue
mq = MongoQueue()

def process_job(job):
    try:
        logger.info(f"Processing job {job.id}")
        # Process the job...
        job.complete()
        return True
    except Exception as e:
        logger.error(f"Error processing job {job.id}: {str(e)}")
        job.failed(str(e))
        return False

def handle_shutdown(signum, frame):
    logger.info("Shutdown signal received, exiting gracefully...")
    sys.exit(0)

def main():
    # Register signal handlers
    signal.signal(signal.SIGTERM, handle_shutdown)
    signal.signal(signal.SIGINT, handle_shutdown)
    
    # Get the store name from command line arguments
    store_name = sys.argv[1] if len(sys.argv) > 1 else None
    
    logger.info(f"Starting job processor for store: {store_name or 'main'}")
    
    try:
        if store_name:
            # Process jobs from a specific store
            job_store = mq.get_job_store(store_name)
            job_store.run_jobs(process_job, max_workers=2)
        else:
            # Process jobs from the main queue
            mq.run_jobs(process_job, max_workers=2)
    except Exception as e:
        logger.error(f"Fatal error: {str(e)}")
        sys.exit(1)

if __name__ == "__main__":
    main()
```

Supervisor configuration:

```ini
[program:mq_main]
command=python job_processor.py
autostart=true
autorestart=true
stderr_logfile=/var/log/mq/main.err.log
stdout_logfile=/var/log/mq/main.out.log

[program:mq_users]
command=python job_processor.py users
autostart=true
autorestart=true
stderr_logfile=/var/log/mq/users.err.log
stdout_logfile=/var/log/mq/users.out.log

[program:mq_posts]
command=python job_processor.py posts
autostart=true
autorestart=true
stderr_logfile=/var/log/mq/posts.err.log
stdout_logfile=/var/log/mq/posts.out.log
```

## Capacity Management

Job stores with capacity limits will raise a ValueError when you attempt to exceed the limit:

```python
# This will raise ValueError if users_store already has 5 jobs
try:
    users_store.put({"user_id": "new_user", "action": "verify_email"})
except ValueError as e:
    print(f"Capacity error: {e}")
```

## Statistics

Get information about job stores and their capacities:

```python
# Get capacity stats for a specific store
capacity_stats = users_store.get_capacity_stats()
print(capacity_stats)
# Example output:
# {
#     'store_name': 'users',
#     'current_jobs': 3,
#     'max_capacity': 5,
#     'is_full': False,
#     'available_capacity': 2
# }

# Get overall stats including job stores
stats = mq.stats()
print(stats)
# Example output:
# {
#     'jobs': {
#         'total': 15,
#         'pending': 10,
#         'processing': 2,
#         'failed': 1,
#         'completed': 2,
#         'main_queue': 3,
#     },
#     'workers': {
#         'total': 5,
#         'active': 5,
#         'inactive': 0,
#     },
#     'stores': {
#         'users': 5,
#         'posts': 7,
#         'analytics': 4,
#     }
# }
```

## Configuration

You can configure default job store settings in your environment variables:

```
MQ_JOB_STORES_DEFAULT_CAPACITY=100  # Default capacity for all stores
```

Or directly in your code:

```python
from mq.config import config

config.job_stores_default_capacity = 100  # Default capacity for all stores
config.job_stores_capacities = {
    "users": 5,
    "posts": 10,
    "analytics": None  # No limit
}
```

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "mq-python",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.11",
    "maintainer_email": "Adnan Ahmad <viperadnan@gmail.com>",
    "keywords": "background, job, jobs, mongodb, queue, scheduler, task, threading, worker",
    "author": null,
    "author_email": "Adnan Ahmad <viperadnan@gmail.com>",
    "download_url": "https://files.pythonhosted.org/packages/62/40/f888bcf0f9c0053a54acc8fcf5f8d517def01eaad612c1ef0cabd1230d2e/mq_python-0.1.0.tar.gz",
    "platform": null,
    "description": "# MongoQueue with Job Stores\n\nThis implementation of MongoQueue provides job store functionality, allowing you to organize jobs into separate stores with customizable capacity limits.\n\n## Features\n\n- Create named job stores with specific capacity limits\n- Isolate jobs in different stores for better organization and management\n- Limit the number of jobs in specific stores to control resource usage\n- All MongoDB queue operations available for each job store\n- Store-specific job processing with dedicated workers\n- Statistics for job stores and capacity management\n\n## Architecture\n\n- `JobStore` - Base class for managing jobs in a specific store\n- `MongoQueue` - A special JobStore that manages jobs with no specific store (main queue) and maintains a registry of all job stores\n\n## Usage\n\n### Basic Usage\n\n```python\nfrom mq import MongoQueue\n\n# Initialize the main MongoQueue\nmq = MongoQueue()\n\n# Create job stores with different capacities\nusers_store = mq.get_job_store(\"users\", max_capacity=5)\nposts_store = mq.get_job_store(\"posts\", max_capacity=10)\nanalytics_store = mq.get_job_store(\"analytics\")  # No capacity limit\n\n# Add jobs to different stores\nmq.put({\"type\": \"system_maintenance\"})  # Job in main queue (no store_name)\nusers_store.put({\"user_id\": \"user_1\", \"action\": \"update_profile\"})  # Job in users store\nposts_store.put({\"post_id\": \"post_1\", \"action\": \"index_content\"})  # Job in posts store\n\n# List jobs in the users store\nusers_jobs = users_store.list_jobs()\nprint(\"Users store jobs:\", users_jobs)\n\n# List jobs in the main queue\nmain_queue_jobs = mq.list_jobs()\nprint(\"Main queue jobs:\", main_queue_jobs)\n\n# Get capacity stats for users store\nusers_capacity = users_store.get_capacity_stats()\nprint(\"Users store capacity:\", users_capacity)\n\n# List available job stores\nstores = mq.list_job_stores()\nprint(\"Available stores:\", stores)\n```\n\n## Running Jobs\n\n### Option 1: Simple Job Processing\n\nThe simplest way to process jobs from a specific job store:\n\n```python\n# Define a job processing function\ndef process_job(job):\n    print(f\"Processing job {job.id} with payload: {job.payload}\")\n    # Process the job...\n    job.complete()\n    return True\n\n# Process jobs from a specific store\n# This will run continuously, processing jobs as they become available\nusers_store.run_jobs(process_job, max_workers=2, poll_interval=1)\n```\n\n### Option 2: Process Jobs from All Stores\n\nProcess jobs from all job stores in parallel:\n\n```python\n# This will run jobs from all stores (including main queue) in parallel\n# It creates worker pools for each store\nmq.run_all_job_stores(process_job, max_workers_per_store=2, poll_interval=1)\n```\n\n### Option 3: Using ThreadPoolExecutor\n\nFor more control over job processing, you can use ThreadPoolExecutor:\n\n```python\nfrom concurrent.futures import ThreadPoolExecutor\nimport signal\n\ndef run_store_with_timeout(store, timeout=10):\n    \"\"\"Run jobs from a store with a timeout\"\"\"\n    # Set up a timeout handler\n    def handle_timeout(signum, frame):\n        raise TimeoutError(f\"Processing time limit reached\")\n    \n    signal.signal(signal.SIGALRM, handle_timeout)\n    signal.alarm(timeout)\n    \n    try:\n        store.run_jobs(process_job, max_workers=2, poll_interval=0.5)\n    except TimeoutError:\n        print(\"Timeout reached\")\n    finally:\n        signal.alarm(0)\n\n# Process jobs using ThreadPoolExecutor\nwith ThreadPoolExecutor(max_workers=4) as executor:\n    # Get all stores\n    stores = [mq] + [mq.get_job_store(name) for name in mq.list_job_stores()]\n    \n    # Submit jobs for each store with a timeout\n    futures = {\n        executor.submit(run_store_with_timeout, store, 5): store \n        for store in stores\n    }\n    \n    # Wait for all to complete\n    for future in futures:\n        try:\n            future.result()\n        except Exception as e:\n            print(f\"Error: {str(e)}\")\n```\n\n### Option 4: Independent Threads\n\nRun job stores in separate threads:\n\n```python\nimport threading\n\n# Create a thread for each store\nthreads = []\n\n# Add a thread for the main queue\nmain_thread = threading.Thread(\n    target=lambda: mq.run_jobs(process_job, max_workers=2)\n)\nthreads.append(main_thread)\n\n# Add threads for each job store\nfor store_name in mq.list_job_stores():\n    store = mq.get_job_store(store_name)\n    thread = threading.Thread(\n        target=lambda s=store: s.run_jobs(process_job, max_workers=2)\n    )\n    threads.append(thread)\n\n# Start all threads\nfor thread in threads:\n    thread.start()\n\n# Wait for threads to complete (or use a timeout mechanism)\nfor thread in threads:\n    thread.join()\n```\n\n### Recommended Production Setup\n\nFor production environments, we recommend:\n\n1. Using a separate process for job processing\n2. Implementing proper error handling and logging\n3. Using a process manager like Supervisor to manage job processing services\n4. Setting up monitoring and alerting for job processing\n\nExample production setup:\n\n```python\n# job_processor.py\nfrom mq import MongoQueue\nimport logging\nimport signal\nimport sys\nimport time\n\n# Set up logging\nlogging.basicConfig(level=logging.INFO)\nlogger = logging.getLogger(__name__)\n\n# Initialize the main MongoQueue\nmq = MongoQueue()\n\ndef process_job(job):\n    try:\n        logger.info(f\"Processing job {job.id}\")\n        # Process the job...\n        job.complete()\n        return True\n    except Exception as e:\n        logger.error(f\"Error processing job {job.id}: {str(e)}\")\n        job.failed(str(e))\n        return False\n\ndef handle_shutdown(signum, frame):\n    logger.info(\"Shutdown signal received, exiting gracefully...\")\n    sys.exit(0)\n\ndef main():\n    # Register signal handlers\n    signal.signal(signal.SIGTERM, handle_shutdown)\n    signal.signal(signal.SIGINT, handle_shutdown)\n    \n    # Get the store name from command line arguments\n    store_name = sys.argv[1] if len(sys.argv) > 1 else None\n    \n    logger.info(f\"Starting job processor for store: {store_name or 'main'}\")\n    \n    try:\n        if store_name:\n            # Process jobs from a specific store\n            job_store = mq.get_job_store(store_name)\n            job_store.run_jobs(process_job, max_workers=2)\n        else:\n            # Process jobs from the main queue\n            mq.run_jobs(process_job, max_workers=2)\n    except Exception as e:\n        logger.error(f\"Fatal error: {str(e)}\")\n        sys.exit(1)\n\nif __name__ == \"__main__\":\n    main()\n```\n\nSupervisor configuration:\n\n```ini\n[program:mq_main]\ncommand=python job_processor.py\nautostart=true\nautorestart=true\nstderr_logfile=/var/log/mq/main.err.log\nstdout_logfile=/var/log/mq/main.out.log\n\n[program:mq_users]\ncommand=python job_processor.py users\nautostart=true\nautorestart=true\nstderr_logfile=/var/log/mq/users.err.log\nstdout_logfile=/var/log/mq/users.out.log\n\n[program:mq_posts]\ncommand=python job_processor.py posts\nautostart=true\nautorestart=true\nstderr_logfile=/var/log/mq/posts.err.log\nstdout_logfile=/var/log/mq/posts.out.log\n```\n\n## Capacity Management\n\nJob stores with capacity limits will raise a ValueError when you attempt to exceed the limit:\n\n```python\n# This will raise ValueError if users_store already has 5 jobs\ntry:\n    users_store.put({\"user_id\": \"new_user\", \"action\": \"verify_email\"})\nexcept ValueError as e:\n    print(f\"Capacity error: {e}\")\n```\n\n## Statistics\n\nGet information about job stores and their capacities:\n\n```python\n# Get capacity stats for a specific store\ncapacity_stats = users_store.get_capacity_stats()\nprint(capacity_stats)\n# Example output:\n# {\n#     'store_name': 'users',\n#     'current_jobs': 3,\n#     'max_capacity': 5,\n#     'is_full': False,\n#     'available_capacity': 2\n# }\n\n# Get overall stats including job stores\nstats = mq.stats()\nprint(stats)\n# Example output:\n# {\n#     'jobs': {\n#         'total': 15,\n#         'pending': 10,\n#         'processing': 2,\n#         'failed': 1,\n#         'completed': 2,\n#         'main_queue': 3,\n#     },\n#     'workers': {\n#         'total': 5,\n#         'active': 5,\n#         'inactive': 0,\n#     },\n#     'stores': {\n#         'users': 5,\n#         'posts': 7,\n#         'analytics': 4,\n#     }\n# }\n```\n\n## Configuration\n\nYou can configure default job store settings in your environment variables:\n\n```\nMQ_JOB_STORES_DEFAULT_CAPACITY=100  # Default capacity for all stores\n```\n\nOr directly in your code:\n\n```python\nfrom mq.config import config\n\nconfig.job_stores_default_capacity = 100  # Default capacity for all stores\nconfig.job_stores_capacities = {\n    \"users\": 5,\n    \"posts\": 10,\n    \"analytics\": None  # No limit\n}\n```\n",
    "bugtrack_url": null,
    "license": null,
    "summary": "An extensible and scalable MongoDB-based job queue system for Python",
    "version": "0.1.0",
    "project_urls": {
        "Bug Tracker": "https://github.com/viperadnan-git/mq-python/issues",
        "Documentation": "https://github.com/viperadnan-git/mq-python#readme",
        "Homepage": "https://github.com/viperadnan-git/mq-python#readme",
        "Source Code": "https://github.com/viperadnan-git/mq-python"
    },
    "split_keywords": [
        "background",
        " job",
        " jobs",
        " mongodb",
        " queue",
        " scheduler",
        " task",
        " threading",
        " worker"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "d4b7b97f7801971662d53f6a345ce8b900e09cfbadcbc37fcc6406991397b59f",
                "md5": "0f223897c3795a062ab2f76f2d0641b6",
                "sha256": "f7d26933770b6b166e2b490d5530beb2fe4157959037d23f8ca0adbb0a5751b6"
            },
            "downloads": -1,
            "filename": "mq_python-0.1.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "0f223897c3795a062ab2f76f2d0641b6",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.11",
            "size": 46567,
            "upload_time": "2025-08-24T11:45:26",
            "upload_time_iso_8601": "2025-08-24T11:45:26.852426Z",
            "url": "https://files.pythonhosted.org/packages/d4/b7/b97f7801971662d53f6a345ce8b900e09cfbadcbc37fcc6406991397b59f/mq_python-0.1.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "6240f888bcf0f9c0053a54acc8fcf5f8d517def01eaad612c1ef0cabd1230d2e",
                "md5": "f97d3c5814513d2b293b40300fcd33da",
                "sha256": "237bf8a5ff82905f5ab6f3773e5f473e9a74007f8ab92ec479c5a609336f1ccf"
            },
            "downloads": -1,
            "filename": "mq_python-0.1.0.tar.gz",
            "has_sig": false,
            "md5_digest": "f97d3c5814513d2b293b40300fcd33da",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.11",
            "size": 64217,
            "upload_time": "2025-08-24T11:45:27",
            "upload_time_iso_8601": "2025-08-24T11:45:27.751822Z",
            "url": "https://files.pythonhosted.org/packages/62/40/f888bcf0f9c0053a54acc8fcf5f8d517def01eaad612c1ef0cabd1230d2e/mq_python-0.1.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-08-24 11:45:27",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "viperadnan-git",
    "github_project": "mq-python",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "mq-python"
}
        
Elapsed time: 1.15090s