# tchu
`tchu` is a lightweight Python wrapper around Pika/RabbitMQ that simplifies event publishing and consuming in distributed systems. It provides intuitive abstractions for common messaging patterns while handling the underlying RabbitMQ connection management.
[](https://opensource.org/licenses/MIT)
[](https://badge.fury.io/py/tchu)
## Features
- **Simple API** for publishing events and consuming them
- **ThreadedConsumer** for concurrent message processing
- **RPC-style messaging** with request-response pattern support
- **Automatic retries** with configurable backoff
- **Message deduplication** support with optional cache integration
- **Idle handlers** for periodic maintenance tasks
- **Comprehensive logging** of all messaging operations
## Installation
```bash
pip install tchu
```
## Usage
### Producer: Publishing Events
```python
from tchu import Producer
# Initialize a producer
producer = Producer(
amqp_url="amqp://guest:guest@localhost:5672/",
exchange="my-exchange",
exchange_type="topic"
)
# Publish a message
producer.publish(
routing_key="user.created",
body={"user_id": "123", "name": "John Doe", "email": "john@example.com"}
)
# Publish a message and wait for a response (RPC-style)
try:
response = producer.call(
routing_key="user.validate",
body={"user_id": "123", "email": "john@example.com"},
timeout=5 # seconds
)
print(f"Response received: {response}")
except TimeoutError:
print("No response received within timeout period")
```
### Consumer: Processing Events
#### Basic Consumer
```python
from tchu import Consumer
def message_handler(ch, method, properties, body, is_rpc):
print(f"Received message: {body}")
if is_rpc:
# For RPC calls, return a response
return json.dumps({"status": "success", "message": "Validation completed"})
# Initialize a consumer
consumer = Consumer(
amqp_url="amqp://guest:guest@localhost:5672/",
exchange="my-exchange",
exchange_type="topic",
routing_keys=["user.*"], # Listen to all user events
callback=message_handler,
prefetch_count=10 # Process up to 10 messages at once
)
# Start consuming messages
consumer.run()
```
#### Threaded Consumer with Django Management Command
```python
# management/commands/listen_for_events.py
from tchu import ThreadedConsumer
from django.core.management.base import BaseCommand
from django.conf import settings
import json
def event_callback(ch, method, properties, body, is_rpc):
try:
print(f"Received event: {method.routing_key}")
data = json.loads(body)
# Process the event data
# ...
print("Event processed successfully")
except Exception as e:
print(f"Error processing event: {e}")
class Command(BaseCommand):
help = "Starts a listener for RabbitMQ events"
def handle(self, *args, **options):
consumer = ThreadedConsumer(
amqp_url=settings.RABBITMQ_BROKER_URL,
exchange="app-events",
exchange_type="topic",
threads=5, # Use 5 worker threads
routing_keys=["user.*", "order.created"],
callback=event_callback,
)
# Start consuming messages in a separate thread
consumer.start()
# Keep the main thread running
self.stdout.write("Event listener started. Press Ctrl+C to stop.")
try:
consumer.join()
except KeyboardInterrupt:
self.stdout.write("Stopping event listener...")
```
### Advanced Features
#### Using with Cache for Message Deduplication
```python
from django.core.cache import cache
from tchu import ThreadedConsumer
# Cache adapter that implements the required interface
class DjangoCache:
def add(self, key, value, timeout=300):
return cache.add(key, value, timeout)
# Initialize consumer with cache
consumer = ThreadedConsumer(
amqp_url="amqp://guest:guest@localhost:5672/",
exchange="my-exchange",
exchange_type="topic",
routing_keys=["user.*"],
callback=message_handler,
cache=DjangoCache(),
cache_key_prefix="myapp" # Prefix for cache keys
)
```
#### Idle Handler for Periodic Tasks
```python
def maintenance_task():
print("Performing periodic maintenance...")
# Clean up resources, update statistics, etc.
consumer = Consumer(
# ... other parameters
idle_handler=maintenance_task,
idle_interval=3600 # Run maintenance every hour
)
```
## API Reference
### AMQPClient
Base class for both Producer and Consumer.
- `__init__(amqp_url="amqp://guest:guest@localhost:5672/")`
- `setup_exchange(exchange, exchange_type)`
- `close()`
### Producer
- `__init__(amqp_url, exchange, exchange_type)`
- `publish(routing_key, body, content_type, delivery_mode)`
- `call(routing_key, body, content_type, delivery_mode, timeout)`
### Consumer
- `__init__(amqp_url, exchange, exchange_type, threads, routing_keys, callback, idle_handler, idle_interval, prefetch_count, cache, cache_key_prefix)`
- `run()`
### ThreadedConsumer
Extends Consumer to run in a separate thread.
## Development
1. Clone the repository
2. Install dependencies: `poetry install`
3. Run tests: `poetry run pytest`
## Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
## License
This project is licensed under the MIT License - see the LICENSE file for details.
Raw data
{
"_id": null,
"home_page": "https://github.com/sigularusrex/tchu",
"name": "tchu",
"maintainer": null,
"docs_url": null,
"requires_python": "<4.0,>=3.7",
"maintainer_email": null,
"keywords": "rabbitmq, pika, amqp, messaging",
"author": "David Sigley",
"author_email": "djsigley@gmail.com",
"download_url": "https://files.pythonhosted.org/packages/f4/f8/f2975508bf75b83496454dac2a9751be3b3e97f521d51fa759e64a76a543/tchu-0.1.2.tar.gz",
"platform": null,
"description": "# tchu\n\n`tchu` is a lightweight Python wrapper around Pika/RabbitMQ that simplifies event publishing and consuming in distributed systems. It provides intuitive abstractions for common messaging patterns while handling the underlying RabbitMQ connection management.\n\n[](https://opensource.org/licenses/MIT)\n[](https://badge.fury.io/py/tchu)\n\n## Features\n\n- **Simple API** for publishing events and consuming them\n- **ThreadedConsumer** for concurrent message processing\n- **RPC-style messaging** with request-response pattern support\n- **Automatic retries** with configurable backoff\n- **Message deduplication** support with optional cache integration\n- **Idle handlers** for periodic maintenance tasks\n- **Comprehensive logging** of all messaging operations\n\n## Installation\n\n```bash\npip install tchu\n```\n\n## Usage\n\n### Producer: Publishing Events\n\n```python\nfrom tchu import Producer\n\n# Initialize a producer\nproducer = Producer(\n amqp_url=\"amqp://guest:guest@localhost:5672/\",\n exchange=\"my-exchange\",\n exchange_type=\"topic\"\n)\n\n# Publish a message\nproducer.publish(\n routing_key=\"user.created\",\n body={\"user_id\": \"123\", \"name\": \"John Doe\", \"email\": \"john@example.com\"}\n)\n\n# Publish a message and wait for a response (RPC-style)\ntry:\n response = producer.call(\n routing_key=\"user.validate\",\n body={\"user_id\": \"123\", \"email\": \"john@example.com\"},\n timeout=5 # seconds\n )\n print(f\"Response received: {response}\")\nexcept TimeoutError:\n print(\"No response received within timeout period\")\n```\n\n### Consumer: Processing Events\n\n#### Basic Consumer\n\n```python\nfrom tchu import Consumer\n\ndef message_handler(ch, method, properties, body, is_rpc):\n print(f\"Received message: {body}\")\n if is_rpc:\n # For RPC calls, return a response\n return json.dumps({\"status\": \"success\", \"message\": \"Validation completed\"})\n\n# Initialize a consumer\nconsumer = Consumer(\n amqp_url=\"amqp://guest:guest@localhost:5672/\",\n exchange=\"my-exchange\",\n exchange_type=\"topic\",\n routing_keys=[\"user.*\"], # Listen to all user events\n callback=message_handler,\n prefetch_count=10 # Process up to 10 messages at once\n)\n\n# Start consuming messages\nconsumer.run()\n```\n\n#### Threaded Consumer with Django Management Command\n\n```python\n# management/commands/listen_for_events.py\nfrom tchu import ThreadedConsumer\nfrom django.core.management.base import BaseCommand\nfrom django.conf import settings\nimport json\n\ndef event_callback(ch, method, properties, body, is_rpc):\n try:\n print(f\"Received event: {method.routing_key}\")\n data = json.loads(body)\n \n # Process the event data\n # ...\n \n print(\"Event processed successfully\")\n except Exception as e:\n print(f\"Error processing event: {e}\")\n\n\nclass Command(BaseCommand):\n help = \"Starts a listener for RabbitMQ events\"\n\n def handle(self, *args, **options):\n consumer = ThreadedConsumer(\n amqp_url=settings.RABBITMQ_BROKER_URL,\n exchange=\"app-events\",\n exchange_type=\"topic\",\n threads=5, # Use 5 worker threads\n routing_keys=[\"user.*\", \"order.created\"],\n callback=event_callback,\n )\n \n # Start consuming messages in a separate thread\n consumer.start()\n \n # Keep the main thread running\n self.stdout.write(\"Event listener started. Press Ctrl+C to stop.\")\n try:\n consumer.join()\n except KeyboardInterrupt:\n self.stdout.write(\"Stopping event listener...\")\n```\n\n### Advanced Features\n\n#### Using with Cache for Message Deduplication\n\n```python\nfrom django.core.cache import cache\nfrom tchu import ThreadedConsumer\n\n# Cache adapter that implements the required interface\nclass DjangoCache:\n def add(self, key, value, timeout=300):\n return cache.add(key, value, timeout)\n\n# Initialize consumer with cache\nconsumer = ThreadedConsumer(\n amqp_url=\"amqp://guest:guest@localhost:5672/\",\n exchange=\"my-exchange\",\n exchange_type=\"topic\",\n routing_keys=[\"user.*\"],\n callback=message_handler,\n cache=DjangoCache(),\n cache_key_prefix=\"myapp\" # Prefix for cache keys\n)\n```\n\n#### Idle Handler for Periodic Tasks\n\n```python\ndef maintenance_task():\n print(\"Performing periodic maintenance...\")\n # Clean up resources, update statistics, etc.\n\nconsumer = Consumer(\n # ... other parameters\n idle_handler=maintenance_task,\n idle_interval=3600 # Run maintenance every hour\n)\n```\n\n## API Reference\n\n### AMQPClient\n\nBase class for both Producer and Consumer.\n\n- `__init__(amqp_url=\"amqp://guest:guest@localhost:5672/\")`\n- `setup_exchange(exchange, exchange_type)`\n- `close()`\n\n### Producer\n\n- `__init__(amqp_url, exchange, exchange_type)`\n- `publish(routing_key, body, content_type, delivery_mode)`\n- `call(routing_key, body, content_type, delivery_mode, timeout)`\n\n### Consumer\n\n- `__init__(amqp_url, exchange, exchange_type, threads, routing_keys, callback, idle_handler, idle_interval, prefetch_count, cache, cache_key_prefix)`\n- `run()`\n\n### ThreadedConsumer\n\nExtends Consumer to run in a separate thread.\n\n## Development\n\n1. Clone the repository\n2. Install dependencies: `poetry install`\n3. Run tests: `poetry run pytest`\n\n## Contributing\n\nContributions are welcome! Please feel free to submit a Pull Request.\n\n## License\n\nThis project is licensed under the MIT License - see the LICENSE file for details.\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "A simple RabbitMQ/Pika wrapper for publishing and consuming events",
"version": "0.1.2",
"project_urls": {
"Documentation": "https://github.com/sigularusrex/tchu",
"Homepage": "https://github.com/sigularusrex/tchu",
"Repository": "https://github.com/sigularusrex/tchu"
},
"split_keywords": [
"rabbitmq",
" pika",
" amqp",
" messaging"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "e2a156508dab4d5fdc19e0d9d29e8935339b84eb8cde0cbab6ce2cbb7aed20e0",
"md5": "438e068e16f0d555c7ee40bcca36264d",
"sha256": "5c4ef01228e0b12eb38e26a1295d58f65633b14412c1fa97d000bdf017733308"
},
"downloads": -1,
"filename": "tchu-0.1.2-py3-none-any.whl",
"has_sig": false,
"md5_digest": "438e068e16f0d555c7ee40bcca36264d",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "<4.0,>=3.7",
"size": 12033,
"upload_time": "2025-09-02T19:47:39",
"upload_time_iso_8601": "2025-09-02T19:47:39.980066Z",
"url": "https://files.pythonhosted.org/packages/e2/a1/56508dab4d5fdc19e0d9d29e8935339b84eb8cde0cbab6ce2cbb7aed20e0/tchu-0.1.2-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "f4f8f2975508bf75b83496454dac2a9751be3b3e97f521d51fa759e64a76a543",
"md5": "e83ba50a93c6c1a219061f6d27e57fdb",
"sha256": "afbd60b1e3de9445ad2da7f3381a2fc29d4a73585296c0cffd48c9b3e06c56fc"
},
"downloads": -1,
"filename": "tchu-0.1.2.tar.gz",
"has_sig": false,
"md5_digest": "e83ba50a93c6c1a219061f6d27e57fdb",
"packagetype": "sdist",
"python_version": "source",
"requires_python": "<4.0,>=3.7",
"size": 10868,
"upload_time": "2025-09-02T19:47:41",
"upload_time_iso_8601": "2025-09-02T19:47:41.244316Z",
"url": "https://files.pythonhosted.org/packages/f4/f8/f2975508bf75b83496454dac2a9751be3b3e97f521d51fa759e64a76a543/tchu-0.1.2.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-09-02 19:47:41",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "sigularusrex",
"github_project": "tchu",
"travis_ci": false,
"coveralls": false,
"github_actions": false,
"requirements": [
{
"name": "pika",
"specs": [
[
">=",
"1.2.0"
]
]
}
],
"lcname": "tchu"
}