<div align="center">
<img src="static/logo_full_horizontal.webp">
# Pythia - Modern Python Worker Framework
**A modern library for creating efficient and scalable workers in Python**
[](https://badge.fury.io/py/pythia-framework)
[](https://pypi.org/project/pythia-framework/)
[](https://opensource.org/licenses/MIT)
[](https://github.com/Ralonso20/Pythia-framework/actions/workflows/ci.yml)
[](https://codecov.io/github/Ralonso20/Pythia-framework)
[](https://pythia-framework.github.io/pythia/)
[](https://github.com/astral-sh/ruff)
[](https://github.com/pythia-framework/pythia/pulls)
</div>
Pythia is a framework that simplifies creating message processing workers, background jobs, and asynchronous tasks. Based on production-proven patterns, it abstracts configuration complexity and allows you to create robust workers in minutes.
## β¨ Key Features
- **β‘ Rapid development**: From days to minutes to create a functional worker
- **π Flexibility**: Support for multiple message brokers (Kafka, RabbitMQ, Redis, Database CDC)
- **π Production ready**: Best practices included by default
- **π― Zero configuration**: Automatic configuration from environment variables
- **π Type safety**: Native integration with Pydantic v2
- **π Observability**: Structured logging and automatic metrics
- **βοΈ Lifecycle management**: Graceful startup/shutdown with signal handling
- **ποΈ Database Workers**: Change Data Capture (CDC) and database synchronization
## π¦ Installation
```bash
# Basic installation
uv add pythia-framework
# or with pip
pip install pythia-framework
# Individual brokers
uv add pythia-framework[kafka] # Apache Kafka only
uv add pythia-framework[rabbitmq] # RabbitMQ only
uv add pythia-framework[redis] # Redis only
uv add pythia-framework[database] # Database CDC only
# Cloud providers
uv add pythia-framework[aws] # AWS SQS/SNS only
uv add pythia-framework[gcp] # Google Pub/Sub only
uv add pythia-framework[azure] # Azure Service Bus only
# Complete bundles
uv add pythia-framework[brokers] # All traditional brokers
uv add pythia-framework[cloud] # All cloud providers
uv add pythia-framework[all] # Everything included
```
## π Quick Start
### Basic Worker
```python
from pythia import Worker
from pythia.brokers.kafka import KafkaConsumer
from pydantic import BaseModel
class ApprovalEvent(BaseModel):
id: str
status: str
user_id: str
class ApprovalWorker(Worker):
source = KafkaConsumer(
topics=["approvals"],
group_id="approval-worker"
)
async def process(self, event: ApprovalEvent):
print(f"Processing approval {event.id}: {event.status}")
# Your business logic here
# Run
if __name__ == "__main__":
ApprovalWorker().run_sync()
```
### Database CDC Worker
```python
from pythia.brokers.database import CDCWorker, DatabaseChange
class UserCDCWorker(CDCWorker):
def __init__(self):
super().__init__(
connection_string="postgresql://user:pass@localhost/db",
tables=["users", "orders"],
poll_interval=5.0
)
async def process_change(self, change: DatabaseChange):
print(f"Change detected in {change.table}: {change.change_type}")
# Process database change
return {"processed": True}
# Run
if __name__ == "__main__":
UserCDCWorker().run_sync()
```
## π‘ Supported Brokers
| Broker | Status | Description |
|--------|--------|-------------|
| **Kafka** | β
Stable | Consumer/Producer with confluent-kafka |
| **RabbitMQ** | β
Stable | AMQP with aio-pika |
| **Redis** | β
Stable | Pub/Sub and Streams |
| **Database CDC** | β
Stable | Change Data Capture with SQLAlchemy |
| **HTTP** | β
Stable | HTTP polling and webhooks |
| **AWS SQS/SNS** | β
Stable | Amazon SQS consumer and SNS producer |
| **GCP Pub/Sub** | β
Stable | Google Cloud Pub/Sub subscriber and publisher |
| **Azure Service Bus** | β
Stable | Azure Service Bus consumer and producer |
| **Azure Storage Queue** | β
Stable | Azure Storage Queue consumer and producer |
## ποΈ Database Workers
### Change Data Capture (CDC)
```python
from pythia.brokers.database import CDCWorker, DatabaseChange, ChangeType
class OrderCDCWorker(CDCWorker):
def __init__(self):
super().__init__(
connection_string="postgresql://localhost/ecommerce",
tables=["orders", "payments"],
poll_interval=2.0,
timestamp_column="updated_at"
)
async def process_change(self, change: DatabaseChange):
if change.change_type == ChangeType.INSERT:
await self.handle_new_record(change)
elif change.change_type == ChangeType.UPDATE:
await self.handle_updated_record(change)
return {"status": "processed", "table": change.table}
```
### Database Synchronization
```python
from pythia.brokers.database import SyncWorker
class DataSyncWorker(SyncWorker):
def __init__(self):
super().__init__(
source_connection="postgresql://prod-db/main",
target_connection="postgresql://analytics-db/replica",
sync_config={
"mode": "incremental", # or "full"
"batch_size": 1000,
"timestamp_column": "updated_at"
}
)
async def process(self):
# Sync specific tables
await self.sync_table("users")
await self.sync_table("orders")
```
## π‘ HTTP Workers
### API Polling Worker
```python
from pythia.brokers.http import PollerWorker
class PaymentStatusPoller(PollerWorker):
def __init__(self):
super().__init__(
url="https://api.payments.com/status",
interval=30, # Poll every 30 seconds
method="GET",
headers={"Authorization": "Bearer your-token"}
)
async def process_message(self, message):
# Process API response
data = message.body
if data.get("status") == "completed":
await self.handle_payment_completed(data)
return {"processed": True}
```
### Webhook Sender Worker
```python
from pythia.brokers.http import WebhookSenderWorker
class NotificationWebhook(WebhookSenderWorker):
def __init__(self):
super().__init__(base_url="https://hooks.example.com")
async def process(self):
# Send webhooks based on your logic
await self.send_webhook(
endpoint="/notifications",
data={"event": "user_created", "user_id": 123}
)
# Broadcast to multiple endpoints
await self.broadcast_webhook(
endpoints=["/webhook1", "/webhook2"],
data={"event": "system_alert", "level": "warning"}
)
```
## βοΈ Cloud Workers
### AWS SQS Consumer
```python
from pythia.brokers.cloud.aws import SQSConsumer
from pythia.config.cloud import AWSConfig
class OrderProcessor(SQSConsumer):
def __init__(self):
aws_config = AWSConfig(
region="us-east-1",
queue_url="https://sqs.us-east-1.amazonaws.com/123/orders"
)
super().__init__(
queue_url="https://sqs.us-east-1.amazonaws.com/123/orders",
aws_config=aws_config
)
async def process_message(self, message):
# Process SQS message
order_data = message.body
print(f"Processing order: {order_data.get('order_id')}")
return {"processed": True}
# Run
if __name__ == "__main__":
OrderProcessor().run_sync()
```
### AWS SNS Producer
```python
from pythia.brokers.cloud.aws import SNSProducer
class NotificationSender(SNSProducer):
def __init__(self):
super().__init__(topic_arn="arn:aws:sns:us-east-1:123:notifications")
async def send_user_notification(self, user_data):
await self.publish_message(
message={"event": "user_created", "user_id": user_data["id"]},
subject="New User Registration"
)
```
### GCP Pub/Sub Subscriber
```python
from pythia.brokers.cloud.gcp import PubSubSubscriber
from pythia.config.cloud import GCPConfig
class MessageProcessor(PubSubSubscriber):
def __init__(self):
gcp_config = GCPConfig(
project_id="my-gcp-project",
subscription_name="message-subscription"
)
super().__init__(
subscription_path="projects/my-gcp-project/subscriptions/message-subscription",
gcp_config=gcp_config
)
async def process_message(self, message):
# Process Pub/Sub message
data = message.body
print(f"Processing message: {data.get('event_type')}")
return {"processed": True}
# Run
if __name__ == "__main__":
MessageProcessor().run_sync()
```
### GCP Pub/Sub Publisher
```python
from pythia.brokers.cloud.gcp import PubSubPublisher
class EventPublisher(PubSubPublisher):
def __init__(self):
super().__init__(topic_path="projects/my-gcp-project/topics/events")
async def publish_event(self, event_data):
await self.publish_message(
message={"event": "user_activity", "data": event_data},
attributes={"source": "user-service", "timestamp": "2025-09-01"},
ordering_key=f"user-{event_data.get('user_id')}"
)
```
### Azure Service Bus Consumer
```python
from pythia.brokers.cloud.azure import ServiceBusConsumer
from pythia.config.cloud import AzureConfig
class OrderProcessor(ServiceBusConsumer):
def __init__(self):
azure_config = AzureConfig(
service_bus_connection_string="Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=test",
service_bus_queue_name="orders"
)
super().__init__(
queue_name="orders",
connection_string="Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=test",
azure_config=azure_config
)
async def process_message(self, message):
# Process Service Bus message
order_data = message.body
print(f"Processing order: {order_data.get('order_id')}")
return {"processed": True}
# Run
if __name__ == "__main__":
OrderProcessor().run_sync()
```
### Azure Storage Queue Producer
```python
from pythia.brokers.cloud.azure import StorageQueueProducer
class TaskProducer(StorageQueueProducer):
def __init__(self):
super().__init__(
queue_name="tasks",
connection_string="DefaultEndpointsProtocol=https;AccountName=test;AccountKey=test;EndpointSuffix=core.windows.net"
)
async def send_task(self, task_data):
await self.send_message(
message={"task": "process_image", "image_id": task_data["id"]},
visibility_timeout=60,
time_to_live=3600 # 1 hour TTL
)
```
## π§ Configuration
### Environment Variables
```bash
# Worker config
PYTHIA_WORKER_NAME=my-worker
PYTHIA_LOG_LEVEL=INFO
PYTHIA_MAX_RETRIES=3
# Kafka config
KAFKA_BOOTSTRAP_SERVERS=localhost:9092
KAFKA_GROUP_ID=my-group
KAFKA_TOPICS=events,notifications
# Database config
DATABASE_URL=postgresql://user:pass@localhost/db
DATABASE_POLL_INTERVAL=5.0
# AWS config
AWS_REGION=us-east-1
AWS_ACCESS_KEY_ID=your-access-key
AWS_SECRET_ACCESS_KEY=your-secret-key
SQS_QUEUE_URL=https://sqs.us-east-1.amazonaws.com/123/queue
SNS_TOPIC_ARN=arn:aws:sns:us-east-1:123:topic
# GCP config
GCP_PROJECT_ID=my-gcp-project
GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account.json
PUBSUB_SUBSCRIPTION=projects/my-project/subscriptions/my-subscription
PUBSUB_TOPIC=projects/my-project/topics/my-topic
# Azure config
AZURE_SERVICE_BUS_CONNECTION_STRING=Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=test
SERVICE_BUS_QUEUE_NAME=orders
AZURE_STORAGE_CONNECTION_STRING=DefaultEndpointsProtocol=https;AccountName=test;AccountKey=test;EndpointSuffix=core.windows.net
STORAGE_QUEUE_NAME=tasks
```
## π§ͺ Testing
```bash
# Install development dependencies
uv sync --all-extras
# Run tests
uv run pytest
# Tests with coverage
uv run pytest --cov=pythia
# Linting
uv run ruff check .
uv run ruff format .
```
## ποΈ Project Structure
```
pythia/
βββ core/ # Core framework
β βββ worker.py # Base Worker class
β βββ message.py # Message abstraction
β βββ lifecycle.py # Lifecycle management
βββ brokers/ # Broker adapters
β βββ kafka/ # Kafka (Confluent)
β βββ rabbitmq/ # RabbitMQ (aio-pika)
β βββ redis/ # Redis Pub/Sub and Streams
β βββ database/ # Database CDC and Sync
βββ config/ # Configuration system
βββ logging/ # Logging with Loguru
βββ utils/ # Utilities
```
## πΊοΈ Roadmap
- [x] **Core Framework** - Worker base, lifecycle, configuration
- [x] **Kafka Integration** - Consumer/Producer con confluent-kafka
- [x] **RabbitMQ Support** - Consumer/Producer con aio-pika
- [x] **Redis Support** - Pub/Sub y Streams
- [x] **Database Workers** - CDC y sincronizaciΓ³n con SQLAlchemy
- [x] **HTTP Workers** - Webhooks, polling, HTTP clients
- [X] **CLI Tools** - Worker generation, monitoring
- [X] **Cloud Brokers** - AWS SQS/SNS, GCP Pub/Sub, Azure Service Bus
- [ ] **Monitoring Dashboard** - Metrics, health checks, web UI
## π Documentation
Visit our complete documentation at: [https://ralonso20.github.io/pythia/](https://ralonso20.github.io/pythia/) (coming soon)
## π€ Contributing
1. Fork the project
2. Create your feature branch (`git checkout -b feature/amazing-feature`)
3. Commit your changes (`git commit -m 'Add amazing feature'`)
4. Push to the branch (`git push origin feature/amazing-feature`)
5. Open a Pull Request
## π License
This project is licensed under the MIT License. See [LICENSE](LICENSE) for more details.
## π― Inspiration
Pythia is inspired by frameworks like:
- **Celery** (Python) - For distributed workers
- **Apache Kafka Streams** (Java) - For stream processing
- **Spring Boot** (Java) - For automatic configuration
- **Sidekiq** (Ruby) - For simplicity and elegance
## π Support
- π§ **Issues**: [GitHub Issues](https://github.com/ralonso20/pythia/issues)
- π **Documentation**: [**Documentation**](https://ralonso20.github.io/pythia/)
- π¬ **Discussions**: [GitHub Discussions](https://github.com/ralonso20/pythia/discussions)
---
**Pythia**: *From configuration complexity to business logic simplicity.*
Raw data
{
"_id": null,
"home_page": null,
"name": "pythia-framework",
"maintainer": "Pythia Contributors",
"docs_url": null,
"requires_python": ">=3.11",
"maintainer_email": null,
"keywords": "async, background-jobs, distributed-systems, kafka, message-queue, microservices, rabbitmq, redis, worker",
"author": "Pythia Contributors",
"author_email": null,
"download_url": "https://files.pythonhosted.org/packages/a3/4e/1b74c7a2ba865736bdc9592fb98c03323d287adc8c6fce43255f9ec5392e/pythia_framework-1.0.1.tar.gz",
"platform": null,
"description": "<div align=\"center\">\n <img src=\"static/logo_full_horizontal.webp\">\n\n # Pythia - Modern Python Worker Framework\n\n **A modern library for creating efficient and scalable workers in Python**\n\n [](https://badge.fury.io/py/pythia-framework)\n [](https://pypi.org/project/pythia-framework/)\n [](https://opensource.org/licenses/MIT)\n[](https://github.com/Ralonso20/Pythia-framework/actions/workflows/ci.yml)\n [](https://codecov.io/github/Ralonso20/Pythia-framework)\n [](https://pythia-framework.github.io/pythia/)\n [](https://github.com/astral-sh/ruff)\n [](https://github.com/pythia-framework/pythia/pulls)\n\n</div>\n\nPythia is a framework that simplifies creating message processing workers, background jobs, and asynchronous tasks. Based on production-proven patterns, it abstracts configuration complexity and allows you to create robust workers in minutes.\n\n## \u2728 Key Features\n\n- **\u26a1 Rapid development**: From days to minutes to create a functional worker\n- **\ud83d\udd04 Flexibility**: Support for multiple message brokers (Kafka, RabbitMQ, Redis, Database CDC)\n- **\ud83d\ude80 Production ready**: Best practices included by default\n- **\ud83c\udfaf Zero configuration**: Automatic configuration from environment variables\n- **\ud83d\udcdd Type safety**: Native integration with Pydantic v2\n- **\ud83d\udcca Observability**: Structured logging and automatic metrics\n- **\u2699\ufe0f Lifecycle management**: Graceful startup/shutdown with signal handling\n- **\ud83d\uddc4\ufe0f Database Workers**: Change Data Capture (CDC) and database synchronization\n\n## \ud83d\udce6 Installation\n\n```bash\n# Basic installation\nuv add pythia-framework\n# or with pip\npip install pythia-framework\n\n# Individual brokers\nuv add pythia-framework[kafka] # Apache Kafka only\nuv add pythia-framework[rabbitmq] # RabbitMQ only\nuv add pythia-framework[redis] # Redis only\nuv add pythia-framework[database] # Database CDC only\n\n# Cloud providers\nuv add pythia-framework[aws] # AWS SQS/SNS only\nuv add pythia-framework[gcp] # Google Pub/Sub only\nuv add pythia-framework[azure] # Azure Service Bus only\n\n# Complete bundles\nuv add pythia-framework[brokers] # All traditional brokers\nuv add pythia-framework[cloud] # All cloud providers\nuv add pythia-framework[all] # Everything included\n```\n\n## \ud83d\ude80 Quick Start\n\n### Basic Worker\n\n```python\nfrom pythia import Worker\nfrom pythia.brokers.kafka import KafkaConsumer\nfrom pydantic import BaseModel\n\nclass ApprovalEvent(BaseModel):\n id: str\n status: str\n user_id: str\n\nclass ApprovalWorker(Worker):\n source = KafkaConsumer(\n topics=[\"approvals\"],\n group_id=\"approval-worker\"\n )\n\n async def process(self, event: ApprovalEvent):\n print(f\"Processing approval {event.id}: {event.status}\")\n # Your business logic here\n\n# Run\nif __name__ == \"__main__\":\n ApprovalWorker().run_sync()\n```\n\n### Database CDC Worker\n\n```python\nfrom pythia.brokers.database import CDCWorker, DatabaseChange\n\nclass UserCDCWorker(CDCWorker):\n def __init__(self):\n super().__init__(\n connection_string=\"postgresql://user:pass@localhost/db\",\n tables=[\"users\", \"orders\"],\n poll_interval=5.0\n )\n\n async def process_change(self, change: DatabaseChange):\n print(f\"Change detected in {change.table}: {change.change_type}\")\n # Process database change\n return {\"processed\": True}\n\n# Run\nif __name__ == \"__main__\":\n UserCDCWorker().run_sync()\n```\n\n## \ud83d\udce1 Supported Brokers\n\n| Broker | Status | Description |\n|--------|--------|-------------|\n| **Kafka** | \u2705 Stable | Consumer/Producer with confluent-kafka |\n| **RabbitMQ** | \u2705 Stable | AMQP with aio-pika |\n| **Redis** | \u2705 Stable | Pub/Sub and Streams |\n| **Database CDC** | \u2705 Stable | Change Data Capture with SQLAlchemy |\n| **HTTP** | \u2705 Stable | HTTP polling and webhooks |\n| **AWS SQS/SNS** | \u2705 Stable | Amazon SQS consumer and SNS producer |\n| **GCP Pub/Sub** | \u2705 Stable | Google Cloud Pub/Sub subscriber and publisher |\n| **Azure Service Bus** | \u2705 Stable | Azure Service Bus consumer and producer |\n| **Azure Storage Queue** | \u2705 Stable | Azure Storage Queue consumer and producer |\n\n## \ud83d\uddc4\ufe0f Database Workers\n\n### Change Data Capture (CDC)\n\n```python\nfrom pythia.brokers.database import CDCWorker, DatabaseChange, ChangeType\n\nclass OrderCDCWorker(CDCWorker):\n def __init__(self):\n super().__init__(\n connection_string=\"postgresql://localhost/ecommerce\",\n tables=[\"orders\", \"payments\"],\n poll_interval=2.0,\n timestamp_column=\"updated_at\"\n )\n\n async def process_change(self, change: DatabaseChange):\n if change.change_type == ChangeType.INSERT:\n await self.handle_new_record(change)\n elif change.change_type == ChangeType.UPDATE:\n await self.handle_updated_record(change)\n\n return {\"status\": \"processed\", \"table\": change.table}\n```\n\n### Database Synchronization\n\n```python\nfrom pythia.brokers.database import SyncWorker\n\nclass DataSyncWorker(SyncWorker):\n def __init__(self):\n super().__init__(\n source_connection=\"postgresql://prod-db/main\",\n target_connection=\"postgresql://analytics-db/replica\",\n sync_config={\n \"mode\": \"incremental\", # or \"full\"\n \"batch_size\": 1000,\n \"timestamp_column\": \"updated_at\"\n }\n )\n\n async def process(self):\n # Sync specific tables\n await self.sync_table(\"users\")\n await self.sync_table(\"orders\")\n```\n\n## \ud83d\udce1 HTTP Workers\n\n### API Polling Worker\n\n```python\nfrom pythia.brokers.http import PollerWorker\n\nclass PaymentStatusPoller(PollerWorker):\n def __init__(self):\n super().__init__(\n url=\"https://api.payments.com/status\",\n interval=30, # Poll every 30 seconds\n method=\"GET\",\n headers={\"Authorization\": \"Bearer your-token\"}\n )\n\n async def process_message(self, message):\n # Process API response\n data = message.body\n if data.get(\"status\") == \"completed\":\n await self.handle_payment_completed(data)\n\n return {\"processed\": True}\n```\n\n### Webhook Sender Worker\n\n```python\nfrom pythia.brokers.http import WebhookSenderWorker\n\nclass NotificationWebhook(WebhookSenderWorker):\n def __init__(self):\n super().__init__(base_url=\"https://hooks.example.com\")\n\n async def process(self):\n # Send webhooks based on your logic\n await self.send_webhook(\n endpoint=\"/notifications\",\n data={\"event\": \"user_created\", \"user_id\": 123}\n )\n\n # Broadcast to multiple endpoints\n await self.broadcast_webhook(\n endpoints=[\"/webhook1\", \"/webhook2\"],\n data={\"event\": \"system_alert\", \"level\": \"warning\"}\n )\n```\n\n## \u2601\ufe0f Cloud Workers\n\n### AWS SQS Consumer\n\n```python\nfrom pythia.brokers.cloud.aws import SQSConsumer\nfrom pythia.config.cloud import AWSConfig\n\nclass OrderProcessor(SQSConsumer):\n def __init__(self):\n aws_config = AWSConfig(\n region=\"us-east-1\",\n queue_url=\"https://sqs.us-east-1.amazonaws.com/123/orders\"\n )\n super().__init__(\n queue_url=\"https://sqs.us-east-1.amazonaws.com/123/orders\",\n aws_config=aws_config\n )\n\n async def process_message(self, message):\n # Process SQS message\n order_data = message.body\n print(f\"Processing order: {order_data.get('order_id')}\")\n return {\"processed\": True}\n\n# Run\nif __name__ == \"__main__\":\n OrderProcessor().run_sync()\n```\n\n### AWS SNS Producer\n\n```python\nfrom pythia.brokers.cloud.aws import SNSProducer\n\nclass NotificationSender(SNSProducer):\n def __init__(self):\n super().__init__(topic_arn=\"arn:aws:sns:us-east-1:123:notifications\")\n\n async def send_user_notification(self, user_data):\n await self.publish_message(\n message={\"event\": \"user_created\", \"user_id\": user_data[\"id\"]},\n subject=\"New User Registration\"\n )\n```\n\n### GCP Pub/Sub Subscriber\n\n```python\nfrom pythia.brokers.cloud.gcp import PubSubSubscriber\nfrom pythia.config.cloud import GCPConfig\n\nclass MessageProcessor(PubSubSubscriber):\n def __init__(self):\n gcp_config = GCPConfig(\n project_id=\"my-gcp-project\",\n subscription_name=\"message-subscription\"\n )\n super().__init__(\n subscription_path=\"projects/my-gcp-project/subscriptions/message-subscription\",\n gcp_config=gcp_config\n )\n\n async def process_message(self, message):\n # Process Pub/Sub message\n data = message.body\n print(f\"Processing message: {data.get('event_type')}\")\n return {\"processed\": True}\n\n# Run\nif __name__ == \"__main__\":\n MessageProcessor().run_sync()\n```\n\n### GCP Pub/Sub Publisher\n\n```python\nfrom pythia.brokers.cloud.gcp import PubSubPublisher\n\nclass EventPublisher(PubSubPublisher):\n def __init__(self):\n super().__init__(topic_path=\"projects/my-gcp-project/topics/events\")\n\n async def publish_event(self, event_data):\n await self.publish_message(\n message={\"event\": \"user_activity\", \"data\": event_data},\n attributes={\"source\": \"user-service\", \"timestamp\": \"2025-09-01\"},\n ordering_key=f\"user-{event_data.get('user_id')}\"\n )\n```\n\n### Azure Service Bus Consumer\n\n```python\nfrom pythia.brokers.cloud.azure import ServiceBusConsumer\nfrom pythia.config.cloud import AzureConfig\n\nclass OrderProcessor(ServiceBusConsumer):\n def __init__(self):\n azure_config = AzureConfig(\n service_bus_connection_string=\"Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=test\",\n service_bus_queue_name=\"orders\"\n )\n super().__init__(\n queue_name=\"orders\",\n connection_string=\"Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=test\",\n azure_config=azure_config\n )\n\n async def process_message(self, message):\n # Process Service Bus message\n order_data = message.body\n print(f\"Processing order: {order_data.get('order_id')}\")\n return {\"processed\": True}\n\n# Run\nif __name__ == \"__main__\":\n OrderProcessor().run_sync()\n```\n\n### Azure Storage Queue Producer\n\n```python\nfrom pythia.brokers.cloud.azure import StorageQueueProducer\n\nclass TaskProducer(StorageQueueProducer):\n def __init__(self):\n super().__init__(\n queue_name=\"tasks\",\n connection_string=\"DefaultEndpointsProtocol=https;AccountName=test;AccountKey=test;EndpointSuffix=core.windows.net\"\n )\n\n async def send_task(self, task_data):\n await self.send_message(\n message={\"task\": \"process_image\", \"image_id\": task_data[\"id\"]},\n visibility_timeout=60,\n time_to_live=3600 # 1 hour TTL\n )\n```\n\n## \ud83d\udd27 Configuration\n\n### Environment Variables\n\n```bash\n# Worker config\nPYTHIA_WORKER_NAME=my-worker\nPYTHIA_LOG_LEVEL=INFO\nPYTHIA_MAX_RETRIES=3\n\n# Kafka config\nKAFKA_BOOTSTRAP_SERVERS=localhost:9092\nKAFKA_GROUP_ID=my-group\nKAFKA_TOPICS=events,notifications\n\n# Database config\nDATABASE_URL=postgresql://user:pass@localhost/db\nDATABASE_POLL_INTERVAL=5.0\n\n# AWS config\nAWS_REGION=us-east-1\nAWS_ACCESS_KEY_ID=your-access-key\nAWS_SECRET_ACCESS_KEY=your-secret-key\nSQS_QUEUE_URL=https://sqs.us-east-1.amazonaws.com/123/queue\nSNS_TOPIC_ARN=arn:aws:sns:us-east-1:123:topic\n\n# GCP config\nGCP_PROJECT_ID=my-gcp-project\nGOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account.json\nPUBSUB_SUBSCRIPTION=projects/my-project/subscriptions/my-subscription\nPUBSUB_TOPIC=projects/my-project/topics/my-topic\n\n# Azure config\nAZURE_SERVICE_BUS_CONNECTION_STRING=Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=test\nSERVICE_BUS_QUEUE_NAME=orders\nAZURE_STORAGE_CONNECTION_STRING=DefaultEndpointsProtocol=https;AccountName=test;AccountKey=test;EndpointSuffix=core.windows.net\nSTORAGE_QUEUE_NAME=tasks\n```\n\n## \ud83e\uddea Testing\n\n```bash\n# Install development dependencies\nuv sync --all-extras\n\n# Run tests\nuv run pytest\n\n# Tests with coverage\nuv run pytest --cov=pythia\n\n# Linting\nuv run ruff check .\nuv run ruff format .\n```\n\n## \ud83c\udfd7\ufe0f Project Structure\n\n```\npythia/\n\u251c\u2500\u2500 core/ # Core framework\n\u2502 \u251c\u2500\u2500 worker.py # Base Worker class\n\u2502 \u251c\u2500\u2500 message.py # Message abstraction\n\u2502 \u2514\u2500\u2500 lifecycle.py # Lifecycle management\n\u251c\u2500\u2500 brokers/ # Broker adapters\n\u2502 \u251c\u2500\u2500 kafka/ # Kafka (Confluent)\n\u2502 \u251c\u2500\u2500 rabbitmq/ # RabbitMQ (aio-pika)\n\u2502 \u251c\u2500\u2500 redis/ # Redis Pub/Sub and Streams\n\u2502 \u2514\u2500\u2500 database/ # Database CDC and Sync\n\u251c\u2500\u2500 config/ # Configuration system\n\u251c\u2500\u2500 logging/ # Logging with Loguru\n\u2514\u2500\u2500 utils/ # Utilities\n```\n\n## \ud83d\uddfa\ufe0f Roadmap\n\n- [x] **Core Framework** - Worker base, lifecycle, configuration\n- [x] **Kafka Integration** - Consumer/Producer con confluent-kafka\n- [x] **RabbitMQ Support** - Consumer/Producer con aio-pika\n- [x] **Redis Support** - Pub/Sub y Streams\n- [x] **Database Workers** - CDC y sincronizaci\u00f3n con SQLAlchemy\n- [x] **HTTP Workers** - Webhooks, polling, HTTP clients\n- [X] **CLI Tools** - Worker generation, monitoring\n- [X] **Cloud Brokers** - AWS SQS/SNS, GCP Pub/Sub, Azure Service Bus\n- [ ] **Monitoring Dashboard** - Metrics, health checks, web UI\n\n## \ud83d\udcd6 Documentation\n\nVisit our complete documentation at: [https://ralonso20.github.io/pythia/](https://ralonso20.github.io/pythia/) (coming soon)\n\n## \ud83e\udd1d Contributing\n\n1. Fork the project\n2. Create your feature branch (`git checkout -b feature/amazing-feature`)\n3. Commit your changes (`git commit -m 'Add amazing feature'`)\n4. Push to the branch (`git push origin feature/amazing-feature`)\n5. Open a Pull Request\n\n## \ud83d\udcc4 License\n\nThis project is licensed under the MIT License. See [LICENSE](LICENSE) for more details.\n\n## \ud83c\udfaf Inspiration\n\nPythia is inspired by frameworks like:\n- **Celery** (Python) - For distributed workers\n- **Apache Kafka Streams** (Java) - For stream processing\n- **Spring Boot** (Java) - For automatic configuration\n- **Sidekiq** (Ruby) - For simplicity and elegance\n\n## \ud83d\udcde Support\n\n- \ud83d\udce7 **Issues**: [GitHub Issues](https://github.com/ralonso20/pythia/issues)\n- \ud83d\udcda **Documentation**: [**Documentation**](https://ralonso20.github.io/pythia/)\n- \ud83d\udcac **Discussions**: [GitHub Discussions](https://github.com/ralonso20/pythia/discussions)\n\n---\n\n**Pythia**: *From configuration complexity to business logic simplicity.*\n",
"bugtrack_url": null,
"license": null,
"summary": "A modern library for creating efficient and scalable workers in Python",
"version": "1.0.1",
"project_urls": {
"Bug Tracker": "https://github.com/pythia-framework/pythia/issues",
"Documentation": "https://pythia-framework.github.io/pythia/",
"Homepage": "https://github.com/pythia-framework/pythia",
"Repository": "https://github.com/pythia-framework/pythia"
},
"split_keywords": [
"async",
" background-jobs",
" distributed-systems",
" kafka",
" message-queue",
" microservices",
" rabbitmq",
" redis",
" worker"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "a8dd56674828453fef004191ba4f673bbd5410c4b77ac405b59c41cdda27289b",
"md5": "afb0ec35b812b1d48e2138a4da494f1e",
"sha256": "bf88e43369bf241d564e35115cb97de2e381dc076751fd4e4131a13f0d276a49"
},
"downloads": -1,
"filename": "pythia_framework-1.0.1-py3-none-any.whl",
"has_sig": false,
"md5_digest": "afb0ec35b812b1d48e2138a4da494f1e",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.11",
"size": 191151,
"upload_time": "2025-09-02T22:52:17",
"upload_time_iso_8601": "2025-09-02T22:52:17.236099Z",
"url": "https://files.pythonhosted.org/packages/a8/dd/56674828453fef004191ba4f673bbd5410c4b77ac405b59c41cdda27289b/pythia_framework-1.0.1-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "a34e1b74c7a2ba865736bdc9592fb98c03323d287adc8c6fce43255f9ec5392e",
"md5": "e06bd6e4b83d2b1eec011d671e5dfee5",
"sha256": "577fe11957fbfee86d507dd456a5a563e9401b921cd87943909ca4fb10a3c998"
},
"downloads": -1,
"filename": "pythia_framework-1.0.1.tar.gz",
"has_sig": false,
"md5_digest": "e06bd6e4b83d2b1eec011d671e5dfee5",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.11",
"size": 2072784,
"upload_time": "2025-09-02T22:52:18",
"upload_time_iso_8601": "2025-09-02T22:52:18.507557Z",
"url": "https://files.pythonhosted.org/packages/a3/4e/1b74c7a2ba865736bdc9592fb98c03323d287adc8c6fce43255f9ec5392e/pythia_framework-1.0.1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-09-02 22:52:18",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "pythia-framework",
"github_project": "pythia",
"github_not_found": true,
"lcname": "pythia-framework"
}