traffik


Nametraffik JSON
Version 1.0.0b1 PyPI version JSON
download
home_pageNone
SummaryDistributed rate limiting for Starlette applications.
upload_time2025-10-19 13:44:30
maintainerNone
docs_urlNone
authorNone
requires_python>=3.9
licenseMIT License Copyright (c) 2025 Daniel T. Afolayan Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
keywords api distributed-rate-limiting fastapi rate-limiting starlette throttling
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Traffik - A Starlette throttling library

[![Test](https://github.com/ti-oluwa/traffik/actions/workflows/test.yaml/badge.svg)](https://github.com/ti-oluwa/traffik/actions/workflows/test.yaml)
[![Code Quality](https://github.com/ti-oluwa/traffik/actions/workflows/code-quality.yaml/badge.svg)](https://github.com/ti-oluwa/traffik/actions/workflows/code-quality.yaml)
[![codecov](https://codecov.io/gh/ti-oluwa/traffik/branch/main/graph/badge.svg)](https://codecov.io/gh/ti-oluwa/traffik)
<!-- [![PyPI version](https://badge.fury.io/py/traffik.svg)](https://badge.fury.io/py/traffik)
[![Python versions](https://img.shields.io/pypi/pyversions/traffik.svg)](https://pypi.org/project/traffik/) -->

Traffik provides flexible rate limiting for Starlette and FastAPI applications with support for both HTTP and WebSocket connections. It offers multiple rate limiting strategies including Fixed Window, Sliding Window, Token Bucket, and Leaky Bucket algorithms, allowing you to choose the approach that best fits your use case.

The library features pluggable backends (in-memory, Redis, Memcached), dynamic backend resolution for special applications. Whether you need simple per-endpoint limits or complex distributed rate limiting, Traffik provides the flexibility and robustness to handle your requirements.

Traffik was inspired by [fastapi-limiter](https://github.com/long2ice/fastapi-limiter) but has evolved into a more comprehensive solution with more advanced features.

## Features

- ๐Ÿš€ **Easy Integration**: Decorator, dependency, and middleware-based throttling
- ๐ŸŽฏ **Multiple Strategies**: Fixed Window (default), Sliding Window, Token Bucket, Leaky Bucket
- ๐Ÿ”„ **Multiple Backends**: In-memory, Redis, Memcached with atomic operation support
- ๐ŸŒ **Protocol Support**: Both HTTP and WebSocket throttling
- ๐Ÿข **Dynamic Backend Resolution**: Support for runtime backend switching
- ๐Ÿ”ง **Dependency Injection Friendly**: Works well with FastAPI's dependency injection system.
- ๐Ÿ“Š **Smart Client Identification**: IP-based by default, fully customizable
- โš™๏ธ **Flexible and Extensible API**: Easily extend base functionality
- ๐Ÿงช **Thoroughly Tested**: Comprehensive test suite covering concurrency and multithreading

## Installation

We recommend using `uv`, however, it is not a strict requirement.

### Install `uv` (optional)

Visit the [uv documentation](https://docs.astral.sh/uv/getting-started/installation/) for installation instructions.

### Basic Installation

```bash
uv add traffik

# or using pip
pip install traffik
```

Install with FastAPI support:

```bash
uv add "traffik[fastapi]"

# or using pip
pip install "traffik[fastapi]"
```

### With Redis Backend

```bash
uv add "traffik[redis]"

# or using pip
pip install "traffik[redis]"
```

### With Memcached Backend

```bash
uv add "traffik[memcached]"

# or using pip
pip install "traffik[memcached]"
```

### With All Features

```bash
uv add "traffik[all]"

# or using pip
pip install "traffik[all]"
```

### Development Installation

```bash
git clone https://github.com/your-username/traffik.git
cd traffik

uv sync --extra dev

# or using pip
pip install -e .[dev]
```

## Quick Testing with Docker

For quick testing across different platforms and Python versions:

```bash
# Run fast tests
./docker-test.sh test-fast

# Run full test suite
./docker-test.sh test

# Start development environment
./docker-test.sh dev

# Test across Python versions
./docker-test.sh test-matrix
```

**Testing Documentation:**

- [DOCKER.md](DOCKER.md) - Complete Docker testing guide
- [TESTING.md](TESTING.md) - Quick testing guide  

## Quick Start Guide

### 1. Basic HTTP Throttling with Starlette

```python
from starlette.applications import Starlette
from starlette.routing import Route
from starlette.requests import Request
from starlette.responses import JSONResponse
from traffik.throttles import HTTPThrottle
from traffik.backends.inmemory import InMemoryBackend

# Create backend
backend = InMemoryBackend(namespace="myapp", persistent=False)

# Create throttle with string rate format
throttle = HTTPThrottle(
    uid="basic_limit",
    rate="5/10s",  # 5 requests per 10 seconds
)

async def throttled_endpoint(request: Request):
    await throttle(request)
    return JSONResponse({"message": "Success"})

app = Starlette(
    routes=[
        Route("/api/data", throttled_endpoint, methods=["GET"]),
    ],
    lifespan=backend.lifespan,
)
```

### 2. FastAPI with Dependency Injection

```python
from fastapi import FastAPI, Depends
from contextlib import asynccontextmanager
from traffik.backends.inmemory import InMemoryBackend
from traffik.throttles import HTTPThrottle

# Create backend
backend = InMemoryBackend(namespace="api")

# Setup lifespan
@asynccontextmanager
async def lifespan(app: FastAPI):
    async with backend(app, persistent=True, close_on_exit=True):
        yield

app = FastAPI(lifespan=lifespan)

# Create throttle
throttle = HTTPThrottle(uid="endpoint_limit", rate="10/m")

@app.get("/api/hello", dependencies=[Depends(throttle)])
async def say_hello():
    return {"message": "Hello World"}
```

### 3. Using Decorators (FastAPI Only)

```python
from fastapi import FastAPI
from contextlib import asynccontextmanager
from traffik.decorators import throttled
from traffik.throttles import HTTPThrottle
from traffik.backends.redis import RedisBackend

backend = RedisBackend(
    connection="redis://localhost:6379/0",
    namespace="api",
    persistent=True,
)

@asynccontextmanager
async def lifespan(app: FastAPI):
    async with backend(app):
        yield

app = FastAPI(lifespan=lifespan)

@app.get("/api/limited")
@throttled(HTTPThrottle(uid="limited", rate="5/m"))
async def limited_endpoint():
    return {"data": "Limited access"}
```

### 4. WebSocket Throttling

```python
from traffik.throttles import WebSocketThrottle
from starlette.websockets import WebSocket
from starlette.exceptions import HTTPException

ws_throttle = WebSocketThrottle(uid="ws_messages", rate="3/10s")

async def ws_endpoint(websocket: WebSocket) -> None:
    await websocket.accept()
    
    while True:
        try:
            data = await websocket.receive_json()
            await ws_throttle(websocket)  # Rate limit per message
            
            await websocket.send_json({
                "status": "success",
                "data": data,
            })
        except HTTPException as exc:
            await websocket.send_json({
                "status": "error", 
                "status_code": exc.status_code,
                "detail": exc.detail,
            })
            break
    
    await websocket.close()
```

## Rate Limiting Strategies

Traffik supports multiple rate limiting strategies, each with different trade-offs. The **Fixed Window strategy is used by default** for its simplicity and performance.

### Fixed Window (Default)

Divides time into fixed windows and counts requests within each window. Simple, fast, and memory-efficient.

**Pros:** Simple, constant memory, fast  
**Cons:** Can allow bursts at window boundaries (up to 2x limit)

```python
from traffik.strategies import FixedWindowStrategy

throttle = HTTPThrottle(
    uid="api_limit",
    rate="100/m",
    strategy=FixedWindowStrategy()  # Default, can be omitted
)
```

### Sliding Window

Most accurate rate limiting with continuous sliding window evaluation. Prevents boundary exploitation.

**Pros:** Most accurate, no boundary issues  
**Cons:** Higher memory usage (O(limit) per key)

```python
from traffik.strategies import SlidingWindowLogStrategy

throttle = HTTPThrottle(
    uid="payment_api",
    rate="10/m",
    strategy=SlidingWindowLogStrategy()
)
```

### Token Bucket

Allows controlled bursts while maintaining average rate over time. Tokens refill continuously.

**Pros:** Allows bursts, smooth distribution, self-recovering  
**Cons:** Slightly more complex

```python
from traffik.strategies import TokenBucketStrategy

throttle = HTTPThrottle(
    uid="user_api",
    rate="100/m",
    strategy=TokenBucketStrategy(burst_size=150)  # Allow bursts up to 150
)
```

### Leaky Bucket

Enforces perfectly smooth traffic output. No bursts allowed.

**Pros:** Smooth output, protects downstream services  
**Cons:** Less forgiving, may reject legitimate bursts

```python
from traffik.strategies import LeakyBucketStrategy

throttle = HTTPThrottle(
    uid="third_party_api",
    rate="50/m",
    strategy=LeakyBucketStrategy()
)
```

## Writing Custom Strategies

You can create custom rate limiting strategies by implementing a callable that follows the strategy protocol. This allows you to implement specialized rate limiting logic tailored to your specific needs.

### Strategy Protocol

A strategy is a callable (function or class with `__call__`) that takes three parameters and returns a wait period:

```python
from traffik.backends.base import ThrottleBackend
from traffik.rates import Rate
from traffik.types import Stringable, WaitPeriod

async def my_strategy(
    key: Stringable, 
    rate: Rate, 
    backend: ThrottleBackend
) -> WaitPeriod:
    """
    :param key: The throttling key (e.g., "user:123", "ip:192.168.1.1")
    :param rate: Rate limit definition with limit and expire properties
    :param backend: Backend instance for storage operations
    :return: Wait time in milliseconds (0.0 if request allowed)
    """
    # Your rate limiting logic here
    return 0.0  # Allow request
```

### Example: Simple Rate Strategy

Here's a basic example that implements a simple counter-based strategy:

```python
from dataclasses import dataclass
from traffik.backends.base import ThrottleBackend
from traffik.rates import Rate
from traffik.types import Stringable, WaitPeriod
from traffik.utils import time


async def simple_counter_strategy(
    self, key: Stringable, rate: Rate, backend: ThrottleBackend
) -> WaitPeriod:
    """
    Simple counter-based rate limiting.
    
    Counts requests and resets counter when expired.
    """
    if rate.unlimited:
        return 0.0
    
    now = time() * 1000  # Current time in milliseconds
    full_key = await backend.get_key(str(key))
    counter_key = f"{full_key}:counter"
    timestamp_key = f"{full_key}:timestamp"
    
    ttl_seconds = int(rate.expire // 1000) + 1
    
    async with await backend.lock(f"lock:{counter_key}", blocking=True, blocking_timeout=1):
        # Get current count and timestamp
        count_str = await backend.get(counter_key)
        timestamp_str = await backend.get(timestamp_key)
        
        if count_str and timestamp_str:
            count = int(count_str)
            timestamp = float(timestamp_str)
            
            # Check if window has expired
            if now - timestamp > rate.expire:
                # Reset counter for new window
                count = 1
                await backend.set(counter_key, "1", expire=ttl_seconds)
                await backend.set(timestamp_key, str(now), expire=ttl_seconds)
            else:
                # Increment counter
                count = await backend.increment(counter_key)
        else:
            # First request
            count = 1
            await backend.set(counter_key, "1", expire=ttl_seconds)
            await backend.set(timestamp_key, str(now), expire=ttl_seconds)
        
        # Check if limit exceeded
        if count > rate.limit:
            # Calculate wait time until window expires
            timestamp = float(await backend.get(timestamp_key))
            elapsed = now - timestamp
            wait_ms = rate.expire - elapsed
            return max(wait_ms, 0.0)
        
        return 0.0

# Usage
throttle = HTTPThrottle(
    uid="simple",
    rate="10/m",
    strategy=simple_counter_strategy,
)
```

### Example: Adaptive Rate Strategy

A more advanced example that adapts the rate limit based on backend load:

```python
from dataclasses import dataclass
from traffik.backends.base import ThrottleBackend
from traffik.rates import Rate
from traffik.types import Stringable, WaitPeriod
from traffik.utils import time

@dataclass(frozen=True)
class AdaptiveRateStrategy:
    """
    Adaptive rate limiting that adjusts based on system load.
    
    Reduces limits during high load, increases during low load.
    """
    
    load_threshold: float = 0.8  # 80% of limit triggers adaptation
    reduction_factor: float = 0.5  # Reduce to 50% during high load
    
    async def __call__(
        self, key: Stringable, rate: Rate, backend: ThrottleBackend
    ) -> WaitPeriod:
        if rate.unlimited:
            return 0.0
        
        now = time() * 1000
        window_duration_ms = rate.expire
        current_window = int(now // window_duration_ms)
        
        full_key = await backend.get_key(str(key))
        counter_key = f"{full_key}:adaptive:{current_window}"
        load_key = f"{full_key}:load"
        ttl_seconds = int(window_duration_ms // 1000) + 1
        
        async with await backend.lock(f"lock:{counter_key}", blocking=True, blocking_timeout=1):
            # Increment request counter
            count = await backend.increment_with_ttl(counter_key, amount=1, ttl=ttl_seconds)
            
            # Calculate current load percentage
            load_percentage = count / rate.limit
            
            # Determine effective limit based on load
            if load_percentage > self.load_threshold:
                # High load - reduce effective limit
                effective_limit = int(rate.limit * self.reduction_factor)
                await backend.set(load_key, "high", expire=ttl_seconds)
            else:
                # Normal load - use full limit
                effective_limit = rate.limit
                await backend.set(load_key, "normal", expire=ttl_seconds)
            
            # Check against effective limit
            if count > effective_limit:
                # Calculate wait time
                time_in_window = now % window_duration_ms
                wait_ms = window_duration_ms - time_in_window
                return wait_ms
            
            return 0.0

# Usage
throttle = HTTPThrottle(
    uid="adaptive_api",
    rate="1000/h",
    strategy=AdaptiveRateStrategy(load_threshold=0.7, reduction_factor=0.6)
)
```

### Example: Priority-Based Strategy

A strategy that implements priority queuing:

```python
from dataclasses import dataclass
from enum import IntEnum
from traffik.backends.base import ThrottleBackend
from traffik.rates import Rate
from traffik.types import Stringable, WaitPeriod
from traffik.utils import time, dump_json, load_json, JSONDecodeError

class Priority(IntEnum):
    """Request priority levels"""
    LOW = 1
    NORMAL = 2
    HIGH = 3
    CRITICAL = 4

@dataclass(frozen=True)
class PriorityQueueStrategy:
    """
    Rate limiting with priority queue.
    
    Higher priority requests are processed first when at capacity.
    """
    
    default_priority: Priority = Priority.NORMAL
    
    def _extract_priority(self, key: str) -> Priority:
        """Extract priority from key (format: "priority:<level>:user:123")"""
        if key.startswith("priority:"):
            try:
                level = int(key.split(":")[1])
                return Priority(level)
            except (ValueError, IndexError):
                pass
        return self.default_priority
    
    async def __call__(
        self, key: Stringable, rate: Rate, backend: ThrottleBackend
    ) -> WaitPeriod:
        if rate.unlimited:
            return 0.0
        
        now = time() * 1000
        priority = self._extract_priority(str(key))
        
        full_key = await backend.get_key(str(key))
        queue_key = f"{full_key}:priority_queue"
        ttl_seconds = int(rate.expire // 1000) + 1
        
        async with await backend.lock(f"lock:{queue_key}", blocking=True, blocking_timeout=1):
            # Get current queue
            queue_json = await backend.get(queue_key)
            if queue_json:
                try:
                    queue = load_json(queue_json)
                except JSONDecodeError:
                    queue = []
            else:
                queue = []
            
            # Remove expired entries
            queue = [
                entry for entry in queue 
                if now - entry["timestamp"] < rate.expire
            ]
            
            # Count requests with higher or equal priority
            higher_priority_count = sum(
                1 for entry in queue 
                if entry["priority"] >= priority
            )
            
            # Check if request can be processed
            if higher_priority_count >= rate.limit:
                # Calculate wait time based on oldest high-priority entry
                oldest_high_priority = min(
                    (entry["timestamp"] for entry in queue if entry["priority"] >= priority),
                    default=now
                )
                wait_ms = rate.expire - (now - oldest_high_priority)
                return max(wait_ms, 0.0)
            
            # Add current request to queue
            queue.append({
                "timestamp": now,
                "priority": priority,
                "key": str(key)
            })
            
            # Sort by priority (descending) and timestamp (ascending)
            queue.sort(key=lambda x: (-x["priority"], x["timestamp"]))
            
            # Store updated queue
            await backend.set(queue_key, dump_json(queue), expire=ttl_seconds)
            return 0.0

# Usage
from traffik.throttles import HTTPThrottle

async def priority_identifier(connection):
    """Extract priority from request headers"""
    priority = connection.headers.get("X-Priority", "2")
    user_id = extract_user_id(connection)
    return f"priority:{priority}:user:{user_id}"

throttle = HTTPThrottle(
    uid="priority_api",
    rate="100/m",
    strategy=PriorityQueueStrategy(),
    identifier=priority_identifier
)
```

### Best Practices for Custom Strategies

1. **Always use locks**: Wrap critical sections with `backend.lock()` to ensure atomicity

   ```python
   async with await backend.lock(f"lock:{key}", blocking=True, blocking_timeout=1):
       # Critical section
   ```

2. **Set appropriate TTLs**: Always set expiration times to prevent memory leaks

   ```python
   ttl_seconds = int(rate.expire // 1000) + 1  # +1 second buffer
   await backend.set(key, value, expire=ttl_seconds)
   ```

3. **Handle unlimited rates**: Check for unlimited rates early

   ```python
   if rate.unlimited:
       return 0.0
   ```

4. **Use proper key prefixes**: Namespace your strategy's keys to avoid conflicts

   ```python
   full_key = await backend.get_key(str(key))
   strategy_key = f"{full_key}:mystrategy:data"
   ```

5. **Return milliseconds**: Always return wait time in milliseconds

   ```python
   return wait_ms  # Not wait_seconds * 1000
   ```

6. **Use dataclasses**: Make strategies immutable and configurable

   ```python
   @dataclass(frozen=True)
   class MyStrategy:
       param1: int = 10
       param2: float = 0.5
   ```

7. **Handle errors gracefully**: Catch and handle JSON decode errors, type errors, etc.

   ```python
   try:
       data = load_json(json_str)
   except JSONDecodeError:
       data = default_value
   ```

### Testing Custom Strategies

```python
import pytest
from traffik.backends.inmemory import InMemoryBackend
from traffik.rates import Rate

@pytest.mark.anyio
async def test_custom_strategy():
    backend = InMemoryBackend(namespace="test")
    async with backend(close_on_exit=True):
        strategy = MyCustomStrategy()
        rate = Rate.parse("5/s")
        
        # Test allowing requests
        for i in range(5):
            wait = await strategy(f"user:123", rate, backend)
            assert wait == 0.0, f"Request {i+1} should be allowed"
        
        # Test throttling
        wait = await strategy(f"user:123", rate, backend)
        assert wait > 0, "Request 6 should be throttled"
        
        # Verify wait time is reasonable
        assert wait <= rate.expire, "Wait time should not exceed window"
```

## Backends

Traffik provides three backend implementations with full atomic operation support and distributed locking.

### In-Memory Backend

Perfect for development, testing, and single-instance applications:

```python
from traffik.backends.inmemory import InMemoryBackend

backend = InMemoryBackend(
    namespace="myapp",
    persistent=False,  # Don't persist across restarts
)
```

**Pros:**

- No external dependencies
- Fast and simple
- Great for development and testing

**Cons:**

- Not suitable for multi-process/distributed systems
- Data lost on restart (even with `persistent=True`)

### Redis Backend

Recommended for production with distributed systems:

```python
from traffik.backends.redis import RedisBackend

# From connection string
backend = RedisBackend(
    connection="redis://localhost:6379/0",
    namespace="myapp",
    persistent=True,
)

# From Redis client factory
import redis.asyncio as redis

def get_client() -> redis.Redis:
    return redis.Redis(host='localhost', port=6379, db=0)

backend = RedisBackend(
    connection=get_client,
    namespace="myapp",
)
```

**Features:**

- Distributed locks using Redlock algorithm
- Production-ready persistence

**Pros:**

- Multi-process and distributed support
- Persistence across restarts
- Battle-tested for production

**Cons:**

- Requires Redis server
- Additional infrastructure

### Memcached Backend

Lightweight distributed caching solution:

```python
from traffik.backends.memcached import MemcachedBackend

backend = MemcachedBackend(
    host="localhost",
    port=11211,
    namespace="myapp",
    pool_size=10,
)
```

**Features:**

- Lightweight distributed locks
- Atomic operations via CAS (Compare-And-Swap)
- Connection pooling
- Fast in-memory storage

**Pros:**

- Lightweight and fast
- Good for high-throughput scenarios
- Simple deployment

**Cons:**

- Less feature-rich than Redis

### Custom Backends

Create custom backends by subclassing `ThrottleBackend`:

```python
import typing

from traffik.backends.base import ThrottleBackend
from traffik.types import HTTPConnectionT, AsyncLock

class CustomBackend(ThrottleBackend[typing.Dict, HTTPConnectionT]):
    """Custom backend with your storage solution"""
    
    async def initialize(self) -> None:
        """Setup connection/resources"""
        pass
    
    async def get(self, key: str) -> typing.Optional[str]:
        """Get value for key"""
        pass
    
    async def set(self, key: str, value: str, expire: typing.Optional[int] = None) -> None:
        """Set value with optional TTL"""
        pass
    
    async def delete(self, key: str) -> None:
        """Delete key"""
        pass
    
    async def increment(self, key: str, amount: int = 1) -> int:
        """Atomically increment counter"""
        pass
    
    async def decrement(self, key: str, amount: int = 1) -> int:
        """Atomically decrement counter"""
        pass
    
    async def increment_with_ttl(self, key: str, amount: int, ttl: int) -> int:
        """Atomically increment with TTL set"""
        pass
    
    async def get_lock(self, name: str) -> AsyncLock:
        """Get distributed lock"""
        pass
    
    async def reset(self) -> None:
        """Clear all data"""
        pass
    
    async def close(self) -> None:
        """Cleanup resources"""
        pass
```

### Throttle Backend Selection

You can specify the backend for each throttle individually or allow them to share a common backend. The shared backend is usually the one set up in your application lifespan.

```python
from fastapi import FastAPI, Depends, Request

from traffik.backends.redis import RedisBackend
from traffik.throttles import HTTPThrottle

shared_redis_backend = RedisBackend(
    connection="redis://localhost:6379/0",
    namespace="shared",
)
app = FastAPI(lifespan=shared_redis_backend.lifespan)

throttle_with_own_backend = HTTPThrottle(
    uid="custom_backend",
    rate="100/m",
    backend=MyCustomBackend(),  # Uses its own backend
)
# Uses backend from app lifespan
throttle_with_shared_backend = HTTPThrottle(
    uid="shared_backend",
    rate="50/m",
)

@app.get("/api/custom", dependencies=[Depends(throttle_with_shared_backend)])
async def endpoint_custom(request: Request = Depends(throttle_with_own_backend)):
    return {"message": "Uses its own backend"}

```

## Configuration Options

### Rate Format

Traffik supports flexible rate specification using string format or `Rate` objects:

```python
from traffik import Rate
from traffik.throttles import HTTPThrottle

# String format (recommended)
throttle = HTTPThrottle(uid="api_v1", rate="100/m")  # 100 per minute
throttle = HTTPThrottle(uid="api_v2", rate="5/10s")  # 5 per 10 seconds
throttle = HTTPThrottle(uid="api_v3", rate="1000/500ms")  # 1000 per 500ms

# Supported units: ms, s/sec/second, m/min/minute, h/hr/hour, d/day
throttle = HTTPThrottle(uid="daily", rate="10000/d")  # 10000 per day

# Rate object for complex configurations
rate = Rate(limit=100, minutes=5, seconds=30)  # 100 per 5.5 minutes
throttle = HTTPThrottle(uid="complex", rate=rate)
```

### Custom Client Identification

Customize how clients are identified for rate limiting:

```python
from starlette.requests import HTTPConnection
from traffik.throttles import HTTPThrottle

async def user_based_identifier(connection: HTTPConnection) -> str:
    """Identify by user ID from JWT token"""
    user_id = extract_user_id(connection.headers.get("authorization"))
    return f"user:{user_id}"

throttle = HTTPThrottle(
    uid="user_limit",
    rate="100/h",
    identifier=user_based_identifier,
)
```

### Exempting Connections

Skip throttling for specific clients by returning `UNLIMITED`:

```python
import typing
from starlette.requests import HTTPConnection
from traffik import UNLIMITED
from traffik.throttles import HTTPThrottle


async def admin_bypass_identifier(connection: HTTPConnection) -> typing.Any:
    """Bypass throttling for admin users"""
    user_role = extract_role(connection.headers.get("authorization"))
    if user_role == "admin":
        return UNLIMITED  # Admins bypass throttling
    
    # Regular users get normal identification
    user_id = extract_user_id(connection.headers.get("authorization"))
    return f"user:{user_id}"

throttle = HTTPThrottle(
    uid="api_limit",
    rate="50/m",
    identifier=admin_bypass_identifier,
)
```

### Custom Throttled Response

Customize the response when rate limits are exceeded:

```python
from starlette.requests import HTTPConnection
from starlette.exceptions import HTTPException

async def custom_throttled_handler(
    connection: HTTPConnection, 
    wait_ms: int,
    *args,
    **kwargs
):
    wait_seconds = wait_ms // 1000
    raise HTTPException(
        status_code=429,
        detail={
            "error": "rate_limit_exceeded",
            "message": f"Too many requests. Retry in {wait_seconds}s",
            "retry_after": wait_seconds
        },
        headers={"Retry-After": str(wait_seconds)},
    )

throttle = HTTPThrottle(
    uid="api",
    rate="100/m",
    handle_throttled=custom_throttled_handler,
)
```

## More on Usage

### Multiple Rate Limits per Endpoint

Apply both burst and sustained limits:

```python
from fastapi import FastAPI, Depends
from traffik.throttles import HTTPThrottle

# Burst limit: 10 per minute
burst_limit = HTTPThrottle(uid="burst", rate="10/m")

# Sustained limit: 100 per hour
sustained_limit = HTTPThrottle(uid="sustained", rate="100/h")

@app.get(
    "/api/data",
    dependencies=[Depends(burst_limit), Depends(sustained_limit)]
)
async def get_data():
    return {"data": "value"}
```

### Per-User Rate Limiting

```python
from traffik.throttles import HTTPThrottle
from starlette.requests import Request

async def user_identifier(request: Request) -> str:
    user_id = extract_user_from_token(request.headers.get("authorization"))
    return f"user:{user_id}"

user_throttle = HTTPThrottle(
    uid="user_quota",
    rate="1000/h",
    identifier=user_identifier,
)
```

### Strategy Selection Based on Use Case

```python
from traffik.throttles import HTTPThrottle
from traffik.strategies import (
    FixedWindowStrategy,
    SlidingWindowLogStrategy,
    TokenBucketStrategy,
    LeakyBucketStrategy,
)

# Public API - allow bursts
public_api = HTTPThrottle(
    uid="public",
    rate="100/m",
    strategy=TokenBucketStrategy(burst_size=150),
)

# Payment API - strict enforcement
payment_api = HTTPThrottle(
    uid="payments",
    rate="10/m",
    strategy=SlidingWindowLogStrategy(),
)

# Third-party API - smooth output
external_api = HTTPThrottle(
    uid="external",
    rate="50/m",
    strategy=LeakyBucketStrategy(),
)

# Simple rate limiting - default
simple_api = HTTPThrottle(
    uid="simple",
    rate="200/m",
    # Uses `FixedWindowStrategy` by default
)
```

### Dynamic Backend Resolution for Multi-Tenant Applications

Enable runtime backend switching for multi-tenant SaaS applications:

```python
from fastapi import FastAPI, Request, Depends
from traffik.throttles import HTTPThrottle
from traffik.backends.redis import RedisBackend
from traffik.backends.inmemory import InMemoryBackend
import jwt

# Shared throttle with dynamic backend resolution
api_throttle = HTTPThrottle(
    uid="api_quota",
    rate="1000/h",
    dynamic_backend=True,  # Resolve backend at runtime
)

TENANT_CONFIG = {
    "enterprise": {"redis_url": "redis://enterprise:6379/0", "multiplier": 5.0},
    "premium": {"redis_url": "redis://premium:6379/0", "multiplier": 2.0},
    "free": {"redis_url": None, "multiplier": 1.0},
}

async def tenant_middleware(request: Request, call_next):
    """Set up tenant-specific backend based on JWT"""
    token = request.headers.get("authorization", "").split(" ")[1] if request.headers.get("authorization") else None
    
    if token:
        payload = jwt.decode(token, "secret", algorithms=["HS256"])
        tier = payload.get("tenant_tier", "free")
        tenant_id = payload.get("tenant_id", "default")
    else:
        tier, tenant_id = "free", "anonymous"
    
    config = TENANT_CONFIG[tier]
    
    # Select backend based on tenant tier
    if config["redis_url"]:
        backend = RedisBackend(
            connection=config["redis_url"],
            namespace=f"tenant_{tenant_id}",
            persistent=True
        )
    else:
        backend = InMemoryBackend(namespace=f"tenant_{tenant_id}")
    
    # Execute within tenant's backend context
    async with backend(request.app):
        return await call_next(request)

app = FastAPI()
app.middleware("http")(tenant_middleware)

@app.get("/api/data")
async def get_data(request: Depends(api_throttle)):
    return {"data": "tenant-specific data"}
```

**When to use dynamic backends:**

- โœ… Multi-tenant SaaS with per-tenant storage
- โœ… A/B testing different strategies
- โœ… Environment-specific backends
- โŒ Simple shared storage (use explicit `backend` parameter instead)

**Important:** Dynamic backend resolution adds slight overhead and complexity. Only use when you need runtime backend switching.

### Application Lifespan Management

Properly manage backend lifecycle:

#### Starlette

```python
from starlette.applications import Starlette
from traffik.backends.inmemory import InMemoryBackend

backend = InMemoryBackend(namespace="app")

app = Starlette(
    routes=[...],
    lifespan=backend.lifespan,  # Automatic cleanup
)
```

#### FastAPI

```python
from fastapi import FastAPI
from contextlib import asynccontextmanager
from traffik.backends.redis import RedisBackend

backend = RedisBackend(connection="redis://localhost:6379/0", namespace="app")

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    async with backend(app, persistent=True, close_on_exit=True):
        yield
    # Shutdown - backend cleanup automatic

app = FastAPI(lifespan=lifespan)
```

## Throttle Middleware

Apply rate limiting across multiple endpoints with sophisticated filtering and routing logic.

### Basic Middleware Setup

#### FastAPI

```python
from fastapi import FastAPI
from traffik.middleware import ThrottleMiddleware, MiddlewareThrottle
from traffik.throttles import HTTPThrottle
from traffik.backends.inmemory import InMemoryBackend

app = FastAPI()
backend = InMemoryBackend(namespace="api")

# Create throttle
api_throttle = HTTPThrottle(uid="api_global", rate="100/m")

# Wrap in middleware throttle
middleware_throttle = MiddlewareThrottle(api_throttle)

# Add middleware
app.add_middleware(
    ThrottleMiddleware,
    middleware_throttles=[middleware_throttle],
    backend=backend
)
```

#### Starlette

```python
from starlette.applications import Starlette
from traffik.middleware import ThrottleMiddleware, MiddlewareThrottle
from traffik.throttles import HTTPThrottle
from traffik.backends.inmemory import InMemoryBackend

backend = InMemoryBackend(namespace="api")

api_throttle = HTTPThrottle(uid="api_throttle", rate="50/m")
middleware_throttle = MiddlewareThrottle(api_throttle)

app = Starlette(
    routes=[...],
    middleware=[
        (ThrottleMiddleware, {
            "middleware_throttles": [middleware_throttle],
            "backend": backend
        })
    ]
)
```

### Advanced Filtering

#### Method-Based Filtering

```python
# Strict limits for write operations
write_throttle = HTTPThrottle(uid="writes", rate="10/m")
read_throttle = HTTPThrottle(uid="reads", rate="1000/m")

write_middleware_throttle = MiddlewareThrottle(
    write_throttle,
    methods={"POST", "PUT", "DELETE"}
)

read_middleware_throttle = MiddlewareThrottle(
    read_throttle,
    methods={"GET", "HEAD"}
)

app.add_middleware(
    ThrottleMiddleware,
    middleware_throttles=[write_middleware_throttle, read_middleware_throttle],
    backend=backend # If not set, uses app lifespan backend
)
```

#### Path Pattern Filtering

```python
# String patterns and regex
api_throttle = HTTPThrottle(uid="api", rate="100/m")
admin_throttle = HTTPThrottle(uid="admin", rate="5/m")

api_middleware_throttle = MiddlewareThrottle(
    api_throttle,
    path="/api/"  # Starts with /api/
)
admin_middleware_throttle = MiddlewareThrottle(
    admin_throttle,
    path=r"^/admin/.*"  # Regex pattern
)

app.add_middleware(
    ThrottleMiddleware,
    middleware_throttles=[admin_middleware_throttle, api_middleware_throttle],
    backend=backend
)
```

#### Custom Hook-Based Filtering

```python
from starlette.requests import HTTPConnection

async def authenticated_only(connection: HTTPConnection) -> bool:
    """Apply throttle only to authenticated users"""
    return connection.headers.get("authorization") is not None

async def intensive_operations(connection: HTTPConnection) -> bool:
    """Throttle resource-intensive endpoints"""
    intensive_paths = ["/api/reports/", "/api/analytics/", "/api/exports/"]
    return any(connection.scope["path"].startswith(p) for p in intensive_paths)

auth_throttle = HTTPThrottle(uid="auth", rate="200/m")
intensive_throttle = HTTPThrottle(uid="intensive", rate="20/m")

auth_middleware_throttle = MiddlewareThrottle(auth_throttle, hook=authenticated_only)
intensive_middleware_throttle = MiddlewareThrottle(intensive_throttle, hook=intensive_operations)

app.add_middleware(
    ThrottleMiddleware,
    middleware_throttles=[intensive_middleware_throttle, auth_middleware_throttle],
    backend=backend
)
```

#### Combined Filtering

```python
async def authenticated_only(connection: HTTPConnection) -> bool:
    return connection.headers.get("authorization") is not None

complex_throttle = HTTPThrottle(uid="complex", rate="25/m")

# Combines ALL criteria: path AND method AND hook
complex_middleware_throttle = MiddlewareThrottle(
    complex_throttle,
    path="/api/",
    methods={"POST"},
    hook=authenticated_only
)
```

### Best Practices

1. **Order Matters**: Place more specific throttles before general ones
2. **Early Placement**: Add throttle middleware early in the stack to reject requests before expensive processing
3. **Production Backends**: Use Redis for multi-instance deployments
4. **Monitoring**: Log throttle hits for capacity planning
5. **Graceful Responses**: Provide clear error messages with retry information

```python
# Example: Optimal middleware ordering
app.add_middleware(
    ThrottleMiddleware,
    middleware_throttles=[
        admin_middleware_throttle,      # Most specific
        intensive_middleware_throttle,  # Specific paths
        api_middleware_throttle,        # General API
    ],
    backend=redis_backend
)
```

## Error Handling

Traffik provides specific exceptions for different scenarios:

### Exception Types

```python
from traffik.exceptions import (
    TraffikException,           # Base exception
    ConfigurationError,         # Invalid configuration
    AnonymousConnection,        # Cannot identify client
    ConnectionThrottled,        # Rate limit exceeded (HTTP 429)
    BackendError,               # Backend operation failed
)
```

### Handling Rate Limit Exceptions

The `ConnectionThrottled` exception is raised when a client exceeds their rate limit. It is an `HTTPException` so its is handled automatically by FastAPI/Starlette, returning a `429 Too Many Requests` response. However, you can still register a custom exception handler if you want to customize the response further.

```python
from fastapi import FastAPI, Request
from traffik.exceptions import ConnectionThrottled
from starlette.responses import JSONResponse

@app.exception_handler(ConnectionThrottled)
async def throttle_handler(request: Request, exc: ConnectionThrottled):
    return JSONResponse(
        status_code=429,
        content={
            "error": "rate_limit_exceeded",
            "retry_after_seconds": exc.retry_after,
            "message": exc.detail
        },
        headers={"Retry-After": str(exc.retry_after)}
    )
```

### Handling Configuration Errors

```python
from traffik.exceptions import ConfigurationError, BackendConnectionError

try:
    backend = RedisBackend(connection="invalid://url")
    await backend.initialize()
except BackendConnectionError as exc:
    logger.error(f"Failed to connect to Redis: {exc}")
except ConfigurationError as exc:
    logger.error(f"Invalid configuration: {exc}")
```

### Handling Anonymous Connections

The default identifier raises `AnonymousConnection` if it cannot identify the client. You can catch this exception to provide a fallback identifier.

```python
from traffik.exceptions import AnonymousConnection
from traffik.backends.base import connection_identifier

async def safe_identifier(connection: HTTPConnection) -> str:
    try:
        return await connection_identifier(connection)
    except AnonymousConnection:
        # Fall back to a default identifier
        return f"anonymous:{connection.scope['path']}"
```

## Testing

Comprehensive testing example using `InMemoryBackend`:

```python
import pytest
from httpx import AsyncClient, ASGITransport
from starlette.applications import Starlette
from starlette.routing import Route
from starlette.responses import JSONResponse
from traffik.backends.inmemory import InMemoryBackend
from traffik.throttles import HTTPThrottle

@pytest.fixture
async def backend():
    backend = InMemoryBackend(namespace="test", persistent=False)
    async with backend():
        yield backend

@pytest.mark.anyio
async def test_throttling(backend):
    throttle = HTTPThrottle(uid="test", rate="2/s")

    async def endpoint(request):
        await throttle(request)
        return JSONResponse({"status": "ok"})
    
    app = Starlette(
        routes=[Route("/test", endpoint, methods=["GET"])],
        lifespan=backend.lifespan,
    )

    async with AsyncClient(
        transport=ASGITransport(app=app),
        base_url="http://testserver",
    ) as client:
        # First 2 requests succeed
        r1 = await client.get("/test")
        r2 = await client.get("/test")
        assert r1.status_code == 200
        assert r2.status_code == 200
        
        # Third request throttled
        r3 = await client.get("/test")
        assert r3.status_code == 429
```

### Testing Different Strategies

```python
from traffik.strategies import (
    FixedWindowStrategy,
    SlidingWindowLogStrategy,
    TokenBucketStrategy,
)

@pytest.mark.parametrize("strategy", [
    FixedWindowStrategy(),
    SlidingWindowLogStrategy(),
    TokenBucketStrategy(),
])
async def test_strategy(backend, strategy):
    throttle = HTTPThrottle(uid="test", rate="5/s", strategy=strategy)
    # Test throttle behavior with different strategies
```

## API Reference

### Throttle Classes

#### `HTTPThrottle`

HTTP request rate limiting with flexible configuration.

```python
HTTPThrottle(
    uid: str,                          # Unique identifier
    rate: Union[Rate, str],           # "100/m" or Rate object
    identifier: Optional[Callable],    # Client ID function
    handle_throttled: Optional[Callable],  # Custom handler
    strategy: Optional[Strategy],      # Rate limiting strategy
    backend: Optional[ThrottleBackend],  # Storage backend
    dynamic_backend: bool = False,     # Runtime backend resolution
    min_wait_period: Optional[int] = None,  # Minimum wait (ms)
    headers: Optional[Dict[str, str]] = None,  # Extra headers to include in throttled response
)
```

#### `WebSocketThrottle`

WebSocket message rate limiting.

```python
WebSocketThrottle(
    uid: str,
    rate: Union[Rate, str],
    identifier: Optional[Callable],
    handle_throttled: Optional[Callable],
    strategy: Optional[Strategy],
    backend: Optional[ThrottleBackend],
    dynamic_backend: bool = False,
    min_wait_period: Optional[int] = None,
    headers: Optional[Dict[str, str]] = None,
)
```

#### `BaseThrottle`

Base class for custom throttle implementations. Override `get_key()` for custom key generation.

### Strategy Classes

All strategies are dataclasses with frozen=True for immutability.

#### `FixedWindowStrategy()` (Default)

Simple fixed-window counting. Fast and memory-efficient.

#### `SlidingWindowLogStrategy()`

Most accurate, maintains request log. Memory: O(limit).

#### `SlidingWindowCounterStrategy()`

Sliding window with counters. Balance between accuracy and memory.

#### `TokenBucketStrategy(burst_size: Optional[int] = None)`

Allows controlled bursts. `burst_size` defaults to rate limit.

#### `TokenBucketWithDebtStrategy(max_debt: Optional[int] = None)`

Token bucket with debt tracking for smoother recovery.

#### `LeakyBucketStrategy()`

Perfectly smooth traffic output. No bursts allowed.

#### `LeakyBucketWithQueueStrategy(queue_size: Optional[int] = None)`

Leaky bucket with request queuing.

### Backend Classes

#### `InMemoryBackend`

```python
InMemoryBackend(
    namespace: str = "inmemory",
    persistent: bool = False,
)
```

#### `RedisBackend`

```python
RedisBackend(
    connection: Union[str, Redis],
    namespace: str,
    persistent: bool = True,
)
```

#### `MemcachedBackend`

```python
MemcachedBackend(
    host: str = "localhost",
    port: int = 11211,
    namespace: str = "memcached",
    pool_size: int = 2,
    persistent: bool = False,
)
```

### Rate Class

```python
Rate(
    limit: int,
    milliseconds: int = 0,
    seconds: int = 0,
    minutes: int = 0,
    hours: int = 0,
)

# Or use string format
Rate.parse("100/m")  # 100 per minute
Rate.parse("5/10s")  # 5 per 10 seconds
```

### Middleware Classes

#### `MiddlewareThrottle`

```python
MiddlewareThrottle(
    throttle: BaseThrottle,
    path: Optional[Union[str, Pattern]] = None,
    methods: Optional[Set[str]] = None,
    hook: Optional[Callable] = None,
)
```

#### `ThrottleMiddleware`

```python
ThrottleMiddleware(
    app: ASGIApp,
    middleware_throttles: List[MiddlewareThrottle],
    backend: Optional[ThrottleBackend] = None,
)
```

## Contributing

We welcome contributions! Please see our [Contributing Guide](CONTRIBUTING.md) for details.

### Development Setup

```bash
git clone https://github.com/ti-oluwa/traffik.git
cd traffik
uv sync --extra dev

# Run tests
uv run pytest

# Run linting
uv run ruff check src/ tests/ --fix

# Run formatting  
uv run ruff format src/ tests/
```

## License

This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.

## Changelog

See [CHANGELOG.md](CHANGELOG.md) for version history and changes.

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "traffik",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.9",
    "maintainer_email": "tioluwa <tioluwa.dev@gmail.com>",
    "keywords": "api, distributed-rate-limiting, fastapi, rate-limiting, starlette, throttling",
    "author": null,
    "author_email": "tioluwa <tioluwa.dev@gmail.com>",
    "download_url": "https://files.pythonhosted.org/packages/85/30/3789ab0dfee538a7f984456cf55fdfe2b805aa305164e7a71e2fd219e252/traffik-1.0.0b1.tar.gz",
    "platform": null,
    "description": "# Traffik - A Starlette throttling library\n\n[![Test](https://github.com/ti-oluwa/traffik/actions/workflows/test.yaml/badge.svg)](https://github.com/ti-oluwa/traffik/actions/workflows/test.yaml)\n[![Code Quality](https://github.com/ti-oluwa/traffik/actions/workflows/code-quality.yaml/badge.svg)](https://github.com/ti-oluwa/traffik/actions/workflows/code-quality.yaml)\n[![codecov](https://codecov.io/gh/ti-oluwa/traffik/branch/main/graph/badge.svg)](https://codecov.io/gh/ti-oluwa/traffik)\n<!-- [![PyPI version](https://badge.fury.io/py/traffik.svg)](https://badge.fury.io/py/traffik)\n[![Python versions](https://img.shields.io/pypi/pyversions/traffik.svg)](https://pypi.org/project/traffik/) -->\n\nTraffik provides flexible rate limiting for Starlette and FastAPI applications with support for both HTTP and WebSocket connections. It offers multiple rate limiting strategies including Fixed Window, Sliding Window, Token Bucket, and Leaky Bucket algorithms, allowing you to choose the approach that best fits your use case.\n\nThe library features pluggable backends (in-memory, Redis, Memcached), dynamic backend resolution for special applications. Whether you need simple per-endpoint limits or complex distributed rate limiting, Traffik provides the flexibility and robustness to handle your requirements.\n\nTraffik was inspired by [fastapi-limiter](https://github.com/long2ice/fastapi-limiter) but has evolved into a more comprehensive solution with more advanced features.\n\n## Features\n\n- \ud83d\ude80 **Easy Integration**: Decorator, dependency, and middleware-based throttling\n- \ud83c\udfaf **Multiple Strategies**: Fixed Window (default), Sliding Window, Token Bucket, Leaky Bucket\n- \ud83d\udd04 **Multiple Backends**: In-memory, Redis, Memcached with atomic operation support\n- \ud83c\udf10 **Protocol Support**: Both HTTP and WebSocket throttling\n- \ud83c\udfe2 **Dynamic Backend Resolution**: Support for runtime backend switching\n- \ud83d\udd27 **Dependency Injection Friendly**: Works well with FastAPI's dependency injection system.\n- \ud83d\udcca **Smart Client Identification**: IP-based by default, fully customizable\n- \u2699\ufe0f **Flexible and Extensible API**: Easily extend base functionality\n- \ud83e\uddea **Thoroughly Tested**: Comprehensive test suite covering concurrency and multithreading\n\n## Installation\n\nWe recommend using `uv`, however, it is not a strict requirement.\n\n### Install `uv` (optional)\n\nVisit the [uv documentation](https://docs.astral.sh/uv/getting-started/installation/) for installation instructions.\n\n### Basic Installation\n\n```bash\nuv add traffik\n\n# or using pip\npip install traffik\n```\n\nInstall with FastAPI support:\n\n```bash\nuv add \"traffik[fastapi]\"\n\n# or using pip\npip install \"traffik[fastapi]\"\n```\n\n### With Redis Backend\n\n```bash\nuv add \"traffik[redis]\"\n\n# or using pip\npip install \"traffik[redis]\"\n```\n\n### With Memcached Backend\n\n```bash\nuv add \"traffik[memcached]\"\n\n# or using pip\npip install \"traffik[memcached]\"\n```\n\n### With All Features\n\n```bash\nuv add \"traffik[all]\"\n\n# or using pip\npip install \"traffik[all]\"\n```\n\n### Development Installation\n\n```bash\ngit clone https://github.com/your-username/traffik.git\ncd traffik\n\nuv sync --extra dev\n\n# or using pip\npip install -e .[dev]\n```\n\n## Quick Testing with Docker\n\nFor quick testing across different platforms and Python versions:\n\n```bash\n# Run fast tests\n./docker-test.sh test-fast\n\n# Run full test suite\n./docker-test.sh test\n\n# Start development environment\n./docker-test.sh dev\n\n# Test across Python versions\n./docker-test.sh test-matrix\n```\n\n**Testing Documentation:**\n\n- [DOCKER.md](DOCKER.md) - Complete Docker testing guide\n- [TESTING.md](TESTING.md) - Quick testing guide  \n\n## Quick Start Guide\n\n### 1. Basic HTTP Throttling with Starlette\n\n```python\nfrom starlette.applications import Starlette\nfrom starlette.routing import Route\nfrom starlette.requests import Request\nfrom starlette.responses import JSONResponse\nfrom traffik.throttles import HTTPThrottle\nfrom traffik.backends.inmemory import InMemoryBackend\n\n# Create backend\nbackend = InMemoryBackend(namespace=\"myapp\", persistent=False)\n\n# Create throttle with string rate format\nthrottle = HTTPThrottle(\n    uid=\"basic_limit\",\n    rate=\"5/10s\",  # 5 requests per 10 seconds\n)\n\nasync def throttled_endpoint(request: Request):\n    await throttle(request)\n    return JSONResponse({\"message\": \"Success\"})\n\napp = Starlette(\n    routes=[\n        Route(\"/api/data\", throttled_endpoint, methods=[\"GET\"]),\n    ],\n    lifespan=backend.lifespan,\n)\n```\n\n### 2. FastAPI with Dependency Injection\n\n```python\nfrom fastapi import FastAPI, Depends\nfrom contextlib import asynccontextmanager\nfrom traffik.backends.inmemory import InMemoryBackend\nfrom traffik.throttles import HTTPThrottle\n\n# Create backend\nbackend = InMemoryBackend(namespace=\"api\")\n\n# Setup lifespan\n@asynccontextmanager\nasync def lifespan(app: FastAPI):\n    async with backend(app, persistent=True, close_on_exit=True):\n        yield\n\napp = FastAPI(lifespan=lifespan)\n\n# Create throttle\nthrottle = HTTPThrottle(uid=\"endpoint_limit\", rate=\"10/m\")\n\n@app.get(\"/api/hello\", dependencies=[Depends(throttle)])\nasync def say_hello():\n    return {\"message\": \"Hello World\"}\n```\n\n### 3. Using Decorators (FastAPI Only)\n\n```python\nfrom fastapi import FastAPI\nfrom contextlib import asynccontextmanager\nfrom traffik.decorators import throttled\nfrom traffik.throttles import HTTPThrottle\nfrom traffik.backends.redis import RedisBackend\n\nbackend = RedisBackend(\n    connection=\"redis://localhost:6379/0\",\n    namespace=\"api\",\n    persistent=True,\n)\n\n@asynccontextmanager\nasync def lifespan(app: FastAPI):\n    async with backend(app):\n        yield\n\napp = FastAPI(lifespan=lifespan)\n\n@app.get(\"/api/limited\")\n@throttled(HTTPThrottle(uid=\"limited\", rate=\"5/m\"))\nasync def limited_endpoint():\n    return {\"data\": \"Limited access\"}\n```\n\n### 4. WebSocket Throttling\n\n```python\nfrom traffik.throttles import WebSocketThrottle\nfrom starlette.websockets import WebSocket\nfrom starlette.exceptions import HTTPException\n\nws_throttle = WebSocketThrottle(uid=\"ws_messages\", rate=\"3/10s\")\n\nasync def ws_endpoint(websocket: WebSocket) -> None:\n    await websocket.accept()\n    \n    while True:\n        try:\n            data = await websocket.receive_json()\n            await ws_throttle(websocket)  # Rate limit per message\n            \n            await websocket.send_json({\n                \"status\": \"success\",\n                \"data\": data,\n            })\n        except HTTPException as exc:\n            await websocket.send_json({\n                \"status\": \"error\", \n                \"status_code\": exc.status_code,\n                \"detail\": exc.detail,\n            })\n            break\n    \n    await websocket.close()\n```\n\n## Rate Limiting Strategies\n\nTraffik supports multiple rate limiting strategies, each with different trade-offs. The **Fixed Window strategy is used by default** for its simplicity and performance.\n\n### Fixed Window (Default)\n\nDivides time into fixed windows and counts requests within each window. Simple, fast, and memory-efficient.\n\n**Pros:** Simple, constant memory, fast  \n**Cons:** Can allow bursts at window boundaries (up to 2x limit)\n\n```python\nfrom traffik.strategies import FixedWindowStrategy\n\nthrottle = HTTPThrottle(\n    uid=\"api_limit\",\n    rate=\"100/m\",\n    strategy=FixedWindowStrategy()  # Default, can be omitted\n)\n```\n\n### Sliding Window\n\nMost accurate rate limiting with continuous sliding window evaluation. Prevents boundary exploitation.\n\n**Pros:** Most accurate, no boundary issues  \n**Cons:** Higher memory usage (O(limit) per key)\n\n```python\nfrom traffik.strategies import SlidingWindowLogStrategy\n\nthrottle = HTTPThrottle(\n    uid=\"payment_api\",\n    rate=\"10/m\",\n    strategy=SlidingWindowLogStrategy()\n)\n```\n\n### Token Bucket\n\nAllows controlled bursts while maintaining average rate over time. Tokens refill continuously.\n\n**Pros:** Allows bursts, smooth distribution, self-recovering  \n**Cons:** Slightly more complex\n\n```python\nfrom traffik.strategies import TokenBucketStrategy\n\nthrottle = HTTPThrottle(\n    uid=\"user_api\",\n    rate=\"100/m\",\n    strategy=TokenBucketStrategy(burst_size=150)  # Allow bursts up to 150\n)\n```\n\n### Leaky Bucket\n\nEnforces perfectly smooth traffic output. No bursts allowed.\n\n**Pros:** Smooth output, protects downstream services  \n**Cons:** Less forgiving, may reject legitimate bursts\n\n```python\nfrom traffik.strategies import LeakyBucketStrategy\n\nthrottle = HTTPThrottle(\n    uid=\"third_party_api\",\n    rate=\"50/m\",\n    strategy=LeakyBucketStrategy()\n)\n```\n\n## Writing Custom Strategies\n\nYou can create custom rate limiting strategies by implementing a callable that follows the strategy protocol. This allows you to implement specialized rate limiting logic tailored to your specific needs.\n\n### Strategy Protocol\n\nA strategy is a callable (function or class with `__call__`) that takes three parameters and returns a wait period:\n\n```python\nfrom traffik.backends.base import ThrottleBackend\nfrom traffik.rates import Rate\nfrom traffik.types import Stringable, WaitPeriod\n\nasync def my_strategy(\n    key: Stringable, \n    rate: Rate, \n    backend: ThrottleBackend\n) -> WaitPeriod:\n    \"\"\"\n    :param key: The throttling key (e.g., \"user:123\", \"ip:192.168.1.1\")\n    :param rate: Rate limit definition with limit and expire properties\n    :param backend: Backend instance for storage operations\n    :return: Wait time in milliseconds (0.0 if request allowed)\n    \"\"\"\n    # Your rate limiting logic here\n    return 0.0  # Allow request\n```\n\n### Example: Simple Rate Strategy\n\nHere's a basic example that implements a simple counter-based strategy:\n\n```python\nfrom dataclasses import dataclass\nfrom traffik.backends.base import ThrottleBackend\nfrom traffik.rates import Rate\nfrom traffik.types import Stringable, WaitPeriod\nfrom traffik.utils import time\n\n\nasync def simple_counter_strategy(\n    self, key: Stringable, rate: Rate, backend: ThrottleBackend\n) -> WaitPeriod:\n    \"\"\"\n    Simple counter-based rate limiting.\n    \n    Counts requests and resets counter when expired.\n    \"\"\"\n    if rate.unlimited:\n        return 0.0\n    \n    now = time() * 1000  # Current time in milliseconds\n    full_key = await backend.get_key(str(key))\n    counter_key = f\"{full_key}:counter\"\n    timestamp_key = f\"{full_key}:timestamp\"\n    \n    ttl_seconds = int(rate.expire // 1000) + 1\n    \n    async with await backend.lock(f\"lock:{counter_key}\", blocking=True, blocking_timeout=1):\n        # Get current count and timestamp\n        count_str = await backend.get(counter_key)\n        timestamp_str = await backend.get(timestamp_key)\n        \n        if count_str and timestamp_str:\n            count = int(count_str)\n            timestamp = float(timestamp_str)\n            \n            # Check if window has expired\n            if now - timestamp > rate.expire:\n                # Reset counter for new window\n                count = 1\n                await backend.set(counter_key, \"1\", expire=ttl_seconds)\n                await backend.set(timestamp_key, str(now), expire=ttl_seconds)\n            else:\n                # Increment counter\n                count = await backend.increment(counter_key)\n        else:\n            # First request\n            count = 1\n            await backend.set(counter_key, \"1\", expire=ttl_seconds)\n            await backend.set(timestamp_key, str(now), expire=ttl_seconds)\n        \n        # Check if limit exceeded\n        if count > rate.limit:\n            # Calculate wait time until window expires\n            timestamp = float(await backend.get(timestamp_key))\n            elapsed = now - timestamp\n            wait_ms = rate.expire - elapsed\n            return max(wait_ms, 0.0)\n        \n        return 0.0\n\n# Usage\nthrottle = HTTPThrottle(\n    uid=\"simple\",\n    rate=\"10/m\",\n    strategy=simple_counter_strategy,\n)\n```\n\n### Example: Adaptive Rate Strategy\n\nA more advanced example that adapts the rate limit based on backend load:\n\n```python\nfrom dataclasses import dataclass\nfrom traffik.backends.base import ThrottleBackend\nfrom traffik.rates import Rate\nfrom traffik.types import Stringable, WaitPeriod\nfrom traffik.utils import time\n\n@dataclass(frozen=True)\nclass AdaptiveRateStrategy:\n    \"\"\"\n    Adaptive rate limiting that adjusts based on system load.\n    \n    Reduces limits during high load, increases during low load.\n    \"\"\"\n    \n    load_threshold: float = 0.8  # 80% of limit triggers adaptation\n    reduction_factor: float = 0.5  # Reduce to 50% during high load\n    \n    async def __call__(\n        self, key: Stringable, rate: Rate, backend: ThrottleBackend\n    ) -> WaitPeriod:\n        if rate.unlimited:\n            return 0.0\n        \n        now = time() * 1000\n        window_duration_ms = rate.expire\n        current_window = int(now // window_duration_ms)\n        \n        full_key = await backend.get_key(str(key))\n        counter_key = f\"{full_key}:adaptive:{current_window}\"\n        load_key = f\"{full_key}:load\"\n        ttl_seconds = int(window_duration_ms // 1000) + 1\n        \n        async with await backend.lock(f\"lock:{counter_key}\", blocking=True, blocking_timeout=1):\n            # Increment request counter\n            count = await backend.increment_with_ttl(counter_key, amount=1, ttl=ttl_seconds)\n            \n            # Calculate current load percentage\n            load_percentage = count / rate.limit\n            \n            # Determine effective limit based on load\n            if load_percentage > self.load_threshold:\n                # High load - reduce effective limit\n                effective_limit = int(rate.limit * self.reduction_factor)\n                await backend.set(load_key, \"high\", expire=ttl_seconds)\n            else:\n                # Normal load - use full limit\n                effective_limit = rate.limit\n                await backend.set(load_key, \"normal\", expire=ttl_seconds)\n            \n            # Check against effective limit\n            if count > effective_limit:\n                # Calculate wait time\n                time_in_window = now % window_duration_ms\n                wait_ms = window_duration_ms - time_in_window\n                return wait_ms\n            \n            return 0.0\n\n# Usage\nthrottle = HTTPThrottle(\n    uid=\"adaptive_api\",\n    rate=\"1000/h\",\n    strategy=AdaptiveRateStrategy(load_threshold=0.7, reduction_factor=0.6)\n)\n```\n\n### Example: Priority-Based Strategy\n\nA strategy that implements priority queuing:\n\n```python\nfrom dataclasses import dataclass\nfrom enum import IntEnum\nfrom traffik.backends.base import ThrottleBackend\nfrom traffik.rates import Rate\nfrom traffik.types import Stringable, WaitPeriod\nfrom traffik.utils import time, dump_json, load_json, JSONDecodeError\n\nclass Priority(IntEnum):\n    \"\"\"Request priority levels\"\"\"\n    LOW = 1\n    NORMAL = 2\n    HIGH = 3\n    CRITICAL = 4\n\n@dataclass(frozen=True)\nclass PriorityQueueStrategy:\n    \"\"\"\n    Rate limiting with priority queue.\n    \n    Higher priority requests are processed first when at capacity.\n    \"\"\"\n    \n    default_priority: Priority = Priority.NORMAL\n    \n    def _extract_priority(self, key: str) -> Priority:\n        \"\"\"Extract priority from key (format: \"priority:<level>:user:123\")\"\"\"\n        if key.startswith(\"priority:\"):\n            try:\n                level = int(key.split(\":\")[1])\n                return Priority(level)\n            except (ValueError, IndexError):\n                pass\n        return self.default_priority\n    \n    async def __call__(\n        self, key: Stringable, rate: Rate, backend: ThrottleBackend\n    ) -> WaitPeriod:\n        if rate.unlimited:\n            return 0.0\n        \n        now = time() * 1000\n        priority = self._extract_priority(str(key))\n        \n        full_key = await backend.get_key(str(key))\n        queue_key = f\"{full_key}:priority_queue\"\n        ttl_seconds = int(rate.expire // 1000) + 1\n        \n        async with await backend.lock(f\"lock:{queue_key}\", blocking=True, blocking_timeout=1):\n            # Get current queue\n            queue_json = await backend.get(queue_key)\n            if queue_json:\n                try:\n                    queue = load_json(queue_json)\n                except JSONDecodeError:\n                    queue = []\n            else:\n                queue = []\n            \n            # Remove expired entries\n            queue = [\n                entry for entry in queue \n                if now - entry[\"timestamp\"] < rate.expire\n            ]\n            \n            # Count requests with higher or equal priority\n            higher_priority_count = sum(\n                1 for entry in queue \n                if entry[\"priority\"] >= priority\n            )\n            \n            # Check if request can be processed\n            if higher_priority_count >= rate.limit:\n                # Calculate wait time based on oldest high-priority entry\n                oldest_high_priority = min(\n                    (entry[\"timestamp\"] for entry in queue if entry[\"priority\"] >= priority),\n                    default=now\n                )\n                wait_ms = rate.expire - (now - oldest_high_priority)\n                return max(wait_ms, 0.0)\n            \n            # Add current request to queue\n            queue.append({\n                \"timestamp\": now,\n                \"priority\": priority,\n                \"key\": str(key)\n            })\n            \n            # Sort by priority (descending) and timestamp (ascending)\n            queue.sort(key=lambda x: (-x[\"priority\"], x[\"timestamp\"]))\n            \n            # Store updated queue\n            await backend.set(queue_key, dump_json(queue), expire=ttl_seconds)\n            return 0.0\n\n# Usage\nfrom traffik.throttles import HTTPThrottle\n\nasync def priority_identifier(connection):\n    \"\"\"Extract priority from request headers\"\"\"\n    priority = connection.headers.get(\"X-Priority\", \"2\")\n    user_id = extract_user_id(connection)\n    return f\"priority:{priority}:user:{user_id}\"\n\nthrottle = HTTPThrottle(\n    uid=\"priority_api\",\n    rate=\"100/m\",\n    strategy=PriorityQueueStrategy(),\n    identifier=priority_identifier\n)\n```\n\n### Best Practices for Custom Strategies\n\n1. **Always use locks**: Wrap critical sections with `backend.lock()` to ensure atomicity\n\n   ```python\n   async with await backend.lock(f\"lock:{key}\", blocking=True, blocking_timeout=1):\n       # Critical section\n   ```\n\n2. **Set appropriate TTLs**: Always set expiration times to prevent memory leaks\n\n   ```python\n   ttl_seconds = int(rate.expire // 1000) + 1  # +1 second buffer\n   await backend.set(key, value, expire=ttl_seconds)\n   ```\n\n3. **Handle unlimited rates**: Check for unlimited rates early\n\n   ```python\n   if rate.unlimited:\n       return 0.0\n   ```\n\n4. **Use proper key prefixes**: Namespace your strategy's keys to avoid conflicts\n\n   ```python\n   full_key = await backend.get_key(str(key))\n   strategy_key = f\"{full_key}:mystrategy:data\"\n   ```\n\n5. **Return milliseconds**: Always return wait time in milliseconds\n\n   ```python\n   return wait_ms  # Not wait_seconds * 1000\n   ```\n\n6. **Use dataclasses**: Make strategies immutable and configurable\n\n   ```python\n   @dataclass(frozen=True)\n   class MyStrategy:\n       param1: int = 10\n       param2: float = 0.5\n   ```\n\n7. **Handle errors gracefully**: Catch and handle JSON decode errors, type errors, etc.\n\n   ```python\n   try:\n       data = load_json(json_str)\n   except JSONDecodeError:\n       data = default_value\n   ```\n\n### Testing Custom Strategies\n\n```python\nimport pytest\nfrom traffik.backends.inmemory import InMemoryBackend\nfrom traffik.rates import Rate\n\n@pytest.mark.anyio\nasync def test_custom_strategy():\n    backend = InMemoryBackend(namespace=\"test\")\n    async with backend(close_on_exit=True):\n        strategy = MyCustomStrategy()\n        rate = Rate.parse(\"5/s\")\n        \n        # Test allowing requests\n        for i in range(5):\n            wait = await strategy(f\"user:123\", rate, backend)\n            assert wait == 0.0, f\"Request {i+1} should be allowed\"\n        \n        # Test throttling\n        wait = await strategy(f\"user:123\", rate, backend)\n        assert wait > 0, \"Request 6 should be throttled\"\n        \n        # Verify wait time is reasonable\n        assert wait <= rate.expire, \"Wait time should not exceed window\"\n```\n\n## Backends\n\nTraffik provides three backend implementations with full atomic operation support and distributed locking.\n\n### In-Memory Backend\n\nPerfect for development, testing, and single-instance applications:\n\n```python\nfrom traffik.backends.inmemory import InMemoryBackend\n\nbackend = InMemoryBackend(\n    namespace=\"myapp\",\n    persistent=False,  # Don't persist across restarts\n)\n```\n\n**Pros:**\n\n- No external dependencies\n- Fast and simple\n- Great for development and testing\n\n**Cons:**\n\n- Not suitable for multi-process/distributed systems\n- Data lost on restart (even with `persistent=True`)\n\n### Redis Backend\n\nRecommended for production with distributed systems:\n\n```python\nfrom traffik.backends.redis import RedisBackend\n\n# From connection string\nbackend = RedisBackend(\n    connection=\"redis://localhost:6379/0\",\n    namespace=\"myapp\",\n    persistent=True,\n)\n\n# From Redis client factory\nimport redis.asyncio as redis\n\ndef get_client() -> redis.Redis:\n    return redis.Redis(host='localhost', port=6379, db=0)\n\nbackend = RedisBackend(\n    connection=get_client,\n    namespace=\"myapp\",\n)\n```\n\n**Features:**\n\n- Distributed locks using Redlock algorithm\n- Production-ready persistence\n\n**Pros:**\n\n- Multi-process and distributed support\n- Persistence across restarts\n- Battle-tested for production\n\n**Cons:**\n\n- Requires Redis server\n- Additional infrastructure\n\n### Memcached Backend\n\nLightweight distributed caching solution:\n\n```python\nfrom traffik.backends.memcached import MemcachedBackend\n\nbackend = MemcachedBackend(\n    host=\"localhost\",\n    port=11211,\n    namespace=\"myapp\",\n    pool_size=10,\n)\n```\n\n**Features:**\n\n- Lightweight distributed locks\n- Atomic operations via CAS (Compare-And-Swap)\n- Connection pooling\n- Fast in-memory storage\n\n**Pros:**\n\n- Lightweight and fast\n- Good for high-throughput scenarios\n- Simple deployment\n\n**Cons:**\n\n- Less feature-rich than Redis\n\n### Custom Backends\n\nCreate custom backends by subclassing `ThrottleBackend`:\n\n```python\nimport typing\n\nfrom traffik.backends.base import ThrottleBackend\nfrom traffik.types import HTTPConnectionT, AsyncLock\n\nclass CustomBackend(ThrottleBackend[typing.Dict, HTTPConnectionT]):\n    \"\"\"Custom backend with your storage solution\"\"\"\n    \n    async def initialize(self) -> None:\n        \"\"\"Setup connection/resources\"\"\"\n        pass\n    \n    async def get(self, key: str) -> typing.Optional[str]:\n        \"\"\"Get value for key\"\"\"\n        pass\n    \n    async def set(self, key: str, value: str, expire: typing.Optional[int] = None) -> None:\n        \"\"\"Set value with optional TTL\"\"\"\n        pass\n    \n    async def delete(self, key: str) -> None:\n        \"\"\"Delete key\"\"\"\n        pass\n    \n    async def increment(self, key: str, amount: int = 1) -> int:\n        \"\"\"Atomically increment counter\"\"\"\n        pass\n    \n    async def decrement(self, key: str, amount: int = 1) -> int:\n        \"\"\"Atomically decrement counter\"\"\"\n        pass\n    \n    async def increment_with_ttl(self, key: str, amount: int, ttl: int) -> int:\n        \"\"\"Atomically increment with TTL set\"\"\"\n        pass\n    \n    async def get_lock(self, name: str) -> AsyncLock:\n        \"\"\"Get distributed lock\"\"\"\n        pass\n    \n    async def reset(self) -> None:\n        \"\"\"Clear all data\"\"\"\n        pass\n    \n    async def close(self) -> None:\n        \"\"\"Cleanup resources\"\"\"\n        pass\n```\n\n### Throttle Backend Selection\n\nYou can specify the backend for each throttle individually or allow them to share a common backend. The shared backend is usually the one set up in your application lifespan.\n\n```python\nfrom fastapi import FastAPI, Depends, Request\n\nfrom traffik.backends.redis import RedisBackend\nfrom traffik.throttles import HTTPThrottle\n\nshared_redis_backend = RedisBackend(\n    connection=\"redis://localhost:6379/0\",\n    namespace=\"shared\",\n)\napp = FastAPI(lifespan=shared_redis_backend.lifespan)\n\nthrottle_with_own_backend = HTTPThrottle(\n    uid=\"custom_backend\",\n    rate=\"100/m\",\n    backend=MyCustomBackend(),  # Uses its own backend\n)\n# Uses backend from app lifespan\nthrottle_with_shared_backend = HTTPThrottle(\n    uid=\"shared_backend\",\n    rate=\"50/m\",\n)\n\n@app.get(\"/api/custom\", dependencies=[Depends(throttle_with_shared_backend)])\nasync def endpoint_custom(request: Request = Depends(throttle_with_own_backend)):\n    return {\"message\": \"Uses its own backend\"}\n\n```\n\n## Configuration Options\n\n### Rate Format\n\nTraffik supports flexible rate specification using string format or `Rate` objects:\n\n```python\nfrom traffik import Rate\nfrom traffik.throttles import HTTPThrottle\n\n# String format (recommended)\nthrottle = HTTPThrottle(uid=\"api_v1\", rate=\"100/m\")  # 100 per minute\nthrottle = HTTPThrottle(uid=\"api_v2\", rate=\"5/10s\")  # 5 per 10 seconds\nthrottle = HTTPThrottle(uid=\"api_v3\", rate=\"1000/500ms\")  # 1000 per 500ms\n\n# Supported units: ms, s/sec/second, m/min/minute, h/hr/hour, d/day\nthrottle = HTTPThrottle(uid=\"daily\", rate=\"10000/d\")  # 10000 per day\n\n# Rate object for complex configurations\nrate = Rate(limit=100, minutes=5, seconds=30)  # 100 per 5.5 minutes\nthrottle = HTTPThrottle(uid=\"complex\", rate=rate)\n```\n\n### Custom Client Identification\n\nCustomize how clients are identified for rate limiting:\n\n```python\nfrom starlette.requests import HTTPConnection\nfrom traffik.throttles import HTTPThrottle\n\nasync def user_based_identifier(connection: HTTPConnection) -> str:\n    \"\"\"Identify by user ID from JWT token\"\"\"\n    user_id = extract_user_id(connection.headers.get(\"authorization\"))\n    return f\"user:{user_id}\"\n\nthrottle = HTTPThrottle(\n    uid=\"user_limit\",\n    rate=\"100/h\",\n    identifier=user_based_identifier,\n)\n```\n\n### Exempting Connections\n\nSkip throttling for specific clients by returning `UNLIMITED`:\n\n```python\nimport typing\nfrom starlette.requests import HTTPConnection\nfrom traffik import UNLIMITED\nfrom traffik.throttles import HTTPThrottle\n\n\nasync def admin_bypass_identifier(connection: HTTPConnection) -> typing.Any:\n    \"\"\"Bypass throttling for admin users\"\"\"\n    user_role = extract_role(connection.headers.get(\"authorization\"))\n    if user_role == \"admin\":\n        return UNLIMITED  # Admins bypass throttling\n    \n    # Regular users get normal identification\n    user_id = extract_user_id(connection.headers.get(\"authorization\"))\n    return f\"user:{user_id}\"\n\nthrottle = HTTPThrottle(\n    uid=\"api_limit\",\n    rate=\"50/m\",\n    identifier=admin_bypass_identifier,\n)\n```\n\n### Custom Throttled Response\n\nCustomize the response when rate limits are exceeded:\n\n```python\nfrom starlette.requests import HTTPConnection\nfrom starlette.exceptions import HTTPException\n\nasync def custom_throttled_handler(\n    connection: HTTPConnection, \n    wait_ms: int,\n    *args,\n    **kwargs\n):\n    wait_seconds = wait_ms // 1000\n    raise HTTPException(\n        status_code=429,\n        detail={\n            \"error\": \"rate_limit_exceeded\",\n            \"message\": f\"Too many requests. Retry in {wait_seconds}s\",\n            \"retry_after\": wait_seconds\n        },\n        headers={\"Retry-After\": str(wait_seconds)},\n    )\n\nthrottle = HTTPThrottle(\n    uid=\"api\",\n    rate=\"100/m\",\n    handle_throttled=custom_throttled_handler,\n)\n```\n\n## More on Usage\n\n### Multiple Rate Limits per Endpoint\n\nApply both burst and sustained limits:\n\n```python\nfrom fastapi import FastAPI, Depends\nfrom traffik.throttles import HTTPThrottle\n\n# Burst limit: 10 per minute\nburst_limit = HTTPThrottle(uid=\"burst\", rate=\"10/m\")\n\n# Sustained limit: 100 per hour\nsustained_limit = HTTPThrottle(uid=\"sustained\", rate=\"100/h\")\n\n@app.get(\n    \"/api/data\",\n    dependencies=[Depends(burst_limit), Depends(sustained_limit)]\n)\nasync def get_data():\n    return {\"data\": \"value\"}\n```\n\n### Per-User Rate Limiting\n\n```python\nfrom traffik.throttles import HTTPThrottle\nfrom starlette.requests import Request\n\nasync def user_identifier(request: Request) -> str:\n    user_id = extract_user_from_token(request.headers.get(\"authorization\"))\n    return f\"user:{user_id}\"\n\nuser_throttle = HTTPThrottle(\n    uid=\"user_quota\",\n    rate=\"1000/h\",\n    identifier=user_identifier,\n)\n```\n\n### Strategy Selection Based on Use Case\n\n```python\nfrom traffik.throttles import HTTPThrottle\nfrom traffik.strategies import (\n    FixedWindowStrategy,\n    SlidingWindowLogStrategy,\n    TokenBucketStrategy,\n    LeakyBucketStrategy,\n)\n\n# Public API - allow bursts\npublic_api = HTTPThrottle(\n    uid=\"public\",\n    rate=\"100/m\",\n    strategy=TokenBucketStrategy(burst_size=150),\n)\n\n# Payment API - strict enforcement\npayment_api = HTTPThrottle(\n    uid=\"payments\",\n    rate=\"10/m\",\n    strategy=SlidingWindowLogStrategy(),\n)\n\n# Third-party API - smooth output\nexternal_api = HTTPThrottle(\n    uid=\"external\",\n    rate=\"50/m\",\n    strategy=LeakyBucketStrategy(),\n)\n\n# Simple rate limiting - default\nsimple_api = HTTPThrottle(\n    uid=\"simple\",\n    rate=\"200/m\",\n    # Uses `FixedWindowStrategy` by default\n)\n```\n\n### Dynamic Backend Resolution for Multi-Tenant Applications\n\nEnable runtime backend switching for multi-tenant SaaS applications:\n\n```python\nfrom fastapi import FastAPI, Request, Depends\nfrom traffik.throttles import HTTPThrottle\nfrom traffik.backends.redis import RedisBackend\nfrom traffik.backends.inmemory import InMemoryBackend\nimport jwt\n\n# Shared throttle with dynamic backend resolution\napi_throttle = HTTPThrottle(\n    uid=\"api_quota\",\n    rate=\"1000/h\",\n    dynamic_backend=True,  # Resolve backend at runtime\n)\n\nTENANT_CONFIG = {\n    \"enterprise\": {\"redis_url\": \"redis://enterprise:6379/0\", \"multiplier\": 5.0},\n    \"premium\": {\"redis_url\": \"redis://premium:6379/0\", \"multiplier\": 2.0},\n    \"free\": {\"redis_url\": None, \"multiplier\": 1.0},\n}\n\nasync def tenant_middleware(request: Request, call_next):\n    \"\"\"Set up tenant-specific backend based on JWT\"\"\"\n    token = request.headers.get(\"authorization\", \"\").split(\" \")[1] if request.headers.get(\"authorization\") else None\n    \n    if token:\n        payload = jwt.decode(token, \"secret\", algorithms=[\"HS256\"])\n        tier = payload.get(\"tenant_tier\", \"free\")\n        tenant_id = payload.get(\"tenant_id\", \"default\")\n    else:\n        tier, tenant_id = \"free\", \"anonymous\"\n    \n    config = TENANT_CONFIG[tier]\n    \n    # Select backend based on tenant tier\n    if config[\"redis_url\"]:\n        backend = RedisBackend(\n            connection=config[\"redis_url\"],\n            namespace=f\"tenant_{tenant_id}\",\n            persistent=True\n        )\n    else:\n        backend = InMemoryBackend(namespace=f\"tenant_{tenant_id}\")\n    \n    # Execute within tenant's backend context\n    async with backend(request.app):\n        return await call_next(request)\n\napp = FastAPI()\napp.middleware(\"http\")(tenant_middleware)\n\n@app.get(\"/api/data\")\nasync def get_data(request: Depends(api_throttle)):\n    return {\"data\": \"tenant-specific data\"}\n```\n\n**When to use dynamic backends:**\n\n- \u2705 Multi-tenant SaaS with per-tenant storage\n- \u2705 A/B testing different strategies\n- \u2705 Environment-specific backends\n- \u274c Simple shared storage (use explicit `backend` parameter instead)\n\n**Important:** Dynamic backend resolution adds slight overhead and complexity. Only use when you need runtime backend switching.\n\n### Application Lifespan Management\n\nProperly manage backend lifecycle:\n\n#### Starlette\n\n```python\nfrom starlette.applications import Starlette\nfrom traffik.backends.inmemory import InMemoryBackend\n\nbackend = InMemoryBackend(namespace=\"app\")\n\napp = Starlette(\n    routes=[...],\n    lifespan=backend.lifespan,  # Automatic cleanup\n)\n```\n\n#### FastAPI\n\n```python\nfrom fastapi import FastAPI\nfrom contextlib import asynccontextmanager\nfrom traffik.backends.redis import RedisBackend\n\nbackend = RedisBackend(connection=\"redis://localhost:6379/0\", namespace=\"app\")\n\n@asynccontextmanager\nasync def lifespan(app: FastAPI):\n    # Startup\n    async with backend(app, persistent=True, close_on_exit=True):\n        yield\n    # Shutdown - backend cleanup automatic\n\napp = FastAPI(lifespan=lifespan)\n```\n\n## Throttle Middleware\n\nApply rate limiting across multiple endpoints with sophisticated filtering and routing logic.\n\n### Basic Middleware Setup\n\n#### FastAPI\n\n```python\nfrom fastapi import FastAPI\nfrom traffik.middleware import ThrottleMiddleware, MiddlewareThrottle\nfrom traffik.throttles import HTTPThrottle\nfrom traffik.backends.inmemory import InMemoryBackend\n\napp = FastAPI()\nbackend = InMemoryBackend(namespace=\"api\")\n\n# Create throttle\napi_throttle = HTTPThrottle(uid=\"api_global\", rate=\"100/m\")\n\n# Wrap in middleware throttle\nmiddleware_throttle = MiddlewareThrottle(api_throttle)\n\n# Add middleware\napp.add_middleware(\n    ThrottleMiddleware,\n    middleware_throttles=[middleware_throttle],\n    backend=backend\n)\n```\n\n#### Starlette\n\n```python\nfrom starlette.applications import Starlette\nfrom traffik.middleware import ThrottleMiddleware, MiddlewareThrottle\nfrom traffik.throttles import HTTPThrottle\nfrom traffik.backends.inmemory import InMemoryBackend\n\nbackend = InMemoryBackend(namespace=\"api\")\n\napi_throttle = HTTPThrottle(uid=\"api_throttle\", rate=\"50/m\")\nmiddleware_throttle = MiddlewareThrottle(api_throttle)\n\napp = Starlette(\n    routes=[...],\n    middleware=[\n        (ThrottleMiddleware, {\n            \"middleware_throttles\": [middleware_throttle],\n            \"backend\": backend\n        })\n    ]\n)\n```\n\n### Advanced Filtering\n\n#### Method-Based Filtering\n\n```python\n# Strict limits for write operations\nwrite_throttle = HTTPThrottle(uid=\"writes\", rate=\"10/m\")\nread_throttle = HTTPThrottle(uid=\"reads\", rate=\"1000/m\")\n\nwrite_middleware_throttle = MiddlewareThrottle(\n    write_throttle,\n    methods={\"POST\", \"PUT\", \"DELETE\"}\n)\n\nread_middleware_throttle = MiddlewareThrottle(\n    read_throttle,\n    methods={\"GET\", \"HEAD\"}\n)\n\napp.add_middleware(\n    ThrottleMiddleware,\n    middleware_throttles=[write_middleware_throttle, read_middleware_throttle],\n    backend=backend # If not set, uses app lifespan backend\n)\n```\n\n#### Path Pattern Filtering\n\n```python\n# String patterns and regex\napi_throttle = HTTPThrottle(uid=\"api\", rate=\"100/m\")\nadmin_throttle = HTTPThrottle(uid=\"admin\", rate=\"5/m\")\n\napi_middleware_throttle = MiddlewareThrottle(\n    api_throttle,\n    path=\"/api/\"  # Starts with /api/\n)\nadmin_middleware_throttle = MiddlewareThrottle(\n    admin_throttle,\n    path=r\"^/admin/.*\"  # Regex pattern\n)\n\napp.add_middleware(\n    ThrottleMiddleware,\n    middleware_throttles=[admin_middleware_throttle, api_middleware_throttle],\n    backend=backend\n)\n```\n\n#### Custom Hook-Based Filtering\n\n```python\nfrom starlette.requests import HTTPConnection\n\nasync def authenticated_only(connection: HTTPConnection) -> bool:\n    \"\"\"Apply throttle only to authenticated users\"\"\"\n    return connection.headers.get(\"authorization\") is not None\n\nasync def intensive_operations(connection: HTTPConnection) -> bool:\n    \"\"\"Throttle resource-intensive endpoints\"\"\"\n    intensive_paths = [\"/api/reports/\", \"/api/analytics/\", \"/api/exports/\"]\n    return any(connection.scope[\"path\"].startswith(p) for p in intensive_paths)\n\nauth_throttle = HTTPThrottle(uid=\"auth\", rate=\"200/m\")\nintensive_throttle = HTTPThrottle(uid=\"intensive\", rate=\"20/m\")\n\nauth_middleware_throttle = MiddlewareThrottle(auth_throttle, hook=authenticated_only)\nintensive_middleware_throttle = MiddlewareThrottle(intensive_throttle, hook=intensive_operations)\n\napp.add_middleware(\n    ThrottleMiddleware,\n    middleware_throttles=[intensive_middleware_throttle, auth_middleware_throttle],\n    backend=backend\n)\n```\n\n#### Combined Filtering\n\n```python\nasync def authenticated_only(connection: HTTPConnection) -> bool:\n    return connection.headers.get(\"authorization\") is not None\n\ncomplex_throttle = HTTPThrottle(uid=\"complex\", rate=\"25/m\")\n\n# Combines ALL criteria: path AND method AND hook\ncomplex_middleware_throttle = MiddlewareThrottle(\n    complex_throttle,\n    path=\"/api/\",\n    methods={\"POST\"},\n    hook=authenticated_only\n)\n```\n\n### Best Practices\n\n1. **Order Matters**: Place more specific throttles before general ones\n2. **Early Placement**: Add throttle middleware early in the stack to reject requests before expensive processing\n3. **Production Backends**: Use Redis for multi-instance deployments\n4. **Monitoring**: Log throttle hits for capacity planning\n5. **Graceful Responses**: Provide clear error messages with retry information\n\n```python\n# Example: Optimal middleware ordering\napp.add_middleware(\n    ThrottleMiddleware,\n    middleware_throttles=[\n        admin_middleware_throttle,      # Most specific\n        intensive_middleware_throttle,  # Specific paths\n        api_middleware_throttle,        # General API\n    ],\n    backend=redis_backend\n)\n```\n\n## Error Handling\n\nTraffik provides specific exceptions for different scenarios:\n\n### Exception Types\n\n```python\nfrom traffik.exceptions import (\n    TraffikException,           # Base exception\n    ConfigurationError,         # Invalid configuration\n    AnonymousConnection,        # Cannot identify client\n    ConnectionThrottled,        # Rate limit exceeded (HTTP 429)\n    BackendError,               # Backend operation failed\n)\n```\n\n### Handling Rate Limit Exceptions\n\nThe `ConnectionThrottled` exception is raised when a client exceeds their rate limit. It is an `HTTPException` so its is handled automatically by FastAPI/Starlette, returning a `429 Too Many Requests` response. However, you can still register a custom exception handler if you want to customize the response further.\n\n```python\nfrom fastapi import FastAPI, Request\nfrom traffik.exceptions import ConnectionThrottled\nfrom starlette.responses import JSONResponse\n\n@app.exception_handler(ConnectionThrottled)\nasync def throttle_handler(request: Request, exc: ConnectionThrottled):\n    return JSONResponse(\n        status_code=429,\n        content={\n            \"error\": \"rate_limit_exceeded\",\n            \"retry_after_seconds\": exc.retry_after,\n            \"message\": exc.detail\n        },\n        headers={\"Retry-After\": str(exc.retry_after)}\n    )\n```\n\n### Handling Configuration Errors\n\n```python\nfrom traffik.exceptions import ConfigurationError, BackendConnectionError\n\ntry:\n    backend = RedisBackend(connection=\"invalid://url\")\n    await backend.initialize()\nexcept BackendConnectionError as exc:\n    logger.error(f\"Failed to connect to Redis: {exc}\")\nexcept ConfigurationError as exc:\n    logger.error(f\"Invalid configuration: {exc}\")\n```\n\n### Handling Anonymous Connections\n\nThe default identifier raises `AnonymousConnection` if it cannot identify the client. You can catch this exception to provide a fallback identifier.\n\n```python\nfrom traffik.exceptions import AnonymousConnection\nfrom traffik.backends.base import connection_identifier\n\nasync def safe_identifier(connection: HTTPConnection) -> str:\n    try:\n        return await connection_identifier(connection)\n    except AnonymousConnection:\n        # Fall back to a default identifier\n        return f\"anonymous:{connection.scope['path']}\"\n```\n\n## Testing\n\nComprehensive testing example using `InMemoryBackend`:\n\n```python\nimport pytest\nfrom httpx import AsyncClient, ASGITransport\nfrom starlette.applications import Starlette\nfrom starlette.routing import Route\nfrom starlette.responses import JSONResponse\nfrom traffik.backends.inmemory import InMemoryBackend\nfrom traffik.throttles import HTTPThrottle\n\n@pytest.fixture\nasync def backend():\n    backend = InMemoryBackend(namespace=\"test\", persistent=False)\n    async with backend():\n        yield backend\n\n@pytest.mark.anyio\nasync def test_throttling(backend):\n    throttle = HTTPThrottle(uid=\"test\", rate=\"2/s\")\n\n    async def endpoint(request):\n        await throttle(request)\n        return JSONResponse({\"status\": \"ok\"})\n    \n    app = Starlette(\n        routes=[Route(\"/test\", endpoint, methods=[\"GET\"])],\n        lifespan=backend.lifespan,\n    )\n\n    async with AsyncClient(\n        transport=ASGITransport(app=app),\n        base_url=\"http://testserver\",\n    ) as client:\n        # First 2 requests succeed\n        r1 = await client.get(\"/test\")\n        r2 = await client.get(\"/test\")\n        assert r1.status_code == 200\n        assert r2.status_code == 200\n        \n        # Third request throttled\n        r3 = await client.get(\"/test\")\n        assert r3.status_code == 429\n```\n\n### Testing Different Strategies\n\n```python\nfrom traffik.strategies import (\n    FixedWindowStrategy,\n    SlidingWindowLogStrategy,\n    TokenBucketStrategy,\n)\n\n@pytest.mark.parametrize(\"strategy\", [\n    FixedWindowStrategy(),\n    SlidingWindowLogStrategy(),\n    TokenBucketStrategy(),\n])\nasync def test_strategy(backend, strategy):\n    throttle = HTTPThrottle(uid=\"test\", rate=\"5/s\", strategy=strategy)\n    # Test throttle behavior with different strategies\n```\n\n## API Reference\n\n### Throttle Classes\n\n#### `HTTPThrottle`\n\nHTTP request rate limiting with flexible configuration.\n\n```python\nHTTPThrottle(\n    uid: str,                          # Unique identifier\n    rate: Union[Rate, str],           # \"100/m\" or Rate object\n    identifier: Optional[Callable],    # Client ID function\n    handle_throttled: Optional[Callable],  # Custom handler\n    strategy: Optional[Strategy],      # Rate limiting strategy\n    backend: Optional[ThrottleBackend],  # Storage backend\n    dynamic_backend: bool = False,     # Runtime backend resolution\n    min_wait_period: Optional[int] = None,  # Minimum wait (ms)\n    headers: Optional[Dict[str, str]] = None,  # Extra headers to include in throttled response\n)\n```\n\n#### `WebSocketThrottle`\n\nWebSocket message rate limiting.\n\n```python\nWebSocketThrottle(\n    uid: str,\n    rate: Union[Rate, str],\n    identifier: Optional[Callable],\n    handle_throttled: Optional[Callable],\n    strategy: Optional[Strategy],\n    backend: Optional[ThrottleBackend],\n    dynamic_backend: bool = False,\n    min_wait_period: Optional[int] = None,\n    headers: Optional[Dict[str, str]] = None,\n)\n```\n\n#### `BaseThrottle`\n\nBase class for custom throttle implementations. Override `get_key()` for custom key generation.\n\n### Strategy Classes\n\nAll strategies are dataclasses with frozen=True for immutability.\n\n#### `FixedWindowStrategy()` (Default)\n\nSimple fixed-window counting. Fast and memory-efficient.\n\n#### `SlidingWindowLogStrategy()`\n\nMost accurate, maintains request log. Memory: O(limit).\n\n#### `SlidingWindowCounterStrategy()`\n\nSliding window with counters. Balance between accuracy and memory.\n\n#### `TokenBucketStrategy(burst_size: Optional[int] = None)`\n\nAllows controlled bursts. `burst_size` defaults to rate limit.\n\n#### `TokenBucketWithDebtStrategy(max_debt: Optional[int] = None)`\n\nToken bucket with debt tracking for smoother recovery.\n\n#### `LeakyBucketStrategy()`\n\nPerfectly smooth traffic output. No bursts allowed.\n\n#### `LeakyBucketWithQueueStrategy(queue_size: Optional[int] = None)`\n\nLeaky bucket with request queuing.\n\n### Backend Classes\n\n#### `InMemoryBackend`\n\n```python\nInMemoryBackend(\n    namespace: str = \"inmemory\",\n    persistent: bool = False,\n)\n```\n\n#### `RedisBackend`\n\n```python\nRedisBackend(\n    connection: Union[str, Redis],\n    namespace: str,\n    persistent: bool = True,\n)\n```\n\n#### `MemcachedBackend`\n\n```python\nMemcachedBackend(\n    host: str = \"localhost\",\n    port: int = 11211,\n    namespace: str = \"memcached\",\n    pool_size: int = 2,\n    persistent: bool = False,\n)\n```\n\n### Rate Class\n\n```python\nRate(\n    limit: int,\n    milliseconds: int = 0,\n    seconds: int = 0,\n    minutes: int = 0,\n    hours: int = 0,\n)\n\n# Or use string format\nRate.parse(\"100/m\")  # 100 per minute\nRate.parse(\"5/10s\")  # 5 per 10 seconds\n```\n\n### Middleware Classes\n\n#### `MiddlewareThrottle`\n\n```python\nMiddlewareThrottle(\n    throttle: BaseThrottle,\n    path: Optional[Union[str, Pattern]] = None,\n    methods: Optional[Set[str]] = None,\n    hook: Optional[Callable] = None,\n)\n```\n\n#### `ThrottleMiddleware`\n\n```python\nThrottleMiddleware(\n    app: ASGIApp,\n    middleware_throttles: List[MiddlewareThrottle],\n    backend: Optional[ThrottleBackend] = None,\n)\n```\n\n## Contributing\n\nWe welcome contributions! Please see our [Contributing Guide](CONTRIBUTING.md) for details.\n\n### Development Setup\n\n```bash\ngit clone https://github.com/ti-oluwa/traffik.git\ncd traffik\nuv sync --extra dev\n\n# Run tests\nuv run pytest\n\n# Run linting\nuv run ruff check src/ tests/ --fix\n\n# Run formatting  \nuv run ruff format src/ tests/\n```\n\n## License\n\nThis project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.\n\n## Changelog\n\nSee [CHANGELOG.md](CHANGELOG.md) for version history and changes.\n",
    "bugtrack_url": null,
    "license": "MIT License  Copyright (c) 2025 Daniel T. Afolayan   Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the \"Software\"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:  The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.  THE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.",
    "summary": "Distributed rate limiting for Starlette applications.",
    "version": "1.0.0b1",
    "project_urls": {
        "Bug Tracker": "https://github.com/ti-oluwa/traffik/issues",
        "Changelog": "https://github.com/ti-oluwa/traffik/blob/main/CHANGELOG.md",
        "Documentation": "https://github.com/ti-oluwa/traffik#readme",
        "Homepage": "https://github.com/ti-oluwa/traffik",
        "Repository": "https://github.com/ti-oluwa/traffik.git"
    },
    "split_keywords": [
        "api",
        " distributed-rate-limiting",
        " fastapi",
        " rate-limiting",
        " starlette",
        " throttling"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "d67a98726fb4cc45a9625daf9fdf2d95241d07a046b8ab33d45c07d68ff76bbc",
                "md5": "e7b1e1c307f30ab919b7b8b27d44b240",
                "sha256": "ec5bbe35b3abe55df5f35ed93a688fda247325fe39aef7ece8b8a017f44c0d36"
            },
            "downloads": -1,
            "filename": "traffik-1.0.0b1-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "e7b1e1c307f30ab919b7b8b27d44b240",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.9",
            "size": 62181,
            "upload_time": "2025-10-19T13:44:29",
            "upload_time_iso_8601": "2025-10-19T13:44:29.727456Z",
            "url": "https://files.pythonhosted.org/packages/d6/7a/98726fb4cc45a9625daf9fdf2d95241d07a046b8ab33d45c07d68ff76bbc/traffik-1.0.0b1-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "85303789ab0dfee538a7f984456cf55fdfe2b805aa305164e7a71e2fd219e252",
                "md5": "7cbd1f8fe60d628966167112f69aacf5",
                "sha256": "f0f8ec5bcb77e340c1f65add0597291d906846678fc827b137555c56ab16bee6"
            },
            "downloads": -1,
            "filename": "traffik-1.0.0b1.tar.gz",
            "has_sig": false,
            "md5_digest": "7cbd1f8fe60d628966167112f69aacf5",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.9",
            "size": 197384,
            "upload_time": "2025-10-19T13:44:30",
            "upload_time_iso_8601": "2025-10-19T13:44:30.946717Z",
            "url": "https://files.pythonhosted.org/packages/85/30/3789ab0dfee538a7f984456cf55fdfe2b805aa305164e7a71e2fd219e252/traffik-1.0.0b1.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-10-19 13:44:30",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "ti-oluwa",
    "github_project": "traffik",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "traffik"
}
        
Elapsed time: 0.59229s