# MongoQueue with Job Stores
This implementation of MongoQueue provides job store functionality, allowing you to organize jobs into separate stores with customizable capacity limits.
## Features
- Create named job stores with specific capacity limits
- Isolate jobs in different stores for better organization and management
- Limit the number of jobs in specific stores to control resource usage
- All MongoDB queue operations available for each job store
- Store-specific job processing with dedicated workers
- Statistics for job stores and capacity management
## Architecture
- `JobStore` - Base class for managing jobs in a specific store
- `MongoQueue` - A special JobStore that manages jobs with no specific store (main queue) and maintains a registry of all job stores
## Usage
### Basic Usage
```python
from mq import MongoQueue
# Initialize the main MongoQueue
mq = MongoQueue()
# Create job stores with different capacities
users_store = mq.get_job_store("users", max_capacity=5)
posts_store = mq.get_job_store("posts", max_capacity=10)
analytics_store = mq.get_job_store("analytics") # No capacity limit
# Add jobs to different stores
mq.put({"type": "system_maintenance"}) # Job in main queue (no store_name)
users_store.put({"user_id": "user_1", "action": "update_profile"}) # Job in users store
posts_store.put({"post_id": "post_1", "action": "index_content"}) # Job in posts store
# List jobs in the users store
users_jobs = users_store.list_jobs()
print("Users store jobs:", users_jobs)
# List jobs in the main queue
main_queue_jobs = mq.list_jobs()
print("Main queue jobs:", main_queue_jobs)
# Get capacity stats for users store
users_capacity = users_store.get_capacity_stats()
print("Users store capacity:", users_capacity)
# List available job stores
stores = mq.list_job_stores()
print("Available stores:", stores)
```
## Running Jobs
### Option 1: Simple Job Processing
The simplest way to process jobs from a specific job store:
```python
# Define a job processing function
def process_job(job):
print(f"Processing job {job.id} with payload: {job.payload}")
# Process the job...
job.complete()
return True
# Process jobs from a specific store
# This will run continuously, processing jobs as they become available
users_store.run_jobs(process_job, max_workers=2, poll_interval=1)
```
### Option 2: Process Jobs from All Stores
Process jobs from all job stores in parallel:
```python
# This will run jobs from all stores (including main queue) in parallel
# It creates worker pools for each store
mq.run_all_job_stores(process_job, max_workers_per_store=2, poll_interval=1)
```
### Option 3: Using ThreadPoolExecutor
For more control over job processing, you can use ThreadPoolExecutor:
```python
from concurrent.futures import ThreadPoolExecutor
import signal
def run_store_with_timeout(store, timeout=10):
"""Run jobs from a store with a timeout"""
# Set up a timeout handler
def handle_timeout(signum, frame):
raise TimeoutError(f"Processing time limit reached")
signal.signal(signal.SIGALRM, handle_timeout)
signal.alarm(timeout)
try:
store.run_jobs(process_job, max_workers=2, poll_interval=0.5)
except TimeoutError:
print("Timeout reached")
finally:
signal.alarm(0)
# Process jobs using ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=4) as executor:
# Get all stores
stores = [mq] + [mq.get_job_store(name) for name in mq.list_job_stores()]
# Submit jobs for each store with a timeout
futures = {
executor.submit(run_store_with_timeout, store, 5): store
for store in stores
}
# Wait for all to complete
for future in futures:
try:
future.result()
except Exception as e:
print(f"Error: {str(e)}")
```
### Option 4: Independent Threads
Run job stores in separate threads:
```python
import threading
# Create a thread for each store
threads = []
# Add a thread for the main queue
main_thread = threading.Thread(
target=lambda: mq.run_jobs(process_job, max_workers=2)
)
threads.append(main_thread)
# Add threads for each job store
for store_name in mq.list_job_stores():
store = mq.get_job_store(store_name)
thread = threading.Thread(
target=lambda s=store: s.run_jobs(process_job, max_workers=2)
)
threads.append(thread)
# Start all threads
for thread in threads:
thread.start()
# Wait for threads to complete (or use a timeout mechanism)
for thread in threads:
thread.join()
```
### Recommended Production Setup
For production environments, we recommend:
1. Using a separate process for job processing
2. Implementing proper error handling and logging
3. Using a process manager like Supervisor to manage job processing services
4. Setting up monitoring and alerting for job processing
Example production setup:
```python
# job_processor.py
from mq import MongoQueue
import logging
import signal
import sys
import time
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize the main MongoQueue
mq = MongoQueue()
def process_job(job):
try:
logger.info(f"Processing job {job.id}")
# Process the job...
job.complete()
return True
except Exception as e:
logger.error(f"Error processing job {job.id}: {str(e)}")
job.failed(str(e))
return False
def handle_shutdown(signum, frame):
logger.info("Shutdown signal received, exiting gracefully...")
sys.exit(0)
def main():
# Register signal handlers
signal.signal(signal.SIGTERM, handle_shutdown)
signal.signal(signal.SIGINT, handle_shutdown)
# Get the store name from command line arguments
store_name = sys.argv[1] if len(sys.argv) > 1 else None
logger.info(f"Starting job processor for store: {store_name or 'main'}")
try:
if store_name:
# Process jobs from a specific store
job_store = mq.get_job_store(store_name)
job_store.run_jobs(process_job, max_workers=2)
else:
# Process jobs from the main queue
mq.run_jobs(process_job, max_workers=2)
except Exception as e:
logger.error(f"Fatal error: {str(e)}")
sys.exit(1)
if __name__ == "__main__":
main()
```
Supervisor configuration:
```ini
[program:mq_main]
command=python job_processor.py
autostart=true
autorestart=true
stderr_logfile=/var/log/mq/main.err.log
stdout_logfile=/var/log/mq/main.out.log
[program:mq_users]
command=python job_processor.py users
autostart=true
autorestart=true
stderr_logfile=/var/log/mq/users.err.log
stdout_logfile=/var/log/mq/users.out.log
[program:mq_posts]
command=python job_processor.py posts
autostart=true
autorestart=true
stderr_logfile=/var/log/mq/posts.err.log
stdout_logfile=/var/log/mq/posts.out.log
```
## Capacity Management
Job stores with capacity limits will raise a ValueError when you attempt to exceed the limit:
```python
# This will raise ValueError if users_store already has 5 jobs
try:
users_store.put({"user_id": "new_user", "action": "verify_email"})
except ValueError as e:
print(f"Capacity error: {e}")
```
## Statistics
Get information about job stores and their capacities:
```python
# Get capacity stats for a specific store
capacity_stats = users_store.get_capacity_stats()
print(capacity_stats)
# Example output:
# {
# 'store_name': 'users',
# 'current_jobs': 3,
# 'max_capacity': 5,
# 'is_full': False,
# 'available_capacity': 2
# }
# Get overall stats including job stores
stats = mq.stats()
print(stats)
# Example output:
# {
# 'jobs': {
# 'total': 15,
# 'pending': 10,
# 'processing': 2,
# 'failed': 1,
# 'completed': 2,
# 'main_queue': 3,
# },
# 'workers': {
# 'total': 5,
# 'active': 5,
# 'inactive': 0,
# },
# 'stores': {
# 'users': 5,
# 'posts': 7,
# 'analytics': 4,
# }
# }
```
## Configuration
You can configure default job store settings in your environment variables:
```
MQ_JOB_STORES_DEFAULT_CAPACITY=100 # Default capacity for all stores
```
Or directly in your code:
```python
from mq.config import config
config.job_stores_default_capacity = 100 # Default capacity for all stores
config.job_stores_capacities = {
"users": 5,
"posts": 10,
"analytics": None # No limit
}
```
Raw data
{
"_id": null,
"home_page": null,
"name": "mq-python",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.11",
"maintainer_email": "Adnan Ahmad <viperadnan@gmail.com>",
"keywords": "background, job, jobs, mongodb, queue, scheduler, task, threading, worker",
"author": null,
"author_email": "Adnan Ahmad <viperadnan@gmail.com>",
"download_url": "https://files.pythonhosted.org/packages/62/40/f888bcf0f9c0053a54acc8fcf5f8d517def01eaad612c1ef0cabd1230d2e/mq_python-0.1.0.tar.gz",
"platform": null,
"description": "# MongoQueue with Job Stores\n\nThis implementation of MongoQueue provides job store functionality, allowing you to organize jobs into separate stores with customizable capacity limits.\n\n## Features\n\n- Create named job stores with specific capacity limits\n- Isolate jobs in different stores for better organization and management\n- Limit the number of jobs in specific stores to control resource usage\n- All MongoDB queue operations available for each job store\n- Store-specific job processing with dedicated workers\n- Statistics for job stores and capacity management\n\n## Architecture\n\n- `JobStore` - Base class for managing jobs in a specific store\n- `MongoQueue` - A special JobStore that manages jobs with no specific store (main queue) and maintains a registry of all job stores\n\n## Usage\n\n### Basic Usage\n\n```python\nfrom mq import MongoQueue\n\n# Initialize the main MongoQueue\nmq = MongoQueue()\n\n# Create job stores with different capacities\nusers_store = mq.get_job_store(\"users\", max_capacity=5)\nposts_store = mq.get_job_store(\"posts\", max_capacity=10)\nanalytics_store = mq.get_job_store(\"analytics\") # No capacity limit\n\n# Add jobs to different stores\nmq.put({\"type\": \"system_maintenance\"}) # Job in main queue (no store_name)\nusers_store.put({\"user_id\": \"user_1\", \"action\": \"update_profile\"}) # Job in users store\nposts_store.put({\"post_id\": \"post_1\", \"action\": \"index_content\"}) # Job in posts store\n\n# List jobs in the users store\nusers_jobs = users_store.list_jobs()\nprint(\"Users store jobs:\", users_jobs)\n\n# List jobs in the main queue\nmain_queue_jobs = mq.list_jobs()\nprint(\"Main queue jobs:\", main_queue_jobs)\n\n# Get capacity stats for users store\nusers_capacity = users_store.get_capacity_stats()\nprint(\"Users store capacity:\", users_capacity)\n\n# List available job stores\nstores = mq.list_job_stores()\nprint(\"Available stores:\", stores)\n```\n\n## Running Jobs\n\n### Option 1: Simple Job Processing\n\nThe simplest way to process jobs from a specific job store:\n\n```python\n# Define a job processing function\ndef process_job(job):\n print(f\"Processing job {job.id} with payload: {job.payload}\")\n # Process the job...\n job.complete()\n return True\n\n# Process jobs from a specific store\n# This will run continuously, processing jobs as they become available\nusers_store.run_jobs(process_job, max_workers=2, poll_interval=1)\n```\n\n### Option 2: Process Jobs from All Stores\n\nProcess jobs from all job stores in parallel:\n\n```python\n# This will run jobs from all stores (including main queue) in parallel\n# It creates worker pools for each store\nmq.run_all_job_stores(process_job, max_workers_per_store=2, poll_interval=1)\n```\n\n### Option 3: Using ThreadPoolExecutor\n\nFor more control over job processing, you can use ThreadPoolExecutor:\n\n```python\nfrom concurrent.futures import ThreadPoolExecutor\nimport signal\n\ndef run_store_with_timeout(store, timeout=10):\n \"\"\"Run jobs from a store with a timeout\"\"\"\n # Set up a timeout handler\n def handle_timeout(signum, frame):\n raise TimeoutError(f\"Processing time limit reached\")\n \n signal.signal(signal.SIGALRM, handle_timeout)\n signal.alarm(timeout)\n \n try:\n store.run_jobs(process_job, max_workers=2, poll_interval=0.5)\n except TimeoutError:\n print(\"Timeout reached\")\n finally:\n signal.alarm(0)\n\n# Process jobs using ThreadPoolExecutor\nwith ThreadPoolExecutor(max_workers=4) as executor:\n # Get all stores\n stores = [mq] + [mq.get_job_store(name) for name in mq.list_job_stores()]\n \n # Submit jobs for each store with a timeout\n futures = {\n executor.submit(run_store_with_timeout, store, 5): store \n for store in stores\n }\n \n # Wait for all to complete\n for future in futures:\n try:\n future.result()\n except Exception as e:\n print(f\"Error: {str(e)}\")\n```\n\n### Option 4: Independent Threads\n\nRun job stores in separate threads:\n\n```python\nimport threading\n\n# Create a thread for each store\nthreads = []\n\n# Add a thread for the main queue\nmain_thread = threading.Thread(\n target=lambda: mq.run_jobs(process_job, max_workers=2)\n)\nthreads.append(main_thread)\n\n# Add threads for each job store\nfor store_name in mq.list_job_stores():\n store = mq.get_job_store(store_name)\n thread = threading.Thread(\n target=lambda s=store: s.run_jobs(process_job, max_workers=2)\n )\n threads.append(thread)\n\n# Start all threads\nfor thread in threads:\n thread.start()\n\n# Wait for threads to complete (or use a timeout mechanism)\nfor thread in threads:\n thread.join()\n```\n\n### Recommended Production Setup\n\nFor production environments, we recommend:\n\n1. Using a separate process for job processing\n2. Implementing proper error handling and logging\n3. Using a process manager like Supervisor to manage job processing services\n4. Setting up monitoring and alerting for job processing\n\nExample production setup:\n\n```python\n# job_processor.py\nfrom mq import MongoQueue\nimport logging\nimport signal\nimport sys\nimport time\n\n# Set up logging\nlogging.basicConfig(level=logging.INFO)\nlogger = logging.getLogger(__name__)\n\n# Initialize the main MongoQueue\nmq = MongoQueue()\n\ndef process_job(job):\n try:\n logger.info(f\"Processing job {job.id}\")\n # Process the job...\n job.complete()\n return True\n except Exception as e:\n logger.error(f\"Error processing job {job.id}: {str(e)}\")\n job.failed(str(e))\n return False\n\ndef handle_shutdown(signum, frame):\n logger.info(\"Shutdown signal received, exiting gracefully...\")\n sys.exit(0)\n\ndef main():\n # Register signal handlers\n signal.signal(signal.SIGTERM, handle_shutdown)\n signal.signal(signal.SIGINT, handle_shutdown)\n \n # Get the store name from command line arguments\n store_name = sys.argv[1] if len(sys.argv) > 1 else None\n \n logger.info(f\"Starting job processor for store: {store_name or 'main'}\")\n \n try:\n if store_name:\n # Process jobs from a specific store\n job_store = mq.get_job_store(store_name)\n job_store.run_jobs(process_job, max_workers=2)\n else:\n # Process jobs from the main queue\n mq.run_jobs(process_job, max_workers=2)\n except Exception as e:\n logger.error(f\"Fatal error: {str(e)}\")\n sys.exit(1)\n\nif __name__ == \"__main__\":\n main()\n```\n\nSupervisor configuration:\n\n```ini\n[program:mq_main]\ncommand=python job_processor.py\nautostart=true\nautorestart=true\nstderr_logfile=/var/log/mq/main.err.log\nstdout_logfile=/var/log/mq/main.out.log\n\n[program:mq_users]\ncommand=python job_processor.py users\nautostart=true\nautorestart=true\nstderr_logfile=/var/log/mq/users.err.log\nstdout_logfile=/var/log/mq/users.out.log\n\n[program:mq_posts]\ncommand=python job_processor.py posts\nautostart=true\nautorestart=true\nstderr_logfile=/var/log/mq/posts.err.log\nstdout_logfile=/var/log/mq/posts.out.log\n```\n\n## Capacity Management\n\nJob stores with capacity limits will raise a ValueError when you attempt to exceed the limit:\n\n```python\n# This will raise ValueError if users_store already has 5 jobs\ntry:\n users_store.put({\"user_id\": \"new_user\", \"action\": \"verify_email\"})\nexcept ValueError as e:\n print(f\"Capacity error: {e}\")\n```\n\n## Statistics\n\nGet information about job stores and their capacities:\n\n```python\n# Get capacity stats for a specific store\ncapacity_stats = users_store.get_capacity_stats()\nprint(capacity_stats)\n# Example output:\n# {\n# 'store_name': 'users',\n# 'current_jobs': 3,\n# 'max_capacity': 5,\n# 'is_full': False,\n# 'available_capacity': 2\n# }\n\n# Get overall stats including job stores\nstats = mq.stats()\nprint(stats)\n# Example output:\n# {\n# 'jobs': {\n# 'total': 15,\n# 'pending': 10,\n# 'processing': 2,\n# 'failed': 1,\n# 'completed': 2,\n# 'main_queue': 3,\n# },\n# 'workers': {\n# 'total': 5,\n# 'active': 5,\n# 'inactive': 0,\n# },\n# 'stores': {\n# 'users': 5,\n# 'posts': 7,\n# 'analytics': 4,\n# }\n# }\n```\n\n## Configuration\n\nYou can configure default job store settings in your environment variables:\n\n```\nMQ_JOB_STORES_DEFAULT_CAPACITY=100 # Default capacity for all stores\n```\n\nOr directly in your code:\n\n```python\nfrom mq.config import config\n\nconfig.job_stores_default_capacity = 100 # Default capacity for all stores\nconfig.job_stores_capacities = {\n \"users\": 5,\n \"posts\": 10,\n \"analytics\": None # No limit\n}\n```\n",
"bugtrack_url": null,
"license": null,
"summary": "An extensible and scalable MongoDB-based job queue system for Python",
"version": "0.1.0",
"project_urls": {
"Bug Tracker": "https://github.com/viperadnan-git/mq-python/issues",
"Documentation": "https://github.com/viperadnan-git/mq-python#readme",
"Homepage": "https://github.com/viperadnan-git/mq-python#readme",
"Source Code": "https://github.com/viperadnan-git/mq-python"
},
"split_keywords": [
"background",
" job",
" jobs",
" mongodb",
" queue",
" scheduler",
" task",
" threading",
" worker"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "d4b7b97f7801971662d53f6a345ce8b900e09cfbadcbc37fcc6406991397b59f",
"md5": "0f223897c3795a062ab2f76f2d0641b6",
"sha256": "f7d26933770b6b166e2b490d5530beb2fe4157959037d23f8ca0adbb0a5751b6"
},
"downloads": -1,
"filename": "mq_python-0.1.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "0f223897c3795a062ab2f76f2d0641b6",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.11",
"size": 46567,
"upload_time": "2025-08-24T11:45:26",
"upload_time_iso_8601": "2025-08-24T11:45:26.852426Z",
"url": "https://files.pythonhosted.org/packages/d4/b7/b97f7801971662d53f6a345ce8b900e09cfbadcbc37fcc6406991397b59f/mq_python-0.1.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "6240f888bcf0f9c0053a54acc8fcf5f8d517def01eaad612c1ef0cabd1230d2e",
"md5": "f97d3c5814513d2b293b40300fcd33da",
"sha256": "237bf8a5ff82905f5ab6f3773e5f473e9a74007f8ab92ec479c5a609336f1ccf"
},
"downloads": -1,
"filename": "mq_python-0.1.0.tar.gz",
"has_sig": false,
"md5_digest": "f97d3c5814513d2b293b40300fcd33da",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.11",
"size": 64217,
"upload_time": "2025-08-24T11:45:27",
"upload_time_iso_8601": "2025-08-24T11:45:27.751822Z",
"url": "https://files.pythonhosted.org/packages/62/40/f888bcf0f9c0053a54acc8fcf5f8d517def01eaad612c1ef0cabd1230d2e/mq_python-0.1.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-08-24 11:45:27",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "viperadnan-git",
"github_project": "mq-python",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "mq-python"
}