apigatewaybuilder


Nameapigatewaybuilder JSON
Version 1.4.1 PyPI version JSON
download
home_pageNone
SummaryFramework-agnostic API Gateway with validation, auth, and rate limiting
upload_time2025-10-09 11:17:16
maintainerNone
docs_urlNone
authorPrabhBirJ
requires_python>=3.11
licenseMIT
keywords api gateway validation authorization rate-limiting
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # API Gateway


**API Gateway** is a modular, developer-friendly Python project designed to become a **full-featured API Gateway framework**.  
(v1.4.0) it ships with **request validation utilities, authorization, rate limiting and logging** powered by [Pydantic](https://docs.pydantic.dev).  


---

##  Vision
The goal of **API Gateway** is to provide:
-  **Validation**: Ensure only clean, schema-compliant data enters your services. *(available today)*  
-  **Authentication & Authorization**: Pluggable security layers. *(coming soon)*  
-  **Observability**: Metrics, logging, tracing. *(coming soon)*  
-  **Routing**: Intelligent request routing and proxying. *(coming soon)*  
-  **Rate Limiting & QoS**: Keep traffic fair and resilient. *(coming soon)*  
 

---

##  Installation For Contribution

To get started you need [`uv`](https://docs.astral.sh/uv/), a fast Python package manager. Install it first with:

```bash
# On Linux / macOS
curl -LsSf https://astral.sh/uv/install.sh | sh

# On Windows (PowerShell)
powershell -c "irm https://astral.sh/uv/install.ps1 | iex"


git clone https://github.com/PrabhbirJ/apigateway.git
cd apigateway
uv sync --all-extras --dev

# Run tests
uv run pytest
```

---

##  Installation To Use in Your Project

To get started you need [`uv`](https://docs.astral.sh/uv/), a fast Python package manager. Install it first with:

```bash
# On Linux / macOS
curl -LsSf https://astral.sh/uv/install.sh | sh

# On Windows (PowerShell)
powershell -c "irm https://astral.sh/uv/install.ps1 | iex"

cd {Project_Directory}

uv init
uv add git+https://github.com/PrabhbirJ/apigateway.git

uv add flask django fastapi

```

---

##  Project Structure

```bash
apigateway
├── CHANGELOG.md
├── LICENSE
├── README.md
├── pyproject.toml
├── src
│   ├── apigateway
│   │   ├── core
│   │   │   ├── __init__.py
│   │   │   ├── adapters
│   │   │   │   ├── __init__.py
│   │   │   │   ├── base_adapter.py
│   │   │   │   ├── django.py
│   │   │   │   ├── fastapi.py
│   │   │   │   ├── flask.py
│   │   │   │   └── generic.py
│   │   │   ├── auth
│   │   │   │   ├── __init__.py
│   │   │   │   └── auth.py
│   │   │   ├── enums
│   │   │   │   ├── __init__.py
│   │   │   │   └── validation_modes.py
│   │   │   ├── errors
│   │   │   │   ├── __init.py
│   │   │   │   └── formatters.py
│   │   │   └── validation
│   │   │       ├── __init__.py
│   │   │       ├── file_validation.py
│   │   │       └── validation.py
│   │   └── exceptions
│   │       ├── AuthError.py
│   │       ├── GatewayValidationError.py
│   │       ├── __init__.py
│   └── apigateway.egg-info
│       ├── PKG-INFO
│       ├── SOURCES.txt
│       ├── dependency_links.txt
│       ├── requires.txt
│       └── top_level.txt
├── tests
│   ├── __init__.py
│   └── validation
│       ├── __init__.py
│       ├── test_adapters
│       │   ├── __init__.py
│       │   ├── conftest.py
│       │   ├── test_django.py
│       │   ├── test_fastapi.py
│       │   └── test_flask.py
│       └── test_generic
│           ├── __init__.py
│           ├── test_error_handling.py
│           ├── test_pre_post_validators.py
│           └── test_strict_vs_lax.py
└── uv.lock

```
---

## Error Handling

All validation errors are raised as GatewayValidationError with this schema:
```bash
{
  "error": "Validation Failed",
  "code": "validation_error",
  "details": [
    {
      "field": "id",
      "message": "value is not a valid integer",
      "type": "type_error.integer"
    }
  ]
}
```
You can customize formatting by supplying your own error_formatter

---

##  Flask Example

```python
import os
import json
import base64
import time
import secrets
import jwt  # Add this import
from datetime import datetime, timedelta
from flask import Flask, jsonify, request
from pydantic import BaseModel, ConfigDict
from typing import Optional, List, Dict, Any

# API Gateway imports
from apigateway.core.validation.validation import validate_flask, PreValidators
from apigateway.core.enums.validation_modes import ValidationMode
from apigateway.core.auth.auth import authorize_flask  # Updated import
from apigateway.core.rate_limit.RateLimitEngine import configure_rate_limiting, KeyGenerators
from apigateway.core.rate_limit.RateLimiting import rate_limit_flask
from apigateway.core.rate_limit.MemoryBackend import MemoryBackend

# NEW: Logging system imports
from apigateway.core.logging import configure_logging, JsonLogger, LogLevel, get_logger
from apigateway.core.logging.logger import log_request_flask

# =============================================================================
# LOGGING CONFIGURATION
# =============================================================================

# Configure structured JSON logging
logger_instance = JsonLogger(
    log_level=LogLevel.INFO,
    enable_sampling=False,  # Disable sampling for demo (log everything)
    masked_fields={'authorization', 'cookie', 'x-api-key', 'token', 'password'}
)
configure_logging(logger_instance)

# Get logger for manual logging
app_logger = get_logger()

# =============================================================================
# APPLICATION SETUP
# =============================================================================

# Configure rate limiting with memory backend
configure_rate_limiting(MemoryBackend())

app = Flask(__name__)

# =============================================================================
# MOCK USER DATABASE
# =============================================================================

users_db = {
    "testuser": {
        "user_id": "1", 
        "username": "testuser",
        "email": "test@example.com",
        "roles": ["user", "admin"]
    },
    "user1": {
        "user_id": "2", 
        "username": "user1",
        "email": "user1@example.com",
        "roles": ["user"]
    },
    "premium": {
        "user_id": "3", 
        "username": "premium",
        "email": "premium@example.com",
        "roles": ["user", "premium"]
    },
    "moderator": {
        "user_id": "4", 
        "username": "moderator", 
        "email": "mod@example.com",
        "roles": ["user", "moderator"]
    }
}


JWT_SECRET_KEY = "demo-secret-key-32-characters-long-for-development-only!"
JWT_ALGORITHM = "HS256"


def my_jwt_decoder(token: str) -> Dict[str, Any]:
    """Our JWT decoder - we handle the secret and decoding logic."""
    try:
        payload = jwt.decode(token, JWT_SECRET_KEY, algorithms=[JWT_ALGORITHM])
        return {
            'user_id': payload.get('sub'),
            'username': payload.get('username'),
            'email': payload.get('email'),
            'roles': payload.get('roles', []),
            'permissions': payload.get('permissions', []),
            'token_payload': payload
        }
    except jwt.ExpiredSignatureError:
        raise Exception("Token has expired")
    except jwt.InvalidSignatureError:
        raise Exception("Invalid token signature")
    except jwt.InvalidTokenError as e:
        raise Exception(f"Invalid token: {str(e)}")


class TokenRequestSchema(BaseModel):
    username: str
    model_config = ConfigDict(extra='forbid')

class ProtectedDataSchema(BaseModel):
    important_data: str
    sensitive_info: Optional[str] = None
    model_config = ConfigDict(extra='forbid')

class UserSchema(BaseModel):
    username: str
    age: int
    email: str
    model_config = ConfigDict(extra='forbid')

class ContactSchema(BaseModel):
    name: str
    email: str
    message: str
    model_config = ConfigDict(extra='ignore')

class SearchSchema(BaseModel):
    query: str
    limit: int = 10
    category: str = "all"
    model_config = ConfigDict(extra='forbid')

class PostSchema(BaseModel):
    title: str
    content: str
    tags: List[str] = []
    model_config = ConfigDict(extra='forbid')

class ApiKeySchema(BaseModel):
    name: str
    permissions: List[str]
    model_config = ConfigDict(extra='forbid')



def create_jwt_token(user_data: dict) -> str:
    """Create a properly signed JWT token."""
    now = int(time.time())
    payload = {
        "sub": str(user_data["user_id"]),
        "username": user_data["username"],
        "email": user_data["email"],
        "roles": user_data["roles"],
        "permissions": ["read", "write"],
        "iat": now,
        "exp": now + 3600,  # 1 hour
        "jti": f"token_{user_data['user_id']}_{now}"
    }
    
    return jwt.encode(payload, JWT_SECRET_KEY, algorithm=JWT_ALGORITHM)

def create_custom_role_token(roles: List[str], username: str = "demo_user") -> str:
    """Create JWT token with custom roles."""
    user_data = {
        "user_id": f"demo_{int(time.time())}",
        "username": username,
        "email": f"{username}@demo.com",
        "roles": roles
    }
    return create_jwt_token(user_data)

# Post-validators with logging
def audit_user_creation(user: UserSchema) -> UserSchema:
    """Post-validator: Log user creation for audit."""
    app_logger.log(LogLevel.INFO, "User creation audit", {
        'audit_action': 'user_creation',
        'new_username': user.username,
        'user_age': user.age,
        'user_email': user.email
    })
    return user

def uppercase_username(user: UserSchema) -> UserSchema:
    """Post-validator: Transform username to uppercase."""
    original_username = user.username
    user.username = user.username.upper()
    
    app_logger.log(LogLevel.INFO, "Username transformed", {
        'transformation': 'uppercase',
        'original_username': original_username,
        'new_username': user.username
    })
    return user

def validate_admin_email(user: UserSchema) -> UserSchema:
    """Post-validator: Ensure admin users have company email."""
    if "@company.com" not in user.email:
        app_logger.log(LogLevel.WARNING, "Admin email validation failed", {
            'validation_rule': 'company_email_required',
            'provided_email': user.email,
            'username': user.username
        })
        raise ValueError("Admin users must have @company.com email address")
    
    app_logger.log(LogLevel.INFO, "Admin email validation passed", {
        'validation_rule': 'company_email_required',
        'email': user.email,
        'username': user.username
    })
    return user



@app.route('/', methods=['GET'])
@log_request_flask()
def home():
    """API documentation showing all available endpoints."""
    app_logger.log(LogLevel.INFO, "API documentation requested", {
        'endpoint': 'home',
        'documentation_type': 'api_overview'
    })
    
    return jsonify({
        "message": "API Gateway Demo - JWT + Logging",
        "version": "3.0-LOGGING",
        "features": ["JWT Verification", "Validation", "RBAC", "Rate Limiting", "Structured Logging"],
        "logging": {
            "format": "structured_json",
            "correlation_tracking": "enabled",
            "sensitive_masking": "enabled",
            "log_level": "INFO"
        },
        "endpoints": {
            "token_generation": {
                "POST /get-token": "Get JWT token (rate limited: 10/min)",
                "GET /whoami": "Get current user info (requires valid JWT)"
            },
            "public": {
                "POST /contact": "Submit contact form (rate limited: 10/min)",
                "GET /search": "Search with query params (rate limited: 20/min)",
                "GET /public-data": "Get public data (rate limited: 100/min)"
            },
            "user_protected": {
                "GET /profile": "View profile (user role required)",
                "POST /posts": "Create post (user role + validation + rate limit: 5/min)",
                "POST /submit": "Submit protected data (user role + validation)"
            },
            "admin_only": {
                "POST /users": "Create user (admin role + rate limit: 2/min)",
                "POST /admin/users": "Create admin user (admin role + strict validation)",
                "GET /admin/stats": "View admin stats (admin role)"
            },
            "moderator_only": {
                "POST /moderate": "Moderate content (moderator role + rate limit: 10/min)"
            },
            "premium_features": {
                "GET /premium/data": "Premium data access (premium role)",
                "POST /premium/api-keys": "Create API keys (premium role + rate limit: 1/min)"
            }
        }
    })



@app.route("/get-token", methods=["POST"])
@log_request_flask()                           # OUTERMOST - logs everything
@rate_limit_flask(requests=10, window=60)     # Rate limiting
@validate_flask(TokenRequestSchema)          # Validation
def get_token(validated: TokenRequestSchema, _rate_limit_info=None):
    """Generate JWT token for testing."""
    user = users_db.get(validated.username)
    if not user:
        app_logger.log(LogLevel.WARNING, "Token request for unknown user", {
            'requested_username': validated.username,
            'available_users': list(users_db.keys())
        })
        return jsonify({"error": "User not found"}), 404
    
    # Create properly signed JWT token
    access_token = create_jwt_token(user)
    
    app_logger.log(LogLevel.INFO, "JWT token generated successfully", {
        'token_action': 'generation',
        'username': user["username"],
        'user_id': user["user_id"],
        'roles': user["roles"],
        'token_expiry': datetime.fromtimestamp(int(time.time()) + 3600).isoformat()
    })
    
    return jsonify({
        "access_token": access_token,
        "token_type": "bearer",
        "expires_in": 3600,
        "user": {
            "username": user["username"],
            "roles": user["roles"]
        },
        "note": "Properly signed JWT token with verification"
    })

@app.route("/get-custom-token/<role>", methods=["GET"])
@log_request_flask()
@rate_limit_flask(requests=5, window=60, scope="custom_token")
def get_custom_token(role, _rate_limit_info=None):
    """Generate JWT token with specific role for testing."""
    valid_roles = ["user", "admin", "moderator", "premium"]
    
    if role not in valid_roles:
        app_logger.log(LogLevel.WARNING, "Invalid role requested for custom token", {
            'requested_role': role,
            'valid_roles': valid_roles
        })
        return jsonify({"error": f"Invalid role. Valid roles: {valid_roles}"}), 400
    
    token = create_custom_role_token([role], f"demo_{role}")
    
    app_logger.log(LogLevel.INFO, "Custom role token generated", {
        'token_action': 'custom_generation',
        'role': role,
        'demo_user': f"demo_{role}"
    })
    
    return jsonify({
        "access_token": token,
        "role": role,
        "expires_in": 3600,
        "note": f"JWT token with {role} role"
    })

@app.route("/whoami", methods=["GET"])
@log_request_flask()
@authorize_flask(token_decoder=my_jwt_decoder) 
def whoami(user):
    """Get current user information from verified JWT token."""
    app_logger.log(LogLevel.INFO, "User identity verified", {
        'identity_check': 'whoami',
        'user_id': user['user_id'],
        'username': user.get('username'),
        'roles': user['roles'],
        'token_id': user['token_payload'].get('jti')
    })
    
    return jsonify({
        "message": "JWT token successfully verified",
        "user": {
            "user_id": user["user_id"],
            "username": user.get("username"),
            "email": user.get("email"),
            "roles": user["roles"],
            "permissions": user.get("permissions", [])
        },
        "token_info": {
            "expires_at": datetime.fromtimestamp(user["token_payload"]["exp"]).isoformat(),
            "issued_at": datetime.fromtimestamp(user["token_payload"]["iat"]).isoformat(),
            "token_id": user["token_payload"].get("jti")
        }
    })



@app.route('/contact', methods=['POST'])
@log_request_flask()
@rate_limit_flask(requests=10, window=60, scope="contact")
@validate_flask(
    ContactSchema, 
    mode=ValidationMode.PERMISSIVE,
    pre_validators=[PreValidators.normalize_email, PreValidators.sanitize_strings]
)
def submit_contact(validated: ContactSchema, _rate_limit_info=None):
    """Submit contact form - public endpoint."""
    app_logger.log(LogLevel.INFO, "Contact form submitted", {
        'form_submission': 'contact',
        'contact_name': validated.name,
        'contact_email': validated.email,
        'message_length': len(validated.message)
    })
    
    return jsonify({
        "success": True,
        "message": "Contact form submitted successfully",
        "data": validated.model_dump()
    })

@app.route('/search', methods=['GET'])
@log_request_flask()
@rate_limit_flask(requests=20, window=60, scope="search")
@validate_flask(SearchSchema, mode=ValidationMode.LAX)
def search(validated: SearchSchema, _rate_limit_info=None):
    """Search endpoint with query parameters."""
    results = [
        f"Result {i} for '{validated.query}'" 
        for i in range(1, min(validated.limit + 1, 6))
    ]
    
    app_logger.log(LogLevel.INFO, "Search performed", {
        'search_action': 'query_executed',
        'query': validated.query,
        'category': validated.category,
        'limit': validated.limit,
        'results_count': len(results)
    })
    
    return jsonify({
        "query": validated.query,
        "category": validated.category,
        "results": results,
        "total": len(results)
    })



@app.route("/profile", methods=["GET"])
@log_request_flask()
@authorize_flask(["user"], token_decoder=my_jwt_decoder) 
def get_profile(user):
    """Get user profile - requires user role."""
    app_logger.log(LogLevel.INFO, "User profile accessed", {
        'profile_access': 'view',
        'user_id': user['user_id'],
        'username': user.get('username'),
        'account_type': "premium" if "premium" in user["roles"] else "standard"
    })
    
    return jsonify({
        "profile": {
            "user_id": user["user_id"],
            "username": user.get("username"),
            "email": user.get("email"),
            "roles": user["roles"],
            "account_type": "premium" if "premium" in user["roles"] else "standard"
        }
    })

@app.route('/posts', methods=['POST'])
@log_request_flask()                         
@rate_limit_flask(requests=5, window=60, key_func=KeyGenerators.user_based)
@authorize_flask(["user"], token_decoder=my_jwt_decoder) 
@validate_flask(PostSchema)
def create_post(validated: PostSchema, user, _rate_limit_info=None):
    """Create a post - full decorator stack with logging."""
    post_id = int(time.time())
    
    app_logger.log(LogLevel.INFO, "Post created successfully", {
        'content_creation': 'post',
        'post_id': post_id,
        'title': validated.title,
        'content_length': len(validated.content),
        'tags_count': len(validated.tags),
        'author_id': user['user_id'],
        'author_username': user.get('username')
    })
    
    return jsonify({
        "success": True,
        "message": "Post created successfully",
        "post": {
            "id": post_id,
            "title": validated.title,
            "content": validated.content,
            "tags": validated.tags,
            "author": user.get("username"),
            "created_at": datetime.now().isoformat()
        }
    })



@app.route('/users', methods=['POST'])
@log_request_flask()
@rate_limit_flask(requests=2, window=60, key_func=KeyGenerators.user_based)
@authorize_flask(["admin"], token_decoder=my_jwt_decoder) 
@validate_flask(UserSchema, mode=ValidationMode.STRICT, post_validators=[audit_user_creation])
def create_user(validated: UserSchema, user, _rate_limit_info=None):
    """Create a new user - admin only with comprehensive logging."""
    new_user_id = str(len(users_db) + 1)
    
    app_logger.log(LogLevel.INFO, "Admin user creation completed", {
        'admin_action': 'user_creation',
        'new_user_id': new_user_id,
        'new_username': validated.username,
        'new_user_email': validated.email,
        'created_by_admin_id': user['user_id'],
        'created_by_admin_username': user.get('username')
    })
    
    return jsonify({
        "success": True,
        "message": f"User {validated.username} created successfully",
        "user": {
            "id": new_user_id,
            **validated.model_dump()
        },
        "created_by": user.get("username")
    })

@app.route('/admin/users', methods=['POST'])
@log_request_flask()
@authorize_flask(["admin"], token_decoder=my_jwt_decoder)  
@validate_flask(
    UserSchema, 
    mode=ValidationMode.STRICT,
    post_validators=[validate_admin_email, uppercase_username, audit_user_creation]
)
def create_admin_user(validated: UserSchema, user):
    """Create admin user with multiple post-validators and logging."""
    app_logger.log(LogLevel.INFO, "Admin user creation with enhanced validation", {
        'admin_action': 'admin_user_creation',
        'new_admin_username': validated.username,
        'email_validated': True,
        'username_transformed': True,
        'created_by': user.get('username')
    })
    
    return jsonify({
        "success": True,
        "message": f"Admin user {validated.username} created",
        "user": validated.model_dump(),
        "created_by": user.get("username")
    })

@app.route('/admin/stats', methods=['GET'])
@log_request_flask()
@authorize_flask(["admin"], token_decoder=my_jwt_decoder)  
def admin_stats(user):
    """Get admin statistics."""
    stats_data = {
        "total_users": len(users_db),
        "admin_users": len([u for u in users_db.values() if "admin" in u["roles"]]),
        "premium_users": len([u for u in users_db.values() if "premium" in u["roles"]]),
        "server_uptime": "demo mode",
        "last_access": datetime.now().isoformat()
    }
    
    app_logger.log(LogLevel.INFO, "Admin statistics accessed", {
        'admin_action': 'stats_view',
        'accessed_by': user.get('username'),
        'stats_summary': {
            'total_users': stats_data["total_users"],
            'admin_users': stats_data["admin_users"],
            'premium_users': stats_data["premium_users"]
        }
    })
    
    return jsonify({
        "stats": stats_data,
        "accessed_by": user.get("username")
    })



@app.route('/premium/data', methods=['GET'])
@log_request_flask()
@authorize_flask(["premium", "admin"], token_decoder=my_jwt_decoder)  
def get_premium_data(user):
    """Get premium data - premium role required."""
    app_logger.log(LogLevel.INFO, "Premium content accessed", {
        'premium_access': 'data_retrieval',
        'user_id': user['user_id'],
        'username': user.get('username'),
        'access_tier': 'premium'
    })
    
    return jsonify({
        "premium_data": {
            "exclusive_content": "This is premium content",
            "analytics": {"views": 12345, "engagement": "high"},
            "api_calls_remaining": 9999,
            "subscription_tier": "premium"
        },
        "user": user.get("username")
    })

@app.route('/premium/api-keys', methods=['POST'])
@log_request_flask()
@rate_limit_flask(requests=1, window=60, key_func=KeyGenerators.user_based)
@authorize_flask(["premium", "admin"], token_decoder=my_jwt_decoder)  
@validate_flask(ApiKeySchema)
def create_api_key(validated: ApiKeySchema, user, _rate_limit_info=None):
    """Create API key - premium feature with strict rate limiting."""
    api_key = f"ak_{secrets.token_urlsafe(32)}"
    
    app_logger.log(LogLevel.INFO, "API key created", {
        'api_key_action': 'creation',
        'key_name': validated.name,
        'permissions': validated.permissions,
        'created_by_user_id': user['user_id'],
        'created_by_username': user.get('username'),
        'key_prefix': api_key[:8] + "..."  
    })
    
    return jsonify({
        "success": True,
        "api_key": {
            "key": api_key,
            "name": validated.name,
            "permissions": validated.permissions,
            "created_at": datetime.now().isoformat(),
            "expires_at": (datetime.now() + timedelta(days=365)).isoformat()
        },
        "owner": user.get("username")
    })



@app.errorhandler(404)
def not_found(error):
    app_logger.log(LogLevel.WARNING, "Endpoint not found", {
        'error_type': '404_not_found',
        'requested_path': request.path,
        'method': request.method
    })
    return jsonify({"error": "Endpoint not found"}), 404

@app.errorhandler(500)
def internal_error(error):
    app_logger.log(LogLevel.ERROR, "Internal server error occurred", {
        'error_type': '500_internal_error',
        'error_message': str(error),
        'path': request.path,
        'method': request.method
    })
    return jsonify({"error": "Internal server error"}), 500



if __name__ == '__main__':
    print("🚀 Starting API Gateway with Comprehensive Logging...")
    print("🌐 Server: http://127.0.0.1:5001")
    print("📖 API Docs: GET http://127.0.0.1:5001/")
    
    print("\n📊 Logging Configuration:")
    print("  • Format: Structured JSON")
    print("  • Level: INFO")
    print("  • Correlation IDs: Enabled") 
    print("  • Sensitive Masking: Enabled")
    print("  • Sampling: Disabled (logs everything)")
    
    print("\n🔑 Demo Users (use POST /get-token):")
    for username, data in users_db.items():
        print(f"  • {username} (roles: {', '.join(data['roles'])})")
    
    print("\n🎭 Quick Test Tokens:")
    print("  • GET /get-custom-token/user")
    print("  • GET /get-custom-token/admin") 
    print("  • GET /get-custom-token/premium")
    print("  • GET /get-custom-token/moderator")
    
    app_logger.log(LogLevel.INFO, "Flask server starting", {
        'server_startup': True,
        'host': '127.0.0.1',
        'port': 5001,
        'environment': 'development',
        'logging_enabled': True,
        'jwt_verification': True
    })
    
    app.run(debug=True, host='127.0.0.1', port=5001)
```

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "apigatewaybuilder",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.11",
    "maintainer_email": null,
    "keywords": "api, gateway, validation, authorization, rate-limiting",
    "author": "PrabhBirJ",
    "author_email": null,
    "download_url": "https://files.pythonhosted.org/packages/db/07/4043890f83252d75c60abe7415cf4cbf7479e36fc9be4fea7d64581e7416/apigatewaybuilder-1.4.1.tar.gz",
    "platform": null,
    "description": "# API Gateway\n\n\n**API Gateway** is a modular, developer-friendly Python project designed to become a **full-featured API Gateway framework**.  \n(v1.4.0) it ships with **request validation utilities, authorization, rate limiting and logging** powered by [Pydantic](https://docs.pydantic.dev).  \n\n\n---\n\n##  Vision\nThe goal of **API Gateway** is to provide:\n-  **Validation**: Ensure only clean, schema-compliant data enters your services. *(available today)*  \n-  **Authentication & Authorization**: Pluggable security layers. *(coming soon)*  \n-  **Observability**: Metrics, logging, tracing. *(coming soon)*  \n-  **Routing**: Intelligent request routing and proxying. *(coming soon)*  \n-  **Rate Limiting & QoS**: Keep traffic fair and resilient. *(coming soon)*  \n \n\n---\n\n##  Installation For Contribution\n\nTo get started you need [`uv`](https://docs.astral.sh/uv/), a fast Python package manager. Install it first with:\n\n```bash\n# On Linux / macOS\ncurl -LsSf https://astral.sh/uv/install.sh | sh\n\n# On Windows (PowerShell)\npowershell -c \"irm https://astral.sh/uv/install.ps1 | iex\"\n\n\ngit clone https://github.com/PrabhbirJ/apigateway.git\ncd apigateway\nuv sync --all-extras --dev\n\n# Run tests\nuv run pytest\n```\n\n---\n\n##  Installation To Use in Your Project\n\nTo get started you need [`uv`](https://docs.astral.sh/uv/), a fast Python package manager. Install it first with:\n\n```bash\n# On Linux / macOS\ncurl -LsSf https://astral.sh/uv/install.sh | sh\n\n# On Windows (PowerShell)\npowershell -c \"irm https://astral.sh/uv/install.ps1 | iex\"\n\ncd {Project_Directory}\n\nuv init\nuv add git+https://github.com/PrabhbirJ/apigateway.git\n\nuv add flask django fastapi\n\n```\n\n---\n\n##  Project Structure\n\n```bash\napigateway\n\u251c\u2500\u2500 CHANGELOG.md\n\u251c\u2500\u2500 LICENSE\n\u251c\u2500\u2500 README.md\n\u251c\u2500\u2500 pyproject.toml\n\u251c\u2500\u2500 src\n\u2502   \u251c\u2500\u2500 apigateway\n\u2502   \u2502   \u251c\u2500\u2500 core\n\u2502   \u2502   \u2502   \u251c\u2500\u2500 __init__.py\n\u2502   \u2502   \u2502   \u251c\u2500\u2500 adapters\n\u2502   \u2502   \u2502   \u2502   \u251c\u2500\u2500 __init__.py\n\u2502   \u2502   \u2502   \u2502   \u251c\u2500\u2500 base_adapter.py\n\u2502   \u2502   \u2502   \u2502   \u251c\u2500\u2500 django.py\n\u2502   \u2502   \u2502   \u2502   \u251c\u2500\u2500 fastapi.py\n\u2502   \u2502   \u2502   \u2502   \u251c\u2500\u2500 flask.py\n\u2502   \u2502   \u2502   \u2502   \u2514\u2500\u2500 generic.py\n\u2502   \u2502   \u2502   \u251c\u2500\u2500 auth\n\u2502   \u2502   \u2502   \u2502   \u251c\u2500\u2500 __init__.py\n\u2502   \u2502   \u2502   \u2502   \u2514\u2500\u2500 auth.py\n\u2502   \u2502   \u2502   \u251c\u2500\u2500 enums\n\u2502   \u2502   \u2502   \u2502   \u251c\u2500\u2500 __init__.py\n\u2502   \u2502   \u2502   \u2502   \u2514\u2500\u2500 validation_modes.py\n\u2502   \u2502   \u2502   \u251c\u2500\u2500 errors\n\u2502   \u2502   \u2502   \u2502   \u251c\u2500\u2500 __init.py\n\u2502   \u2502   \u2502   \u2502   \u2514\u2500\u2500 formatters.py\n\u2502   \u2502   \u2502   \u2514\u2500\u2500 validation\n\u2502   \u2502   \u2502       \u251c\u2500\u2500 __init__.py\n\u2502   \u2502   \u2502       \u251c\u2500\u2500 file_validation.py\n\u2502   \u2502   \u2502       \u2514\u2500\u2500 validation.py\n\u2502   \u2502   \u2514\u2500\u2500 exceptions\n\u2502   \u2502       \u251c\u2500\u2500 AuthError.py\n\u2502   \u2502       \u251c\u2500\u2500 GatewayValidationError.py\n\u2502   \u2502       \u251c\u2500\u2500 __init__.py\n\u2502   \u2514\u2500\u2500 apigateway.egg-info\n\u2502       \u251c\u2500\u2500 PKG-INFO\n\u2502       \u251c\u2500\u2500 SOURCES.txt\n\u2502       \u251c\u2500\u2500 dependency_links.txt\n\u2502       \u251c\u2500\u2500 requires.txt\n\u2502       \u2514\u2500\u2500 top_level.txt\n\u251c\u2500\u2500 tests\n\u2502   \u251c\u2500\u2500 __init__.py\n\u2502   \u2514\u2500\u2500 validation\n\u2502       \u251c\u2500\u2500 __init__.py\n\u2502       \u251c\u2500\u2500 test_adapters\n\u2502       \u2502   \u251c\u2500\u2500 __init__.py\n\u2502       \u2502   \u251c\u2500\u2500 conftest.py\n\u2502       \u2502   \u251c\u2500\u2500 test_django.py\n\u2502       \u2502   \u251c\u2500\u2500 test_fastapi.py\n\u2502       \u2502   \u2514\u2500\u2500 test_flask.py\n\u2502       \u2514\u2500\u2500 test_generic\n\u2502           \u251c\u2500\u2500 __init__.py\n\u2502           \u251c\u2500\u2500 test_error_handling.py\n\u2502           \u251c\u2500\u2500 test_pre_post_validators.py\n\u2502           \u2514\u2500\u2500 test_strict_vs_lax.py\n\u2514\u2500\u2500 uv.lock\n\n```\n---\n\n## Error Handling\n\nAll validation errors are raised as GatewayValidationError with this schema:\n```bash\n{\n  \"error\": \"Validation Failed\",\n  \"code\": \"validation_error\",\n  \"details\": [\n    {\n      \"field\": \"id\",\n      \"message\": \"value is not a valid integer\",\n      \"type\": \"type_error.integer\"\n    }\n  ]\n}\n```\nYou can customize formatting by supplying your own error_formatter\n\n---\n\n##  Flask Example\n\n```python\nimport os\nimport json\nimport base64\nimport time\nimport secrets\nimport jwt  # Add this import\nfrom datetime import datetime, timedelta\nfrom flask import Flask, jsonify, request\nfrom pydantic import BaseModel, ConfigDict\nfrom typing import Optional, List, Dict, Any\n\n# API Gateway imports\nfrom apigateway.core.validation.validation import validate_flask, PreValidators\nfrom apigateway.core.enums.validation_modes import ValidationMode\nfrom apigateway.core.auth.auth import authorize_flask  # Updated import\nfrom apigateway.core.rate_limit.RateLimitEngine import configure_rate_limiting, KeyGenerators\nfrom apigateway.core.rate_limit.RateLimiting import rate_limit_flask\nfrom apigateway.core.rate_limit.MemoryBackend import MemoryBackend\n\n# NEW: Logging system imports\nfrom apigateway.core.logging import configure_logging, JsonLogger, LogLevel, get_logger\nfrom apigateway.core.logging.logger import log_request_flask\n\n# =============================================================================\n# LOGGING CONFIGURATION\n# =============================================================================\n\n# Configure structured JSON logging\nlogger_instance = JsonLogger(\n    log_level=LogLevel.INFO,\n    enable_sampling=False,  # Disable sampling for demo (log everything)\n    masked_fields={'authorization', 'cookie', 'x-api-key', 'token', 'password'}\n)\nconfigure_logging(logger_instance)\n\n# Get logger for manual logging\napp_logger = get_logger()\n\n# =============================================================================\n# APPLICATION SETUP\n# =============================================================================\n\n# Configure rate limiting with memory backend\nconfigure_rate_limiting(MemoryBackend())\n\napp = Flask(__name__)\n\n# =============================================================================\n# MOCK USER DATABASE\n# =============================================================================\n\nusers_db = {\n    \"testuser\": {\n        \"user_id\": \"1\", \n        \"username\": \"testuser\",\n        \"email\": \"test@example.com\",\n        \"roles\": [\"user\", \"admin\"]\n    },\n    \"user1\": {\n        \"user_id\": \"2\", \n        \"username\": \"user1\",\n        \"email\": \"user1@example.com\",\n        \"roles\": [\"user\"]\n    },\n    \"premium\": {\n        \"user_id\": \"3\", \n        \"username\": \"premium\",\n        \"email\": \"premium@example.com\",\n        \"roles\": [\"user\", \"premium\"]\n    },\n    \"moderator\": {\n        \"user_id\": \"4\", \n        \"username\": \"moderator\", \n        \"email\": \"mod@example.com\",\n        \"roles\": [\"user\", \"moderator\"]\n    }\n}\n\n\nJWT_SECRET_KEY = \"demo-secret-key-32-characters-long-for-development-only!\"\nJWT_ALGORITHM = \"HS256\"\n\n\ndef my_jwt_decoder(token: str) -> Dict[str, Any]:\n    \"\"\"Our JWT decoder - we handle the secret and decoding logic.\"\"\"\n    try:\n        payload = jwt.decode(token, JWT_SECRET_KEY, algorithms=[JWT_ALGORITHM])\n        return {\n            'user_id': payload.get('sub'),\n            'username': payload.get('username'),\n            'email': payload.get('email'),\n            'roles': payload.get('roles', []),\n            'permissions': payload.get('permissions', []),\n            'token_payload': payload\n        }\n    except jwt.ExpiredSignatureError:\n        raise Exception(\"Token has expired\")\n    except jwt.InvalidSignatureError:\n        raise Exception(\"Invalid token signature\")\n    except jwt.InvalidTokenError as e:\n        raise Exception(f\"Invalid token: {str(e)}\")\n\n\nclass TokenRequestSchema(BaseModel):\n    username: str\n    model_config = ConfigDict(extra='forbid')\n\nclass ProtectedDataSchema(BaseModel):\n    important_data: str\n    sensitive_info: Optional[str] = None\n    model_config = ConfigDict(extra='forbid')\n\nclass UserSchema(BaseModel):\n    username: str\n    age: int\n    email: str\n    model_config = ConfigDict(extra='forbid')\n\nclass ContactSchema(BaseModel):\n    name: str\n    email: str\n    message: str\n    model_config = ConfigDict(extra='ignore')\n\nclass SearchSchema(BaseModel):\n    query: str\n    limit: int = 10\n    category: str = \"all\"\n    model_config = ConfigDict(extra='forbid')\n\nclass PostSchema(BaseModel):\n    title: str\n    content: str\n    tags: List[str] = []\n    model_config = ConfigDict(extra='forbid')\n\nclass ApiKeySchema(BaseModel):\n    name: str\n    permissions: List[str]\n    model_config = ConfigDict(extra='forbid')\n\n\n\ndef create_jwt_token(user_data: dict) -> str:\n    \"\"\"Create a properly signed JWT token.\"\"\"\n    now = int(time.time())\n    payload = {\n        \"sub\": str(user_data[\"user_id\"]),\n        \"username\": user_data[\"username\"],\n        \"email\": user_data[\"email\"],\n        \"roles\": user_data[\"roles\"],\n        \"permissions\": [\"read\", \"write\"],\n        \"iat\": now,\n        \"exp\": now + 3600,  # 1 hour\n        \"jti\": f\"token_{user_data['user_id']}_{now}\"\n    }\n    \n    return jwt.encode(payload, JWT_SECRET_KEY, algorithm=JWT_ALGORITHM)\n\ndef create_custom_role_token(roles: List[str], username: str = \"demo_user\") -> str:\n    \"\"\"Create JWT token with custom roles.\"\"\"\n    user_data = {\n        \"user_id\": f\"demo_{int(time.time())}\",\n        \"username\": username,\n        \"email\": f\"{username}@demo.com\",\n        \"roles\": roles\n    }\n    return create_jwt_token(user_data)\n\n# Post-validators with logging\ndef audit_user_creation(user: UserSchema) -> UserSchema:\n    \"\"\"Post-validator: Log user creation for audit.\"\"\"\n    app_logger.log(LogLevel.INFO, \"User creation audit\", {\n        'audit_action': 'user_creation',\n        'new_username': user.username,\n        'user_age': user.age,\n        'user_email': user.email\n    })\n    return user\n\ndef uppercase_username(user: UserSchema) -> UserSchema:\n    \"\"\"Post-validator: Transform username to uppercase.\"\"\"\n    original_username = user.username\n    user.username = user.username.upper()\n    \n    app_logger.log(LogLevel.INFO, \"Username transformed\", {\n        'transformation': 'uppercase',\n        'original_username': original_username,\n        'new_username': user.username\n    })\n    return user\n\ndef validate_admin_email(user: UserSchema) -> UserSchema:\n    \"\"\"Post-validator: Ensure admin users have company email.\"\"\"\n    if \"@company.com\" not in user.email:\n        app_logger.log(LogLevel.WARNING, \"Admin email validation failed\", {\n            'validation_rule': 'company_email_required',\n            'provided_email': user.email,\n            'username': user.username\n        })\n        raise ValueError(\"Admin users must have @company.com email address\")\n    \n    app_logger.log(LogLevel.INFO, \"Admin email validation passed\", {\n        'validation_rule': 'company_email_required',\n        'email': user.email,\n        'username': user.username\n    })\n    return user\n\n\n\n@app.route('/', methods=['GET'])\n@log_request_flask()\ndef home():\n    \"\"\"API documentation showing all available endpoints.\"\"\"\n    app_logger.log(LogLevel.INFO, \"API documentation requested\", {\n        'endpoint': 'home',\n        'documentation_type': 'api_overview'\n    })\n    \n    return jsonify({\n        \"message\": \"API Gateway Demo - JWT + Logging\",\n        \"version\": \"3.0-LOGGING\",\n        \"features\": [\"JWT Verification\", \"Validation\", \"RBAC\", \"Rate Limiting\", \"Structured Logging\"],\n        \"logging\": {\n            \"format\": \"structured_json\",\n            \"correlation_tracking\": \"enabled\",\n            \"sensitive_masking\": \"enabled\",\n            \"log_level\": \"INFO\"\n        },\n        \"endpoints\": {\n            \"token_generation\": {\n                \"POST /get-token\": \"Get JWT token (rate limited: 10/min)\",\n                \"GET /whoami\": \"Get current user info (requires valid JWT)\"\n            },\n            \"public\": {\n                \"POST /contact\": \"Submit contact form (rate limited: 10/min)\",\n                \"GET /search\": \"Search with query params (rate limited: 20/min)\",\n                \"GET /public-data\": \"Get public data (rate limited: 100/min)\"\n            },\n            \"user_protected\": {\n                \"GET /profile\": \"View profile (user role required)\",\n                \"POST /posts\": \"Create post (user role + validation + rate limit: 5/min)\",\n                \"POST /submit\": \"Submit protected data (user role + validation)\"\n            },\n            \"admin_only\": {\n                \"POST /users\": \"Create user (admin role + rate limit: 2/min)\",\n                \"POST /admin/users\": \"Create admin user (admin role + strict validation)\",\n                \"GET /admin/stats\": \"View admin stats (admin role)\"\n            },\n            \"moderator_only\": {\n                \"POST /moderate\": \"Moderate content (moderator role + rate limit: 10/min)\"\n            },\n            \"premium_features\": {\n                \"GET /premium/data\": \"Premium data access (premium role)\",\n                \"POST /premium/api-keys\": \"Create API keys (premium role + rate limit: 1/min)\"\n            }\n        }\n    })\n\n\n\n@app.route(\"/get-token\", methods=[\"POST\"])\n@log_request_flask()                           # OUTERMOST - logs everything\n@rate_limit_flask(requests=10, window=60)     # Rate limiting\n@validate_flask(TokenRequestSchema)          # Validation\ndef get_token(validated: TokenRequestSchema, _rate_limit_info=None):\n    \"\"\"Generate JWT token for testing.\"\"\"\n    user = users_db.get(validated.username)\n    if not user:\n        app_logger.log(LogLevel.WARNING, \"Token request for unknown user\", {\n            'requested_username': validated.username,\n            'available_users': list(users_db.keys())\n        })\n        return jsonify({\"error\": \"User not found\"}), 404\n    \n    # Create properly signed JWT token\n    access_token = create_jwt_token(user)\n    \n    app_logger.log(LogLevel.INFO, \"JWT token generated successfully\", {\n        'token_action': 'generation',\n        'username': user[\"username\"],\n        'user_id': user[\"user_id\"],\n        'roles': user[\"roles\"],\n        'token_expiry': datetime.fromtimestamp(int(time.time()) + 3600).isoformat()\n    })\n    \n    return jsonify({\n        \"access_token\": access_token,\n        \"token_type\": \"bearer\",\n        \"expires_in\": 3600,\n        \"user\": {\n            \"username\": user[\"username\"],\n            \"roles\": user[\"roles\"]\n        },\n        \"note\": \"Properly signed JWT token with verification\"\n    })\n\n@app.route(\"/get-custom-token/<role>\", methods=[\"GET\"])\n@log_request_flask()\n@rate_limit_flask(requests=5, window=60, scope=\"custom_token\")\ndef get_custom_token(role, _rate_limit_info=None):\n    \"\"\"Generate JWT token with specific role for testing.\"\"\"\n    valid_roles = [\"user\", \"admin\", \"moderator\", \"premium\"]\n    \n    if role not in valid_roles:\n        app_logger.log(LogLevel.WARNING, \"Invalid role requested for custom token\", {\n            'requested_role': role,\n            'valid_roles': valid_roles\n        })\n        return jsonify({\"error\": f\"Invalid role. Valid roles: {valid_roles}\"}), 400\n    \n    token = create_custom_role_token([role], f\"demo_{role}\")\n    \n    app_logger.log(LogLevel.INFO, \"Custom role token generated\", {\n        'token_action': 'custom_generation',\n        'role': role,\n        'demo_user': f\"demo_{role}\"\n    })\n    \n    return jsonify({\n        \"access_token\": token,\n        \"role\": role,\n        \"expires_in\": 3600,\n        \"note\": f\"JWT token with {role} role\"\n    })\n\n@app.route(\"/whoami\", methods=[\"GET\"])\n@log_request_flask()\n@authorize_flask(token_decoder=my_jwt_decoder) \ndef whoami(user):\n    \"\"\"Get current user information from verified JWT token.\"\"\"\n    app_logger.log(LogLevel.INFO, \"User identity verified\", {\n        'identity_check': 'whoami',\n        'user_id': user['user_id'],\n        'username': user.get('username'),\n        'roles': user['roles'],\n        'token_id': user['token_payload'].get('jti')\n    })\n    \n    return jsonify({\n        \"message\": \"JWT token successfully verified\",\n        \"user\": {\n            \"user_id\": user[\"user_id\"],\n            \"username\": user.get(\"username\"),\n            \"email\": user.get(\"email\"),\n            \"roles\": user[\"roles\"],\n            \"permissions\": user.get(\"permissions\", [])\n        },\n        \"token_info\": {\n            \"expires_at\": datetime.fromtimestamp(user[\"token_payload\"][\"exp\"]).isoformat(),\n            \"issued_at\": datetime.fromtimestamp(user[\"token_payload\"][\"iat\"]).isoformat(),\n            \"token_id\": user[\"token_payload\"].get(\"jti\")\n        }\n    })\n\n\n\n@app.route('/contact', methods=['POST'])\n@log_request_flask()\n@rate_limit_flask(requests=10, window=60, scope=\"contact\")\n@validate_flask(\n    ContactSchema, \n    mode=ValidationMode.PERMISSIVE,\n    pre_validators=[PreValidators.normalize_email, PreValidators.sanitize_strings]\n)\ndef submit_contact(validated: ContactSchema, _rate_limit_info=None):\n    \"\"\"Submit contact form - public endpoint.\"\"\"\n    app_logger.log(LogLevel.INFO, \"Contact form submitted\", {\n        'form_submission': 'contact',\n        'contact_name': validated.name,\n        'contact_email': validated.email,\n        'message_length': len(validated.message)\n    })\n    \n    return jsonify({\n        \"success\": True,\n        \"message\": \"Contact form submitted successfully\",\n        \"data\": validated.model_dump()\n    })\n\n@app.route('/search', methods=['GET'])\n@log_request_flask()\n@rate_limit_flask(requests=20, window=60, scope=\"search\")\n@validate_flask(SearchSchema, mode=ValidationMode.LAX)\ndef search(validated: SearchSchema, _rate_limit_info=None):\n    \"\"\"Search endpoint with query parameters.\"\"\"\n    results = [\n        f\"Result {i} for '{validated.query}'\" \n        for i in range(1, min(validated.limit + 1, 6))\n    ]\n    \n    app_logger.log(LogLevel.INFO, \"Search performed\", {\n        'search_action': 'query_executed',\n        'query': validated.query,\n        'category': validated.category,\n        'limit': validated.limit,\n        'results_count': len(results)\n    })\n    \n    return jsonify({\n        \"query\": validated.query,\n        \"category\": validated.category,\n        \"results\": results,\n        \"total\": len(results)\n    })\n\n\n\n@app.route(\"/profile\", methods=[\"GET\"])\n@log_request_flask()\n@authorize_flask([\"user\"], token_decoder=my_jwt_decoder) \ndef get_profile(user):\n    \"\"\"Get user profile - requires user role.\"\"\"\n    app_logger.log(LogLevel.INFO, \"User profile accessed\", {\n        'profile_access': 'view',\n        'user_id': user['user_id'],\n        'username': user.get('username'),\n        'account_type': \"premium\" if \"premium\" in user[\"roles\"] else \"standard\"\n    })\n    \n    return jsonify({\n        \"profile\": {\n            \"user_id\": user[\"user_id\"],\n            \"username\": user.get(\"username\"),\n            \"email\": user.get(\"email\"),\n            \"roles\": user[\"roles\"],\n            \"account_type\": \"premium\" if \"premium\" in user[\"roles\"] else \"standard\"\n        }\n    })\n\n@app.route('/posts', methods=['POST'])\n@log_request_flask()                         \n@rate_limit_flask(requests=5, window=60, key_func=KeyGenerators.user_based)\n@authorize_flask([\"user\"], token_decoder=my_jwt_decoder) \n@validate_flask(PostSchema)\ndef create_post(validated: PostSchema, user, _rate_limit_info=None):\n    \"\"\"Create a post - full decorator stack with logging.\"\"\"\n    post_id = int(time.time())\n    \n    app_logger.log(LogLevel.INFO, \"Post created successfully\", {\n        'content_creation': 'post',\n        'post_id': post_id,\n        'title': validated.title,\n        'content_length': len(validated.content),\n        'tags_count': len(validated.tags),\n        'author_id': user['user_id'],\n        'author_username': user.get('username')\n    })\n    \n    return jsonify({\n        \"success\": True,\n        \"message\": \"Post created successfully\",\n        \"post\": {\n            \"id\": post_id,\n            \"title\": validated.title,\n            \"content\": validated.content,\n            \"tags\": validated.tags,\n            \"author\": user.get(\"username\"),\n            \"created_at\": datetime.now().isoformat()\n        }\n    })\n\n\n\n@app.route('/users', methods=['POST'])\n@log_request_flask()\n@rate_limit_flask(requests=2, window=60, key_func=KeyGenerators.user_based)\n@authorize_flask([\"admin\"], token_decoder=my_jwt_decoder) \n@validate_flask(UserSchema, mode=ValidationMode.STRICT, post_validators=[audit_user_creation])\ndef create_user(validated: UserSchema, user, _rate_limit_info=None):\n    \"\"\"Create a new user - admin only with comprehensive logging.\"\"\"\n    new_user_id = str(len(users_db) + 1)\n    \n    app_logger.log(LogLevel.INFO, \"Admin user creation completed\", {\n        'admin_action': 'user_creation',\n        'new_user_id': new_user_id,\n        'new_username': validated.username,\n        'new_user_email': validated.email,\n        'created_by_admin_id': user['user_id'],\n        'created_by_admin_username': user.get('username')\n    })\n    \n    return jsonify({\n        \"success\": True,\n        \"message\": f\"User {validated.username} created successfully\",\n        \"user\": {\n            \"id\": new_user_id,\n            **validated.model_dump()\n        },\n        \"created_by\": user.get(\"username\")\n    })\n\n@app.route('/admin/users', methods=['POST'])\n@log_request_flask()\n@authorize_flask([\"admin\"], token_decoder=my_jwt_decoder)  \n@validate_flask(\n    UserSchema, \n    mode=ValidationMode.STRICT,\n    post_validators=[validate_admin_email, uppercase_username, audit_user_creation]\n)\ndef create_admin_user(validated: UserSchema, user):\n    \"\"\"Create admin user with multiple post-validators and logging.\"\"\"\n    app_logger.log(LogLevel.INFO, \"Admin user creation with enhanced validation\", {\n        'admin_action': 'admin_user_creation',\n        'new_admin_username': validated.username,\n        'email_validated': True,\n        'username_transformed': True,\n        'created_by': user.get('username')\n    })\n    \n    return jsonify({\n        \"success\": True,\n        \"message\": f\"Admin user {validated.username} created\",\n        \"user\": validated.model_dump(),\n        \"created_by\": user.get(\"username\")\n    })\n\n@app.route('/admin/stats', methods=['GET'])\n@log_request_flask()\n@authorize_flask([\"admin\"], token_decoder=my_jwt_decoder)  \ndef admin_stats(user):\n    \"\"\"Get admin statistics.\"\"\"\n    stats_data = {\n        \"total_users\": len(users_db),\n        \"admin_users\": len([u for u in users_db.values() if \"admin\" in u[\"roles\"]]),\n        \"premium_users\": len([u for u in users_db.values() if \"premium\" in u[\"roles\"]]),\n        \"server_uptime\": \"demo mode\",\n        \"last_access\": datetime.now().isoformat()\n    }\n    \n    app_logger.log(LogLevel.INFO, \"Admin statistics accessed\", {\n        'admin_action': 'stats_view',\n        'accessed_by': user.get('username'),\n        'stats_summary': {\n            'total_users': stats_data[\"total_users\"],\n            'admin_users': stats_data[\"admin_users\"],\n            'premium_users': stats_data[\"premium_users\"]\n        }\n    })\n    \n    return jsonify({\n        \"stats\": stats_data,\n        \"accessed_by\": user.get(\"username\")\n    })\n\n\n\n@app.route('/premium/data', methods=['GET'])\n@log_request_flask()\n@authorize_flask([\"premium\", \"admin\"], token_decoder=my_jwt_decoder)  \ndef get_premium_data(user):\n    \"\"\"Get premium data - premium role required.\"\"\"\n    app_logger.log(LogLevel.INFO, \"Premium content accessed\", {\n        'premium_access': 'data_retrieval',\n        'user_id': user['user_id'],\n        'username': user.get('username'),\n        'access_tier': 'premium'\n    })\n    \n    return jsonify({\n        \"premium_data\": {\n            \"exclusive_content\": \"This is premium content\",\n            \"analytics\": {\"views\": 12345, \"engagement\": \"high\"},\n            \"api_calls_remaining\": 9999,\n            \"subscription_tier\": \"premium\"\n        },\n        \"user\": user.get(\"username\")\n    })\n\n@app.route('/premium/api-keys', methods=['POST'])\n@log_request_flask()\n@rate_limit_flask(requests=1, window=60, key_func=KeyGenerators.user_based)\n@authorize_flask([\"premium\", \"admin\"], token_decoder=my_jwt_decoder)  \n@validate_flask(ApiKeySchema)\ndef create_api_key(validated: ApiKeySchema, user, _rate_limit_info=None):\n    \"\"\"Create API key - premium feature with strict rate limiting.\"\"\"\n    api_key = f\"ak_{secrets.token_urlsafe(32)}\"\n    \n    app_logger.log(LogLevel.INFO, \"API key created\", {\n        'api_key_action': 'creation',\n        'key_name': validated.name,\n        'permissions': validated.permissions,\n        'created_by_user_id': user['user_id'],\n        'created_by_username': user.get('username'),\n        'key_prefix': api_key[:8] + \"...\"  \n    })\n    \n    return jsonify({\n        \"success\": True,\n        \"api_key\": {\n            \"key\": api_key,\n            \"name\": validated.name,\n            \"permissions\": validated.permissions,\n            \"created_at\": datetime.now().isoformat(),\n            \"expires_at\": (datetime.now() + timedelta(days=365)).isoformat()\n        },\n        \"owner\": user.get(\"username\")\n    })\n\n\n\n@app.errorhandler(404)\ndef not_found(error):\n    app_logger.log(LogLevel.WARNING, \"Endpoint not found\", {\n        'error_type': '404_not_found',\n        'requested_path': request.path,\n        'method': request.method\n    })\n    return jsonify({\"error\": \"Endpoint not found\"}), 404\n\n@app.errorhandler(500)\ndef internal_error(error):\n    app_logger.log(LogLevel.ERROR, \"Internal server error occurred\", {\n        'error_type': '500_internal_error',\n        'error_message': str(error),\n        'path': request.path,\n        'method': request.method\n    })\n    return jsonify({\"error\": \"Internal server error\"}), 500\n\n\n\nif __name__ == '__main__':\n    print(\"\ud83d\ude80 Starting API Gateway with Comprehensive Logging...\")\n    print(\"\ud83c\udf10 Server: http://127.0.0.1:5001\")\n    print(\"\ud83d\udcd6 API Docs: GET http://127.0.0.1:5001/\")\n    \n    print(\"\\n\ud83d\udcca Logging Configuration:\")\n    print(\"  \u2022 Format: Structured JSON\")\n    print(\"  \u2022 Level: INFO\")\n    print(\"  \u2022 Correlation IDs: Enabled\") \n    print(\"  \u2022 Sensitive Masking: Enabled\")\n    print(\"  \u2022 Sampling: Disabled (logs everything)\")\n    \n    print(\"\\n\ud83d\udd11 Demo Users (use POST /get-token):\")\n    for username, data in users_db.items():\n        print(f\"  \u2022 {username} (roles: {', '.join(data['roles'])})\")\n    \n    print(\"\\n\ud83c\udfad Quick Test Tokens:\")\n    print(\"  \u2022 GET /get-custom-token/user\")\n    print(\"  \u2022 GET /get-custom-token/admin\") \n    print(\"  \u2022 GET /get-custom-token/premium\")\n    print(\"  \u2022 GET /get-custom-token/moderator\")\n    \n    app_logger.log(LogLevel.INFO, \"Flask server starting\", {\n        'server_startup': True,\n        'host': '127.0.0.1',\n        'port': 5001,\n        'environment': 'development',\n        'logging_enabled': True,\n        'jwt_verification': True\n    })\n    \n    app.run(debug=True, host='127.0.0.1', port=5001)\n```\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "Framework-agnostic API Gateway with validation, auth, and rate limiting",
    "version": "1.4.1",
    "project_urls": {
        "Documentation": "https://github.com/PrabhBirJ/apigateway#readme",
        "Homepage": "https://github.com/PrabhBirJ/apigateway",
        "Issues": "https://github.com/PrabhBirJ/apigateway/issues",
        "Repository": "https://github.com/PrabhBirJ/apigateway"
    },
    "split_keywords": [
        "api",
        " gateway",
        " validation",
        " authorization",
        " rate-limiting"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "4f8342d1089de1bc5611429434ef910b9d11b8ca6f76bb1558f14bf200d3d72c",
                "md5": "0bc23f50bcccc4b08eb5d17b98a4f275",
                "sha256": "ee21a304585a699dce7d5fdc523d997e547430fd904d47ca8fc5a4bf33bc2e5a"
            },
            "downloads": -1,
            "filename": "apigatewaybuilder-1.4.1-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "0bc23f50bcccc4b08eb5d17b98a4f275",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.11",
            "size": 54100,
            "upload_time": "2025-10-09T11:17:14",
            "upload_time_iso_8601": "2025-10-09T11:17:14.553801Z",
            "url": "https://files.pythonhosted.org/packages/4f/83/42d1089de1bc5611429434ef910b9d11b8ca6f76bb1558f14bf200d3d72c/apigatewaybuilder-1.4.1-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "db074043890f83252d75c60abe7415cf4cbf7479e36fc9be4fea7d64581e7416",
                "md5": "3afed05bfc06be94a11f47ea5b3b482d",
                "sha256": "3a9d22c691f8778e7167411059739ef66b8f644dd56075dfef2e3baf34b9ef6a"
            },
            "downloads": -1,
            "filename": "apigatewaybuilder-1.4.1.tar.gz",
            "has_sig": false,
            "md5_digest": "3afed05bfc06be94a11f47ea5b3b482d",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.11",
            "size": 47606,
            "upload_time": "2025-10-09T11:17:16",
            "upload_time_iso_8601": "2025-10-09T11:17:16.236825Z",
            "url": "https://files.pythonhosted.org/packages/db/07/4043890f83252d75c60abe7415cf4cbf7479e36fc9be4fea7d64581e7416/apigatewaybuilder-1.4.1.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-10-09 11:17:16",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "PrabhBirJ",
    "github_project": "apigateway#readme",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": false,
    "lcname": "apigatewaybuilder"
}
        
Elapsed time: 3.30795s