pythia-framework


Namepythia-framework JSON
Version 1.0.1 PyPI version JSON
download
home_pageNone
SummaryA modern library for creating efficient and scalable workers in Python
upload_time2025-09-02 22:52:18
maintainerPythia Contributors
docs_urlNone
authorPythia Contributors
requires_python>=3.11
licenseNone
keywords async background-jobs distributed-systems kafka message-queue microservices rabbitmq redis worker
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            <div align="center">
  <img src="static/logo_full_horizontal.webp">

  # Pythia - Modern Python Worker Framework

  **A modern library for creating efficient and scalable workers in Python**

  [![PyPI version](https://badge.fury.io/py/pythia-framework.svg)](https://badge.fury.io/py/pythia-framework)
  [![Python Support](https://img.shields.io/pypi/pyversions/pythia-framework.svg)](https://pypi.org/project/pythia-framework/)
  [![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
[![CI/CD Pipeline](https://github.com/Ralonso20/Pythia-framework/actions/workflows/ci.yml/badge.svg)](https://github.com/Ralonso20/Pythia-framework/actions/workflows/ci.yml)
  [![codecov](https://codecov.io/github/Ralonso20/Pythia-framework/graph/badge.svg?token=b6n9A4sNae)](https://codecov.io/github/Ralonso20/Pythia-framework)
  [![Documentation Status](https://img.shields.io/badge/docs-mkdocs-blue.svg)](https://pythia-framework.github.io/pythia/)
  [![Code style: black](https://img.shields.io/badge/code%20style-ruff-000000.svg)](https://github.com/astral-sh/ruff)
  [![PRs Welcome](https://img.shields.io/badge/PRs-welcome-brightgreen.svg)](https://github.com/pythia-framework/pythia/pulls)

</div>

Pythia is a framework that simplifies creating message processing workers, background jobs, and asynchronous tasks. Based on production-proven patterns, it abstracts configuration complexity and allows you to create robust workers in minutes.

## ✨ Key Features

- **⚑ Rapid development**: From days to minutes to create a functional worker
- **πŸ”„ Flexibility**: Support for multiple message brokers (Kafka, RabbitMQ, Redis, Database CDC)
- **πŸš€ Production ready**: Best practices included by default
- **🎯 Zero configuration**: Automatic configuration from environment variables
- **πŸ“ Type safety**: Native integration with Pydantic v2
- **πŸ“Š Observability**: Structured logging and automatic metrics
- **βš™οΈ Lifecycle management**: Graceful startup/shutdown with signal handling
- **πŸ—„οΈ Database Workers**: Change Data Capture (CDC) and database synchronization

## πŸ“¦ Installation

```bash
# Basic installation
uv add pythia-framework
# or with pip
pip install pythia-framework

# Individual brokers
uv add pythia-framework[kafka]          # Apache Kafka only
uv add pythia-framework[rabbitmq]       # RabbitMQ only
uv add pythia-framework[redis]          # Redis only
uv add pythia-framework[database]       # Database CDC only

# Cloud providers
uv add pythia-framework[aws]            # AWS SQS/SNS only
uv add pythia-framework[gcp]            # Google Pub/Sub only
uv add pythia-framework[azure]          # Azure Service Bus only

# Complete bundles
uv add pythia-framework[brokers]        # All traditional brokers
uv add pythia-framework[cloud]          # All cloud providers
uv add pythia-framework[all]            # Everything included
```

## πŸš€ Quick Start

### Basic Worker

```python
from pythia import Worker
from pythia.brokers.kafka import KafkaConsumer
from pydantic import BaseModel

class ApprovalEvent(BaseModel):
    id: str
    status: str
    user_id: str

class ApprovalWorker(Worker):
    source = KafkaConsumer(
        topics=["approvals"],
        group_id="approval-worker"
    )

    async def process(self, event: ApprovalEvent):
        print(f"Processing approval {event.id}: {event.status}")
        # Your business logic here

# Run
if __name__ == "__main__":
    ApprovalWorker().run_sync()
```

### Database CDC Worker

```python
from pythia.brokers.database import CDCWorker, DatabaseChange

class UserCDCWorker(CDCWorker):
    def __init__(self):
        super().__init__(
            connection_string="postgresql://user:pass@localhost/db",
            tables=["users", "orders"],
            poll_interval=5.0
        )

    async def process_change(self, change: DatabaseChange):
        print(f"Change detected in {change.table}: {change.change_type}")
        # Process database change
        return {"processed": True}

# Run
if __name__ == "__main__":
    UserCDCWorker().run_sync()
```

## πŸ“‘ Supported Brokers

| Broker | Status | Description |
|--------|--------|-------------|
| **Kafka** | βœ… Stable | Consumer/Producer with confluent-kafka |
| **RabbitMQ** | βœ… Stable | AMQP with aio-pika |
| **Redis** | βœ… Stable | Pub/Sub and Streams |
| **Database CDC** | βœ… Stable | Change Data Capture with SQLAlchemy |
| **HTTP** | βœ… Stable | HTTP polling and webhooks |
| **AWS SQS/SNS** | βœ… Stable | Amazon SQS consumer and SNS producer |
| **GCP Pub/Sub** | βœ… Stable | Google Cloud Pub/Sub subscriber and publisher |
| **Azure Service Bus** | βœ… Stable | Azure Service Bus consumer and producer |
| **Azure Storage Queue** | βœ… Stable | Azure Storage Queue consumer and producer |

## πŸ—„οΈ Database Workers

### Change Data Capture (CDC)

```python
from pythia.brokers.database import CDCWorker, DatabaseChange, ChangeType

class OrderCDCWorker(CDCWorker):
    def __init__(self):
        super().__init__(
            connection_string="postgresql://localhost/ecommerce",
            tables=["orders", "payments"],
            poll_interval=2.0,
            timestamp_column="updated_at"
        )

    async def process_change(self, change: DatabaseChange):
        if change.change_type == ChangeType.INSERT:
            await self.handle_new_record(change)
        elif change.change_type == ChangeType.UPDATE:
            await self.handle_updated_record(change)

        return {"status": "processed", "table": change.table}
```

### Database Synchronization

```python
from pythia.brokers.database import SyncWorker

class DataSyncWorker(SyncWorker):
    def __init__(self):
        super().__init__(
            source_connection="postgresql://prod-db/main",
            target_connection="postgresql://analytics-db/replica",
            sync_config={
                "mode": "incremental",  # or "full"
                "batch_size": 1000,
                "timestamp_column": "updated_at"
            }
        )

    async def process(self):
        # Sync specific tables
        await self.sync_table("users")
        await self.sync_table("orders")
```

## πŸ“‘ HTTP Workers

### API Polling Worker

```python
from pythia.brokers.http import PollerWorker

class PaymentStatusPoller(PollerWorker):
    def __init__(self):
        super().__init__(
            url="https://api.payments.com/status",
            interval=30,  # Poll every 30 seconds
            method="GET",
            headers={"Authorization": "Bearer your-token"}
        )

    async def process_message(self, message):
        # Process API response
        data = message.body
        if data.get("status") == "completed":
            await self.handle_payment_completed(data)

        return {"processed": True}
```

### Webhook Sender Worker

```python
from pythia.brokers.http import WebhookSenderWorker

class NotificationWebhook(WebhookSenderWorker):
    def __init__(self):
        super().__init__(base_url="https://hooks.example.com")

    async def process(self):
        # Send webhooks based on your logic
        await self.send_webhook(
            endpoint="/notifications",
            data={"event": "user_created", "user_id": 123}
        )

        # Broadcast to multiple endpoints
        await self.broadcast_webhook(
            endpoints=["/webhook1", "/webhook2"],
            data={"event": "system_alert", "level": "warning"}
        )
```

## ☁️ Cloud Workers

### AWS SQS Consumer

```python
from pythia.brokers.cloud.aws import SQSConsumer
from pythia.config.cloud import AWSConfig

class OrderProcessor(SQSConsumer):
    def __init__(self):
        aws_config = AWSConfig(
            region="us-east-1",
            queue_url="https://sqs.us-east-1.amazonaws.com/123/orders"
        )
        super().__init__(
            queue_url="https://sqs.us-east-1.amazonaws.com/123/orders",
            aws_config=aws_config
        )

    async def process_message(self, message):
        # Process SQS message
        order_data = message.body
        print(f"Processing order: {order_data.get('order_id')}")
        return {"processed": True}

# Run
if __name__ == "__main__":
    OrderProcessor().run_sync()
```

### AWS SNS Producer

```python
from pythia.brokers.cloud.aws import SNSProducer

class NotificationSender(SNSProducer):
    def __init__(self):
        super().__init__(topic_arn="arn:aws:sns:us-east-1:123:notifications")

    async def send_user_notification(self, user_data):
        await self.publish_message(
            message={"event": "user_created", "user_id": user_data["id"]},
            subject="New User Registration"
        )
```

### GCP Pub/Sub Subscriber

```python
from pythia.brokers.cloud.gcp import PubSubSubscriber
from pythia.config.cloud import GCPConfig

class MessageProcessor(PubSubSubscriber):
    def __init__(self):
        gcp_config = GCPConfig(
            project_id="my-gcp-project",
            subscription_name="message-subscription"
        )
        super().__init__(
            subscription_path="projects/my-gcp-project/subscriptions/message-subscription",
            gcp_config=gcp_config
        )

    async def process_message(self, message):
        # Process Pub/Sub message
        data = message.body
        print(f"Processing message: {data.get('event_type')}")
        return {"processed": True}

# Run
if __name__ == "__main__":
    MessageProcessor().run_sync()
```

### GCP Pub/Sub Publisher

```python
from pythia.brokers.cloud.gcp import PubSubPublisher

class EventPublisher(PubSubPublisher):
    def __init__(self):
        super().__init__(topic_path="projects/my-gcp-project/topics/events")

    async def publish_event(self, event_data):
        await self.publish_message(
            message={"event": "user_activity", "data": event_data},
            attributes={"source": "user-service", "timestamp": "2025-09-01"},
            ordering_key=f"user-{event_data.get('user_id')}"
        )
```

### Azure Service Bus Consumer

```python
from pythia.brokers.cloud.azure import ServiceBusConsumer
from pythia.config.cloud import AzureConfig

class OrderProcessor(ServiceBusConsumer):
    def __init__(self):
        azure_config = AzureConfig(
            service_bus_connection_string="Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=test",
            service_bus_queue_name="orders"
        )
        super().__init__(
            queue_name="orders",
            connection_string="Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=test",
            azure_config=azure_config
        )

    async def process_message(self, message):
        # Process Service Bus message
        order_data = message.body
        print(f"Processing order: {order_data.get('order_id')}")
        return {"processed": True}

# Run
if __name__ == "__main__":
    OrderProcessor().run_sync()
```

### Azure Storage Queue Producer

```python
from pythia.brokers.cloud.azure import StorageQueueProducer

class TaskProducer(StorageQueueProducer):
    def __init__(self):
        super().__init__(
            queue_name="tasks",
            connection_string="DefaultEndpointsProtocol=https;AccountName=test;AccountKey=test;EndpointSuffix=core.windows.net"
        )

    async def send_task(self, task_data):
        await self.send_message(
            message={"task": "process_image", "image_id": task_data["id"]},
            visibility_timeout=60,
            time_to_live=3600  # 1 hour TTL
        )
```

## πŸ”§ Configuration

### Environment Variables

```bash
# Worker config
PYTHIA_WORKER_NAME=my-worker
PYTHIA_LOG_LEVEL=INFO
PYTHIA_MAX_RETRIES=3

# Kafka config
KAFKA_BOOTSTRAP_SERVERS=localhost:9092
KAFKA_GROUP_ID=my-group
KAFKA_TOPICS=events,notifications

# Database config
DATABASE_URL=postgresql://user:pass@localhost/db
DATABASE_POLL_INTERVAL=5.0

# AWS config
AWS_REGION=us-east-1
AWS_ACCESS_KEY_ID=your-access-key
AWS_SECRET_ACCESS_KEY=your-secret-key
SQS_QUEUE_URL=https://sqs.us-east-1.amazonaws.com/123/queue
SNS_TOPIC_ARN=arn:aws:sns:us-east-1:123:topic

# GCP config
GCP_PROJECT_ID=my-gcp-project
GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account.json
PUBSUB_SUBSCRIPTION=projects/my-project/subscriptions/my-subscription
PUBSUB_TOPIC=projects/my-project/topics/my-topic

# Azure config
AZURE_SERVICE_BUS_CONNECTION_STRING=Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=test
SERVICE_BUS_QUEUE_NAME=orders
AZURE_STORAGE_CONNECTION_STRING=DefaultEndpointsProtocol=https;AccountName=test;AccountKey=test;EndpointSuffix=core.windows.net
STORAGE_QUEUE_NAME=tasks
```

## πŸ§ͺ Testing

```bash
# Install development dependencies
uv sync --all-extras

# Run tests
uv run pytest

# Tests with coverage
uv run pytest --cov=pythia

# Linting
uv run ruff check .
uv run ruff format .
```

## πŸ—οΈ Project Structure

```
pythia/
β”œβ”€β”€ core/                   # Core framework
β”‚   β”œβ”€β”€ worker.py          # Base Worker class
β”‚   β”œβ”€β”€ message.py         # Message abstraction
β”‚   └── lifecycle.py       # Lifecycle management
β”œβ”€β”€ brokers/               # Broker adapters
β”‚   β”œβ”€β”€ kafka/            # Kafka (Confluent)
β”‚   β”œβ”€β”€ rabbitmq/         # RabbitMQ (aio-pika)
β”‚   β”œβ”€β”€ redis/            # Redis Pub/Sub and Streams
β”‚   └── database/         # Database CDC and Sync
β”œβ”€β”€ config/               # Configuration system
β”œβ”€β”€ logging/              # Logging with Loguru
└── utils/                # Utilities
```

## πŸ—ΊοΈ Roadmap

- [x] **Core Framework** - Worker base, lifecycle, configuration
- [x] **Kafka Integration** - Consumer/Producer con confluent-kafka
- [x] **RabbitMQ Support** - Consumer/Producer con aio-pika
- [x] **Redis Support** - Pub/Sub y Streams
- [x] **Database Workers** - CDC y sincronizaciΓ³n con SQLAlchemy
- [x] **HTTP Workers** - Webhooks, polling, HTTP clients
- [X] **CLI Tools** - Worker generation, monitoring
- [X] **Cloud Brokers** - AWS SQS/SNS, GCP Pub/Sub, Azure Service Bus
- [ ] **Monitoring Dashboard** - Metrics, health checks, web UI

## πŸ“– Documentation

Visit our complete documentation at: [https://ralonso20.github.io/pythia/](https://ralonso20.github.io/pythia/) (coming soon)

## 🀝 Contributing

1. Fork the project
2. Create your feature branch (`git checkout -b feature/amazing-feature`)
3. Commit your changes (`git commit -m 'Add amazing feature'`)
4. Push to the branch (`git push origin feature/amazing-feature`)
5. Open a Pull Request

## πŸ“„ License

This project is licensed under the MIT License. See [LICENSE](LICENSE) for more details.

## 🎯 Inspiration

Pythia is inspired by frameworks like:
- **Celery** (Python) - For distributed workers
- **Apache Kafka Streams** (Java) - For stream processing
- **Spring Boot** (Java) - For automatic configuration
- **Sidekiq** (Ruby) - For simplicity and elegance

## πŸ“ž Support

- πŸ“§ **Issues**: [GitHub Issues](https://github.com/ralonso20/pythia/issues)
- πŸ“š **Documentation**: [**Documentation**](https://ralonso20.github.io/pythia/)
- πŸ’¬ **Discussions**: [GitHub Discussions](https://github.com/ralonso20/pythia/discussions)

---

**Pythia**: *From configuration complexity to business logic simplicity.*

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "pythia-framework",
    "maintainer": "Pythia Contributors",
    "docs_url": null,
    "requires_python": ">=3.11",
    "maintainer_email": null,
    "keywords": "async, background-jobs, distributed-systems, kafka, message-queue, microservices, rabbitmq, redis, worker",
    "author": "Pythia Contributors",
    "author_email": null,
    "download_url": "https://files.pythonhosted.org/packages/a3/4e/1b74c7a2ba865736bdc9592fb98c03323d287adc8c6fce43255f9ec5392e/pythia_framework-1.0.1.tar.gz",
    "platform": null,
    "description": "<div align=\"center\">\n  <img src=\"static/logo_full_horizontal.webp\">\n\n  # Pythia - Modern Python Worker Framework\n\n  **A modern library for creating efficient and scalable workers in Python**\n\n  [![PyPI version](https://badge.fury.io/py/pythia-framework.svg)](https://badge.fury.io/py/pythia-framework)\n  [![Python Support](https://img.shields.io/pypi/pyversions/pythia-framework.svg)](https://pypi.org/project/pythia-framework/)\n  [![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)\n[![CI/CD Pipeline](https://github.com/Ralonso20/Pythia-framework/actions/workflows/ci.yml/badge.svg)](https://github.com/Ralonso20/Pythia-framework/actions/workflows/ci.yml)\n  [![codecov](https://codecov.io/github/Ralonso20/Pythia-framework/graph/badge.svg?token=b6n9A4sNae)](https://codecov.io/github/Ralonso20/Pythia-framework)\n  [![Documentation Status](https://img.shields.io/badge/docs-mkdocs-blue.svg)](https://pythia-framework.github.io/pythia/)\n  [![Code style: black](https://img.shields.io/badge/code%20style-ruff-000000.svg)](https://github.com/astral-sh/ruff)\n  [![PRs Welcome](https://img.shields.io/badge/PRs-welcome-brightgreen.svg)](https://github.com/pythia-framework/pythia/pulls)\n\n</div>\n\nPythia is a framework that simplifies creating message processing workers, background jobs, and asynchronous tasks. Based on production-proven patterns, it abstracts configuration complexity and allows you to create robust workers in minutes.\n\n## \u2728 Key Features\n\n- **\u26a1 Rapid development**: From days to minutes to create a functional worker\n- **\ud83d\udd04 Flexibility**: Support for multiple message brokers (Kafka, RabbitMQ, Redis, Database CDC)\n- **\ud83d\ude80 Production ready**: Best practices included by default\n- **\ud83c\udfaf Zero configuration**: Automatic configuration from environment variables\n- **\ud83d\udcdd Type safety**: Native integration with Pydantic v2\n- **\ud83d\udcca Observability**: Structured logging and automatic metrics\n- **\u2699\ufe0f Lifecycle management**: Graceful startup/shutdown with signal handling\n- **\ud83d\uddc4\ufe0f Database Workers**: Change Data Capture (CDC) and database synchronization\n\n## \ud83d\udce6 Installation\n\n```bash\n# Basic installation\nuv add pythia-framework\n# or with pip\npip install pythia-framework\n\n# Individual brokers\nuv add pythia-framework[kafka]          # Apache Kafka only\nuv add pythia-framework[rabbitmq]       # RabbitMQ only\nuv add pythia-framework[redis]          # Redis only\nuv add pythia-framework[database]       # Database CDC only\n\n# Cloud providers\nuv add pythia-framework[aws]            # AWS SQS/SNS only\nuv add pythia-framework[gcp]            # Google Pub/Sub only\nuv add pythia-framework[azure]          # Azure Service Bus only\n\n# Complete bundles\nuv add pythia-framework[brokers]        # All traditional brokers\nuv add pythia-framework[cloud]          # All cloud providers\nuv add pythia-framework[all]            # Everything included\n```\n\n## \ud83d\ude80 Quick Start\n\n### Basic Worker\n\n```python\nfrom pythia import Worker\nfrom pythia.brokers.kafka import KafkaConsumer\nfrom pydantic import BaseModel\n\nclass ApprovalEvent(BaseModel):\n    id: str\n    status: str\n    user_id: str\n\nclass ApprovalWorker(Worker):\n    source = KafkaConsumer(\n        topics=[\"approvals\"],\n        group_id=\"approval-worker\"\n    )\n\n    async def process(self, event: ApprovalEvent):\n        print(f\"Processing approval {event.id}: {event.status}\")\n        # Your business logic here\n\n# Run\nif __name__ == \"__main__\":\n    ApprovalWorker().run_sync()\n```\n\n### Database CDC Worker\n\n```python\nfrom pythia.brokers.database import CDCWorker, DatabaseChange\n\nclass UserCDCWorker(CDCWorker):\n    def __init__(self):\n        super().__init__(\n            connection_string=\"postgresql://user:pass@localhost/db\",\n            tables=[\"users\", \"orders\"],\n            poll_interval=5.0\n        )\n\n    async def process_change(self, change: DatabaseChange):\n        print(f\"Change detected in {change.table}: {change.change_type}\")\n        # Process database change\n        return {\"processed\": True}\n\n# Run\nif __name__ == \"__main__\":\n    UserCDCWorker().run_sync()\n```\n\n## \ud83d\udce1 Supported Brokers\n\n| Broker | Status | Description |\n|--------|--------|-------------|\n| **Kafka** | \u2705 Stable | Consumer/Producer with confluent-kafka |\n| **RabbitMQ** | \u2705 Stable | AMQP with aio-pika |\n| **Redis** | \u2705 Stable | Pub/Sub and Streams |\n| **Database CDC** | \u2705 Stable | Change Data Capture with SQLAlchemy |\n| **HTTP** | \u2705 Stable | HTTP polling and webhooks |\n| **AWS SQS/SNS** | \u2705 Stable | Amazon SQS consumer and SNS producer |\n| **GCP Pub/Sub** | \u2705 Stable | Google Cloud Pub/Sub subscriber and publisher |\n| **Azure Service Bus** | \u2705 Stable | Azure Service Bus consumer and producer |\n| **Azure Storage Queue** | \u2705 Stable | Azure Storage Queue consumer and producer |\n\n## \ud83d\uddc4\ufe0f Database Workers\n\n### Change Data Capture (CDC)\n\n```python\nfrom pythia.brokers.database import CDCWorker, DatabaseChange, ChangeType\n\nclass OrderCDCWorker(CDCWorker):\n    def __init__(self):\n        super().__init__(\n            connection_string=\"postgresql://localhost/ecommerce\",\n            tables=[\"orders\", \"payments\"],\n            poll_interval=2.0,\n            timestamp_column=\"updated_at\"\n        )\n\n    async def process_change(self, change: DatabaseChange):\n        if change.change_type == ChangeType.INSERT:\n            await self.handle_new_record(change)\n        elif change.change_type == ChangeType.UPDATE:\n            await self.handle_updated_record(change)\n\n        return {\"status\": \"processed\", \"table\": change.table}\n```\n\n### Database Synchronization\n\n```python\nfrom pythia.brokers.database import SyncWorker\n\nclass DataSyncWorker(SyncWorker):\n    def __init__(self):\n        super().__init__(\n            source_connection=\"postgresql://prod-db/main\",\n            target_connection=\"postgresql://analytics-db/replica\",\n            sync_config={\n                \"mode\": \"incremental\",  # or \"full\"\n                \"batch_size\": 1000,\n                \"timestamp_column\": \"updated_at\"\n            }\n        )\n\n    async def process(self):\n        # Sync specific tables\n        await self.sync_table(\"users\")\n        await self.sync_table(\"orders\")\n```\n\n## \ud83d\udce1 HTTP Workers\n\n### API Polling Worker\n\n```python\nfrom pythia.brokers.http import PollerWorker\n\nclass PaymentStatusPoller(PollerWorker):\n    def __init__(self):\n        super().__init__(\n            url=\"https://api.payments.com/status\",\n            interval=30,  # Poll every 30 seconds\n            method=\"GET\",\n            headers={\"Authorization\": \"Bearer your-token\"}\n        )\n\n    async def process_message(self, message):\n        # Process API response\n        data = message.body\n        if data.get(\"status\") == \"completed\":\n            await self.handle_payment_completed(data)\n\n        return {\"processed\": True}\n```\n\n### Webhook Sender Worker\n\n```python\nfrom pythia.brokers.http import WebhookSenderWorker\n\nclass NotificationWebhook(WebhookSenderWorker):\n    def __init__(self):\n        super().__init__(base_url=\"https://hooks.example.com\")\n\n    async def process(self):\n        # Send webhooks based on your logic\n        await self.send_webhook(\n            endpoint=\"/notifications\",\n            data={\"event\": \"user_created\", \"user_id\": 123}\n        )\n\n        # Broadcast to multiple endpoints\n        await self.broadcast_webhook(\n            endpoints=[\"/webhook1\", \"/webhook2\"],\n            data={\"event\": \"system_alert\", \"level\": \"warning\"}\n        )\n```\n\n## \u2601\ufe0f Cloud Workers\n\n### AWS SQS Consumer\n\n```python\nfrom pythia.brokers.cloud.aws import SQSConsumer\nfrom pythia.config.cloud import AWSConfig\n\nclass OrderProcessor(SQSConsumer):\n    def __init__(self):\n        aws_config = AWSConfig(\n            region=\"us-east-1\",\n            queue_url=\"https://sqs.us-east-1.amazonaws.com/123/orders\"\n        )\n        super().__init__(\n            queue_url=\"https://sqs.us-east-1.amazonaws.com/123/orders\",\n            aws_config=aws_config\n        )\n\n    async def process_message(self, message):\n        # Process SQS message\n        order_data = message.body\n        print(f\"Processing order: {order_data.get('order_id')}\")\n        return {\"processed\": True}\n\n# Run\nif __name__ == \"__main__\":\n    OrderProcessor().run_sync()\n```\n\n### AWS SNS Producer\n\n```python\nfrom pythia.brokers.cloud.aws import SNSProducer\n\nclass NotificationSender(SNSProducer):\n    def __init__(self):\n        super().__init__(topic_arn=\"arn:aws:sns:us-east-1:123:notifications\")\n\n    async def send_user_notification(self, user_data):\n        await self.publish_message(\n            message={\"event\": \"user_created\", \"user_id\": user_data[\"id\"]},\n            subject=\"New User Registration\"\n        )\n```\n\n### GCP Pub/Sub Subscriber\n\n```python\nfrom pythia.brokers.cloud.gcp import PubSubSubscriber\nfrom pythia.config.cloud import GCPConfig\n\nclass MessageProcessor(PubSubSubscriber):\n    def __init__(self):\n        gcp_config = GCPConfig(\n            project_id=\"my-gcp-project\",\n            subscription_name=\"message-subscription\"\n        )\n        super().__init__(\n            subscription_path=\"projects/my-gcp-project/subscriptions/message-subscription\",\n            gcp_config=gcp_config\n        )\n\n    async def process_message(self, message):\n        # Process Pub/Sub message\n        data = message.body\n        print(f\"Processing message: {data.get('event_type')}\")\n        return {\"processed\": True}\n\n# Run\nif __name__ == \"__main__\":\n    MessageProcessor().run_sync()\n```\n\n### GCP Pub/Sub Publisher\n\n```python\nfrom pythia.brokers.cloud.gcp import PubSubPublisher\n\nclass EventPublisher(PubSubPublisher):\n    def __init__(self):\n        super().__init__(topic_path=\"projects/my-gcp-project/topics/events\")\n\n    async def publish_event(self, event_data):\n        await self.publish_message(\n            message={\"event\": \"user_activity\", \"data\": event_data},\n            attributes={\"source\": \"user-service\", \"timestamp\": \"2025-09-01\"},\n            ordering_key=f\"user-{event_data.get('user_id')}\"\n        )\n```\n\n### Azure Service Bus Consumer\n\n```python\nfrom pythia.brokers.cloud.azure import ServiceBusConsumer\nfrom pythia.config.cloud import AzureConfig\n\nclass OrderProcessor(ServiceBusConsumer):\n    def __init__(self):\n        azure_config = AzureConfig(\n            service_bus_connection_string=\"Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=test\",\n            service_bus_queue_name=\"orders\"\n        )\n        super().__init__(\n            queue_name=\"orders\",\n            connection_string=\"Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=test\",\n            azure_config=azure_config\n        )\n\n    async def process_message(self, message):\n        # Process Service Bus message\n        order_data = message.body\n        print(f\"Processing order: {order_data.get('order_id')}\")\n        return {\"processed\": True}\n\n# Run\nif __name__ == \"__main__\":\n    OrderProcessor().run_sync()\n```\n\n### Azure Storage Queue Producer\n\n```python\nfrom pythia.brokers.cloud.azure import StorageQueueProducer\n\nclass TaskProducer(StorageQueueProducer):\n    def __init__(self):\n        super().__init__(\n            queue_name=\"tasks\",\n            connection_string=\"DefaultEndpointsProtocol=https;AccountName=test;AccountKey=test;EndpointSuffix=core.windows.net\"\n        )\n\n    async def send_task(self, task_data):\n        await self.send_message(\n            message={\"task\": \"process_image\", \"image_id\": task_data[\"id\"]},\n            visibility_timeout=60,\n            time_to_live=3600  # 1 hour TTL\n        )\n```\n\n## \ud83d\udd27 Configuration\n\n### Environment Variables\n\n```bash\n# Worker config\nPYTHIA_WORKER_NAME=my-worker\nPYTHIA_LOG_LEVEL=INFO\nPYTHIA_MAX_RETRIES=3\n\n# Kafka config\nKAFKA_BOOTSTRAP_SERVERS=localhost:9092\nKAFKA_GROUP_ID=my-group\nKAFKA_TOPICS=events,notifications\n\n# Database config\nDATABASE_URL=postgresql://user:pass@localhost/db\nDATABASE_POLL_INTERVAL=5.0\n\n# AWS config\nAWS_REGION=us-east-1\nAWS_ACCESS_KEY_ID=your-access-key\nAWS_SECRET_ACCESS_KEY=your-secret-key\nSQS_QUEUE_URL=https://sqs.us-east-1.amazonaws.com/123/queue\nSNS_TOPIC_ARN=arn:aws:sns:us-east-1:123:topic\n\n# GCP config\nGCP_PROJECT_ID=my-gcp-project\nGOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account.json\nPUBSUB_SUBSCRIPTION=projects/my-project/subscriptions/my-subscription\nPUBSUB_TOPIC=projects/my-project/topics/my-topic\n\n# Azure config\nAZURE_SERVICE_BUS_CONNECTION_STRING=Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=test\nSERVICE_BUS_QUEUE_NAME=orders\nAZURE_STORAGE_CONNECTION_STRING=DefaultEndpointsProtocol=https;AccountName=test;AccountKey=test;EndpointSuffix=core.windows.net\nSTORAGE_QUEUE_NAME=tasks\n```\n\n## \ud83e\uddea Testing\n\n```bash\n# Install development dependencies\nuv sync --all-extras\n\n# Run tests\nuv run pytest\n\n# Tests with coverage\nuv run pytest --cov=pythia\n\n# Linting\nuv run ruff check .\nuv run ruff format .\n```\n\n## \ud83c\udfd7\ufe0f Project Structure\n\n```\npythia/\n\u251c\u2500\u2500 core/                   # Core framework\n\u2502   \u251c\u2500\u2500 worker.py          # Base Worker class\n\u2502   \u251c\u2500\u2500 message.py         # Message abstraction\n\u2502   \u2514\u2500\u2500 lifecycle.py       # Lifecycle management\n\u251c\u2500\u2500 brokers/               # Broker adapters\n\u2502   \u251c\u2500\u2500 kafka/            # Kafka (Confluent)\n\u2502   \u251c\u2500\u2500 rabbitmq/         # RabbitMQ (aio-pika)\n\u2502   \u251c\u2500\u2500 redis/            # Redis Pub/Sub and Streams\n\u2502   \u2514\u2500\u2500 database/         # Database CDC and Sync\n\u251c\u2500\u2500 config/               # Configuration system\n\u251c\u2500\u2500 logging/              # Logging with Loguru\n\u2514\u2500\u2500 utils/                # Utilities\n```\n\n## \ud83d\uddfa\ufe0f Roadmap\n\n- [x] **Core Framework** - Worker base, lifecycle, configuration\n- [x] **Kafka Integration** - Consumer/Producer con confluent-kafka\n- [x] **RabbitMQ Support** - Consumer/Producer con aio-pika\n- [x] **Redis Support** - Pub/Sub y Streams\n- [x] **Database Workers** - CDC y sincronizaci\u00f3n con SQLAlchemy\n- [x] **HTTP Workers** - Webhooks, polling, HTTP clients\n- [X] **CLI Tools** - Worker generation, monitoring\n- [X] **Cloud Brokers** - AWS SQS/SNS, GCP Pub/Sub, Azure Service Bus\n- [ ] **Monitoring Dashboard** - Metrics, health checks, web UI\n\n## \ud83d\udcd6 Documentation\n\nVisit our complete documentation at: [https://ralonso20.github.io/pythia/](https://ralonso20.github.io/pythia/) (coming soon)\n\n## \ud83e\udd1d Contributing\n\n1. Fork the project\n2. Create your feature branch (`git checkout -b feature/amazing-feature`)\n3. Commit your changes (`git commit -m 'Add amazing feature'`)\n4. Push to the branch (`git push origin feature/amazing-feature`)\n5. Open a Pull Request\n\n## \ud83d\udcc4 License\n\nThis project is licensed under the MIT License. See [LICENSE](LICENSE) for more details.\n\n## \ud83c\udfaf Inspiration\n\nPythia is inspired by frameworks like:\n- **Celery** (Python) - For distributed workers\n- **Apache Kafka Streams** (Java) - For stream processing\n- **Spring Boot** (Java) - For automatic configuration\n- **Sidekiq** (Ruby) - For simplicity and elegance\n\n## \ud83d\udcde Support\n\n- \ud83d\udce7 **Issues**: [GitHub Issues](https://github.com/ralonso20/pythia/issues)\n- \ud83d\udcda **Documentation**: [**Documentation**](https://ralonso20.github.io/pythia/)\n- \ud83d\udcac **Discussions**: [GitHub Discussions](https://github.com/ralonso20/pythia/discussions)\n\n---\n\n**Pythia**: *From configuration complexity to business logic simplicity.*\n",
    "bugtrack_url": null,
    "license": null,
    "summary": "A modern library for creating efficient and scalable workers in Python",
    "version": "1.0.1",
    "project_urls": {
        "Bug Tracker": "https://github.com/pythia-framework/pythia/issues",
        "Documentation": "https://pythia-framework.github.io/pythia/",
        "Homepage": "https://github.com/pythia-framework/pythia",
        "Repository": "https://github.com/pythia-framework/pythia"
    },
    "split_keywords": [
        "async",
        " background-jobs",
        " distributed-systems",
        " kafka",
        " message-queue",
        " microservices",
        " rabbitmq",
        " redis",
        " worker"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "a8dd56674828453fef004191ba4f673bbd5410c4b77ac405b59c41cdda27289b",
                "md5": "afb0ec35b812b1d48e2138a4da494f1e",
                "sha256": "bf88e43369bf241d564e35115cb97de2e381dc076751fd4e4131a13f0d276a49"
            },
            "downloads": -1,
            "filename": "pythia_framework-1.0.1-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "afb0ec35b812b1d48e2138a4da494f1e",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.11",
            "size": 191151,
            "upload_time": "2025-09-02T22:52:17",
            "upload_time_iso_8601": "2025-09-02T22:52:17.236099Z",
            "url": "https://files.pythonhosted.org/packages/a8/dd/56674828453fef004191ba4f673bbd5410c4b77ac405b59c41cdda27289b/pythia_framework-1.0.1-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "a34e1b74c7a2ba865736bdc9592fb98c03323d287adc8c6fce43255f9ec5392e",
                "md5": "e06bd6e4b83d2b1eec011d671e5dfee5",
                "sha256": "577fe11957fbfee86d507dd456a5a563e9401b921cd87943909ca4fb10a3c998"
            },
            "downloads": -1,
            "filename": "pythia_framework-1.0.1.tar.gz",
            "has_sig": false,
            "md5_digest": "e06bd6e4b83d2b1eec011d671e5dfee5",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.11",
            "size": 2072784,
            "upload_time": "2025-09-02T22:52:18",
            "upload_time_iso_8601": "2025-09-02T22:52:18.507557Z",
            "url": "https://files.pythonhosted.org/packages/a3/4e/1b74c7a2ba865736bdc9592fb98c03323d287adc8c6fce43255f9ec5392e/pythia_framework-1.0.1.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-09-02 22:52:18",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "pythia-framework",
    "github_project": "pythia",
    "github_not_found": true,
    "lcname": "pythia-framework"
}
        
Elapsed time: 1.85325s