aio-websocket-pool


Nameaio-websocket-pool JSON
Version 0.1.0 PyPI version JSON
download
home_pagehttps://github.com/your-username/aio-websocket-pool
SummaryAsynchronous WebSocket connection pool for Python with connection reuse, health monitoring, and automatic cleanup
upload_time2025-07-13 20:16:07
maintainerNone
docs_urlNone
authorBoChen SHEN
requires_python<3.13,>=3.11
licenseMIT
keywords websocket connection-pool async asyncio websockets pool
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # aio-websocket-pool

A flexible, async-friendly WebSocket connection pool implementation for Python with connection reuse, health monitoring,
and automatic cleanup.

## Features

- **Connection Reuse**: Efficient connection pooling with automatic reuse of healthy connections
- **Health Monitoring**: Automatic connection health checks and cleanup of disconnected connections
- **Thread-safe**: Uses asyncio locks for concurrent access in async environments
- **Configurable Limits**: Set maximum connection limits and timeouts
- **Retry Logic**: Built-in exponential backoff retry mechanism for connection failures
- **Server Data Draining**: Optional server data detection and draining during connection release
- **Context Manager Support**: Clean, intuitive API with async context managers
- **Idle Connection Cleanup**: Automatic cleanup of idle connections to prevent resource leaks
- **Type-safe**: Full typing support with comprehensive type hints

## Installation

```bash
pip install aio-websocket-pool
```

Or using Poetry:

```bash
poetry add aio-websocket-pool
```

## Quick Start

```python
import asyncio
from aio_websocket_pool import WebsocketConnectionPool


async def main():
    # Create a connection pool
    async with WebsocketConnectionPool(uri="ws://localhost:8080") as pool:
        # Get a connection from the pool
        async with pool.get_connection() as conn:
            await conn.send("Hello, World!")
            response = await conn.recv()
            print(response)


asyncio.run(main())
```

## Advanced Usage

### Basic Pool Configuration

```python
from aio_websocket_pool import WebsocketConnectionPool

# Create a pool with custom configuration
pool = WebsocketConnectionPool(
    uri="ws://localhost:8080",
    max_connections=10,
    idle_timeout=60.0,
    connection_timeout=10.0,
    max_retries=3,
    warmup_connections=2,  # Pre-create 2 connections
    headers={"Authorization": "Bearer token123"}
)

async with pool:
    # Pool is ready to use
    conn = await pool.acquire()
    try:
        await conn.send("Hello")
        response = await conn.recv()
        print(response)
    finally:
        await pool.release(conn)
```

### Connection Health Monitoring

```python
# Enable server data checking during connection release
pool = WebsocketConnectionPool(
    uri="ws://localhost:8080",
    check_server_data_on_release=True,
    drain_timeout=5.0,
    drain_quiet_period=1.0
)

async with pool:
    async with pool.get_connection() as conn:
        await conn.send("REQUEST")
        response = await conn.recv()
        print(response)
        # Connection will be checked for additional server data
        # before being returned to the pool
```

### Custom Drain Condition

```python
def custom_drain_condition(message):
    # Only drain messages that look like server notifications
    return message.startswith(b'{"type":"notification"')


pool = WebsocketConnectionPool(
    uri="ws://localhost:8080",
    check_server_data_on_release=True,
    drain_condition=custom_drain_condition
)
```

### Force New Connection

```python
async with pool:
    # Force create a new connection instead of reusing existing ones
    async with pool.get_new_connection() as conn:
        await conn.send("Important message")
        response = await conn.recv()
        print(response)
```

### Connection Pool Monitoring

```python
async with pool:
    print(f"Total connections: {pool.total_connections}")
    print(f"Available connections: {pool.available_connections}")
    print(f"Busy connections: {pool.busy_connections}")
    print(f"Pending connections: {pool.pending_connections}")
```

### Manual Connection Management

```python
async with pool:
    # Manually acquire and release connections
    conn = await pool.acquire()
    try:
        await conn.send("Hello")
        response = await conn.recv()
        print(response)
    finally:
        # Release connection back to pool
        await pool.release(conn)

    # Force remove a connection from the pool
    conn = await pool.acquire()
    try:
        await conn.send("Hello")
        response = await conn.recv()
        print(response)
    finally:
        # Remove connection instead of returning to pool
        await pool.release(conn, force_remove=True)
```

## API Reference

### WebsocketConnectionPool

#### Constructor

```
WebsocketConnectionPool(
    uri: str,
    headers: Dict[str, str] | None = None,
    idle_timeout: float = 60.0,
    max_connections: int = 50,
    max_retries: int = 3,
    cleanup_interval: float = 5.0,
    connection_timeout: float = 10.0,
    warmup_connections: int = 0,
    check_server_data_on_release: bool = False,
    drain_timeout: float = 10.0,
    drain_quiet_period: float = 2.0,
    drain_condition: DrainConditionCallback | None = None,
    **kwargs
)
```

Creates a new WebSocket connection pool.

#### Properties

- `total_connections: int` - Total number of connections in the pool
- `available_connections: int` - Number of available connections
- `busy_connections: int` - Number of connections currently in use
- `pending_connections: int` - Number of connections being drained
- `is_closed: bool` - Whether the pool is closed
- `is_started: bool` - Whether the pool has been started

#### Methods

- `async start()` - Start the connection pool
- `async acquire() -> WebsocketConnection` - Acquire a connection from the pool
- `async acquire_new() -> WebsocketConnection` - Force acquire a new connection
- `async release(connection, *, force_remove=False)` - Release a connection back to the pool
- `async get_connection()` - Context manager for acquiring/releasing connections
- `async get_new_connection()` - Context manager for acquiring/releasing new connections
- `async close_all()` - Close all connections and shut down the pool

### WebsocketConnection

#### Properties

- `is_busy: bool` - Whether the connection is currently in use
- `is_draining: bool` - Whether the connection is being drained
- `is_connected: bool` - Whether the connection is active
- `last_activity: float` - Timestamp of last activity

#### Methods

- `async send(message: str | bytes)` - Send a message through the connection
- `async recv() -> str | bytes` - Receive a message from the connection
- `async ping()` - Send a ping to check connection health
- `async connect()` - Establish the WebSocket connection
- `async close()` - Close the connection

## Error Handling

The library provides several specific exception types:

```python
from aio_websocket_pool import (
    WebsocketError,
    ConnectionBusyError,
    ConnectionUnavailableError,
    ConnectionClosedError,
    ConnectionPoolExhaustedError,
    ConnectionPoolUnavailableError,
)

try:
    async with pool.get_connection() as conn:
        await conn.send("Hello")
        response = await conn.recv()
except ConnectionPoolExhaustedError:
    print("No connections available in pool")
except ConnectionClosedError:
    print("Connection was closed unexpectedly")
except WebsocketError as e:
    print(f"WebSocket error: {e}")
```

## Configuration Best Practices

### For High-Traffic Applications

```python
pool = WebsocketConnectionPool(
    uri="ws://your-server.com/ws",
    max_connections=100,
    warmup_connections=10,
    idle_timeout=120.0,
    connection_timeout=5.0,
    max_retries=5,
    cleanup_interval=10.0
)
```

### For Low-Latency Applications

```python
pool = WebsocketConnectionPool(
    uri="ws://your-server.com/ws",
    max_connections=50,
    warmup_connections=5,
    idle_timeout=30.0,
    connection_timeout=3.0,
    check_server_data_on_release=True,
    drain_timeout=2.0,
    drain_quiet_period=0.5
)
```

### For Resource-Constrained Environments

```python
pool = WebsocketConnectionPool(
    uri="ws://your-server.com/ws",
    max_connections=5,
    warmup_connections=1,
    idle_timeout=30.0,
    connection_timeout=10.0,
    max_retries=2,
    cleanup_interval=5.0
)
```

## Requirements

- Python 3.11+
- websockets >= 15.0.1
- tenacity >= 9.1.2

## License

This project is licensed under the MIT License.

## Development

### Setup

```bash
poetry install
```

### Running Tests

```bash
poetry run pytest
```

### Code Quality

```bash
poetry run black .
poetry run isort .
poetry run flake8 .
poetry run mypy .
```

### Building

```bash
poetry build
```
            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/your-username/aio-websocket-pool",
    "name": "aio-websocket-pool",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<3.13,>=3.11",
    "maintainer_email": null,
    "keywords": "websocket, connection-pool, async, asyncio, websockets, pool",
    "author": "BoChen SHEN",
    "author_email": "6goddddddd@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/46/51/8ed5a08c1f58aa2b4bdec8fbf30d07d95a079e576e3412d84eb77a77e4e0/aio_websocket_pool-0.1.0.tar.gz",
    "platform": null,
    "description": "# aio-websocket-pool\n\nA flexible, async-friendly WebSocket connection pool implementation for Python with connection reuse, health monitoring,\nand automatic cleanup.\n\n## Features\n\n- **Connection Reuse**: Efficient connection pooling with automatic reuse of healthy connections\n- **Health Monitoring**: Automatic connection health checks and cleanup of disconnected connections\n- **Thread-safe**: Uses asyncio locks for concurrent access in async environments\n- **Configurable Limits**: Set maximum connection limits and timeouts\n- **Retry Logic**: Built-in exponential backoff retry mechanism for connection failures\n- **Server Data Draining**: Optional server data detection and draining during connection release\n- **Context Manager Support**: Clean, intuitive API with async context managers\n- **Idle Connection Cleanup**: Automatic cleanup of idle connections to prevent resource leaks\n- **Type-safe**: Full typing support with comprehensive type hints\n\n## Installation\n\n```bash\npip install aio-websocket-pool\n```\n\nOr using Poetry:\n\n```bash\npoetry add aio-websocket-pool\n```\n\n## Quick Start\n\n```python\nimport asyncio\nfrom aio_websocket_pool import WebsocketConnectionPool\n\n\nasync def main():\n    # Create a connection pool\n    async with WebsocketConnectionPool(uri=\"ws://localhost:8080\") as pool:\n        # Get a connection from the pool\n        async with pool.get_connection() as conn:\n            await conn.send(\"Hello, World!\")\n            response = await conn.recv()\n            print(response)\n\n\nasyncio.run(main())\n```\n\n## Advanced Usage\n\n### Basic Pool Configuration\n\n```python\nfrom aio_websocket_pool import WebsocketConnectionPool\n\n# Create a pool with custom configuration\npool = WebsocketConnectionPool(\n    uri=\"ws://localhost:8080\",\n    max_connections=10,\n    idle_timeout=60.0,\n    connection_timeout=10.0,\n    max_retries=3,\n    warmup_connections=2,  # Pre-create 2 connections\n    headers={\"Authorization\": \"Bearer token123\"}\n)\n\nasync with pool:\n    # Pool is ready to use\n    conn = await pool.acquire()\n    try:\n        await conn.send(\"Hello\")\n        response = await conn.recv()\n        print(response)\n    finally:\n        await pool.release(conn)\n```\n\n### Connection Health Monitoring\n\n```python\n# Enable server data checking during connection release\npool = WebsocketConnectionPool(\n    uri=\"ws://localhost:8080\",\n    check_server_data_on_release=True,\n    drain_timeout=5.0,\n    drain_quiet_period=1.0\n)\n\nasync with pool:\n    async with pool.get_connection() as conn:\n        await conn.send(\"REQUEST\")\n        response = await conn.recv()\n        print(response)\n        # Connection will be checked for additional server data\n        # before being returned to the pool\n```\n\n### Custom Drain Condition\n\n```python\ndef custom_drain_condition(message):\n    # Only drain messages that look like server notifications\n    return message.startswith(b'{\"type\":\"notification\"')\n\n\npool = WebsocketConnectionPool(\n    uri=\"ws://localhost:8080\",\n    check_server_data_on_release=True,\n    drain_condition=custom_drain_condition\n)\n```\n\n### Force New Connection\n\n```python\nasync with pool:\n    # Force create a new connection instead of reusing existing ones\n    async with pool.get_new_connection() as conn:\n        await conn.send(\"Important message\")\n        response = await conn.recv()\n        print(response)\n```\n\n### Connection Pool Monitoring\n\n```python\nasync with pool:\n    print(f\"Total connections: {pool.total_connections}\")\n    print(f\"Available connections: {pool.available_connections}\")\n    print(f\"Busy connections: {pool.busy_connections}\")\n    print(f\"Pending connections: {pool.pending_connections}\")\n```\n\n### Manual Connection Management\n\n```python\nasync with pool:\n    # Manually acquire and release connections\n    conn = await pool.acquire()\n    try:\n        await conn.send(\"Hello\")\n        response = await conn.recv()\n        print(response)\n    finally:\n        # Release connection back to pool\n        await pool.release(conn)\n\n    # Force remove a connection from the pool\n    conn = await pool.acquire()\n    try:\n        await conn.send(\"Hello\")\n        response = await conn.recv()\n        print(response)\n    finally:\n        # Remove connection instead of returning to pool\n        await pool.release(conn, force_remove=True)\n```\n\n## API Reference\n\n### WebsocketConnectionPool\n\n#### Constructor\n\n```\nWebsocketConnectionPool(\n    uri: str,\n    headers: Dict[str, str] | None = None,\n    idle_timeout: float = 60.0,\n    max_connections: int = 50,\n    max_retries: int = 3,\n    cleanup_interval: float = 5.0,\n    connection_timeout: float = 10.0,\n    warmup_connections: int = 0,\n    check_server_data_on_release: bool = False,\n    drain_timeout: float = 10.0,\n    drain_quiet_period: float = 2.0,\n    drain_condition: DrainConditionCallback | None = None,\n    **kwargs\n)\n```\n\nCreates a new WebSocket connection pool.\n\n#### Properties\n\n- `total_connections: int` - Total number of connections in the pool\n- `available_connections: int` - Number of available connections\n- `busy_connections: int` - Number of connections currently in use\n- `pending_connections: int` - Number of connections being drained\n- `is_closed: bool` - Whether the pool is closed\n- `is_started: bool` - Whether the pool has been started\n\n#### Methods\n\n- `async start()` - Start the connection pool\n- `async acquire() -> WebsocketConnection` - Acquire a connection from the pool\n- `async acquire_new() -> WebsocketConnection` - Force acquire a new connection\n- `async release(connection, *, force_remove=False)` - Release a connection back to the pool\n- `async get_connection()` - Context manager for acquiring/releasing connections\n- `async get_new_connection()` - Context manager for acquiring/releasing new connections\n- `async close_all()` - Close all connections and shut down the pool\n\n### WebsocketConnection\n\n#### Properties\n\n- `is_busy: bool` - Whether the connection is currently in use\n- `is_draining: bool` - Whether the connection is being drained\n- `is_connected: bool` - Whether the connection is active\n- `last_activity: float` - Timestamp of last activity\n\n#### Methods\n\n- `async send(message: str | bytes)` - Send a message through the connection\n- `async recv() -> str | bytes` - Receive a message from the connection\n- `async ping()` - Send a ping to check connection health\n- `async connect()` - Establish the WebSocket connection\n- `async close()` - Close the connection\n\n## Error Handling\n\nThe library provides several specific exception types:\n\n```python\nfrom aio_websocket_pool import (\n    WebsocketError,\n    ConnectionBusyError,\n    ConnectionUnavailableError,\n    ConnectionClosedError,\n    ConnectionPoolExhaustedError,\n    ConnectionPoolUnavailableError,\n)\n\ntry:\n    async with pool.get_connection() as conn:\n        await conn.send(\"Hello\")\n        response = await conn.recv()\nexcept ConnectionPoolExhaustedError:\n    print(\"No connections available in pool\")\nexcept ConnectionClosedError:\n    print(\"Connection was closed unexpectedly\")\nexcept WebsocketError as e:\n    print(f\"WebSocket error: {e}\")\n```\n\n## Configuration Best Practices\n\n### For High-Traffic Applications\n\n```python\npool = WebsocketConnectionPool(\n    uri=\"ws://your-server.com/ws\",\n    max_connections=100,\n    warmup_connections=10,\n    idle_timeout=120.0,\n    connection_timeout=5.0,\n    max_retries=5,\n    cleanup_interval=10.0\n)\n```\n\n### For Low-Latency Applications\n\n```python\npool = WebsocketConnectionPool(\n    uri=\"ws://your-server.com/ws\",\n    max_connections=50,\n    warmup_connections=5,\n    idle_timeout=30.0,\n    connection_timeout=3.0,\n    check_server_data_on_release=True,\n    drain_timeout=2.0,\n    drain_quiet_period=0.5\n)\n```\n\n### For Resource-Constrained Environments\n\n```python\npool = WebsocketConnectionPool(\n    uri=\"ws://your-server.com/ws\",\n    max_connections=5,\n    warmup_connections=1,\n    idle_timeout=30.0,\n    connection_timeout=10.0,\n    max_retries=2,\n    cleanup_interval=5.0\n)\n```\n\n## Requirements\n\n- Python 3.11+\n- websockets >= 15.0.1\n- tenacity >= 9.1.2\n\n## License\n\nThis project is licensed under the MIT License.\n\n## Development\n\n### Setup\n\n```bash\npoetry install\n```\n\n### Running Tests\n\n```bash\npoetry run pytest\n```\n\n### Code Quality\n\n```bash\npoetry run black .\npoetry run isort .\npoetry run flake8 .\npoetry run mypy .\n```\n\n### Building\n\n```bash\npoetry build\n```",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "Asynchronous WebSocket connection pool for Python with connection reuse, health monitoring, and automatic cleanup",
    "version": "0.1.0",
    "project_urls": {
        "Documentation": "https://github.com/your-username/aio-websocket-pool",
        "Homepage": "https://github.com/your-username/aio-websocket-pool",
        "Repository": "https://github.com/your-username/aio-websocket-pool"
    },
    "split_keywords": [
        "websocket",
        " connection-pool",
        " async",
        " asyncio",
        " websockets",
        " pool"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "1de22ecb72406d86ed8f877cac0ff7a4f8edad354f9ceef51e13222f4b9446ca",
                "md5": "2464bad42103f77aa35977ec61f54ea5",
                "sha256": "d0c568bbce86f7518a66edb328c4a579e95d49070c1797dc1aa482ee940c4217"
            },
            "downloads": -1,
            "filename": "aio_websocket_pool-0.1.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "2464bad42103f77aa35977ec61f54ea5",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<3.13,>=3.11",
            "size": 14189,
            "upload_time": "2025-07-13T20:16:06",
            "upload_time_iso_8601": "2025-07-13T20:16:06.496950Z",
            "url": "https://files.pythonhosted.org/packages/1d/e2/2ecb72406d86ed8f877cac0ff7a4f8edad354f9ceef51e13222f4b9446ca/aio_websocket_pool-0.1.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "46518ed5a08c1f58aa2b4bdec8fbf30d07d95a079e576e3412d84eb77a77e4e0",
                "md5": "cb80c2ec3d61c42cd1a32983108d1186",
                "sha256": "beed68cf4f08babd84d25648914a3dcf82378b20c82228f41129bbaf4a32a95d"
            },
            "downloads": -1,
            "filename": "aio_websocket_pool-0.1.0.tar.gz",
            "has_sig": false,
            "md5_digest": "cb80c2ec3d61c42cd1a32983108d1186",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<3.13,>=3.11",
            "size": 14160,
            "upload_time": "2025-07-13T20:16:07",
            "upload_time_iso_8601": "2025-07-13T20:16:07.758631Z",
            "url": "https://files.pythonhosted.org/packages/46/51/8ed5a08c1f58aa2b4bdec8fbf30d07d95a079e576e3412d84eb77a77e4e0/aio_websocket_pool-0.1.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-07-13 20:16:07",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "your-username",
    "github_project": "aio-websocket-pool",
    "github_not_found": true,
    "lcname": "aio-websocket-pool"
}
        
Elapsed time: 0.53455s