tchu


Nametchu JSON
Version 0.1.2 PyPI version JSON
download
home_pagehttps://github.com/sigularusrex/tchu
SummaryA simple RabbitMQ/Pika wrapper for publishing and consuming events
upload_time2025-09-02 19:47:41
maintainerNone
docs_urlNone
authorDavid Sigley
requires_python<4.0,>=3.7
licenseMIT
keywords rabbitmq pika amqp messaging
VCS
bugtrack_url
requirements pika
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # tchu

`tchu` is a lightweight Python wrapper around Pika/RabbitMQ that simplifies event publishing and consuming in distributed systems. It provides intuitive abstractions for common messaging patterns while handling the underlying RabbitMQ connection management.

[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
[![PyPI version](https://badge.fury.io/py/tchu.svg)](https://badge.fury.io/py/tchu)

## Features

- **Simple API** for publishing events and consuming them
- **ThreadedConsumer** for concurrent message processing
- **RPC-style messaging** with request-response pattern support
- **Automatic retries** with configurable backoff
- **Message deduplication** support with optional cache integration
- **Idle handlers** for periodic maintenance tasks
- **Comprehensive logging** of all messaging operations

## Installation

```bash
pip install tchu
```

## Usage

### Producer: Publishing Events

```python
from tchu import Producer

# Initialize a producer
producer = Producer(
    amqp_url="amqp://guest:guest@localhost:5672/",
    exchange="my-exchange",
    exchange_type="topic"
)

# Publish a message
producer.publish(
    routing_key="user.created",
    body={"user_id": "123", "name": "John Doe", "email": "john@example.com"}
)

# Publish a message and wait for a response (RPC-style)
try:
    response = producer.call(
        routing_key="user.validate",
        body={"user_id": "123", "email": "john@example.com"},
        timeout=5  # seconds
    )
    print(f"Response received: {response}")
except TimeoutError:
    print("No response received within timeout period")
```

### Consumer: Processing Events

#### Basic Consumer

```python
from tchu import Consumer

def message_handler(ch, method, properties, body, is_rpc):
    print(f"Received message: {body}")
    if is_rpc:
        # For RPC calls, return a response
        return json.dumps({"status": "success", "message": "Validation completed"})

# Initialize a consumer
consumer = Consumer(
    amqp_url="amqp://guest:guest@localhost:5672/",
    exchange="my-exchange",
    exchange_type="topic",
    routing_keys=["user.*"],  # Listen to all user events
    callback=message_handler,
    prefetch_count=10  # Process up to 10 messages at once
)

# Start consuming messages
consumer.run()
```

#### Threaded Consumer with Django Management Command

```python
# management/commands/listen_for_events.py
from tchu import ThreadedConsumer
from django.core.management.base import BaseCommand
from django.conf import settings
import json

def event_callback(ch, method, properties, body, is_rpc):
    try:
        print(f"Received event: {method.routing_key}")
        data = json.loads(body)
        
        # Process the event data
        # ...
        
        print("Event processed successfully")
    except Exception as e:
        print(f"Error processing event: {e}")


class Command(BaseCommand):
    help = "Starts a listener for RabbitMQ events"

    def handle(self, *args, **options):
        consumer = ThreadedConsumer(
            amqp_url=settings.RABBITMQ_BROKER_URL,
            exchange="app-events",
            exchange_type="topic",
            threads=5,  # Use 5 worker threads
            routing_keys=["user.*", "order.created"],
            callback=event_callback,
        )
        
        # Start consuming messages in a separate thread
        consumer.start()
        
        # Keep the main thread running
        self.stdout.write("Event listener started. Press Ctrl+C to stop.")
        try:
            consumer.join()
        except KeyboardInterrupt:
            self.stdout.write("Stopping event listener...")
```

### Advanced Features

#### Using with Cache for Message Deduplication

```python
from django.core.cache import cache
from tchu import ThreadedConsumer

# Cache adapter that implements the required interface
class DjangoCache:
    def add(self, key, value, timeout=300):
        return cache.add(key, value, timeout)

# Initialize consumer with cache
consumer = ThreadedConsumer(
    amqp_url="amqp://guest:guest@localhost:5672/",
    exchange="my-exchange",
    exchange_type="topic",
    routing_keys=["user.*"],
    callback=message_handler,
    cache=DjangoCache(),
    cache_key_prefix="myapp"  # Prefix for cache keys
)
```

#### Idle Handler for Periodic Tasks

```python
def maintenance_task():
    print("Performing periodic maintenance...")
    # Clean up resources, update statistics, etc.

consumer = Consumer(
    # ... other parameters
    idle_handler=maintenance_task,
    idle_interval=3600  # Run maintenance every hour
)
```

## API Reference

### AMQPClient

Base class for both Producer and Consumer.

- `__init__(amqp_url="amqp://guest:guest@localhost:5672/")`
- `setup_exchange(exchange, exchange_type)`
- `close()`

### Producer

- `__init__(amqp_url, exchange, exchange_type)`
- `publish(routing_key, body, content_type, delivery_mode)`
- `call(routing_key, body, content_type, delivery_mode, timeout)`

### Consumer

- `__init__(amqp_url, exchange, exchange_type, threads, routing_keys, callback, idle_handler, idle_interval, prefetch_count, cache, cache_key_prefix)`
- `run()`

### ThreadedConsumer

Extends Consumer to run in a separate thread.

## Development

1. Clone the repository
2. Install dependencies: `poetry install`
3. Run tests: `poetry run pytest`

## Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

## License

This project is licensed under the MIT License - see the LICENSE file for details.

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/sigularusrex/tchu",
    "name": "tchu",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<4.0,>=3.7",
    "maintainer_email": null,
    "keywords": "rabbitmq, pika, amqp, messaging",
    "author": "David Sigley",
    "author_email": "djsigley@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/f4/f8/f2975508bf75b83496454dac2a9751be3b3e97f521d51fa759e64a76a543/tchu-0.1.2.tar.gz",
    "platform": null,
    "description": "# tchu\n\n`tchu` is a lightweight Python wrapper around Pika/RabbitMQ that simplifies event publishing and consuming in distributed systems. It provides intuitive abstractions for common messaging patterns while handling the underlying RabbitMQ connection management.\n\n[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)\n[![PyPI version](https://badge.fury.io/py/tchu.svg)](https://badge.fury.io/py/tchu)\n\n## Features\n\n- **Simple API** for publishing events and consuming them\n- **ThreadedConsumer** for concurrent message processing\n- **RPC-style messaging** with request-response pattern support\n- **Automatic retries** with configurable backoff\n- **Message deduplication** support with optional cache integration\n- **Idle handlers** for periodic maintenance tasks\n- **Comprehensive logging** of all messaging operations\n\n## Installation\n\n```bash\npip install tchu\n```\n\n## Usage\n\n### Producer: Publishing Events\n\n```python\nfrom tchu import Producer\n\n# Initialize a producer\nproducer = Producer(\n    amqp_url=\"amqp://guest:guest@localhost:5672/\",\n    exchange=\"my-exchange\",\n    exchange_type=\"topic\"\n)\n\n# Publish a message\nproducer.publish(\n    routing_key=\"user.created\",\n    body={\"user_id\": \"123\", \"name\": \"John Doe\", \"email\": \"john@example.com\"}\n)\n\n# Publish a message and wait for a response (RPC-style)\ntry:\n    response = producer.call(\n        routing_key=\"user.validate\",\n        body={\"user_id\": \"123\", \"email\": \"john@example.com\"},\n        timeout=5  # seconds\n    )\n    print(f\"Response received: {response}\")\nexcept TimeoutError:\n    print(\"No response received within timeout period\")\n```\n\n### Consumer: Processing Events\n\n#### Basic Consumer\n\n```python\nfrom tchu import Consumer\n\ndef message_handler(ch, method, properties, body, is_rpc):\n    print(f\"Received message: {body}\")\n    if is_rpc:\n        # For RPC calls, return a response\n        return json.dumps({\"status\": \"success\", \"message\": \"Validation completed\"})\n\n# Initialize a consumer\nconsumer = Consumer(\n    amqp_url=\"amqp://guest:guest@localhost:5672/\",\n    exchange=\"my-exchange\",\n    exchange_type=\"topic\",\n    routing_keys=[\"user.*\"],  # Listen to all user events\n    callback=message_handler,\n    prefetch_count=10  # Process up to 10 messages at once\n)\n\n# Start consuming messages\nconsumer.run()\n```\n\n#### Threaded Consumer with Django Management Command\n\n```python\n# management/commands/listen_for_events.py\nfrom tchu import ThreadedConsumer\nfrom django.core.management.base import BaseCommand\nfrom django.conf import settings\nimport json\n\ndef event_callback(ch, method, properties, body, is_rpc):\n    try:\n        print(f\"Received event: {method.routing_key}\")\n        data = json.loads(body)\n        \n        # Process the event data\n        # ...\n        \n        print(\"Event processed successfully\")\n    except Exception as e:\n        print(f\"Error processing event: {e}\")\n\n\nclass Command(BaseCommand):\n    help = \"Starts a listener for RabbitMQ events\"\n\n    def handle(self, *args, **options):\n        consumer = ThreadedConsumer(\n            amqp_url=settings.RABBITMQ_BROKER_URL,\n            exchange=\"app-events\",\n            exchange_type=\"topic\",\n            threads=5,  # Use 5 worker threads\n            routing_keys=[\"user.*\", \"order.created\"],\n            callback=event_callback,\n        )\n        \n        # Start consuming messages in a separate thread\n        consumer.start()\n        \n        # Keep the main thread running\n        self.stdout.write(\"Event listener started. Press Ctrl+C to stop.\")\n        try:\n            consumer.join()\n        except KeyboardInterrupt:\n            self.stdout.write(\"Stopping event listener...\")\n```\n\n### Advanced Features\n\n#### Using with Cache for Message Deduplication\n\n```python\nfrom django.core.cache import cache\nfrom tchu import ThreadedConsumer\n\n# Cache adapter that implements the required interface\nclass DjangoCache:\n    def add(self, key, value, timeout=300):\n        return cache.add(key, value, timeout)\n\n# Initialize consumer with cache\nconsumer = ThreadedConsumer(\n    amqp_url=\"amqp://guest:guest@localhost:5672/\",\n    exchange=\"my-exchange\",\n    exchange_type=\"topic\",\n    routing_keys=[\"user.*\"],\n    callback=message_handler,\n    cache=DjangoCache(),\n    cache_key_prefix=\"myapp\"  # Prefix for cache keys\n)\n```\n\n#### Idle Handler for Periodic Tasks\n\n```python\ndef maintenance_task():\n    print(\"Performing periodic maintenance...\")\n    # Clean up resources, update statistics, etc.\n\nconsumer = Consumer(\n    # ... other parameters\n    idle_handler=maintenance_task,\n    idle_interval=3600  # Run maintenance every hour\n)\n```\n\n## API Reference\n\n### AMQPClient\n\nBase class for both Producer and Consumer.\n\n- `__init__(amqp_url=\"amqp://guest:guest@localhost:5672/\")`\n- `setup_exchange(exchange, exchange_type)`\n- `close()`\n\n### Producer\n\n- `__init__(amqp_url, exchange, exchange_type)`\n- `publish(routing_key, body, content_type, delivery_mode)`\n- `call(routing_key, body, content_type, delivery_mode, timeout)`\n\n### Consumer\n\n- `__init__(amqp_url, exchange, exchange_type, threads, routing_keys, callback, idle_handler, idle_interval, prefetch_count, cache, cache_key_prefix)`\n- `run()`\n\n### ThreadedConsumer\n\nExtends Consumer to run in a separate thread.\n\n## Development\n\n1. Clone the repository\n2. Install dependencies: `poetry install`\n3. Run tests: `poetry run pytest`\n\n## Contributing\n\nContributions are welcome! Please feel free to submit a Pull Request.\n\n## License\n\nThis project is licensed under the MIT License - see the LICENSE file for details.\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "A simple RabbitMQ/Pika wrapper for publishing and consuming events",
    "version": "0.1.2",
    "project_urls": {
        "Documentation": "https://github.com/sigularusrex/tchu",
        "Homepage": "https://github.com/sigularusrex/tchu",
        "Repository": "https://github.com/sigularusrex/tchu"
    },
    "split_keywords": [
        "rabbitmq",
        " pika",
        " amqp",
        " messaging"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "e2a156508dab4d5fdc19e0d9d29e8935339b84eb8cde0cbab6ce2cbb7aed20e0",
                "md5": "438e068e16f0d555c7ee40bcca36264d",
                "sha256": "5c4ef01228e0b12eb38e26a1295d58f65633b14412c1fa97d000bdf017733308"
            },
            "downloads": -1,
            "filename": "tchu-0.1.2-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "438e068e16f0d555c7ee40bcca36264d",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<4.0,>=3.7",
            "size": 12033,
            "upload_time": "2025-09-02T19:47:39",
            "upload_time_iso_8601": "2025-09-02T19:47:39.980066Z",
            "url": "https://files.pythonhosted.org/packages/e2/a1/56508dab4d5fdc19e0d9d29e8935339b84eb8cde0cbab6ce2cbb7aed20e0/tchu-0.1.2-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "f4f8f2975508bf75b83496454dac2a9751be3b3e97f521d51fa759e64a76a543",
                "md5": "e83ba50a93c6c1a219061f6d27e57fdb",
                "sha256": "afbd60b1e3de9445ad2da7f3381a2fc29d4a73585296c0cffd48c9b3e06c56fc"
            },
            "downloads": -1,
            "filename": "tchu-0.1.2.tar.gz",
            "has_sig": false,
            "md5_digest": "e83ba50a93c6c1a219061f6d27e57fdb",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<4.0,>=3.7",
            "size": 10868,
            "upload_time": "2025-09-02T19:47:41",
            "upload_time_iso_8601": "2025-09-02T19:47:41.244316Z",
            "url": "https://files.pythonhosted.org/packages/f4/f8/f2975508bf75b83496454dac2a9751be3b3e97f521d51fa759e64a76a543/tchu-0.1.2.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-09-02 19:47:41",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "sigularusrex",
    "github_project": "tchu",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": false,
    "requirements": [
        {
            "name": "pika",
            "specs": [
                [
                    ">=",
                    "1.2.0"
                ]
            ]
        }
    ],
    "lcname": "tchu"
}
        
Elapsed time: 0.93670s