zephyrflow


Namezephyrflow JSON
Version 0.1.0 PyPI version JSON
download
home_pageNone
SummaryA messaging library supporting multiple backends
upload_time2025-01-16 02:16:46
maintainerNone
docs_urlNone
authorZbigniew Mastylo
requires_python<4.0,>=3.9
licenseNone
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # MsgFlow

MsgFlow is a powerful and flexible messaging library that provides a unified interface for working with multiple message brokers. It currently supports Kafka, RabbitMQ, and Redis, offering both synchronous and asynchronous clients.

## Features

- **Multiple Broker Support**: 
  - Apache Kafka
  - RabbitMQ
  - Redis Streams
- **Unified Interface**: Consistent API across all message brokers
- **Async Support**: Native async/await support for all clients
- **Type Safety**: Full type hints support
- **Error Handling**: Robust error handling and recovery mechanisms
- **Consumer Groups**: Support for consumer groups in Kafka and RabbitMQ
- **Exchange Bindings**: Advanced RabbitMQ exchange and queue bindings
- **Stream Processing**: Redis Streams support for stream processing

## Requirements

- Python 3.8+
- Redis 5.0+ (for Redis Streams support)
- Kafka 2.0+
- RabbitMQ 3.8+

## Installation

```bash
# Install with poetry (recommended)
poetry add msgflow

# Install with pip
pip install msgflow
```

## Quick Start

### Async Iterator Pattern

All async clients in MsgFlow implement the async iterator pattern, allowing you to use them in async for loops:

```python
async with client:  # Automatically connects and closes
    async for message in client:  # Uses receive() under the hood
        print(f"Received: {message}")
```

### Kafka Example

```python
from msgflow.kafka.async_client import AsyncKafkaClient

async def kafka_example():
    # Create a client
    client = AsyncKafkaClient(
        stream_name="my-topic",
        bootstrap_servers="localhost:9092"
    )
    
    # Using async context manager
    async with client:
        # Send messages
        await client.send("Hello Kafka!")
        
        # Receive messages
        async for message in client:
            print(f"Received: {message}")
```

### RabbitMQ Example

```python
from msgflow.rabbit.async_client import AsyncRabbitClient

async def rabbitmq_example():
    # Create a client
    client = AsyncRabbitClient(
        stream_name="my-routing-key",
        queue_name="my-queue",
        rabbitmq_url="amqp://guest:guest@localhost:5672/"
    )
    
    # Connect
    await client.connect()
    
    # Send messages
    await client.send("Hello RabbitMQ!")
    
    # Receive messages
    async for message in client.receive():
        print(f"Received: {message}")
        break
    
    # Close connection
    await client.close()
```

### Redis Example

```python
from msgflow.redis.async_client import AsyncRedisClient

async def redis_example():
    # Create a client
    client = AsyncRedisClient(
        stream_name="my-stream",
        redis_url="redis://localhost:6379"
    )
    
    # Connect
    await client.connect()
    
    # Send messages
    await client.send("Hello Redis!")
    
    # Receive messages
    async for message in client.receive():
        print(f"Received: {message}")
        break
    
    # Close connection
    await client.close()
```

## Configuration

### Environment Variables

- `KAFKA_BOOTSTRAP_SERVERS`: Kafka bootstrap servers (default: "localhost:9092")
- `RABBITMQ_URL`: RabbitMQ connection URL (default: "amqp://guest:guest@localhost:5672/")
- `REDIS_URL`: Redis connection URL (default: "redis://localhost:6379")

### Client Configuration

Each client accepts additional configuration parameters:

#### Kafka Client
- `group_id`: Consumer group ID
- `auto_offset_reset`: Offset reset strategy
- `security_protocol`: Security protocol
- `sasl_mechanism`: SASL mechanism
- `sasl_plain_username`: SASL username
- `sasl_plain_password`: SASL password

#### RabbitMQ Client
- `exchange_name`: Exchange name
- `exchange_type`: Exchange type
- `routing_key`: Routing key
- `queue_name`: Queue name
- `durable`: Queue durability
- `auto_delete`: Auto-delete queue

#### Redis Client
- `stream_max_len`: Maximum stream length
- `consumer_group`: Consumer group name
- `consumer_name`: Consumer name
- `block_ms`: Blocking time in milliseconds

## Advanced Usage

### Consumer Groups

```python
# Kafka Consumer Group
client = AsyncKafkaClient(
    stream_name="my-topic",
    group_id="my-group",
    bootstrap_servers="localhost:9092"
)

# RabbitMQ Consumer Group
client = AsyncRabbitClient(
    stream_name="my-routing-key",
    queue_name="my-queue",
    consumer_group="my-group",
    rabbitmq_url="amqp://guest:guest@localhost:5672/"
)

# Redis Consumer Group
client = AsyncRedisClient(
    stream_name="my-stream",
    consumer_group="my-group",
    redis_url="redis://localhost:6379"
)
```

### Error Handling

```python
try:
    await client.connect()
    await client.send("message")
except ConnectionError:
    # Handle connection errors
    pass
except TimeoutError:
    # Handle timeout errors
    pass
except Exception as e:
    # Handle other errors
    pass
finally:
    await client.close()
```

## Development

### Prerequisites

- Python 3.10+
- Poetry
- Docker (for running integration tests)

### Setup

```bash
# Clone the repository
git clone https://github.com/yourusername/msgflow.git
cd msgflow

# Install dependencies
poetry install

# Run tests
poetry run pytest
```

### Running Integration Tests

Start the required services:

```bash
docker-compose up -d
```

Run the integration tests:

```bash
poetry run pytest tests/integration
```

## Contributing

1. Fork the repository
2. Create your feature branch (`git checkout -b feature/amazing-feature`)
3. Commit your changes (`git commit -m 'Add amazing feature'`)
4. Push to the branch (`git push origin feature/amazing-feature`)
5. Open a Pull Request

## License

This project is licensed under the MIT License - see the LICENSE file for details.

## Acknowledgments

- [Apache Kafka](https://kafka.apache.org/)
- [RabbitMQ](https://www.rabbitmq.com/)
- [Redis](https://redis.io/)
- [aiokafka](https://github.com/aio-libs/aiokafka)
- [aio-pika](https://github.com/mosquito/aio-pika)
- [redis-py](https://github.com/redis/redis-py)

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "zephyrflow",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<4.0,>=3.9",
    "maintainer_email": null,
    "keywords": null,
    "author": "Zbigniew Mastylo",
    "author_email": "z.mastylo@protonmail.com",
    "download_url": "https://files.pythonhosted.org/packages/8c/06/d9d010949a130e924d9e5d15847b92fdee87dad3eec63e8f2f7a16f50767/zephyrflow-0.1.0.tar.gz",
    "platform": null,
    "description": "# MsgFlow\n\nMsgFlow is a powerful and flexible messaging library that provides a unified interface for working with multiple message brokers. It currently supports Kafka, RabbitMQ, and Redis, offering both synchronous and asynchronous clients.\n\n## Features\n\n- **Multiple Broker Support**: \n  - Apache Kafka\n  - RabbitMQ\n  - Redis Streams\n- **Unified Interface**: Consistent API across all message brokers\n- **Async Support**: Native async/await support for all clients\n- **Type Safety**: Full type hints support\n- **Error Handling**: Robust error handling and recovery mechanisms\n- **Consumer Groups**: Support for consumer groups in Kafka and RabbitMQ\n- **Exchange Bindings**: Advanced RabbitMQ exchange and queue bindings\n- **Stream Processing**: Redis Streams support for stream processing\n\n## Requirements\n\n- Python 3.8+\n- Redis 5.0+ (for Redis Streams support)\n- Kafka 2.0+\n- RabbitMQ 3.8+\n\n## Installation\n\n```bash\n# Install with poetry (recommended)\npoetry add msgflow\n\n# Install with pip\npip install msgflow\n```\n\n## Quick Start\n\n### Async Iterator Pattern\n\nAll async clients in MsgFlow implement the async iterator pattern, allowing you to use them in async for loops:\n\n```python\nasync with client:  # Automatically connects and closes\n    async for message in client:  # Uses receive() under the hood\n        print(f\"Received: {message}\")\n```\n\n### Kafka Example\n\n```python\nfrom msgflow.kafka.async_client import AsyncKafkaClient\n\nasync def kafka_example():\n    # Create a client\n    client = AsyncKafkaClient(\n        stream_name=\"my-topic\",\n        bootstrap_servers=\"localhost:9092\"\n    )\n    \n    # Using async context manager\n    async with client:\n        # Send messages\n        await client.send(\"Hello Kafka!\")\n        \n        # Receive messages\n        async for message in client:\n            print(f\"Received: {message}\")\n```\n\n### RabbitMQ Example\n\n```python\nfrom msgflow.rabbit.async_client import AsyncRabbitClient\n\nasync def rabbitmq_example():\n    # Create a client\n    client = AsyncRabbitClient(\n        stream_name=\"my-routing-key\",\n        queue_name=\"my-queue\",\n        rabbitmq_url=\"amqp://guest:guest@localhost:5672/\"\n    )\n    \n    # Connect\n    await client.connect()\n    \n    # Send messages\n    await client.send(\"Hello RabbitMQ!\")\n    \n    # Receive messages\n    async for message in client.receive():\n        print(f\"Received: {message}\")\n        break\n    \n    # Close connection\n    await client.close()\n```\n\n### Redis Example\n\n```python\nfrom msgflow.redis.async_client import AsyncRedisClient\n\nasync def redis_example():\n    # Create a client\n    client = AsyncRedisClient(\n        stream_name=\"my-stream\",\n        redis_url=\"redis://localhost:6379\"\n    )\n    \n    # Connect\n    await client.connect()\n    \n    # Send messages\n    await client.send(\"Hello Redis!\")\n    \n    # Receive messages\n    async for message in client.receive():\n        print(f\"Received: {message}\")\n        break\n    \n    # Close connection\n    await client.close()\n```\n\n## Configuration\n\n### Environment Variables\n\n- `KAFKA_BOOTSTRAP_SERVERS`: Kafka bootstrap servers (default: \"localhost:9092\")\n- `RABBITMQ_URL`: RabbitMQ connection URL (default: \"amqp://guest:guest@localhost:5672/\")\n- `REDIS_URL`: Redis connection URL (default: \"redis://localhost:6379\")\n\n### Client Configuration\n\nEach client accepts additional configuration parameters:\n\n#### Kafka Client\n- `group_id`: Consumer group ID\n- `auto_offset_reset`: Offset reset strategy\n- `security_protocol`: Security protocol\n- `sasl_mechanism`: SASL mechanism\n- `sasl_plain_username`: SASL username\n- `sasl_plain_password`: SASL password\n\n#### RabbitMQ Client\n- `exchange_name`: Exchange name\n- `exchange_type`: Exchange type\n- `routing_key`: Routing key\n- `queue_name`: Queue name\n- `durable`: Queue durability\n- `auto_delete`: Auto-delete queue\n\n#### Redis Client\n- `stream_max_len`: Maximum stream length\n- `consumer_group`: Consumer group name\n- `consumer_name`: Consumer name\n- `block_ms`: Blocking time in milliseconds\n\n## Advanced Usage\n\n### Consumer Groups\n\n```python\n# Kafka Consumer Group\nclient = AsyncKafkaClient(\n    stream_name=\"my-topic\",\n    group_id=\"my-group\",\n    bootstrap_servers=\"localhost:9092\"\n)\n\n# RabbitMQ Consumer Group\nclient = AsyncRabbitClient(\n    stream_name=\"my-routing-key\",\n    queue_name=\"my-queue\",\n    consumer_group=\"my-group\",\n    rabbitmq_url=\"amqp://guest:guest@localhost:5672/\"\n)\n\n# Redis Consumer Group\nclient = AsyncRedisClient(\n    stream_name=\"my-stream\",\n    consumer_group=\"my-group\",\n    redis_url=\"redis://localhost:6379\"\n)\n```\n\n### Error Handling\n\n```python\ntry:\n    await client.connect()\n    await client.send(\"message\")\nexcept ConnectionError:\n    # Handle connection errors\n    pass\nexcept TimeoutError:\n    # Handle timeout errors\n    pass\nexcept Exception as e:\n    # Handle other errors\n    pass\nfinally:\n    await client.close()\n```\n\n## Development\n\n### Prerequisites\n\n- Python 3.10+\n- Poetry\n- Docker (for running integration tests)\n\n### Setup\n\n```bash\n# Clone the repository\ngit clone https://github.com/yourusername/msgflow.git\ncd msgflow\n\n# Install dependencies\npoetry install\n\n# Run tests\npoetry run pytest\n```\n\n### Running Integration Tests\n\nStart the required services:\n\n```bash\ndocker-compose up -d\n```\n\nRun the integration tests:\n\n```bash\npoetry run pytest tests/integration\n```\n\n## Contributing\n\n1. Fork the repository\n2. Create your feature branch (`git checkout -b feature/amazing-feature`)\n3. Commit your changes (`git commit -m 'Add amazing feature'`)\n4. Push to the branch (`git push origin feature/amazing-feature`)\n5. Open a Pull Request\n\n## License\n\nThis project is licensed under the MIT License - see the LICENSE file for details.\n\n## Acknowledgments\n\n- [Apache Kafka](https://kafka.apache.org/)\n- [RabbitMQ](https://www.rabbitmq.com/)\n- [Redis](https://redis.io/)\n- [aiokafka](https://github.com/aio-libs/aiokafka)\n- [aio-pika](https://github.com/mosquito/aio-pika)\n- [redis-py](https://github.com/redis/redis-py)\n",
    "bugtrack_url": null,
    "license": null,
    "summary": "A messaging library supporting multiple backends",
    "version": "0.1.0",
    "project_urls": null,
    "split_keywords": [],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "3488c75cf40db459cac73b50f7d14438846d2f34d07e8c5bef04bf0329f140a8",
                "md5": "aa2bab25a619cc14b3410351b8b1d873",
                "sha256": "8d60593a39e223b39757105094056315448fa9b79080938a070af20f23b00fb6"
            },
            "downloads": -1,
            "filename": "zephyrflow-0.1.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "aa2bab25a619cc14b3410351b8b1d873",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<4.0,>=3.9",
            "size": 21846,
            "upload_time": "2025-01-16T02:16:42",
            "upload_time_iso_8601": "2025-01-16T02:16:42.454794Z",
            "url": "https://files.pythonhosted.org/packages/34/88/c75cf40db459cac73b50f7d14438846d2f34d07e8c5bef04bf0329f140a8/zephyrflow-0.1.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "8c06d9d010949a130e924d9e5d15847b92fdee87dad3eec63e8f2f7a16f50767",
                "md5": "a541959e53695189da23e1c70f26f8dd",
                "sha256": "d894f4f2868ebe47b1fc23ae5018100257c5e640ec88d056a101ec31abe8b394"
            },
            "downloads": -1,
            "filename": "zephyrflow-0.1.0.tar.gz",
            "has_sig": false,
            "md5_digest": "a541959e53695189da23e1c70f26f8dd",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<4.0,>=3.9",
            "size": 15550,
            "upload_time": "2025-01-16T02:16:46",
            "upload_time_iso_8601": "2025-01-16T02:16:46.022843Z",
            "url": "https://files.pythonhosted.org/packages/8c/06/d9d010949a130e924d9e5d15847b92fdee87dad3eec63e8f2f7a16f50767/zephyrflow-0.1.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-01-16 02:16:46",
    "github": false,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "lcname": "zephyrflow"
}
        
Elapsed time: 1.36090s