# aio-websocket-pool
A flexible, async-friendly WebSocket connection pool implementation for Python with connection reuse, health monitoring,
and automatic cleanup.
## Features
- **Connection Reuse**: Efficient connection pooling with automatic reuse of healthy connections
- **Health Monitoring**: Automatic connection health checks and cleanup of disconnected connections
- **Thread-safe**: Uses asyncio locks for concurrent access in async environments
- **Configurable Limits**: Set maximum connection limits and timeouts
- **Retry Logic**: Built-in exponential backoff retry mechanism for connection failures
- **Server Data Draining**: Optional server data detection and draining during connection release
- **Context Manager Support**: Clean, intuitive API with async context managers
- **Idle Connection Cleanup**: Automatic cleanup of idle connections to prevent resource leaks
- **Type-safe**: Full typing support with comprehensive type hints
## Installation
```bash
pip install aio-websocket-pool
```
Or using Poetry:
```bash
poetry add aio-websocket-pool
```
## Quick Start
```python
import asyncio
from aio_websocket_pool import WebsocketConnectionPool
async def main():
# Create a connection pool
async with WebsocketConnectionPool(uri="ws://localhost:8080") as pool:
# Get a connection from the pool
async with pool.get_connection() as conn:
await conn.send("Hello, World!")
response = await conn.recv()
print(response)
asyncio.run(main())
```
## Advanced Usage
### Basic Pool Configuration
```python
from aio_websocket_pool import WebsocketConnectionPool
# Create a pool with custom configuration
pool = WebsocketConnectionPool(
uri="ws://localhost:8080",
max_connections=10,
idle_timeout=60.0,
connection_timeout=10.0,
max_retries=3,
warmup_connections=2, # Pre-create 2 connections
headers={"Authorization": "Bearer token123"}
)
async with pool:
# Pool is ready to use
conn = await pool.acquire()
try:
await conn.send("Hello")
response = await conn.recv()
print(response)
finally:
await pool.release(conn)
```
### Connection Health Monitoring
```python
# Enable server data checking during connection release
pool = WebsocketConnectionPool(
uri="ws://localhost:8080",
check_server_data_on_release=True,
drain_timeout=5.0,
drain_quiet_period=1.0
)
async with pool:
async with pool.get_connection() as conn:
await conn.send("REQUEST")
response = await conn.recv()
print(response)
# Connection will be checked for additional server data
# before being returned to the pool
```
### Custom Drain Condition
```python
def custom_drain_condition(message):
# Only drain messages that look like server notifications
return message.startswith(b'{"type":"notification"')
pool = WebsocketConnectionPool(
uri="ws://localhost:8080",
check_server_data_on_release=True,
drain_condition=custom_drain_condition
)
```
### Force New Connection
```python
async with pool:
# Force create a new connection instead of reusing existing ones
async with pool.get_new_connection() as conn:
await conn.send("Important message")
response = await conn.recv()
print(response)
```
### Connection Pool Monitoring
```python
async with pool:
print(f"Total connections: {pool.total_connections}")
print(f"Available connections: {pool.available_connections}")
print(f"Busy connections: {pool.busy_connections}")
print(f"Pending connections: {pool.pending_connections}")
```
### Manual Connection Management
```python
async with pool:
# Manually acquire and release connections
conn = await pool.acquire()
try:
await conn.send("Hello")
response = await conn.recv()
print(response)
finally:
# Release connection back to pool
await pool.release(conn)
# Force remove a connection from the pool
conn = await pool.acquire()
try:
await conn.send("Hello")
response = await conn.recv()
print(response)
finally:
# Remove connection instead of returning to pool
await pool.release(conn, force_remove=True)
```
## API Reference
### WebsocketConnectionPool
#### Constructor
```
WebsocketConnectionPool(
uri: str,
headers: Dict[str, str] | None = None,
idle_timeout: float = 60.0,
max_connections: int = 50,
max_retries: int = 3,
cleanup_interval: float = 5.0,
connection_timeout: float = 10.0,
warmup_connections: int = 0,
check_server_data_on_release: bool = False,
drain_timeout: float = 10.0,
drain_quiet_period: float = 2.0,
drain_condition: DrainConditionCallback | None = None,
**kwargs
)
```
Creates a new WebSocket connection pool.
#### Properties
- `total_connections: int` - Total number of connections in the pool
- `available_connections: int` - Number of available connections
- `busy_connections: int` - Number of connections currently in use
- `pending_connections: int` - Number of connections being drained
- `is_closed: bool` - Whether the pool is closed
- `is_started: bool` - Whether the pool has been started
#### Methods
- `async start()` - Start the connection pool
- `async acquire() -> WebsocketConnection` - Acquire a connection from the pool
- `async acquire_new() -> WebsocketConnection` - Force acquire a new connection
- `async release(connection, *, force_remove=False)` - Release a connection back to the pool
- `async get_connection()` - Context manager for acquiring/releasing connections
- `async get_new_connection()` - Context manager for acquiring/releasing new connections
- `async close_all()` - Close all connections and shut down the pool
### WebsocketConnection
#### Properties
- `is_busy: bool` - Whether the connection is currently in use
- `is_draining: bool` - Whether the connection is being drained
- `is_connected: bool` - Whether the connection is active
- `last_activity: float` - Timestamp of last activity
#### Methods
- `async send(message: str | bytes)` - Send a message through the connection
- `async recv() -> str | bytes` - Receive a message from the connection
- `async ping()` - Send a ping to check connection health
- `async connect()` - Establish the WebSocket connection
- `async close()` - Close the connection
## Error Handling
The library provides several specific exception types:
```python
from aio_websocket_pool import (
WebsocketError,
ConnectionBusyError,
ConnectionUnavailableError,
ConnectionClosedError,
ConnectionPoolExhaustedError,
ConnectionPoolUnavailableError,
)
try:
async with pool.get_connection() as conn:
await conn.send("Hello")
response = await conn.recv()
except ConnectionPoolExhaustedError:
print("No connections available in pool")
except ConnectionClosedError:
print("Connection was closed unexpectedly")
except WebsocketError as e:
print(f"WebSocket error: {e}")
```
## Configuration Best Practices
### For High-Traffic Applications
```python
pool = WebsocketConnectionPool(
uri="ws://your-server.com/ws",
max_connections=100,
warmup_connections=10,
idle_timeout=120.0,
connection_timeout=5.0,
max_retries=5,
cleanup_interval=10.0
)
```
### For Low-Latency Applications
```python
pool = WebsocketConnectionPool(
uri="ws://your-server.com/ws",
max_connections=50,
warmup_connections=5,
idle_timeout=30.0,
connection_timeout=3.0,
check_server_data_on_release=True,
drain_timeout=2.0,
drain_quiet_period=0.5
)
```
### For Resource-Constrained Environments
```python
pool = WebsocketConnectionPool(
uri="ws://your-server.com/ws",
max_connections=5,
warmup_connections=1,
idle_timeout=30.0,
connection_timeout=10.0,
max_retries=2,
cleanup_interval=5.0
)
```
## Requirements
- Python 3.11+
- websockets >= 15.0.1
- tenacity >= 9.1.2
## License
This project is licensed under the MIT License.
## Development
### Setup
```bash
poetry install
```
### Running Tests
```bash
poetry run pytest
```
### Code Quality
```bash
poetry run black .
poetry run isort .
poetry run flake8 .
poetry run mypy .
```
### Building
```bash
poetry build
```
Raw data
{
"_id": null,
"home_page": "https://github.com/your-username/aio-websocket-pool",
"name": "aio-websocket-pool",
"maintainer": null,
"docs_url": null,
"requires_python": "<3.13,>=3.11",
"maintainer_email": null,
"keywords": "websocket, connection-pool, async, asyncio, websockets, pool",
"author": "BoChen SHEN",
"author_email": "6goddddddd@gmail.com",
"download_url": "https://files.pythonhosted.org/packages/46/51/8ed5a08c1f58aa2b4bdec8fbf30d07d95a079e576e3412d84eb77a77e4e0/aio_websocket_pool-0.1.0.tar.gz",
"platform": null,
"description": "# aio-websocket-pool\n\nA flexible, async-friendly WebSocket connection pool implementation for Python with connection reuse, health monitoring,\nand automatic cleanup.\n\n## Features\n\n- **Connection Reuse**: Efficient connection pooling with automatic reuse of healthy connections\n- **Health Monitoring**: Automatic connection health checks and cleanup of disconnected connections\n- **Thread-safe**: Uses asyncio locks for concurrent access in async environments\n- **Configurable Limits**: Set maximum connection limits and timeouts\n- **Retry Logic**: Built-in exponential backoff retry mechanism for connection failures\n- **Server Data Draining**: Optional server data detection and draining during connection release\n- **Context Manager Support**: Clean, intuitive API with async context managers\n- **Idle Connection Cleanup**: Automatic cleanup of idle connections to prevent resource leaks\n- **Type-safe**: Full typing support with comprehensive type hints\n\n## Installation\n\n```bash\npip install aio-websocket-pool\n```\n\nOr using Poetry:\n\n```bash\npoetry add aio-websocket-pool\n```\n\n## Quick Start\n\n```python\nimport asyncio\nfrom aio_websocket_pool import WebsocketConnectionPool\n\n\nasync def main():\n # Create a connection pool\n async with WebsocketConnectionPool(uri=\"ws://localhost:8080\") as pool:\n # Get a connection from the pool\n async with pool.get_connection() as conn:\n await conn.send(\"Hello, World!\")\n response = await conn.recv()\n print(response)\n\n\nasyncio.run(main())\n```\n\n## Advanced Usage\n\n### Basic Pool Configuration\n\n```python\nfrom aio_websocket_pool import WebsocketConnectionPool\n\n# Create a pool with custom configuration\npool = WebsocketConnectionPool(\n uri=\"ws://localhost:8080\",\n max_connections=10,\n idle_timeout=60.0,\n connection_timeout=10.0,\n max_retries=3,\n warmup_connections=2, # Pre-create 2 connections\n headers={\"Authorization\": \"Bearer token123\"}\n)\n\nasync with pool:\n # Pool is ready to use\n conn = await pool.acquire()\n try:\n await conn.send(\"Hello\")\n response = await conn.recv()\n print(response)\n finally:\n await pool.release(conn)\n```\n\n### Connection Health Monitoring\n\n```python\n# Enable server data checking during connection release\npool = WebsocketConnectionPool(\n uri=\"ws://localhost:8080\",\n check_server_data_on_release=True,\n drain_timeout=5.0,\n drain_quiet_period=1.0\n)\n\nasync with pool:\n async with pool.get_connection() as conn:\n await conn.send(\"REQUEST\")\n response = await conn.recv()\n print(response)\n # Connection will be checked for additional server data\n # before being returned to the pool\n```\n\n### Custom Drain Condition\n\n```python\ndef custom_drain_condition(message):\n # Only drain messages that look like server notifications\n return message.startswith(b'{\"type\":\"notification\"')\n\n\npool = WebsocketConnectionPool(\n uri=\"ws://localhost:8080\",\n check_server_data_on_release=True,\n drain_condition=custom_drain_condition\n)\n```\n\n### Force New Connection\n\n```python\nasync with pool:\n # Force create a new connection instead of reusing existing ones\n async with pool.get_new_connection() as conn:\n await conn.send(\"Important message\")\n response = await conn.recv()\n print(response)\n```\n\n### Connection Pool Monitoring\n\n```python\nasync with pool:\n print(f\"Total connections: {pool.total_connections}\")\n print(f\"Available connections: {pool.available_connections}\")\n print(f\"Busy connections: {pool.busy_connections}\")\n print(f\"Pending connections: {pool.pending_connections}\")\n```\n\n### Manual Connection Management\n\n```python\nasync with pool:\n # Manually acquire and release connections\n conn = await pool.acquire()\n try:\n await conn.send(\"Hello\")\n response = await conn.recv()\n print(response)\n finally:\n # Release connection back to pool\n await pool.release(conn)\n\n # Force remove a connection from the pool\n conn = await pool.acquire()\n try:\n await conn.send(\"Hello\")\n response = await conn.recv()\n print(response)\n finally:\n # Remove connection instead of returning to pool\n await pool.release(conn, force_remove=True)\n```\n\n## API Reference\n\n### WebsocketConnectionPool\n\n#### Constructor\n\n```\nWebsocketConnectionPool(\n uri: str,\n headers: Dict[str, str] | None = None,\n idle_timeout: float = 60.0,\n max_connections: int = 50,\n max_retries: int = 3,\n cleanup_interval: float = 5.0,\n connection_timeout: float = 10.0,\n warmup_connections: int = 0,\n check_server_data_on_release: bool = False,\n drain_timeout: float = 10.0,\n drain_quiet_period: float = 2.0,\n drain_condition: DrainConditionCallback | None = None,\n **kwargs\n)\n```\n\nCreates a new WebSocket connection pool.\n\n#### Properties\n\n- `total_connections: int` - Total number of connections in the pool\n- `available_connections: int` - Number of available connections\n- `busy_connections: int` - Number of connections currently in use\n- `pending_connections: int` - Number of connections being drained\n- `is_closed: bool` - Whether the pool is closed\n- `is_started: bool` - Whether the pool has been started\n\n#### Methods\n\n- `async start()` - Start the connection pool\n- `async acquire() -> WebsocketConnection` - Acquire a connection from the pool\n- `async acquire_new() -> WebsocketConnection` - Force acquire a new connection\n- `async release(connection, *, force_remove=False)` - Release a connection back to the pool\n- `async get_connection()` - Context manager for acquiring/releasing connections\n- `async get_new_connection()` - Context manager for acquiring/releasing new connections\n- `async close_all()` - Close all connections and shut down the pool\n\n### WebsocketConnection\n\n#### Properties\n\n- `is_busy: bool` - Whether the connection is currently in use\n- `is_draining: bool` - Whether the connection is being drained\n- `is_connected: bool` - Whether the connection is active\n- `last_activity: float` - Timestamp of last activity\n\n#### Methods\n\n- `async send(message: str | bytes)` - Send a message through the connection\n- `async recv() -> str | bytes` - Receive a message from the connection\n- `async ping()` - Send a ping to check connection health\n- `async connect()` - Establish the WebSocket connection\n- `async close()` - Close the connection\n\n## Error Handling\n\nThe library provides several specific exception types:\n\n```python\nfrom aio_websocket_pool import (\n WebsocketError,\n ConnectionBusyError,\n ConnectionUnavailableError,\n ConnectionClosedError,\n ConnectionPoolExhaustedError,\n ConnectionPoolUnavailableError,\n)\n\ntry:\n async with pool.get_connection() as conn:\n await conn.send(\"Hello\")\n response = await conn.recv()\nexcept ConnectionPoolExhaustedError:\n print(\"No connections available in pool\")\nexcept ConnectionClosedError:\n print(\"Connection was closed unexpectedly\")\nexcept WebsocketError as e:\n print(f\"WebSocket error: {e}\")\n```\n\n## Configuration Best Practices\n\n### For High-Traffic Applications\n\n```python\npool = WebsocketConnectionPool(\n uri=\"ws://your-server.com/ws\",\n max_connections=100,\n warmup_connections=10,\n idle_timeout=120.0,\n connection_timeout=5.0,\n max_retries=5,\n cleanup_interval=10.0\n)\n```\n\n### For Low-Latency Applications\n\n```python\npool = WebsocketConnectionPool(\n uri=\"ws://your-server.com/ws\",\n max_connections=50,\n warmup_connections=5,\n idle_timeout=30.0,\n connection_timeout=3.0,\n check_server_data_on_release=True,\n drain_timeout=2.0,\n drain_quiet_period=0.5\n)\n```\n\n### For Resource-Constrained Environments\n\n```python\npool = WebsocketConnectionPool(\n uri=\"ws://your-server.com/ws\",\n max_connections=5,\n warmup_connections=1,\n idle_timeout=30.0,\n connection_timeout=10.0,\n max_retries=2,\n cleanup_interval=5.0\n)\n```\n\n## Requirements\n\n- Python 3.11+\n- websockets >= 15.0.1\n- tenacity >= 9.1.2\n\n## License\n\nThis project is licensed under the MIT License.\n\n## Development\n\n### Setup\n\n```bash\npoetry install\n```\n\n### Running Tests\n\n```bash\npoetry run pytest\n```\n\n### Code Quality\n\n```bash\npoetry run black .\npoetry run isort .\npoetry run flake8 .\npoetry run mypy .\n```\n\n### Building\n\n```bash\npoetry build\n```",
"bugtrack_url": null,
"license": "MIT",
"summary": "Asynchronous WebSocket connection pool for Python with connection reuse, health monitoring, and automatic cleanup",
"version": "0.1.0",
"project_urls": {
"Documentation": "https://github.com/your-username/aio-websocket-pool",
"Homepage": "https://github.com/your-username/aio-websocket-pool",
"Repository": "https://github.com/your-username/aio-websocket-pool"
},
"split_keywords": [
"websocket",
" connection-pool",
" async",
" asyncio",
" websockets",
" pool"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "1de22ecb72406d86ed8f877cac0ff7a4f8edad354f9ceef51e13222f4b9446ca",
"md5": "2464bad42103f77aa35977ec61f54ea5",
"sha256": "d0c568bbce86f7518a66edb328c4a579e95d49070c1797dc1aa482ee940c4217"
},
"downloads": -1,
"filename": "aio_websocket_pool-0.1.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "2464bad42103f77aa35977ec61f54ea5",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "<3.13,>=3.11",
"size": 14189,
"upload_time": "2025-07-13T20:16:06",
"upload_time_iso_8601": "2025-07-13T20:16:06.496950Z",
"url": "https://files.pythonhosted.org/packages/1d/e2/2ecb72406d86ed8f877cac0ff7a4f8edad354f9ceef51e13222f4b9446ca/aio_websocket_pool-0.1.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "46518ed5a08c1f58aa2b4bdec8fbf30d07d95a079e576e3412d84eb77a77e4e0",
"md5": "cb80c2ec3d61c42cd1a32983108d1186",
"sha256": "beed68cf4f08babd84d25648914a3dcf82378b20c82228f41129bbaf4a32a95d"
},
"downloads": -1,
"filename": "aio_websocket_pool-0.1.0.tar.gz",
"has_sig": false,
"md5_digest": "cb80c2ec3d61c42cd1a32983108d1186",
"packagetype": "sdist",
"python_version": "source",
"requires_python": "<3.13,>=3.11",
"size": 14160,
"upload_time": "2025-07-13T20:16:07",
"upload_time_iso_8601": "2025-07-13T20:16:07.758631Z",
"url": "https://files.pythonhosted.org/packages/46/51/8ed5a08c1f58aa2b4bdec8fbf30d07d95a079e576e3412d84eb77a77e4e0/aio_websocket_pool-0.1.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-07-13 20:16:07",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "your-username",
"github_project": "aio-websocket-pool",
"github_not_found": true,
"lcname": "aio-websocket-pool"
}