| Name | traffik JSON |
| Version |
1.0.0b1
JSON |
| download |
| home_page | None |
| Summary | Distributed rate limiting for Starlette applications. |
| upload_time | 2025-10-19 13:44:30 |
| maintainer | None |
| docs_url | None |
| author | None |
| requires_python | >=3.9 |
| license | MIT License Copyright (c) 2025 Daniel T. Afolayan Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. |
| keywords |
api
distributed-rate-limiting
fastapi
rate-limiting
starlette
throttling
|
| VCS |
 |
| bugtrack_url |
|
| requirements |
No requirements were recorded.
|
| Travis-CI |
No Travis.
|
| coveralls test coverage |
No coveralls.
|
# Traffik - A Starlette throttling library
[](https://github.com/ti-oluwa/traffik/actions/workflows/test.yaml)
[](https://github.com/ti-oluwa/traffik/actions/workflows/code-quality.yaml)
[](https://codecov.io/gh/ti-oluwa/traffik)
<!-- [](https://badge.fury.io/py/traffik)
[](https://pypi.org/project/traffik/) -->
Traffik provides flexible rate limiting for Starlette and FastAPI applications with support for both HTTP and WebSocket connections. It offers multiple rate limiting strategies including Fixed Window, Sliding Window, Token Bucket, and Leaky Bucket algorithms, allowing you to choose the approach that best fits your use case.
The library features pluggable backends (in-memory, Redis, Memcached), dynamic backend resolution for special applications. Whether you need simple per-endpoint limits or complex distributed rate limiting, Traffik provides the flexibility and robustness to handle your requirements.
Traffik was inspired by [fastapi-limiter](https://github.com/long2ice/fastapi-limiter) but has evolved into a more comprehensive solution with more advanced features.
## Features
- ๐ **Easy Integration**: Decorator, dependency, and middleware-based throttling
- ๐ฏ **Multiple Strategies**: Fixed Window (default), Sliding Window, Token Bucket, Leaky Bucket
- ๐ **Multiple Backends**: In-memory, Redis, Memcached with atomic operation support
- ๐ **Protocol Support**: Both HTTP and WebSocket throttling
- ๐ข **Dynamic Backend Resolution**: Support for runtime backend switching
- ๐ง **Dependency Injection Friendly**: Works well with FastAPI's dependency injection system.
- ๐ **Smart Client Identification**: IP-based by default, fully customizable
- โ๏ธ **Flexible and Extensible API**: Easily extend base functionality
- ๐งช **Thoroughly Tested**: Comprehensive test suite covering concurrency and multithreading
## Installation
We recommend using `uv`, however, it is not a strict requirement.
### Install `uv` (optional)
Visit the [uv documentation](https://docs.astral.sh/uv/getting-started/installation/) for installation instructions.
### Basic Installation
```bash
uv add traffik
# or using pip
pip install traffik
```
Install with FastAPI support:
```bash
uv add "traffik[fastapi]"
# or using pip
pip install "traffik[fastapi]"
```
### With Redis Backend
```bash
uv add "traffik[redis]"
# or using pip
pip install "traffik[redis]"
```
### With Memcached Backend
```bash
uv add "traffik[memcached]"
# or using pip
pip install "traffik[memcached]"
```
### With All Features
```bash
uv add "traffik[all]"
# or using pip
pip install "traffik[all]"
```
### Development Installation
```bash
git clone https://github.com/your-username/traffik.git
cd traffik
uv sync --extra dev
# or using pip
pip install -e .[dev]
```
## Quick Testing with Docker
For quick testing across different platforms and Python versions:
```bash
# Run fast tests
./docker-test.sh test-fast
# Run full test suite
./docker-test.sh test
# Start development environment
./docker-test.sh dev
# Test across Python versions
./docker-test.sh test-matrix
```
**Testing Documentation:**
- [DOCKER.md](DOCKER.md) - Complete Docker testing guide
- [TESTING.md](TESTING.md) - Quick testing guide
## Quick Start Guide
### 1. Basic HTTP Throttling with Starlette
```python
from starlette.applications import Starlette
from starlette.routing import Route
from starlette.requests import Request
from starlette.responses import JSONResponse
from traffik.throttles import HTTPThrottle
from traffik.backends.inmemory import InMemoryBackend
# Create backend
backend = InMemoryBackend(namespace="myapp", persistent=False)
# Create throttle with string rate format
throttle = HTTPThrottle(
uid="basic_limit",
rate="5/10s", # 5 requests per 10 seconds
)
async def throttled_endpoint(request: Request):
await throttle(request)
return JSONResponse({"message": "Success"})
app = Starlette(
routes=[
Route("/api/data", throttled_endpoint, methods=["GET"]),
],
lifespan=backend.lifespan,
)
```
### 2. FastAPI with Dependency Injection
```python
from fastapi import FastAPI, Depends
from contextlib import asynccontextmanager
from traffik.backends.inmemory import InMemoryBackend
from traffik.throttles import HTTPThrottle
# Create backend
backend = InMemoryBackend(namespace="api")
# Setup lifespan
@asynccontextmanager
async def lifespan(app: FastAPI):
async with backend(app, persistent=True, close_on_exit=True):
yield
app = FastAPI(lifespan=lifespan)
# Create throttle
throttle = HTTPThrottle(uid="endpoint_limit", rate="10/m")
@app.get("/api/hello", dependencies=[Depends(throttle)])
async def say_hello():
return {"message": "Hello World"}
```
### 3. Using Decorators (FastAPI Only)
```python
from fastapi import FastAPI
from contextlib import asynccontextmanager
from traffik.decorators import throttled
from traffik.throttles import HTTPThrottle
from traffik.backends.redis import RedisBackend
backend = RedisBackend(
connection="redis://localhost:6379/0",
namespace="api",
persistent=True,
)
@asynccontextmanager
async def lifespan(app: FastAPI):
async with backend(app):
yield
app = FastAPI(lifespan=lifespan)
@app.get("/api/limited")
@throttled(HTTPThrottle(uid="limited", rate="5/m"))
async def limited_endpoint():
return {"data": "Limited access"}
```
### 4. WebSocket Throttling
```python
from traffik.throttles import WebSocketThrottle
from starlette.websockets import WebSocket
from starlette.exceptions import HTTPException
ws_throttle = WebSocketThrottle(uid="ws_messages", rate="3/10s")
async def ws_endpoint(websocket: WebSocket) -> None:
await websocket.accept()
while True:
try:
data = await websocket.receive_json()
await ws_throttle(websocket) # Rate limit per message
await websocket.send_json({
"status": "success",
"data": data,
})
except HTTPException as exc:
await websocket.send_json({
"status": "error",
"status_code": exc.status_code,
"detail": exc.detail,
})
break
await websocket.close()
```
## Rate Limiting Strategies
Traffik supports multiple rate limiting strategies, each with different trade-offs. The **Fixed Window strategy is used by default** for its simplicity and performance.
### Fixed Window (Default)
Divides time into fixed windows and counts requests within each window. Simple, fast, and memory-efficient.
**Pros:** Simple, constant memory, fast
**Cons:** Can allow bursts at window boundaries (up to 2x limit)
```python
from traffik.strategies import FixedWindowStrategy
throttle = HTTPThrottle(
uid="api_limit",
rate="100/m",
strategy=FixedWindowStrategy() # Default, can be omitted
)
```
### Sliding Window
Most accurate rate limiting with continuous sliding window evaluation. Prevents boundary exploitation.
**Pros:** Most accurate, no boundary issues
**Cons:** Higher memory usage (O(limit) per key)
```python
from traffik.strategies import SlidingWindowLogStrategy
throttle = HTTPThrottle(
uid="payment_api",
rate="10/m",
strategy=SlidingWindowLogStrategy()
)
```
### Token Bucket
Allows controlled bursts while maintaining average rate over time. Tokens refill continuously.
**Pros:** Allows bursts, smooth distribution, self-recovering
**Cons:** Slightly more complex
```python
from traffik.strategies import TokenBucketStrategy
throttle = HTTPThrottle(
uid="user_api",
rate="100/m",
strategy=TokenBucketStrategy(burst_size=150) # Allow bursts up to 150
)
```
### Leaky Bucket
Enforces perfectly smooth traffic output. No bursts allowed.
**Pros:** Smooth output, protects downstream services
**Cons:** Less forgiving, may reject legitimate bursts
```python
from traffik.strategies import LeakyBucketStrategy
throttle = HTTPThrottle(
uid="third_party_api",
rate="50/m",
strategy=LeakyBucketStrategy()
)
```
## Writing Custom Strategies
You can create custom rate limiting strategies by implementing a callable that follows the strategy protocol. This allows you to implement specialized rate limiting logic tailored to your specific needs.
### Strategy Protocol
A strategy is a callable (function or class with `__call__`) that takes three parameters and returns a wait period:
```python
from traffik.backends.base import ThrottleBackend
from traffik.rates import Rate
from traffik.types import Stringable, WaitPeriod
async def my_strategy(
key: Stringable,
rate: Rate,
backend: ThrottleBackend
) -> WaitPeriod:
"""
:param key: The throttling key (e.g., "user:123", "ip:192.168.1.1")
:param rate: Rate limit definition with limit and expire properties
:param backend: Backend instance for storage operations
:return: Wait time in milliseconds (0.0 if request allowed)
"""
# Your rate limiting logic here
return 0.0 # Allow request
```
### Example: Simple Rate Strategy
Here's a basic example that implements a simple counter-based strategy:
```python
from dataclasses import dataclass
from traffik.backends.base import ThrottleBackend
from traffik.rates import Rate
from traffik.types import Stringable, WaitPeriod
from traffik.utils import time
async def simple_counter_strategy(
self, key: Stringable, rate: Rate, backend: ThrottleBackend
) -> WaitPeriod:
"""
Simple counter-based rate limiting.
Counts requests and resets counter when expired.
"""
if rate.unlimited:
return 0.0
now = time() * 1000 # Current time in milliseconds
full_key = await backend.get_key(str(key))
counter_key = f"{full_key}:counter"
timestamp_key = f"{full_key}:timestamp"
ttl_seconds = int(rate.expire // 1000) + 1
async with await backend.lock(f"lock:{counter_key}", blocking=True, blocking_timeout=1):
# Get current count and timestamp
count_str = await backend.get(counter_key)
timestamp_str = await backend.get(timestamp_key)
if count_str and timestamp_str:
count = int(count_str)
timestamp = float(timestamp_str)
# Check if window has expired
if now - timestamp > rate.expire:
# Reset counter for new window
count = 1
await backend.set(counter_key, "1", expire=ttl_seconds)
await backend.set(timestamp_key, str(now), expire=ttl_seconds)
else:
# Increment counter
count = await backend.increment(counter_key)
else:
# First request
count = 1
await backend.set(counter_key, "1", expire=ttl_seconds)
await backend.set(timestamp_key, str(now), expire=ttl_seconds)
# Check if limit exceeded
if count > rate.limit:
# Calculate wait time until window expires
timestamp = float(await backend.get(timestamp_key))
elapsed = now - timestamp
wait_ms = rate.expire - elapsed
return max(wait_ms, 0.0)
return 0.0
# Usage
throttle = HTTPThrottle(
uid="simple",
rate="10/m",
strategy=simple_counter_strategy,
)
```
### Example: Adaptive Rate Strategy
A more advanced example that adapts the rate limit based on backend load:
```python
from dataclasses import dataclass
from traffik.backends.base import ThrottleBackend
from traffik.rates import Rate
from traffik.types import Stringable, WaitPeriod
from traffik.utils import time
@dataclass(frozen=True)
class AdaptiveRateStrategy:
"""
Adaptive rate limiting that adjusts based on system load.
Reduces limits during high load, increases during low load.
"""
load_threshold: float = 0.8 # 80% of limit triggers adaptation
reduction_factor: float = 0.5 # Reduce to 50% during high load
async def __call__(
self, key: Stringable, rate: Rate, backend: ThrottleBackend
) -> WaitPeriod:
if rate.unlimited:
return 0.0
now = time() * 1000
window_duration_ms = rate.expire
current_window = int(now // window_duration_ms)
full_key = await backend.get_key(str(key))
counter_key = f"{full_key}:adaptive:{current_window}"
load_key = f"{full_key}:load"
ttl_seconds = int(window_duration_ms // 1000) + 1
async with await backend.lock(f"lock:{counter_key}", blocking=True, blocking_timeout=1):
# Increment request counter
count = await backend.increment_with_ttl(counter_key, amount=1, ttl=ttl_seconds)
# Calculate current load percentage
load_percentage = count / rate.limit
# Determine effective limit based on load
if load_percentage > self.load_threshold:
# High load - reduce effective limit
effective_limit = int(rate.limit * self.reduction_factor)
await backend.set(load_key, "high", expire=ttl_seconds)
else:
# Normal load - use full limit
effective_limit = rate.limit
await backend.set(load_key, "normal", expire=ttl_seconds)
# Check against effective limit
if count > effective_limit:
# Calculate wait time
time_in_window = now % window_duration_ms
wait_ms = window_duration_ms - time_in_window
return wait_ms
return 0.0
# Usage
throttle = HTTPThrottle(
uid="adaptive_api",
rate="1000/h",
strategy=AdaptiveRateStrategy(load_threshold=0.7, reduction_factor=0.6)
)
```
### Example: Priority-Based Strategy
A strategy that implements priority queuing:
```python
from dataclasses import dataclass
from enum import IntEnum
from traffik.backends.base import ThrottleBackend
from traffik.rates import Rate
from traffik.types import Stringable, WaitPeriod
from traffik.utils import time, dump_json, load_json, JSONDecodeError
class Priority(IntEnum):
"""Request priority levels"""
LOW = 1
NORMAL = 2
HIGH = 3
CRITICAL = 4
@dataclass(frozen=True)
class PriorityQueueStrategy:
"""
Rate limiting with priority queue.
Higher priority requests are processed first when at capacity.
"""
default_priority: Priority = Priority.NORMAL
def _extract_priority(self, key: str) -> Priority:
"""Extract priority from key (format: "priority:<level>:user:123")"""
if key.startswith("priority:"):
try:
level = int(key.split(":")[1])
return Priority(level)
except (ValueError, IndexError):
pass
return self.default_priority
async def __call__(
self, key: Stringable, rate: Rate, backend: ThrottleBackend
) -> WaitPeriod:
if rate.unlimited:
return 0.0
now = time() * 1000
priority = self._extract_priority(str(key))
full_key = await backend.get_key(str(key))
queue_key = f"{full_key}:priority_queue"
ttl_seconds = int(rate.expire // 1000) + 1
async with await backend.lock(f"lock:{queue_key}", blocking=True, blocking_timeout=1):
# Get current queue
queue_json = await backend.get(queue_key)
if queue_json:
try:
queue = load_json(queue_json)
except JSONDecodeError:
queue = []
else:
queue = []
# Remove expired entries
queue = [
entry for entry in queue
if now - entry["timestamp"] < rate.expire
]
# Count requests with higher or equal priority
higher_priority_count = sum(
1 for entry in queue
if entry["priority"] >= priority
)
# Check if request can be processed
if higher_priority_count >= rate.limit:
# Calculate wait time based on oldest high-priority entry
oldest_high_priority = min(
(entry["timestamp"] for entry in queue if entry["priority"] >= priority),
default=now
)
wait_ms = rate.expire - (now - oldest_high_priority)
return max(wait_ms, 0.0)
# Add current request to queue
queue.append({
"timestamp": now,
"priority": priority,
"key": str(key)
})
# Sort by priority (descending) and timestamp (ascending)
queue.sort(key=lambda x: (-x["priority"], x["timestamp"]))
# Store updated queue
await backend.set(queue_key, dump_json(queue), expire=ttl_seconds)
return 0.0
# Usage
from traffik.throttles import HTTPThrottle
async def priority_identifier(connection):
"""Extract priority from request headers"""
priority = connection.headers.get("X-Priority", "2")
user_id = extract_user_id(connection)
return f"priority:{priority}:user:{user_id}"
throttle = HTTPThrottle(
uid="priority_api",
rate="100/m",
strategy=PriorityQueueStrategy(),
identifier=priority_identifier
)
```
### Best Practices for Custom Strategies
1. **Always use locks**: Wrap critical sections with `backend.lock()` to ensure atomicity
```python
async with await backend.lock(f"lock:{key}", blocking=True, blocking_timeout=1):
# Critical section
```
2. **Set appropriate TTLs**: Always set expiration times to prevent memory leaks
```python
ttl_seconds = int(rate.expire // 1000) + 1 # +1 second buffer
await backend.set(key, value, expire=ttl_seconds)
```
3. **Handle unlimited rates**: Check for unlimited rates early
```python
if rate.unlimited:
return 0.0
```
4. **Use proper key prefixes**: Namespace your strategy's keys to avoid conflicts
```python
full_key = await backend.get_key(str(key))
strategy_key = f"{full_key}:mystrategy:data"
```
5. **Return milliseconds**: Always return wait time in milliseconds
```python
return wait_ms # Not wait_seconds * 1000
```
6. **Use dataclasses**: Make strategies immutable and configurable
```python
@dataclass(frozen=True)
class MyStrategy:
param1: int = 10
param2: float = 0.5
```
7. **Handle errors gracefully**: Catch and handle JSON decode errors, type errors, etc.
```python
try:
data = load_json(json_str)
except JSONDecodeError:
data = default_value
```
### Testing Custom Strategies
```python
import pytest
from traffik.backends.inmemory import InMemoryBackend
from traffik.rates import Rate
@pytest.mark.anyio
async def test_custom_strategy():
backend = InMemoryBackend(namespace="test")
async with backend(close_on_exit=True):
strategy = MyCustomStrategy()
rate = Rate.parse("5/s")
# Test allowing requests
for i in range(5):
wait = await strategy(f"user:123", rate, backend)
assert wait == 0.0, f"Request {i+1} should be allowed"
# Test throttling
wait = await strategy(f"user:123", rate, backend)
assert wait > 0, "Request 6 should be throttled"
# Verify wait time is reasonable
assert wait <= rate.expire, "Wait time should not exceed window"
```
## Backends
Traffik provides three backend implementations with full atomic operation support and distributed locking.
### In-Memory Backend
Perfect for development, testing, and single-instance applications:
```python
from traffik.backends.inmemory import InMemoryBackend
backend = InMemoryBackend(
namespace="myapp",
persistent=False, # Don't persist across restarts
)
```
**Pros:**
- No external dependencies
- Fast and simple
- Great for development and testing
**Cons:**
- Not suitable for multi-process/distributed systems
- Data lost on restart (even with `persistent=True`)
### Redis Backend
Recommended for production with distributed systems:
```python
from traffik.backends.redis import RedisBackend
# From connection string
backend = RedisBackend(
connection="redis://localhost:6379/0",
namespace="myapp",
persistent=True,
)
# From Redis client factory
import redis.asyncio as redis
def get_client() -> redis.Redis:
return redis.Redis(host='localhost', port=6379, db=0)
backend = RedisBackend(
connection=get_client,
namespace="myapp",
)
```
**Features:**
- Distributed locks using Redlock algorithm
- Production-ready persistence
**Pros:**
- Multi-process and distributed support
- Persistence across restarts
- Battle-tested for production
**Cons:**
- Requires Redis server
- Additional infrastructure
### Memcached Backend
Lightweight distributed caching solution:
```python
from traffik.backends.memcached import MemcachedBackend
backend = MemcachedBackend(
host="localhost",
port=11211,
namespace="myapp",
pool_size=10,
)
```
**Features:**
- Lightweight distributed locks
- Atomic operations via CAS (Compare-And-Swap)
- Connection pooling
- Fast in-memory storage
**Pros:**
- Lightweight and fast
- Good for high-throughput scenarios
- Simple deployment
**Cons:**
- Less feature-rich than Redis
### Custom Backends
Create custom backends by subclassing `ThrottleBackend`:
```python
import typing
from traffik.backends.base import ThrottleBackend
from traffik.types import HTTPConnectionT, AsyncLock
class CustomBackend(ThrottleBackend[typing.Dict, HTTPConnectionT]):
"""Custom backend with your storage solution"""
async def initialize(self) -> None:
"""Setup connection/resources"""
pass
async def get(self, key: str) -> typing.Optional[str]:
"""Get value for key"""
pass
async def set(self, key: str, value: str, expire: typing.Optional[int] = None) -> None:
"""Set value with optional TTL"""
pass
async def delete(self, key: str) -> None:
"""Delete key"""
pass
async def increment(self, key: str, amount: int = 1) -> int:
"""Atomically increment counter"""
pass
async def decrement(self, key: str, amount: int = 1) -> int:
"""Atomically decrement counter"""
pass
async def increment_with_ttl(self, key: str, amount: int, ttl: int) -> int:
"""Atomically increment with TTL set"""
pass
async def get_lock(self, name: str) -> AsyncLock:
"""Get distributed lock"""
pass
async def reset(self) -> None:
"""Clear all data"""
pass
async def close(self) -> None:
"""Cleanup resources"""
pass
```
### Throttle Backend Selection
You can specify the backend for each throttle individually or allow them to share a common backend. The shared backend is usually the one set up in your application lifespan.
```python
from fastapi import FastAPI, Depends, Request
from traffik.backends.redis import RedisBackend
from traffik.throttles import HTTPThrottle
shared_redis_backend = RedisBackend(
connection="redis://localhost:6379/0",
namespace="shared",
)
app = FastAPI(lifespan=shared_redis_backend.lifespan)
throttle_with_own_backend = HTTPThrottle(
uid="custom_backend",
rate="100/m",
backend=MyCustomBackend(), # Uses its own backend
)
# Uses backend from app lifespan
throttle_with_shared_backend = HTTPThrottle(
uid="shared_backend",
rate="50/m",
)
@app.get("/api/custom", dependencies=[Depends(throttle_with_shared_backend)])
async def endpoint_custom(request: Request = Depends(throttle_with_own_backend)):
return {"message": "Uses its own backend"}
```
## Configuration Options
### Rate Format
Traffik supports flexible rate specification using string format or `Rate` objects:
```python
from traffik import Rate
from traffik.throttles import HTTPThrottle
# String format (recommended)
throttle = HTTPThrottle(uid="api_v1", rate="100/m") # 100 per minute
throttle = HTTPThrottle(uid="api_v2", rate="5/10s") # 5 per 10 seconds
throttle = HTTPThrottle(uid="api_v3", rate="1000/500ms") # 1000 per 500ms
# Supported units: ms, s/sec/second, m/min/minute, h/hr/hour, d/day
throttle = HTTPThrottle(uid="daily", rate="10000/d") # 10000 per day
# Rate object for complex configurations
rate = Rate(limit=100, minutes=5, seconds=30) # 100 per 5.5 minutes
throttle = HTTPThrottle(uid="complex", rate=rate)
```
### Custom Client Identification
Customize how clients are identified for rate limiting:
```python
from starlette.requests import HTTPConnection
from traffik.throttles import HTTPThrottle
async def user_based_identifier(connection: HTTPConnection) -> str:
"""Identify by user ID from JWT token"""
user_id = extract_user_id(connection.headers.get("authorization"))
return f"user:{user_id}"
throttle = HTTPThrottle(
uid="user_limit",
rate="100/h",
identifier=user_based_identifier,
)
```
### Exempting Connections
Skip throttling for specific clients by returning `UNLIMITED`:
```python
import typing
from starlette.requests import HTTPConnection
from traffik import UNLIMITED
from traffik.throttles import HTTPThrottle
async def admin_bypass_identifier(connection: HTTPConnection) -> typing.Any:
"""Bypass throttling for admin users"""
user_role = extract_role(connection.headers.get("authorization"))
if user_role == "admin":
return UNLIMITED # Admins bypass throttling
# Regular users get normal identification
user_id = extract_user_id(connection.headers.get("authorization"))
return f"user:{user_id}"
throttle = HTTPThrottle(
uid="api_limit",
rate="50/m",
identifier=admin_bypass_identifier,
)
```
### Custom Throttled Response
Customize the response when rate limits are exceeded:
```python
from starlette.requests import HTTPConnection
from starlette.exceptions import HTTPException
async def custom_throttled_handler(
connection: HTTPConnection,
wait_ms: int,
*args,
**kwargs
):
wait_seconds = wait_ms // 1000
raise HTTPException(
status_code=429,
detail={
"error": "rate_limit_exceeded",
"message": f"Too many requests. Retry in {wait_seconds}s",
"retry_after": wait_seconds
},
headers={"Retry-After": str(wait_seconds)},
)
throttle = HTTPThrottle(
uid="api",
rate="100/m",
handle_throttled=custom_throttled_handler,
)
```
## More on Usage
### Multiple Rate Limits per Endpoint
Apply both burst and sustained limits:
```python
from fastapi import FastAPI, Depends
from traffik.throttles import HTTPThrottle
# Burst limit: 10 per minute
burst_limit = HTTPThrottle(uid="burst", rate="10/m")
# Sustained limit: 100 per hour
sustained_limit = HTTPThrottle(uid="sustained", rate="100/h")
@app.get(
"/api/data",
dependencies=[Depends(burst_limit), Depends(sustained_limit)]
)
async def get_data():
return {"data": "value"}
```
### Per-User Rate Limiting
```python
from traffik.throttles import HTTPThrottle
from starlette.requests import Request
async def user_identifier(request: Request) -> str:
user_id = extract_user_from_token(request.headers.get("authorization"))
return f"user:{user_id}"
user_throttle = HTTPThrottle(
uid="user_quota",
rate="1000/h",
identifier=user_identifier,
)
```
### Strategy Selection Based on Use Case
```python
from traffik.throttles import HTTPThrottle
from traffik.strategies import (
FixedWindowStrategy,
SlidingWindowLogStrategy,
TokenBucketStrategy,
LeakyBucketStrategy,
)
# Public API - allow bursts
public_api = HTTPThrottle(
uid="public",
rate="100/m",
strategy=TokenBucketStrategy(burst_size=150),
)
# Payment API - strict enforcement
payment_api = HTTPThrottle(
uid="payments",
rate="10/m",
strategy=SlidingWindowLogStrategy(),
)
# Third-party API - smooth output
external_api = HTTPThrottle(
uid="external",
rate="50/m",
strategy=LeakyBucketStrategy(),
)
# Simple rate limiting - default
simple_api = HTTPThrottle(
uid="simple",
rate="200/m",
# Uses `FixedWindowStrategy` by default
)
```
### Dynamic Backend Resolution for Multi-Tenant Applications
Enable runtime backend switching for multi-tenant SaaS applications:
```python
from fastapi import FastAPI, Request, Depends
from traffik.throttles import HTTPThrottle
from traffik.backends.redis import RedisBackend
from traffik.backends.inmemory import InMemoryBackend
import jwt
# Shared throttle with dynamic backend resolution
api_throttle = HTTPThrottle(
uid="api_quota",
rate="1000/h",
dynamic_backend=True, # Resolve backend at runtime
)
TENANT_CONFIG = {
"enterprise": {"redis_url": "redis://enterprise:6379/0", "multiplier": 5.0},
"premium": {"redis_url": "redis://premium:6379/0", "multiplier": 2.0},
"free": {"redis_url": None, "multiplier": 1.0},
}
async def tenant_middleware(request: Request, call_next):
"""Set up tenant-specific backend based on JWT"""
token = request.headers.get("authorization", "").split(" ")[1] if request.headers.get("authorization") else None
if token:
payload = jwt.decode(token, "secret", algorithms=["HS256"])
tier = payload.get("tenant_tier", "free")
tenant_id = payload.get("tenant_id", "default")
else:
tier, tenant_id = "free", "anonymous"
config = TENANT_CONFIG[tier]
# Select backend based on tenant tier
if config["redis_url"]:
backend = RedisBackend(
connection=config["redis_url"],
namespace=f"tenant_{tenant_id}",
persistent=True
)
else:
backend = InMemoryBackend(namespace=f"tenant_{tenant_id}")
# Execute within tenant's backend context
async with backend(request.app):
return await call_next(request)
app = FastAPI()
app.middleware("http")(tenant_middleware)
@app.get("/api/data")
async def get_data(request: Depends(api_throttle)):
return {"data": "tenant-specific data"}
```
**When to use dynamic backends:**
- โ
Multi-tenant SaaS with per-tenant storage
- โ
A/B testing different strategies
- โ
Environment-specific backends
- โ Simple shared storage (use explicit `backend` parameter instead)
**Important:** Dynamic backend resolution adds slight overhead and complexity. Only use when you need runtime backend switching.
### Application Lifespan Management
Properly manage backend lifecycle:
#### Starlette
```python
from starlette.applications import Starlette
from traffik.backends.inmemory import InMemoryBackend
backend = InMemoryBackend(namespace="app")
app = Starlette(
routes=[...],
lifespan=backend.lifespan, # Automatic cleanup
)
```
#### FastAPI
```python
from fastapi import FastAPI
from contextlib import asynccontextmanager
from traffik.backends.redis import RedisBackend
backend = RedisBackend(connection="redis://localhost:6379/0", namespace="app")
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
async with backend(app, persistent=True, close_on_exit=True):
yield
# Shutdown - backend cleanup automatic
app = FastAPI(lifespan=lifespan)
```
## Throttle Middleware
Apply rate limiting across multiple endpoints with sophisticated filtering and routing logic.
### Basic Middleware Setup
#### FastAPI
```python
from fastapi import FastAPI
from traffik.middleware import ThrottleMiddleware, MiddlewareThrottle
from traffik.throttles import HTTPThrottle
from traffik.backends.inmemory import InMemoryBackend
app = FastAPI()
backend = InMemoryBackend(namespace="api")
# Create throttle
api_throttle = HTTPThrottle(uid="api_global", rate="100/m")
# Wrap in middleware throttle
middleware_throttle = MiddlewareThrottle(api_throttle)
# Add middleware
app.add_middleware(
ThrottleMiddleware,
middleware_throttles=[middleware_throttle],
backend=backend
)
```
#### Starlette
```python
from starlette.applications import Starlette
from traffik.middleware import ThrottleMiddleware, MiddlewareThrottle
from traffik.throttles import HTTPThrottle
from traffik.backends.inmemory import InMemoryBackend
backend = InMemoryBackend(namespace="api")
api_throttle = HTTPThrottle(uid="api_throttle", rate="50/m")
middleware_throttle = MiddlewareThrottle(api_throttle)
app = Starlette(
routes=[...],
middleware=[
(ThrottleMiddleware, {
"middleware_throttles": [middleware_throttle],
"backend": backend
})
]
)
```
### Advanced Filtering
#### Method-Based Filtering
```python
# Strict limits for write operations
write_throttle = HTTPThrottle(uid="writes", rate="10/m")
read_throttle = HTTPThrottle(uid="reads", rate="1000/m")
write_middleware_throttle = MiddlewareThrottle(
write_throttle,
methods={"POST", "PUT", "DELETE"}
)
read_middleware_throttle = MiddlewareThrottle(
read_throttle,
methods={"GET", "HEAD"}
)
app.add_middleware(
ThrottleMiddleware,
middleware_throttles=[write_middleware_throttle, read_middleware_throttle],
backend=backend # If not set, uses app lifespan backend
)
```
#### Path Pattern Filtering
```python
# String patterns and regex
api_throttle = HTTPThrottle(uid="api", rate="100/m")
admin_throttle = HTTPThrottle(uid="admin", rate="5/m")
api_middleware_throttle = MiddlewareThrottle(
api_throttle,
path="/api/" # Starts with /api/
)
admin_middleware_throttle = MiddlewareThrottle(
admin_throttle,
path=r"^/admin/.*" # Regex pattern
)
app.add_middleware(
ThrottleMiddleware,
middleware_throttles=[admin_middleware_throttle, api_middleware_throttle],
backend=backend
)
```
#### Custom Hook-Based Filtering
```python
from starlette.requests import HTTPConnection
async def authenticated_only(connection: HTTPConnection) -> bool:
"""Apply throttle only to authenticated users"""
return connection.headers.get("authorization") is not None
async def intensive_operations(connection: HTTPConnection) -> bool:
"""Throttle resource-intensive endpoints"""
intensive_paths = ["/api/reports/", "/api/analytics/", "/api/exports/"]
return any(connection.scope["path"].startswith(p) for p in intensive_paths)
auth_throttle = HTTPThrottle(uid="auth", rate="200/m")
intensive_throttle = HTTPThrottle(uid="intensive", rate="20/m")
auth_middleware_throttle = MiddlewareThrottle(auth_throttle, hook=authenticated_only)
intensive_middleware_throttle = MiddlewareThrottle(intensive_throttle, hook=intensive_operations)
app.add_middleware(
ThrottleMiddleware,
middleware_throttles=[intensive_middleware_throttle, auth_middleware_throttle],
backend=backend
)
```
#### Combined Filtering
```python
async def authenticated_only(connection: HTTPConnection) -> bool:
return connection.headers.get("authorization") is not None
complex_throttle = HTTPThrottle(uid="complex", rate="25/m")
# Combines ALL criteria: path AND method AND hook
complex_middleware_throttle = MiddlewareThrottle(
complex_throttle,
path="/api/",
methods={"POST"},
hook=authenticated_only
)
```
### Best Practices
1. **Order Matters**: Place more specific throttles before general ones
2. **Early Placement**: Add throttle middleware early in the stack to reject requests before expensive processing
3. **Production Backends**: Use Redis for multi-instance deployments
4. **Monitoring**: Log throttle hits for capacity planning
5. **Graceful Responses**: Provide clear error messages with retry information
```python
# Example: Optimal middleware ordering
app.add_middleware(
ThrottleMiddleware,
middleware_throttles=[
admin_middleware_throttle, # Most specific
intensive_middleware_throttle, # Specific paths
api_middleware_throttle, # General API
],
backend=redis_backend
)
```
## Error Handling
Traffik provides specific exceptions for different scenarios:
### Exception Types
```python
from traffik.exceptions import (
TraffikException, # Base exception
ConfigurationError, # Invalid configuration
AnonymousConnection, # Cannot identify client
ConnectionThrottled, # Rate limit exceeded (HTTP 429)
BackendError, # Backend operation failed
)
```
### Handling Rate Limit Exceptions
The `ConnectionThrottled` exception is raised when a client exceeds their rate limit. It is an `HTTPException` so its is handled automatically by FastAPI/Starlette, returning a `429 Too Many Requests` response. However, you can still register a custom exception handler if you want to customize the response further.
```python
from fastapi import FastAPI, Request
from traffik.exceptions import ConnectionThrottled
from starlette.responses import JSONResponse
@app.exception_handler(ConnectionThrottled)
async def throttle_handler(request: Request, exc: ConnectionThrottled):
return JSONResponse(
status_code=429,
content={
"error": "rate_limit_exceeded",
"retry_after_seconds": exc.retry_after,
"message": exc.detail
},
headers={"Retry-After": str(exc.retry_after)}
)
```
### Handling Configuration Errors
```python
from traffik.exceptions import ConfigurationError, BackendConnectionError
try:
backend = RedisBackend(connection="invalid://url")
await backend.initialize()
except BackendConnectionError as exc:
logger.error(f"Failed to connect to Redis: {exc}")
except ConfigurationError as exc:
logger.error(f"Invalid configuration: {exc}")
```
### Handling Anonymous Connections
The default identifier raises `AnonymousConnection` if it cannot identify the client. You can catch this exception to provide a fallback identifier.
```python
from traffik.exceptions import AnonymousConnection
from traffik.backends.base import connection_identifier
async def safe_identifier(connection: HTTPConnection) -> str:
try:
return await connection_identifier(connection)
except AnonymousConnection:
# Fall back to a default identifier
return f"anonymous:{connection.scope['path']}"
```
## Testing
Comprehensive testing example using `InMemoryBackend`:
```python
import pytest
from httpx import AsyncClient, ASGITransport
from starlette.applications import Starlette
from starlette.routing import Route
from starlette.responses import JSONResponse
from traffik.backends.inmemory import InMemoryBackend
from traffik.throttles import HTTPThrottle
@pytest.fixture
async def backend():
backend = InMemoryBackend(namespace="test", persistent=False)
async with backend():
yield backend
@pytest.mark.anyio
async def test_throttling(backend):
throttle = HTTPThrottle(uid="test", rate="2/s")
async def endpoint(request):
await throttle(request)
return JSONResponse({"status": "ok"})
app = Starlette(
routes=[Route("/test", endpoint, methods=["GET"])],
lifespan=backend.lifespan,
)
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://testserver",
) as client:
# First 2 requests succeed
r1 = await client.get("/test")
r2 = await client.get("/test")
assert r1.status_code == 200
assert r2.status_code == 200
# Third request throttled
r3 = await client.get("/test")
assert r3.status_code == 429
```
### Testing Different Strategies
```python
from traffik.strategies import (
FixedWindowStrategy,
SlidingWindowLogStrategy,
TokenBucketStrategy,
)
@pytest.mark.parametrize("strategy", [
FixedWindowStrategy(),
SlidingWindowLogStrategy(),
TokenBucketStrategy(),
])
async def test_strategy(backend, strategy):
throttle = HTTPThrottle(uid="test", rate="5/s", strategy=strategy)
# Test throttle behavior with different strategies
```
## API Reference
### Throttle Classes
#### `HTTPThrottle`
HTTP request rate limiting with flexible configuration.
```python
HTTPThrottle(
uid: str, # Unique identifier
rate: Union[Rate, str], # "100/m" or Rate object
identifier: Optional[Callable], # Client ID function
handle_throttled: Optional[Callable], # Custom handler
strategy: Optional[Strategy], # Rate limiting strategy
backend: Optional[ThrottleBackend], # Storage backend
dynamic_backend: bool = False, # Runtime backend resolution
min_wait_period: Optional[int] = None, # Minimum wait (ms)
headers: Optional[Dict[str, str]] = None, # Extra headers to include in throttled response
)
```
#### `WebSocketThrottle`
WebSocket message rate limiting.
```python
WebSocketThrottle(
uid: str,
rate: Union[Rate, str],
identifier: Optional[Callable],
handle_throttled: Optional[Callable],
strategy: Optional[Strategy],
backend: Optional[ThrottleBackend],
dynamic_backend: bool = False,
min_wait_period: Optional[int] = None,
headers: Optional[Dict[str, str]] = None,
)
```
#### `BaseThrottle`
Base class for custom throttle implementations. Override `get_key()` for custom key generation.
### Strategy Classes
All strategies are dataclasses with frozen=True for immutability.
#### `FixedWindowStrategy()` (Default)
Simple fixed-window counting. Fast and memory-efficient.
#### `SlidingWindowLogStrategy()`
Most accurate, maintains request log. Memory: O(limit).
#### `SlidingWindowCounterStrategy()`
Sliding window with counters. Balance between accuracy and memory.
#### `TokenBucketStrategy(burst_size: Optional[int] = None)`
Allows controlled bursts. `burst_size` defaults to rate limit.
#### `TokenBucketWithDebtStrategy(max_debt: Optional[int] = None)`
Token bucket with debt tracking for smoother recovery.
#### `LeakyBucketStrategy()`
Perfectly smooth traffic output. No bursts allowed.
#### `LeakyBucketWithQueueStrategy(queue_size: Optional[int] = None)`
Leaky bucket with request queuing.
### Backend Classes
#### `InMemoryBackend`
```python
InMemoryBackend(
namespace: str = "inmemory",
persistent: bool = False,
)
```
#### `RedisBackend`
```python
RedisBackend(
connection: Union[str, Redis],
namespace: str,
persistent: bool = True,
)
```
#### `MemcachedBackend`
```python
MemcachedBackend(
host: str = "localhost",
port: int = 11211,
namespace: str = "memcached",
pool_size: int = 2,
persistent: bool = False,
)
```
### Rate Class
```python
Rate(
limit: int,
milliseconds: int = 0,
seconds: int = 0,
minutes: int = 0,
hours: int = 0,
)
# Or use string format
Rate.parse("100/m") # 100 per minute
Rate.parse("5/10s") # 5 per 10 seconds
```
### Middleware Classes
#### `MiddlewareThrottle`
```python
MiddlewareThrottle(
throttle: BaseThrottle,
path: Optional[Union[str, Pattern]] = None,
methods: Optional[Set[str]] = None,
hook: Optional[Callable] = None,
)
```
#### `ThrottleMiddleware`
```python
ThrottleMiddleware(
app: ASGIApp,
middleware_throttles: List[MiddlewareThrottle],
backend: Optional[ThrottleBackend] = None,
)
```
## Contributing
We welcome contributions! Please see our [Contributing Guide](CONTRIBUTING.md) for details.
### Development Setup
```bash
git clone https://github.com/ti-oluwa/traffik.git
cd traffik
uv sync --extra dev
# Run tests
uv run pytest
# Run linting
uv run ruff check src/ tests/ --fix
# Run formatting
uv run ruff format src/ tests/
```
## License
This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.
## Changelog
See [CHANGELOG.md](CHANGELOG.md) for version history and changes.
Raw data
{
"_id": null,
"home_page": null,
"name": "traffik",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.9",
"maintainer_email": "tioluwa <tioluwa.dev@gmail.com>",
"keywords": "api, distributed-rate-limiting, fastapi, rate-limiting, starlette, throttling",
"author": null,
"author_email": "tioluwa <tioluwa.dev@gmail.com>",
"download_url": "https://files.pythonhosted.org/packages/85/30/3789ab0dfee538a7f984456cf55fdfe2b805aa305164e7a71e2fd219e252/traffik-1.0.0b1.tar.gz",
"platform": null,
"description": "# Traffik - A Starlette throttling library\n\n[](https://github.com/ti-oluwa/traffik/actions/workflows/test.yaml)\n[](https://github.com/ti-oluwa/traffik/actions/workflows/code-quality.yaml)\n[](https://codecov.io/gh/ti-oluwa/traffik)\n<!-- [](https://badge.fury.io/py/traffik)\n[](https://pypi.org/project/traffik/) -->\n\nTraffik provides flexible rate limiting for Starlette and FastAPI applications with support for both HTTP and WebSocket connections. It offers multiple rate limiting strategies including Fixed Window, Sliding Window, Token Bucket, and Leaky Bucket algorithms, allowing you to choose the approach that best fits your use case.\n\nThe library features pluggable backends (in-memory, Redis, Memcached), dynamic backend resolution for special applications. Whether you need simple per-endpoint limits or complex distributed rate limiting, Traffik provides the flexibility and robustness to handle your requirements.\n\nTraffik was inspired by [fastapi-limiter](https://github.com/long2ice/fastapi-limiter) but has evolved into a more comprehensive solution with more advanced features.\n\n## Features\n\n- \ud83d\ude80 **Easy Integration**: Decorator, dependency, and middleware-based throttling\n- \ud83c\udfaf **Multiple Strategies**: Fixed Window (default), Sliding Window, Token Bucket, Leaky Bucket\n- \ud83d\udd04 **Multiple Backends**: In-memory, Redis, Memcached with atomic operation support\n- \ud83c\udf10 **Protocol Support**: Both HTTP and WebSocket throttling\n- \ud83c\udfe2 **Dynamic Backend Resolution**: Support for runtime backend switching\n- \ud83d\udd27 **Dependency Injection Friendly**: Works well with FastAPI's dependency injection system.\n- \ud83d\udcca **Smart Client Identification**: IP-based by default, fully customizable\n- \u2699\ufe0f **Flexible and Extensible API**: Easily extend base functionality\n- \ud83e\uddea **Thoroughly Tested**: Comprehensive test suite covering concurrency and multithreading\n\n## Installation\n\nWe recommend using `uv`, however, it is not a strict requirement.\n\n### Install `uv` (optional)\n\nVisit the [uv documentation](https://docs.astral.sh/uv/getting-started/installation/) for installation instructions.\n\n### Basic Installation\n\n```bash\nuv add traffik\n\n# or using pip\npip install traffik\n```\n\nInstall with FastAPI support:\n\n```bash\nuv add \"traffik[fastapi]\"\n\n# or using pip\npip install \"traffik[fastapi]\"\n```\n\n### With Redis Backend\n\n```bash\nuv add \"traffik[redis]\"\n\n# or using pip\npip install \"traffik[redis]\"\n```\n\n### With Memcached Backend\n\n```bash\nuv add \"traffik[memcached]\"\n\n# or using pip\npip install \"traffik[memcached]\"\n```\n\n### With All Features\n\n```bash\nuv add \"traffik[all]\"\n\n# or using pip\npip install \"traffik[all]\"\n```\n\n### Development Installation\n\n```bash\ngit clone https://github.com/your-username/traffik.git\ncd traffik\n\nuv sync --extra dev\n\n# or using pip\npip install -e .[dev]\n```\n\n## Quick Testing with Docker\n\nFor quick testing across different platforms and Python versions:\n\n```bash\n# Run fast tests\n./docker-test.sh test-fast\n\n# Run full test suite\n./docker-test.sh test\n\n# Start development environment\n./docker-test.sh dev\n\n# Test across Python versions\n./docker-test.sh test-matrix\n```\n\n**Testing Documentation:**\n\n- [DOCKER.md](DOCKER.md) - Complete Docker testing guide\n- [TESTING.md](TESTING.md) - Quick testing guide \n\n## Quick Start Guide\n\n### 1. Basic HTTP Throttling with Starlette\n\n```python\nfrom starlette.applications import Starlette\nfrom starlette.routing import Route\nfrom starlette.requests import Request\nfrom starlette.responses import JSONResponse\nfrom traffik.throttles import HTTPThrottle\nfrom traffik.backends.inmemory import InMemoryBackend\n\n# Create backend\nbackend = InMemoryBackend(namespace=\"myapp\", persistent=False)\n\n# Create throttle with string rate format\nthrottle = HTTPThrottle(\n uid=\"basic_limit\",\n rate=\"5/10s\", # 5 requests per 10 seconds\n)\n\nasync def throttled_endpoint(request: Request):\n await throttle(request)\n return JSONResponse({\"message\": \"Success\"})\n\napp = Starlette(\n routes=[\n Route(\"/api/data\", throttled_endpoint, methods=[\"GET\"]),\n ],\n lifespan=backend.lifespan,\n)\n```\n\n### 2. FastAPI with Dependency Injection\n\n```python\nfrom fastapi import FastAPI, Depends\nfrom contextlib import asynccontextmanager\nfrom traffik.backends.inmemory import InMemoryBackend\nfrom traffik.throttles import HTTPThrottle\n\n# Create backend\nbackend = InMemoryBackend(namespace=\"api\")\n\n# Setup lifespan\n@asynccontextmanager\nasync def lifespan(app: FastAPI):\n async with backend(app, persistent=True, close_on_exit=True):\n yield\n\napp = FastAPI(lifespan=lifespan)\n\n# Create throttle\nthrottle = HTTPThrottle(uid=\"endpoint_limit\", rate=\"10/m\")\n\n@app.get(\"/api/hello\", dependencies=[Depends(throttle)])\nasync def say_hello():\n return {\"message\": \"Hello World\"}\n```\n\n### 3. Using Decorators (FastAPI Only)\n\n```python\nfrom fastapi import FastAPI\nfrom contextlib import asynccontextmanager\nfrom traffik.decorators import throttled\nfrom traffik.throttles import HTTPThrottle\nfrom traffik.backends.redis import RedisBackend\n\nbackend = RedisBackend(\n connection=\"redis://localhost:6379/0\",\n namespace=\"api\",\n persistent=True,\n)\n\n@asynccontextmanager\nasync def lifespan(app: FastAPI):\n async with backend(app):\n yield\n\napp = FastAPI(lifespan=lifespan)\n\n@app.get(\"/api/limited\")\n@throttled(HTTPThrottle(uid=\"limited\", rate=\"5/m\"))\nasync def limited_endpoint():\n return {\"data\": \"Limited access\"}\n```\n\n### 4. WebSocket Throttling\n\n```python\nfrom traffik.throttles import WebSocketThrottle\nfrom starlette.websockets import WebSocket\nfrom starlette.exceptions import HTTPException\n\nws_throttle = WebSocketThrottle(uid=\"ws_messages\", rate=\"3/10s\")\n\nasync def ws_endpoint(websocket: WebSocket) -> None:\n await websocket.accept()\n \n while True:\n try:\n data = await websocket.receive_json()\n await ws_throttle(websocket) # Rate limit per message\n \n await websocket.send_json({\n \"status\": \"success\",\n \"data\": data,\n })\n except HTTPException as exc:\n await websocket.send_json({\n \"status\": \"error\", \n \"status_code\": exc.status_code,\n \"detail\": exc.detail,\n })\n break\n \n await websocket.close()\n```\n\n## Rate Limiting Strategies\n\nTraffik supports multiple rate limiting strategies, each with different trade-offs. The **Fixed Window strategy is used by default** for its simplicity and performance.\n\n### Fixed Window (Default)\n\nDivides time into fixed windows and counts requests within each window. Simple, fast, and memory-efficient.\n\n**Pros:** Simple, constant memory, fast \n**Cons:** Can allow bursts at window boundaries (up to 2x limit)\n\n```python\nfrom traffik.strategies import FixedWindowStrategy\n\nthrottle = HTTPThrottle(\n uid=\"api_limit\",\n rate=\"100/m\",\n strategy=FixedWindowStrategy() # Default, can be omitted\n)\n```\n\n### Sliding Window\n\nMost accurate rate limiting with continuous sliding window evaluation. Prevents boundary exploitation.\n\n**Pros:** Most accurate, no boundary issues \n**Cons:** Higher memory usage (O(limit) per key)\n\n```python\nfrom traffik.strategies import SlidingWindowLogStrategy\n\nthrottle = HTTPThrottle(\n uid=\"payment_api\",\n rate=\"10/m\",\n strategy=SlidingWindowLogStrategy()\n)\n```\n\n### Token Bucket\n\nAllows controlled bursts while maintaining average rate over time. Tokens refill continuously.\n\n**Pros:** Allows bursts, smooth distribution, self-recovering \n**Cons:** Slightly more complex\n\n```python\nfrom traffik.strategies import TokenBucketStrategy\n\nthrottle = HTTPThrottle(\n uid=\"user_api\",\n rate=\"100/m\",\n strategy=TokenBucketStrategy(burst_size=150) # Allow bursts up to 150\n)\n```\n\n### Leaky Bucket\n\nEnforces perfectly smooth traffic output. No bursts allowed.\n\n**Pros:** Smooth output, protects downstream services \n**Cons:** Less forgiving, may reject legitimate bursts\n\n```python\nfrom traffik.strategies import LeakyBucketStrategy\n\nthrottle = HTTPThrottle(\n uid=\"third_party_api\",\n rate=\"50/m\",\n strategy=LeakyBucketStrategy()\n)\n```\n\n## Writing Custom Strategies\n\nYou can create custom rate limiting strategies by implementing a callable that follows the strategy protocol. This allows you to implement specialized rate limiting logic tailored to your specific needs.\n\n### Strategy Protocol\n\nA strategy is a callable (function or class with `__call__`) that takes three parameters and returns a wait period:\n\n```python\nfrom traffik.backends.base import ThrottleBackend\nfrom traffik.rates import Rate\nfrom traffik.types import Stringable, WaitPeriod\n\nasync def my_strategy(\n key: Stringable, \n rate: Rate, \n backend: ThrottleBackend\n) -> WaitPeriod:\n \"\"\"\n :param key: The throttling key (e.g., \"user:123\", \"ip:192.168.1.1\")\n :param rate: Rate limit definition with limit and expire properties\n :param backend: Backend instance for storage operations\n :return: Wait time in milliseconds (0.0 if request allowed)\n \"\"\"\n # Your rate limiting logic here\n return 0.0 # Allow request\n```\n\n### Example: Simple Rate Strategy\n\nHere's a basic example that implements a simple counter-based strategy:\n\n```python\nfrom dataclasses import dataclass\nfrom traffik.backends.base import ThrottleBackend\nfrom traffik.rates import Rate\nfrom traffik.types import Stringable, WaitPeriod\nfrom traffik.utils import time\n\n\nasync def simple_counter_strategy(\n self, key: Stringable, rate: Rate, backend: ThrottleBackend\n) -> WaitPeriod:\n \"\"\"\n Simple counter-based rate limiting.\n \n Counts requests and resets counter when expired.\n \"\"\"\n if rate.unlimited:\n return 0.0\n \n now = time() * 1000 # Current time in milliseconds\n full_key = await backend.get_key(str(key))\n counter_key = f\"{full_key}:counter\"\n timestamp_key = f\"{full_key}:timestamp\"\n \n ttl_seconds = int(rate.expire // 1000) + 1\n \n async with await backend.lock(f\"lock:{counter_key}\", blocking=True, blocking_timeout=1):\n # Get current count and timestamp\n count_str = await backend.get(counter_key)\n timestamp_str = await backend.get(timestamp_key)\n \n if count_str and timestamp_str:\n count = int(count_str)\n timestamp = float(timestamp_str)\n \n # Check if window has expired\n if now - timestamp > rate.expire:\n # Reset counter for new window\n count = 1\n await backend.set(counter_key, \"1\", expire=ttl_seconds)\n await backend.set(timestamp_key, str(now), expire=ttl_seconds)\n else:\n # Increment counter\n count = await backend.increment(counter_key)\n else:\n # First request\n count = 1\n await backend.set(counter_key, \"1\", expire=ttl_seconds)\n await backend.set(timestamp_key, str(now), expire=ttl_seconds)\n \n # Check if limit exceeded\n if count > rate.limit:\n # Calculate wait time until window expires\n timestamp = float(await backend.get(timestamp_key))\n elapsed = now - timestamp\n wait_ms = rate.expire - elapsed\n return max(wait_ms, 0.0)\n \n return 0.0\n\n# Usage\nthrottle = HTTPThrottle(\n uid=\"simple\",\n rate=\"10/m\",\n strategy=simple_counter_strategy,\n)\n```\n\n### Example: Adaptive Rate Strategy\n\nA more advanced example that adapts the rate limit based on backend load:\n\n```python\nfrom dataclasses import dataclass\nfrom traffik.backends.base import ThrottleBackend\nfrom traffik.rates import Rate\nfrom traffik.types import Stringable, WaitPeriod\nfrom traffik.utils import time\n\n@dataclass(frozen=True)\nclass AdaptiveRateStrategy:\n \"\"\"\n Adaptive rate limiting that adjusts based on system load.\n \n Reduces limits during high load, increases during low load.\n \"\"\"\n \n load_threshold: float = 0.8 # 80% of limit triggers adaptation\n reduction_factor: float = 0.5 # Reduce to 50% during high load\n \n async def __call__(\n self, key: Stringable, rate: Rate, backend: ThrottleBackend\n ) -> WaitPeriod:\n if rate.unlimited:\n return 0.0\n \n now = time() * 1000\n window_duration_ms = rate.expire\n current_window = int(now // window_duration_ms)\n \n full_key = await backend.get_key(str(key))\n counter_key = f\"{full_key}:adaptive:{current_window}\"\n load_key = f\"{full_key}:load\"\n ttl_seconds = int(window_duration_ms // 1000) + 1\n \n async with await backend.lock(f\"lock:{counter_key}\", blocking=True, blocking_timeout=1):\n # Increment request counter\n count = await backend.increment_with_ttl(counter_key, amount=1, ttl=ttl_seconds)\n \n # Calculate current load percentage\n load_percentage = count / rate.limit\n \n # Determine effective limit based on load\n if load_percentage > self.load_threshold:\n # High load - reduce effective limit\n effective_limit = int(rate.limit * self.reduction_factor)\n await backend.set(load_key, \"high\", expire=ttl_seconds)\n else:\n # Normal load - use full limit\n effective_limit = rate.limit\n await backend.set(load_key, \"normal\", expire=ttl_seconds)\n \n # Check against effective limit\n if count > effective_limit:\n # Calculate wait time\n time_in_window = now % window_duration_ms\n wait_ms = window_duration_ms - time_in_window\n return wait_ms\n \n return 0.0\n\n# Usage\nthrottle = HTTPThrottle(\n uid=\"adaptive_api\",\n rate=\"1000/h\",\n strategy=AdaptiveRateStrategy(load_threshold=0.7, reduction_factor=0.6)\n)\n```\n\n### Example: Priority-Based Strategy\n\nA strategy that implements priority queuing:\n\n```python\nfrom dataclasses import dataclass\nfrom enum import IntEnum\nfrom traffik.backends.base import ThrottleBackend\nfrom traffik.rates import Rate\nfrom traffik.types import Stringable, WaitPeriod\nfrom traffik.utils import time, dump_json, load_json, JSONDecodeError\n\nclass Priority(IntEnum):\n \"\"\"Request priority levels\"\"\"\n LOW = 1\n NORMAL = 2\n HIGH = 3\n CRITICAL = 4\n\n@dataclass(frozen=True)\nclass PriorityQueueStrategy:\n \"\"\"\n Rate limiting with priority queue.\n \n Higher priority requests are processed first when at capacity.\n \"\"\"\n \n default_priority: Priority = Priority.NORMAL\n \n def _extract_priority(self, key: str) -> Priority:\n \"\"\"Extract priority from key (format: \"priority:<level>:user:123\")\"\"\"\n if key.startswith(\"priority:\"):\n try:\n level = int(key.split(\":\")[1])\n return Priority(level)\n except (ValueError, IndexError):\n pass\n return self.default_priority\n \n async def __call__(\n self, key: Stringable, rate: Rate, backend: ThrottleBackend\n ) -> WaitPeriod:\n if rate.unlimited:\n return 0.0\n \n now = time() * 1000\n priority = self._extract_priority(str(key))\n \n full_key = await backend.get_key(str(key))\n queue_key = f\"{full_key}:priority_queue\"\n ttl_seconds = int(rate.expire // 1000) + 1\n \n async with await backend.lock(f\"lock:{queue_key}\", blocking=True, blocking_timeout=1):\n # Get current queue\n queue_json = await backend.get(queue_key)\n if queue_json:\n try:\n queue = load_json(queue_json)\n except JSONDecodeError:\n queue = []\n else:\n queue = []\n \n # Remove expired entries\n queue = [\n entry for entry in queue \n if now - entry[\"timestamp\"] < rate.expire\n ]\n \n # Count requests with higher or equal priority\n higher_priority_count = sum(\n 1 for entry in queue \n if entry[\"priority\"] >= priority\n )\n \n # Check if request can be processed\n if higher_priority_count >= rate.limit:\n # Calculate wait time based on oldest high-priority entry\n oldest_high_priority = min(\n (entry[\"timestamp\"] for entry in queue if entry[\"priority\"] >= priority),\n default=now\n )\n wait_ms = rate.expire - (now - oldest_high_priority)\n return max(wait_ms, 0.0)\n \n # Add current request to queue\n queue.append({\n \"timestamp\": now,\n \"priority\": priority,\n \"key\": str(key)\n })\n \n # Sort by priority (descending) and timestamp (ascending)\n queue.sort(key=lambda x: (-x[\"priority\"], x[\"timestamp\"]))\n \n # Store updated queue\n await backend.set(queue_key, dump_json(queue), expire=ttl_seconds)\n return 0.0\n\n# Usage\nfrom traffik.throttles import HTTPThrottle\n\nasync def priority_identifier(connection):\n \"\"\"Extract priority from request headers\"\"\"\n priority = connection.headers.get(\"X-Priority\", \"2\")\n user_id = extract_user_id(connection)\n return f\"priority:{priority}:user:{user_id}\"\n\nthrottle = HTTPThrottle(\n uid=\"priority_api\",\n rate=\"100/m\",\n strategy=PriorityQueueStrategy(),\n identifier=priority_identifier\n)\n```\n\n### Best Practices for Custom Strategies\n\n1. **Always use locks**: Wrap critical sections with `backend.lock()` to ensure atomicity\n\n ```python\n async with await backend.lock(f\"lock:{key}\", blocking=True, blocking_timeout=1):\n # Critical section\n ```\n\n2. **Set appropriate TTLs**: Always set expiration times to prevent memory leaks\n\n ```python\n ttl_seconds = int(rate.expire // 1000) + 1 # +1 second buffer\n await backend.set(key, value, expire=ttl_seconds)\n ```\n\n3. **Handle unlimited rates**: Check for unlimited rates early\n\n ```python\n if rate.unlimited:\n return 0.0\n ```\n\n4. **Use proper key prefixes**: Namespace your strategy's keys to avoid conflicts\n\n ```python\n full_key = await backend.get_key(str(key))\n strategy_key = f\"{full_key}:mystrategy:data\"\n ```\n\n5. **Return milliseconds**: Always return wait time in milliseconds\n\n ```python\n return wait_ms # Not wait_seconds * 1000\n ```\n\n6. **Use dataclasses**: Make strategies immutable and configurable\n\n ```python\n @dataclass(frozen=True)\n class MyStrategy:\n param1: int = 10\n param2: float = 0.5\n ```\n\n7. **Handle errors gracefully**: Catch and handle JSON decode errors, type errors, etc.\n\n ```python\n try:\n data = load_json(json_str)\n except JSONDecodeError:\n data = default_value\n ```\n\n### Testing Custom Strategies\n\n```python\nimport pytest\nfrom traffik.backends.inmemory import InMemoryBackend\nfrom traffik.rates import Rate\n\n@pytest.mark.anyio\nasync def test_custom_strategy():\n backend = InMemoryBackend(namespace=\"test\")\n async with backend(close_on_exit=True):\n strategy = MyCustomStrategy()\n rate = Rate.parse(\"5/s\")\n \n # Test allowing requests\n for i in range(5):\n wait = await strategy(f\"user:123\", rate, backend)\n assert wait == 0.0, f\"Request {i+1} should be allowed\"\n \n # Test throttling\n wait = await strategy(f\"user:123\", rate, backend)\n assert wait > 0, \"Request 6 should be throttled\"\n \n # Verify wait time is reasonable\n assert wait <= rate.expire, \"Wait time should not exceed window\"\n```\n\n## Backends\n\nTraffik provides three backend implementations with full atomic operation support and distributed locking.\n\n### In-Memory Backend\n\nPerfect for development, testing, and single-instance applications:\n\n```python\nfrom traffik.backends.inmemory import InMemoryBackend\n\nbackend = InMemoryBackend(\n namespace=\"myapp\",\n persistent=False, # Don't persist across restarts\n)\n```\n\n**Pros:**\n\n- No external dependencies\n- Fast and simple\n- Great for development and testing\n\n**Cons:**\n\n- Not suitable for multi-process/distributed systems\n- Data lost on restart (even with `persistent=True`)\n\n### Redis Backend\n\nRecommended for production with distributed systems:\n\n```python\nfrom traffik.backends.redis import RedisBackend\n\n# From connection string\nbackend = RedisBackend(\n connection=\"redis://localhost:6379/0\",\n namespace=\"myapp\",\n persistent=True,\n)\n\n# From Redis client factory\nimport redis.asyncio as redis\n\ndef get_client() -> redis.Redis:\n return redis.Redis(host='localhost', port=6379, db=0)\n\nbackend = RedisBackend(\n connection=get_client,\n namespace=\"myapp\",\n)\n```\n\n**Features:**\n\n- Distributed locks using Redlock algorithm\n- Production-ready persistence\n\n**Pros:**\n\n- Multi-process and distributed support\n- Persistence across restarts\n- Battle-tested for production\n\n**Cons:**\n\n- Requires Redis server\n- Additional infrastructure\n\n### Memcached Backend\n\nLightweight distributed caching solution:\n\n```python\nfrom traffik.backends.memcached import MemcachedBackend\n\nbackend = MemcachedBackend(\n host=\"localhost\",\n port=11211,\n namespace=\"myapp\",\n pool_size=10,\n)\n```\n\n**Features:**\n\n- Lightweight distributed locks\n- Atomic operations via CAS (Compare-And-Swap)\n- Connection pooling\n- Fast in-memory storage\n\n**Pros:**\n\n- Lightweight and fast\n- Good for high-throughput scenarios\n- Simple deployment\n\n**Cons:**\n\n- Less feature-rich than Redis\n\n### Custom Backends\n\nCreate custom backends by subclassing `ThrottleBackend`:\n\n```python\nimport typing\n\nfrom traffik.backends.base import ThrottleBackend\nfrom traffik.types import HTTPConnectionT, AsyncLock\n\nclass CustomBackend(ThrottleBackend[typing.Dict, HTTPConnectionT]):\n \"\"\"Custom backend with your storage solution\"\"\"\n \n async def initialize(self) -> None:\n \"\"\"Setup connection/resources\"\"\"\n pass\n \n async def get(self, key: str) -> typing.Optional[str]:\n \"\"\"Get value for key\"\"\"\n pass\n \n async def set(self, key: str, value: str, expire: typing.Optional[int] = None) -> None:\n \"\"\"Set value with optional TTL\"\"\"\n pass\n \n async def delete(self, key: str) -> None:\n \"\"\"Delete key\"\"\"\n pass\n \n async def increment(self, key: str, amount: int = 1) -> int:\n \"\"\"Atomically increment counter\"\"\"\n pass\n \n async def decrement(self, key: str, amount: int = 1) -> int:\n \"\"\"Atomically decrement counter\"\"\"\n pass\n \n async def increment_with_ttl(self, key: str, amount: int, ttl: int) -> int:\n \"\"\"Atomically increment with TTL set\"\"\"\n pass\n \n async def get_lock(self, name: str) -> AsyncLock:\n \"\"\"Get distributed lock\"\"\"\n pass\n \n async def reset(self) -> None:\n \"\"\"Clear all data\"\"\"\n pass\n \n async def close(self) -> None:\n \"\"\"Cleanup resources\"\"\"\n pass\n```\n\n### Throttle Backend Selection\n\nYou can specify the backend for each throttle individually or allow them to share a common backend. The shared backend is usually the one set up in your application lifespan.\n\n```python\nfrom fastapi import FastAPI, Depends, Request\n\nfrom traffik.backends.redis import RedisBackend\nfrom traffik.throttles import HTTPThrottle\n\nshared_redis_backend = RedisBackend(\n connection=\"redis://localhost:6379/0\",\n namespace=\"shared\",\n)\napp = FastAPI(lifespan=shared_redis_backend.lifespan)\n\nthrottle_with_own_backend = HTTPThrottle(\n uid=\"custom_backend\",\n rate=\"100/m\",\n backend=MyCustomBackend(), # Uses its own backend\n)\n# Uses backend from app lifespan\nthrottle_with_shared_backend = HTTPThrottle(\n uid=\"shared_backend\",\n rate=\"50/m\",\n)\n\n@app.get(\"/api/custom\", dependencies=[Depends(throttle_with_shared_backend)])\nasync def endpoint_custom(request: Request = Depends(throttle_with_own_backend)):\n return {\"message\": \"Uses its own backend\"}\n\n```\n\n## Configuration Options\n\n### Rate Format\n\nTraffik supports flexible rate specification using string format or `Rate` objects:\n\n```python\nfrom traffik import Rate\nfrom traffik.throttles import HTTPThrottle\n\n# String format (recommended)\nthrottle = HTTPThrottle(uid=\"api_v1\", rate=\"100/m\") # 100 per minute\nthrottle = HTTPThrottle(uid=\"api_v2\", rate=\"5/10s\") # 5 per 10 seconds\nthrottle = HTTPThrottle(uid=\"api_v3\", rate=\"1000/500ms\") # 1000 per 500ms\n\n# Supported units: ms, s/sec/second, m/min/minute, h/hr/hour, d/day\nthrottle = HTTPThrottle(uid=\"daily\", rate=\"10000/d\") # 10000 per day\n\n# Rate object for complex configurations\nrate = Rate(limit=100, minutes=5, seconds=30) # 100 per 5.5 minutes\nthrottle = HTTPThrottle(uid=\"complex\", rate=rate)\n```\n\n### Custom Client Identification\n\nCustomize how clients are identified for rate limiting:\n\n```python\nfrom starlette.requests import HTTPConnection\nfrom traffik.throttles import HTTPThrottle\n\nasync def user_based_identifier(connection: HTTPConnection) -> str:\n \"\"\"Identify by user ID from JWT token\"\"\"\n user_id = extract_user_id(connection.headers.get(\"authorization\"))\n return f\"user:{user_id}\"\n\nthrottle = HTTPThrottle(\n uid=\"user_limit\",\n rate=\"100/h\",\n identifier=user_based_identifier,\n)\n```\n\n### Exempting Connections\n\nSkip throttling for specific clients by returning `UNLIMITED`:\n\n```python\nimport typing\nfrom starlette.requests import HTTPConnection\nfrom traffik import UNLIMITED\nfrom traffik.throttles import HTTPThrottle\n\n\nasync def admin_bypass_identifier(connection: HTTPConnection) -> typing.Any:\n \"\"\"Bypass throttling for admin users\"\"\"\n user_role = extract_role(connection.headers.get(\"authorization\"))\n if user_role == \"admin\":\n return UNLIMITED # Admins bypass throttling\n \n # Regular users get normal identification\n user_id = extract_user_id(connection.headers.get(\"authorization\"))\n return f\"user:{user_id}\"\n\nthrottle = HTTPThrottle(\n uid=\"api_limit\",\n rate=\"50/m\",\n identifier=admin_bypass_identifier,\n)\n```\n\n### Custom Throttled Response\n\nCustomize the response when rate limits are exceeded:\n\n```python\nfrom starlette.requests import HTTPConnection\nfrom starlette.exceptions import HTTPException\n\nasync def custom_throttled_handler(\n connection: HTTPConnection, \n wait_ms: int,\n *args,\n **kwargs\n):\n wait_seconds = wait_ms // 1000\n raise HTTPException(\n status_code=429,\n detail={\n \"error\": \"rate_limit_exceeded\",\n \"message\": f\"Too many requests. Retry in {wait_seconds}s\",\n \"retry_after\": wait_seconds\n },\n headers={\"Retry-After\": str(wait_seconds)},\n )\n\nthrottle = HTTPThrottle(\n uid=\"api\",\n rate=\"100/m\",\n handle_throttled=custom_throttled_handler,\n)\n```\n\n## More on Usage\n\n### Multiple Rate Limits per Endpoint\n\nApply both burst and sustained limits:\n\n```python\nfrom fastapi import FastAPI, Depends\nfrom traffik.throttles import HTTPThrottle\n\n# Burst limit: 10 per minute\nburst_limit = HTTPThrottle(uid=\"burst\", rate=\"10/m\")\n\n# Sustained limit: 100 per hour\nsustained_limit = HTTPThrottle(uid=\"sustained\", rate=\"100/h\")\n\n@app.get(\n \"/api/data\",\n dependencies=[Depends(burst_limit), Depends(sustained_limit)]\n)\nasync def get_data():\n return {\"data\": \"value\"}\n```\n\n### Per-User Rate Limiting\n\n```python\nfrom traffik.throttles import HTTPThrottle\nfrom starlette.requests import Request\n\nasync def user_identifier(request: Request) -> str:\n user_id = extract_user_from_token(request.headers.get(\"authorization\"))\n return f\"user:{user_id}\"\n\nuser_throttle = HTTPThrottle(\n uid=\"user_quota\",\n rate=\"1000/h\",\n identifier=user_identifier,\n)\n```\n\n### Strategy Selection Based on Use Case\n\n```python\nfrom traffik.throttles import HTTPThrottle\nfrom traffik.strategies import (\n FixedWindowStrategy,\n SlidingWindowLogStrategy,\n TokenBucketStrategy,\n LeakyBucketStrategy,\n)\n\n# Public API - allow bursts\npublic_api = HTTPThrottle(\n uid=\"public\",\n rate=\"100/m\",\n strategy=TokenBucketStrategy(burst_size=150),\n)\n\n# Payment API - strict enforcement\npayment_api = HTTPThrottle(\n uid=\"payments\",\n rate=\"10/m\",\n strategy=SlidingWindowLogStrategy(),\n)\n\n# Third-party API - smooth output\nexternal_api = HTTPThrottle(\n uid=\"external\",\n rate=\"50/m\",\n strategy=LeakyBucketStrategy(),\n)\n\n# Simple rate limiting - default\nsimple_api = HTTPThrottle(\n uid=\"simple\",\n rate=\"200/m\",\n # Uses `FixedWindowStrategy` by default\n)\n```\n\n### Dynamic Backend Resolution for Multi-Tenant Applications\n\nEnable runtime backend switching for multi-tenant SaaS applications:\n\n```python\nfrom fastapi import FastAPI, Request, Depends\nfrom traffik.throttles import HTTPThrottle\nfrom traffik.backends.redis import RedisBackend\nfrom traffik.backends.inmemory import InMemoryBackend\nimport jwt\n\n# Shared throttle with dynamic backend resolution\napi_throttle = HTTPThrottle(\n uid=\"api_quota\",\n rate=\"1000/h\",\n dynamic_backend=True, # Resolve backend at runtime\n)\n\nTENANT_CONFIG = {\n \"enterprise\": {\"redis_url\": \"redis://enterprise:6379/0\", \"multiplier\": 5.0},\n \"premium\": {\"redis_url\": \"redis://premium:6379/0\", \"multiplier\": 2.0},\n \"free\": {\"redis_url\": None, \"multiplier\": 1.0},\n}\n\nasync def tenant_middleware(request: Request, call_next):\n \"\"\"Set up tenant-specific backend based on JWT\"\"\"\n token = request.headers.get(\"authorization\", \"\").split(\" \")[1] if request.headers.get(\"authorization\") else None\n \n if token:\n payload = jwt.decode(token, \"secret\", algorithms=[\"HS256\"])\n tier = payload.get(\"tenant_tier\", \"free\")\n tenant_id = payload.get(\"tenant_id\", \"default\")\n else:\n tier, tenant_id = \"free\", \"anonymous\"\n \n config = TENANT_CONFIG[tier]\n \n # Select backend based on tenant tier\n if config[\"redis_url\"]:\n backend = RedisBackend(\n connection=config[\"redis_url\"],\n namespace=f\"tenant_{tenant_id}\",\n persistent=True\n )\n else:\n backend = InMemoryBackend(namespace=f\"tenant_{tenant_id}\")\n \n # Execute within tenant's backend context\n async with backend(request.app):\n return await call_next(request)\n\napp = FastAPI()\napp.middleware(\"http\")(tenant_middleware)\n\n@app.get(\"/api/data\")\nasync def get_data(request: Depends(api_throttle)):\n return {\"data\": \"tenant-specific data\"}\n```\n\n**When to use dynamic backends:**\n\n- \u2705 Multi-tenant SaaS with per-tenant storage\n- \u2705 A/B testing different strategies\n- \u2705 Environment-specific backends\n- \u274c Simple shared storage (use explicit `backend` parameter instead)\n\n**Important:** Dynamic backend resolution adds slight overhead and complexity. Only use when you need runtime backend switching.\n\n### Application Lifespan Management\n\nProperly manage backend lifecycle:\n\n#### Starlette\n\n```python\nfrom starlette.applications import Starlette\nfrom traffik.backends.inmemory import InMemoryBackend\n\nbackend = InMemoryBackend(namespace=\"app\")\n\napp = Starlette(\n routes=[...],\n lifespan=backend.lifespan, # Automatic cleanup\n)\n```\n\n#### FastAPI\n\n```python\nfrom fastapi import FastAPI\nfrom contextlib import asynccontextmanager\nfrom traffik.backends.redis import RedisBackend\n\nbackend = RedisBackend(connection=\"redis://localhost:6379/0\", namespace=\"app\")\n\n@asynccontextmanager\nasync def lifespan(app: FastAPI):\n # Startup\n async with backend(app, persistent=True, close_on_exit=True):\n yield\n # Shutdown - backend cleanup automatic\n\napp = FastAPI(lifespan=lifespan)\n```\n\n## Throttle Middleware\n\nApply rate limiting across multiple endpoints with sophisticated filtering and routing logic.\n\n### Basic Middleware Setup\n\n#### FastAPI\n\n```python\nfrom fastapi import FastAPI\nfrom traffik.middleware import ThrottleMiddleware, MiddlewareThrottle\nfrom traffik.throttles import HTTPThrottle\nfrom traffik.backends.inmemory import InMemoryBackend\n\napp = FastAPI()\nbackend = InMemoryBackend(namespace=\"api\")\n\n# Create throttle\napi_throttle = HTTPThrottle(uid=\"api_global\", rate=\"100/m\")\n\n# Wrap in middleware throttle\nmiddleware_throttle = MiddlewareThrottle(api_throttle)\n\n# Add middleware\napp.add_middleware(\n ThrottleMiddleware,\n middleware_throttles=[middleware_throttle],\n backend=backend\n)\n```\n\n#### Starlette\n\n```python\nfrom starlette.applications import Starlette\nfrom traffik.middleware import ThrottleMiddleware, MiddlewareThrottle\nfrom traffik.throttles import HTTPThrottle\nfrom traffik.backends.inmemory import InMemoryBackend\n\nbackend = InMemoryBackend(namespace=\"api\")\n\napi_throttle = HTTPThrottle(uid=\"api_throttle\", rate=\"50/m\")\nmiddleware_throttle = MiddlewareThrottle(api_throttle)\n\napp = Starlette(\n routes=[...],\n middleware=[\n (ThrottleMiddleware, {\n \"middleware_throttles\": [middleware_throttle],\n \"backend\": backend\n })\n ]\n)\n```\n\n### Advanced Filtering\n\n#### Method-Based Filtering\n\n```python\n# Strict limits for write operations\nwrite_throttle = HTTPThrottle(uid=\"writes\", rate=\"10/m\")\nread_throttle = HTTPThrottle(uid=\"reads\", rate=\"1000/m\")\n\nwrite_middleware_throttle = MiddlewareThrottle(\n write_throttle,\n methods={\"POST\", \"PUT\", \"DELETE\"}\n)\n\nread_middleware_throttle = MiddlewareThrottle(\n read_throttle,\n methods={\"GET\", \"HEAD\"}\n)\n\napp.add_middleware(\n ThrottleMiddleware,\n middleware_throttles=[write_middleware_throttle, read_middleware_throttle],\n backend=backend # If not set, uses app lifespan backend\n)\n```\n\n#### Path Pattern Filtering\n\n```python\n# String patterns and regex\napi_throttle = HTTPThrottle(uid=\"api\", rate=\"100/m\")\nadmin_throttle = HTTPThrottle(uid=\"admin\", rate=\"5/m\")\n\napi_middleware_throttle = MiddlewareThrottle(\n api_throttle,\n path=\"/api/\" # Starts with /api/\n)\nadmin_middleware_throttle = MiddlewareThrottle(\n admin_throttle,\n path=r\"^/admin/.*\" # Regex pattern\n)\n\napp.add_middleware(\n ThrottleMiddleware,\n middleware_throttles=[admin_middleware_throttle, api_middleware_throttle],\n backend=backend\n)\n```\n\n#### Custom Hook-Based Filtering\n\n```python\nfrom starlette.requests import HTTPConnection\n\nasync def authenticated_only(connection: HTTPConnection) -> bool:\n \"\"\"Apply throttle only to authenticated users\"\"\"\n return connection.headers.get(\"authorization\") is not None\n\nasync def intensive_operations(connection: HTTPConnection) -> bool:\n \"\"\"Throttle resource-intensive endpoints\"\"\"\n intensive_paths = [\"/api/reports/\", \"/api/analytics/\", \"/api/exports/\"]\n return any(connection.scope[\"path\"].startswith(p) for p in intensive_paths)\n\nauth_throttle = HTTPThrottle(uid=\"auth\", rate=\"200/m\")\nintensive_throttle = HTTPThrottle(uid=\"intensive\", rate=\"20/m\")\n\nauth_middleware_throttle = MiddlewareThrottle(auth_throttle, hook=authenticated_only)\nintensive_middleware_throttle = MiddlewareThrottle(intensive_throttle, hook=intensive_operations)\n\napp.add_middleware(\n ThrottleMiddleware,\n middleware_throttles=[intensive_middleware_throttle, auth_middleware_throttle],\n backend=backend\n)\n```\n\n#### Combined Filtering\n\n```python\nasync def authenticated_only(connection: HTTPConnection) -> bool:\n return connection.headers.get(\"authorization\") is not None\n\ncomplex_throttle = HTTPThrottle(uid=\"complex\", rate=\"25/m\")\n\n# Combines ALL criteria: path AND method AND hook\ncomplex_middleware_throttle = MiddlewareThrottle(\n complex_throttle,\n path=\"/api/\",\n methods={\"POST\"},\n hook=authenticated_only\n)\n```\n\n### Best Practices\n\n1. **Order Matters**: Place more specific throttles before general ones\n2. **Early Placement**: Add throttle middleware early in the stack to reject requests before expensive processing\n3. **Production Backends**: Use Redis for multi-instance deployments\n4. **Monitoring**: Log throttle hits for capacity planning\n5. **Graceful Responses**: Provide clear error messages with retry information\n\n```python\n# Example: Optimal middleware ordering\napp.add_middleware(\n ThrottleMiddleware,\n middleware_throttles=[\n admin_middleware_throttle, # Most specific\n intensive_middleware_throttle, # Specific paths\n api_middleware_throttle, # General API\n ],\n backend=redis_backend\n)\n```\n\n## Error Handling\n\nTraffik provides specific exceptions for different scenarios:\n\n### Exception Types\n\n```python\nfrom traffik.exceptions import (\n TraffikException, # Base exception\n ConfigurationError, # Invalid configuration\n AnonymousConnection, # Cannot identify client\n ConnectionThrottled, # Rate limit exceeded (HTTP 429)\n BackendError, # Backend operation failed\n)\n```\n\n### Handling Rate Limit Exceptions\n\nThe `ConnectionThrottled` exception is raised when a client exceeds their rate limit. It is an `HTTPException` so its is handled automatically by FastAPI/Starlette, returning a `429 Too Many Requests` response. However, you can still register a custom exception handler if you want to customize the response further.\n\n```python\nfrom fastapi import FastAPI, Request\nfrom traffik.exceptions import ConnectionThrottled\nfrom starlette.responses import JSONResponse\n\n@app.exception_handler(ConnectionThrottled)\nasync def throttle_handler(request: Request, exc: ConnectionThrottled):\n return JSONResponse(\n status_code=429,\n content={\n \"error\": \"rate_limit_exceeded\",\n \"retry_after_seconds\": exc.retry_after,\n \"message\": exc.detail\n },\n headers={\"Retry-After\": str(exc.retry_after)}\n )\n```\n\n### Handling Configuration Errors\n\n```python\nfrom traffik.exceptions import ConfigurationError, BackendConnectionError\n\ntry:\n backend = RedisBackend(connection=\"invalid://url\")\n await backend.initialize()\nexcept BackendConnectionError as exc:\n logger.error(f\"Failed to connect to Redis: {exc}\")\nexcept ConfigurationError as exc:\n logger.error(f\"Invalid configuration: {exc}\")\n```\n\n### Handling Anonymous Connections\n\nThe default identifier raises `AnonymousConnection` if it cannot identify the client. You can catch this exception to provide a fallback identifier.\n\n```python\nfrom traffik.exceptions import AnonymousConnection\nfrom traffik.backends.base import connection_identifier\n\nasync def safe_identifier(connection: HTTPConnection) -> str:\n try:\n return await connection_identifier(connection)\n except AnonymousConnection:\n # Fall back to a default identifier\n return f\"anonymous:{connection.scope['path']}\"\n```\n\n## Testing\n\nComprehensive testing example using `InMemoryBackend`:\n\n```python\nimport pytest\nfrom httpx import AsyncClient, ASGITransport\nfrom starlette.applications import Starlette\nfrom starlette.routing import Route\nfrom starlette.responses import JSONResponse\nfrom traffik.backends.inmemory import InMemoryBackend\nfrom traffik.throttles import HTTPThrottle\n\n@pytest.fixture\nasync def backend():\n backend = InMemoryBackend(namespace=\"test\", persistent=False)\n async with backend():\n yield backend\n\n@pytest.mark.anyio\nasync def test_throttling(backend):\n throttle = HTTPThrottle(uid=\"test\", rate=\"2/s\")\n\n async def endpoint(request):\n await throttle(request)\n return JSONResponse({\"status\": \"ok\"})\n \n app = Starlette(\n routes=[Route(\"/test\", endpoint, methods=[\"GET\"])],\n lifespan=backend.lifespan,\n )\n\n async with AsyncClient(\n transport=ASGITransport(app=app),\n base_url=\"http://testserver\",\n ) as client:\n # First 2 requests succeed\n r1 = await client.get(\"/test\")\n r2 = await client.get(\"/test\")\n assert r1.status_code == 200\n assert r2.status_code == 200\n \n # Third request throttled\n r3 = await client.get(\"/test\")\n assert r3.status_code == 429\n```\n\n### Testing Different Strategies\n\n```python\nfrom traffik.strategies import (\n FixedWindowStrategy,\n SlidingWindowLogStrategy,\n TokenBucketStrategy,\n)\n\n@pytest.mark.parametrize(\"strategy\", [\n FixedWindowStrategy(),\n SlidingWindowLogStrategy(),\n TokenBucketStrategy(),\n])\nasync def test_strategy(backend, strategy):\n throttle = HTTPThrottle(uid=\"test\", rate=\"5/s\", strategy=strategy)\n # Test throttle behavior with different strategies\n```\n\n## API Reference\n\n### Throttle Classes\n\n#### `HTTPThrottle`\n\nHTTP request rate limiting with flexible configuration.\n\n```python\nHTTPThrottle(\n uid: str, # Unique identifier\n rate: Union[Rate, str], # \"100/m\" or Rate object\n identifier: Optional[Callable], # Client ID function\n handle_throttled: Optional[Callable], # Custom handler\n strategy: Optional[Strategy], # Rate limiting strategy\n backend: Optional[ThrottleBackend], # Storage backend\n dynamic_backend: bool = False, # Runtime backend resolution\n min_wait_period: Optional[int] = None, # Minimum wait (ms)\n headers: Optional[Dict[str, str]] = None, # Extra headers to include in throttled response\n)\n```\n\n#### `WebSocketThrottle`\n\nWebSocket message rate limiting.\n\n```python\nWebSocketThrottle(\n uid: str,\n rate: Union[Rate, str],\n identifier: Optional[Callable],\n handle_throttled: Optional[Callable],\n strategy: Optional[Strategy],\n backend: Optional[ThrottleBackend],\n dynamic_backend: bool = False,\n min_wait_period: Optional[int] = None,\n headers: Optional[Dict[str, str]] = None,\n)\n```\n\n#### `BaseThrottle`\n\nBase class for custom throttle implementations. Override `get_key()` for custom key generation.\n\n### Strategy Classes\n\nAll strategies are dataclasses with frozen=True for immutability.\n\n#### `FixedWindowStrategy()` (Default)\n\nSimple fixed-window counting. Fast and memory-efficient.\n\n#### `SlidingWindowLogStrategy()`\n\nMost accurate, maintains request log. Memory: O(limit).\n\n#### `SlidingWindowCounterStrategy()`\n\nSliding window with counters. Balance between accuracy and memory.\n\n#### `TokenBucketStrategy(burst_size: Optional[int] = None)`\n\nAllows controlled bursts. `burst_size` defaults to rate limit.\n\n#### `TokenBucketWithDebtStrategy(max_debt: Optional[int] = None)`\n\nToken bucket with debt tracking for smoother recovery.\n\n#### `LeakyBucketStrategy()`\n\nPerfectly smooth traffic output. No bursts allowed.\n\n#### `LeakyBucketWithQueueStrategy(queue_size: Optional[int] = None)`\n\nLeaky bucket with request queuing.\n\n### Backend Classes\n\n#### `InMemoryBackend`\n\n```python\nInMemoryBackend(\n namespace: str = \"inmemory\",\n persistent: bool = False,\n)\n```\n\n#### `RedisBackend`\n\n```python\nRedisBackend(\n connection: Union[str, Redis],\n namespace: str,\n persistent: bool = True,\n)\n```\n\n#### `MemcachedBackend`\n\n```python\nMemcachedBackend(\n host: str = \"localhost\",\n port: int = 11211,\n namespace: str = \"memcached\",\n pool_size: int = 2,\n persistent: bool = False,\n)\n```\n\n### Rate Class\n\n```python\nRate(\n limit: int,\n milliseconds: int = 0,\n seconds: int = 0,\n minutes: int = 0,\n hours: int = 0,\n)\n\n# Or use string format\nRate.parse(\"100/m\") # 100 per minute\nRate.parse(\"5/10s\") # 5 per 10 seconds\n```\n\n### Middleware Classes\n\n#### `MiddlewareThrottle`\n\n```python\nMiddlewareThrottle(\n throttle: BaseThrottle,\n path: Optional[Union[str, Pattern]] = None,\n methods: Optional[Set[str]] = None,\n hook: Optional[Callable] = None,\n)\n```\n\n#### `ThrottleMiddleware`\n\n```python\nThrottleMiddleware(\n app: ASGIApp,\n middleware_throttles: List[MiddlewareThrottle],\n backend: Optional[ThrottleBackend] = None,\n)\n```\n\n## Contributing\n\nWe welcome contributions! Please see our [Contributing Guide](CONTRIBUTING.md) for details.\n\n### Development Setup\n\n```bash\ngit clone https://github.com/ti-oluwa/traffik.git\ncd traffik\nuv sync --extra dev\n\n# Run tests\nuv run pytest\n\n# Run linting\nuv run ruff check src/ tests/ --fix\n\n# Run formatting \nuv run ruff format src/ tests/\n```\n\n## License\n\nThis project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.\n\n## Changelog\n\nSee [CHANGELOG.md](CHANGELOG.md) for version history and changes.\n",
"bugtrack_url": null,
"license": "MIT License Copyright (c) 2025 Daniel T. Afolayan Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the \"Software\"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.",
"summary": "Distributed rate limiting for Starlette applications.",
"version": "1.0.0b1",
"project_urls": {
"Bug Tracker": "https://github.com/ti-oluwa/traffik/issues",
"Changelog": "https://github.com/ti-oluwa/traffik/blob/main/CHANGELOG.md",
"Documentation": "https://github.com/ti-oluwa/traffik#readme",
"Homepage": "https://github.com/ti-oluwa/traffik",
"Repository": "https://github.com/ti-oluwa/traffik.git"
},
"split_keywords": [
"api",
" distributed-rate-limiting",
" fastapi",
" rate-limiting",
" starlette",
" throttling"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "d67a98726fb4cc45a9625daf9fdf2d95241d07a046b8ab33d45c07d68ff76bbc",
"md5": "e7b1e1c307f30ab919b7b8b27d44b240",
"sha256": "ec5bbe35b3abe55df5f35ed93a688fda247325fe39aef7ece8b8a017f44c0d36"
},
"downloads": -1,
"filename": "traffik-1.0.0b1-py3-none-any.whl",
"has_sig": false,
"md5_digest": "e7b1e1c307f30ab919b7b8b27d44b240",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.9",
"size": 62181,
"upload_time": "2025-10-19T13:44:29",
"upload_time_iso_8601": "2025-10-19T13:44:29.727456Z",
"url": "https://files.pythonhosted.org/packages/d6/7a/98726fb4cc45a9625daf9fdf2d95241d07a046b8ab33d45c07d68ff76bbc/traffik-1.0.0b1-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "85303789ab0dfee538a7f984456cf55fdfe2b805aa305164e7a71e2fd219e252",
"md5": "7cbd1f8fe60d628966167112f69aacf5",
"sha256": "f0f8ec5bcb77e340c1f65add0597291d906846678fc827b137555c56ab16bee6"
},
"downloads": -1,
"filename": "traffik-1.0.0b1.tar.gz",
"has_sig": false,
"md5_digest": "7cbd1f8fe60d628966167112f69aacf5",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.9",
"size": 197384,
"upload_time": "2025-10-19T13:44:30",
"upload_time_iso_8601": "2025-10-19T13:44:30.946717Z",
"url": "https://files.pythonhosted.org/packages/85/30/3789ab0dfee538a7f984456cf55fdfe2b805aa305164e7a71e2fd219e252/traffik-1.0.0b1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-10-19 13:44:30",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "ti-oluwa",
"github_project": "traffik",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "traffik"
}