# API Gateway
**API Gateway** is a modular, developer-friendly Python project designed to become a **full-featured API Gateway framework**.
(v1.4.0) it ships with **request validation utilities, authorization, rate limiting and logging** powered by [Pydantic](https://docs.pydantic.dev).
---
## Vision
The goal of **API Gateway** is to provide:
- **Validation**: Ensure only clean, schema-compliant data enters your services. *(available today)*
- **Authentication & Authorization**: Pluggable security layers. *(coming soon)*
- **Observability**: Metrics, logging, tracing. *(coming soon)*
- **Routing**: Intelligent request routing and proxying. *(coming soon)*
- **Rate Limiting & QoS**: Keep traffic fair and resilient. *(coming soon)*
---
## Installation For Contribution
To get started you need [`uv`](https://docs.astral.sh/uv/), a fast Python package manager. Install it first with:
```bash
# On Linux / macOS
curl -LsSf https://astral.sh/uv/install.sh | sh
# On Windows (PowerShell)
powershell -c "irm https://astral.sh/uv/install.ps1 | iex"
git clone https://github.com/PrabhbirJ/apigateway.git
cd apigateway
uv sync --all-extras --dev
# Run tests
uv run pytest
```
---
## Installation To Use in Your Project
To get started you need [`uv`](https://docs.astral.sh/uv/), a fast Python package manager. Install it first with:
```bash
# On Linux / macOS
curl -LsSf https://astral.sh/uv/install.sh | sh
# On Windows (PowerShell)
powershell -c "irm https://astral.sh/uv/install.ps1 | iex"
cd {Project_Directory}
uv init
uv add git+https://github.com/PrabhbirJ/apigateway.git
uv add flask django fastapi
```
---
## Project Structure
```bash
apigateway
├── CHANGELOG.md
├── LICENSE
├── README.md
├── pyproject.toml
├── src
│ ├── apigateway
│ │ ├── core
│ │ │ ├── __init__.py
│ │ │ ├── adapters
│ │ │ │ ├── __init__.py
│ │ │ │ ├── base_adapter.py
│ │ │ │ ├── django.py
│ │ │ │ ├── fastapi.py
│ │ │ │ ├── flask.py
│ │ │ │ └── generic.py
│ │ │ ├── auth
│ │ │ │ ├── __init__.py
│ │ │ │ └── auth.py
│ │ │ ├── enums
│ │ │ │ ├── __init__.py
│ │ │ │ └── validation_modes.py
│ │ │ ├── errors
│ │ │ │ ├── __init.py
│ │ │ │ └── formatters.py
│ │ │ └── validation
│ │ │ ├── __init__.py
│ │ │ ├── file_validation.py
│ │ │ └── validation.py
│ │ └── exceptions
│ │ ├── AuthError.py
│ │ ├── GatewayValidationError.py
│ │ ├── __init__.py
│ └── apigateway.egg-info
│ ├── PKG-INFO
│ ├── SOURCES.txt
│ ├── dependency_links.txt
│ ├── requires.txt
│ └── top_level.txt
├── tests
│ ├── __init__.py
│ └── validation
│ ├── __init__.py
│ ├── test_adapters
│ │ ├── __init__.py
│ │ ├── conftest.py
│ │ ├── test_django.py
│ │ ├── test_fastapi.py
│ │ └── test_flask.py
│ └── test_generic
│ ├── __init__.py
│ ├── test_error_handling.py
│ ├── test_pre_post_validators.py
│ └── test_strict_vs_lax.py
└── uv.lock
```
---
## Error Handling
All validation errors are raised as GatewayValidationError with this schema:
```bash
{
"error": "Validation Failed",
"code": "validation_error",
"details": [
{
"field": "id",
"message": "value is not a valid integer",
"type": "type_error.integer"
}
]
}
```
You can customize formatting by supplying your own error_formatter
---
## Flask Example
```python
import os
import json
import base64
import time
import secrets
import jwt # Add this import
from datetime import datetime, timedelta
from flask import Flask, jsonify, request
from pydantic import BaseModel, ConfigDict
from typing import Optional, List, Dict, Any
# API Gateway imports
from apigateway.core.validation.validation import validate_flask, PreValidators
from apigateway.core.enums.validation_modes import ValidationMode
from apigateway.core.auth.auth import authorize_flask # Updated import
from apigateway.core.rate_limit.RateLimitEngine import configure_rate_limiting, KeyGenerators
from apigateway.core.rate_limit.RateLimiting import rate_limit_flask
from apigateway.core.rate_limit.MemoryBackend import MemoryBackend
# NEW: Logging system imports
from apigateway.core.logging import configure_logging, JsonLogger, LogLevel, get_logger
from apigateway.core.logging.logger import log_request_flask
# =============================================================================
# LOGGING CONFIGURATION
# =============================================================================
# Configure structured JSON logging
logger_instance = JsonLogger(
log_level=LogLevel.INFO,
enable_sampling=False, # Disable sampling for demo (log everything)
masked_fields={'authorization', 'cookie', 'x-api-key', 'token', 'password'}
)
configure_logging(logger_instance)
# Get logger for manual logging
app_logger = get_logger()
# =============================================================================
# APPLICATION SETUP
# =============================================================================
# Configure rate limiting with memory backend
configure_rate_limiting(MemoryBackend())
app = Flask(__name__)
# =============================================================================
# MOCK USER DATABASE
# =============================================================================
users_db = {
"testuser": {
"user_id": "1",
"username": "testuser",
"email": "test@example.com",
"roles": ["user", "admin"]
},
"user1": {
"user_id": "2",
"username": "user1",
"email": "user1@example.com",
"roles": ["user"]
},
"premium": {
"user_id": "3",
"username": "premium",
"email": "premium@example.com",
"roles": ["user", "premium"]
},
"moderator": {
"user_id": "4",
"username": "moderator",
"email": "mod@example.com",
"roles": ["user", "moderator"]
}
}
JWT_SECRET_KEY = "demo-secret-key-32-characters-long-for-development-only!"
JWT_ALGORITHM = "HS256"
def my_jwt_decoder(token: str) -> Dict[str, Any]:
"""Our JWT decoder - we handle the secret and decoding logic."""
try:
payload = jwt.decode(token, JWT_SECRET_KEY, algorithms=[JWT_ALGORITHM])
return {
'user_id': payload.get('sub'),
'username': payload.get('username'),
'email': payload.get('email'),
'roles': payload.get('roles', []),
'permissions': payload.get('permissions', []),
'token_payload': payload
}
except jwt.ExpiredSignatureError:
raise Exception("Token has expired")
except jwt.InvalidSignatureError:
raise Exception("Invalid token signature")
except jwt.InvalidTokenError as e:
raise Exception(f"Invalid token: {str(e)}")
class TokenRequestSchema(BaseModel):
username: str
model_config = ConfigDict(extra='forbid')
class ProtectedDataSchema(BaseModel):
important_data: str
sensitive_info: Optional[str] = None
model_config = ConfigDict(extra='forbid')
class UserSchema(BaseModel):
username: str
age: int
email: str
model_config = ConfigDict(extra='forbid')
class ContactSchema(BaseModel):
name: str
email: str
message: str
model_config = ConfigDict(extra='ignore')
class SearchSchema(BaseModel):
query: str
limit: int = 10
category: str = "all"
model_config = ConfigDict(extra='forbid')
class PostSchema(BaseModel):
title: str
content: str
tags: List[str] = []
model_config = ConfigDict(extra='forbid')
class ApiKeySchema(BaseModel):
name: str
permissions: List[str]
model_config = ConfigDict(extra='forbid')
def create_jwt_token(user_data: dict) -> str:
"""Create a properly signed JWT token."""
now = int(time.time())
payload = {
"sub": str(user_data["user_id"]),
"username": user_data["username"],
"email": user_data["email"],
"roles": user_data["roles"],
"permissions": ["read", "write"],
"iat": now,
"exp": now + 3600, # 1 hour
"jti": f"token_{user_data['user_id']}_{now}"
}
return jwt.encode(payload, JWT_SECRET_KEY, algorithm=JWT_ALGORITHM)
def create_custom_role_token(roles: List[str], username: str = "demo_user") -> str:
"""Create JWT token with custom roles."""
user_data = {
"user_id": f"demo_{int(time.time())}",
"username": username,
"email": f"{username}@demo.com",
"roles": roles
}
return create_jwt_token(user_data)
# Post-validators with logging
def audit_user_creation(user: UserSchema) -> UserSchema:
"""Post-validator: Log user creation for audit."""
app_logger.log(LogLevel.INFO, "User creation audit", {
'audit_action': 'user_creation',
'new_username': user.username,
'user_age': user.age,
'user_email': user.email
})
return user
def uppercase_username(user: UserSchema) -> UserSchema:
"""Post-validator: Transform username to uppercase."""
original_username = user.username
user.username = user.username.upper()
app_logger.log(LogLevel.INFO, "Username transformed", {
'transformation': 'uppercase',
'original_username': original_username,
'new_username': user.username
})
return user
def validate_admin_email(user: UserSchema) -> UserSchema:
"""Post-validator: Ensure admin users have company email."""
if "@company.com" not in user.email:
app_logger.log(LogLevel.WARNING, "Admin email validation failed", {
'validation_rule': 'company_email_required',
'provided_email': user.email,
'username': user.username
})
raise ValueError("Admin users must have @company.com email address")
app_logger.log(LogLevel.INFO, "Admin email validation passed", {
'validation_rule': 'company_email_required',
'email': user.email,
'username': user.username
})
return user
@app.route('/', methods=['GET'])
@log_request_flask()
def home():
"""API documentation showing all available endpoints."""
app_logger.log(LogLevel.INFO, "API documentation requested", {
'endpoint': 'home',
'documentation_type': 'api_overview'
})
return jsonify({
"message": "API Gateway Demo - JWT + Logging",
"version": "3.0-LOGGING",
"features": ["JWT Verification", "Validation", "RBAC", "Rate Limiting", "Structured Logging"],
"logging": {
"format": "structured_json",
"correlation_tracking": "enabled",
"sensitive_masking": "enabled",
"log_level": "INFO"
},
"endpoints": {
"token_generation": {
"POST /get-token": "Get JWT token (rate limited: 10/min)",
"GET /whoami": "Get current user info (requires valid JWT)"
},
"public": {
"POST /contact": "Submit contact form (rate limited: 10/min)",
"GET /search": "Search with query params (rate limited: 20/min)",
"GET /public-data": "Get public data (rate limited: 100/min)"
},
"user_protected": {
"GET /profile": "View profile (user role required)",
"POST /posts": "Create post (user role + validation + rate limit: 5/min)",
"POST /submit": "Submit protected data (user role + validation)"
},
"admin_only": {
"POST /users": "Create user (admin role + rate limit: 2/min)",
"POST /admin/users": "Create admin user (admin role + strict validation)",
"GET /admin/stats": "View admin stats (admin role)"
},
"moderator_only": {
"POST /moderate": "Moderate content (moderator role + rate limit: 10/min)"
},
"premium_features": {
"GET /premium/data": "Premium data access (premium role)",
"POST /premium/api-keys": "Create API keys (premium role + rate limit: 1/min)"
}
}
})
@app.route("/get-token", methods=["POST"])
@log_request_flask() # OUTERMOST - logs everything
@rate_limit_flask(requests=10, window=60) # Rate limiting
@validate_flask(TokenRequestSchema) # Validation
def get_token(validated: TokenRequestSchema, _rate_limit_info=None):
"""Generate JWT token for testing."""
user = users_db.get(validated.username)
if not user:
app_logger.log(LogLevel.WARNING, "Token request for unknown user", {
'requested_username': validated.username,
'available_users': list(users_db.keys())
})
return jsonify({"error": "User not found"}), 404
# Create properly signed JWT token
access_token = create_jwt_token(user)
app_logger.log(LogLevel.INFO, "JWT token generated successfully", {
'token_action': 'generation',
'username': user["username"],
'user_id': user["user_id"],
'roles': user["roles"],
'token_expiry': datetime.fromtimestamp(int(time.time()) + 3600).isoformat()
})
return jsonify({
"access_token": access_token,
"token_type": "bearer",
"expires_in": 3600,
"user": {
"username": user["username"],
"roles": user["roles"]
},
"note": "Properly signed JWT token with verification"
})
@app.route("/get-custom-token/<role>", methods=["GET"])
@log_request_flask()
@rate_limit_flask(requests=5, window=60, scope="custom_token")
def get_custom_token(role, _rate_limit_info=None):
"""Generate JWT token with specific role for testing."""
valid_roles = ["user", "admin", "moderator", "premium"]
if role not in valid_roles:
app_logger.log(LogLevel.WARNING, "Invalid role requested for custom token", {
'requested_role': role,
'valid_roles': valid_roles
})
return jsonify({"error": f"Invalid role. Valid roles: {valid_roles}"}), 400
token = create_custom_role_token([role], f"demo_{role}")
app_logger.log(LogLevel.INFO, "Custom role token generated", {
'token_action': 'custom_generation',
'role': role,
'demo_user': f"demo_{role}"
})
return jsonify({
"access_token": token,
"role": role,
"expires_in": 3600,
"note": f"JWT token with {role} role"
})
@app.route("/whoami", methods=["GET"])
@log_request_flask()
@authorize_flask(token_decoder=my_jwt_decoder)
def whoami(user):
"""Get current user information from verified JWT token."""
app_logger.log(LogLevel.INFO, "User identity verified", {
'identity_check': 'whoami',
'user_id': user['user_id'],
'username': user.get('username'),
'roles': user['roles'],
'token_id': user['token_payload'].get('jti')
})
return jsonify({
"message": "JWT token successfully verified",
"user": {
"user_id": user["user_id"],
"username": user.get("username"),
"email": user.get("email"),
"roles": user["roles"],
"permissions": user.get("permissions", [])
},
"token_info": {
"expires_at": datetime.fromtimestamp(user["token_payload"]["exp"]).isoformat(),
"issued_at": datetime.fromtimestamp(user["token_payload"]["iat"]).isoformat(),
"token_id": user["token_payload"].get("jti")
}
})
@app.route('/contact', methods=['POST'])
@log_request_flask()
@rate_limit_flask(requests=10, window=60, scope="contact")
@validate_flask(
ContactSchema,
mode=ValidationMode.PERMISSIVE,
pre_validators=[PreValidators.normalize_email, PreValidators.sanitize_strings]
)
def submit_contact(validated: ContactSchema, _rate_limit_info=None):
"""Submit contact form - public endpoint."""
app_logger.log(LogLevel.INFO, "Contact form submitted", {
'form_submission': 'contact',
'contact_name': validated.name,
'contact_email': validated.email,
'message_length': len(validated.message)
})
return jsonify({
"success": True,
"message": "Contact form submitted successfully",
"data": validated.model_dump()
})
@app.route('/search', methods=['GET'])
@log_request_flask()
@rate_limit_flask(requests=20, window=60, scope="search")
@validate_flask(SearchSchema, mode=ValidationMode.LAX)
def search(validated: SearchSchema, _rate_limit_info=None):
"""Search endpoint with query parameters."""
results = [
f"Result {i} for '{validated.query}'"
for i in range(1, min(validated.limit + 1, 6))
]
app_logger.log(LogLevel.INFO, "Search performed", {
'search_action': 'query_executed',
'query': validated.query,
'category': validated.category,
'limit': validated.limit,
'results_count': len(results)
})
return jsonify({
"query": validated.query,
"category": validated.category,
"results": results,
"total": len(results)
})
@app.route("/profile", methods=["GET"])
@log_request_flask()
@authorize_flask(["user"], token_decoder=my_jwt_decoder)
def get_profile(user):
"""Get user profile - requires user role."""
app_logger.log(LogLevel.INFO, "User profile accessed", {
'profile_access': 'view',
'user_id': user['user_id'],
'username': user.get('username'),
'account_type': "premium" if "premium" in user["roles"] else "standard"
})
return jsonify({
"profile": {
"user_id": user["user_id"],
"username": user.get("username"),
"email": user.get("email"),
"roles": user["roles"],
"account_type": "premium" if "premium" in user["roles"] else "standard"
}
})
@app.route('/posts', methods=['POST'])
@log_request_flask()
@rate_limit_flask(requests=5, window=60, key_func=KeyGenerators.user_based)
@authorize_flask(["user"], token_decoder=my_jwt_decoder)
@validate_flask(PostSchema)
def create_post(validated: PostSchema, user, _rate_limit_info=None):
"""Create a post - full decorator stack with logging."""
post_id = int(time.time())
app_logger.log(LogLevel.INFO, "Post created successfully", {
'content_creation': 'post',
'post_id': post_id,
'title': validated.title,
'content_length': len(validated.content),
'tags_count': len(validated.tags),
'author_id': user['user_id'],
'author_username': user.get('username')
})
return jsonify({
"success": True,
"message": "Post created successfully",
"post": {
"id": post_id,
"title": validated.title,
"content": validated.content,
"tags": validated.tags,
"author": user.get("username"),
"created_at": datetime.now().isoformat()
}
})
@app.route('/users', methods=['POST'])
@log_request_flask()
@rate_limit_flask(requests=2, window=60, key_func=KeyGenerators.user_based)
@authorize_flask(["admin"], token_decoder=my_jwt_decoder)
@validate_flask(UserSchema, mode=ValidationMode.STRICT, post_validators=[audit_user_creation])
def create_user(validated: UserSchema, user, _rate_limit_info=None):
"""Create a new user - admin only with comprehensive logging."""
new_user_id = str(len(users_db) + 1)
app_logger.log(LogLevel.INFO, "Admin user creation completed", {
'admin_action': 'user_creation',
'new_user_id': new_user_id,
'new_username': validated.username,
'new_user_email': validated.email,
'created_by_admin_id': user['user_id'],
'created_by_admin_username': user.get('username')
})
return jsonify({
"success": True,
"message": f"User {validated.username} created successfully",
"user": {
"id": new_user_id,
**validated.model_dump()
},
"created_by": user.get("username")
})
@app.route('/admin/users', methods=['POST'])
@log_request_flask()
@authorize_flask(["admin"], token_decoder=my_jwt_decoder)
@validate_flask(
UserSchema,
mode=ValidationMode.STRICT,
post_validators=[validate_admin_email, uppercase_username, audit_user_creation]
)
def create_admin_user(validated: UserSchema, user):
"""Create admin user with multiple post-validators and logging."""
app_logger.log(LogLevel.INFO, "Admin user creation with enhanced validation", {
'admin_action': 'admin_user_creation',
'new_admin_username': validated.username,
'email_validated': True,
'username_transformed': True,
'created_by': user.get('username')
})
return jsonify({
"success": True,
"message": f"Admin user {validated.username} created",
"user": validated.model_dump(),
"created_by": user.get("username")
})
@app.route('/admin/stats', methods=['GET'])
@log_request_flask()
@authorize_flask(["admin"], token_decoder=my_jwt_decoder)
def admin_stats(user):
"""Get admin statistics."""
stats_data = {
"total_users": len(users_db),
"admin_users": len([u for u in users_db.values() if "admin" in u["roles"]]),
"premium_users": len([u for u in users_db.values() if "premium" in u["roles"]]),
"server_uptime": "demo mode",
"last_access": datetime.now().isoformat()
}
app_logger.log(LogLevel.INFO, "Admin statistics accessed", {
'admin_action': 'stats_view',
'accessed_by': user.get('username'),
'stats_summary': {
'total_users': stats_data["total_users"],
'admin_users': stats_data["admin_users"],
'premium_users': stats_data["premium_users"]
}
})
return jsonify({
"stats": stats_data,
"accessed_by": user.get("username")
})
@app.route('/premium/data', methods=['GET'])
@log_request_flask()
@authorize_flask(["premium", "admin"], token_decoder=my_jwt_decoder)
def get_premium_data(user):
"""Get premium data - premium role required."""
app_logger.log(LogLevel.INFO, "Premium content accessed", {
'premium_access': 'data_retrieval',
'user_id': user['user_id'],
'username': user.get('username'),
'access_tier': 'premium'
})
return jsonify({
"premium_data": {
"exclusive_content": "This is premium content",
"analytics": {"views": 12345, "engagement": "high"},
"api_calls_remaining": 9999,
"subscription_tier": "premium"
},
"user": user.get("username")
})
@app.route('/premium/api-keys', methods=['POST'])
@log_request_flask()
@rate_limit_flask(requests=1, window=60, key_func=KeyGenerators.user_based)
@authorize_flask(["premium", "admin"], token_decoder=my_jwt_decoder)
@validate_flask(ApiKeySchema)
def create_api_key(validated: ApiKeySchema, user, _rate_limit_info=None):
"""Create API key - premium feature with strict rate limiting."""
api_key = f"ak_{secrets.token_urlsafe(32)}"
app_logger.log(LogLevel.INFO, "API key created", {
'api_key_action': 'creation',
'key_name': validated.name,
'permissions': validated.permissions,
'created_by_user_id': user['user_id'],
'created_by_username': user.get('username'),
'key_prefix': api_key[:8] + "..."
})
return jsonify({
"success": True,
"api_key": {
"key": api_key,
"name": validated.name,
"permissions": validated.permissions,
"created_at": datetime.now().isoformat(),
"expires_at": (datetime.now() + timedelta(days=365)).isoformat()
},
"owner": user.get("username")
})
@app.errorhandler(404)
def not_found(error):
app_logger.log(LogLevel.WARNING, "Endpoint not found", {
'error_type': '404_not_found',
'requested_path': request.path,
'method': request.method
})
return jsonify({"error": "Endpoint not found"}), 404
@app.errorhandler(500)
def internal_error(error):
app_logger.log(LogLevel.ERROR, "Internal server error occurred", {
'error_type': '500_internal_error',
'error_message': str(error),
'path': request.path,
'method': request.method
})
return jsonify({"error": "Internal server error"}), 500
if __name__ == '__main__':
print("🚀 Starting API Gateway with Comprehensive Logging...")
print("🌐 Server: http://127.0.0.1:5001")
print("📖 API Docs: GET http://127.0.0.1:5001/")
print("\n📊 Logging Configuration:")
print(" • Format: Structured JSON")
print(" • Level: INFO")
print(" • Correlation IDs: Enabled")
print(" • Sensitive Masking: Enabled")
print(" • Sampling: Disabled (logs everything)")
print("\n🔑 Demo Users (use POST /get-token):")
for username, data in users_db.items():
print(f" • {username} (roles: {', '.join(data['roles'])})")
print("\n🎭 Quick Test Tokens:")
print(" • GET /get-custom-token/user")
print(" • GET /get-custom-token/admin")
print(" • GET /get-custom-token/premium")
print(" • GET /get-custom-token/moderator")
app_logger.log(LogLevel.INFO, "Flask server starting", {
'server_startup': True,
'host': '127.0.0.1',
'port': 5001,
'environment': 'development',
'logging_enabled': True,
'jwt_verification': True
})
app.run(debug=True, host='127.0.0.1', port=5001)
```
Raw data
{
"_id": null,
"home_page": null,
"name": "apigatewaybuilder",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.11",
"maintainer_email": null,
"keywords": "api, gateway, validation, authorization, rate-limiting",
"author": "PrabhBirJ",
"author_email": null,
"download_url": "https://files.pythonhosted.org/packages/db/07/4043890f83252d75c60abe7415cf4cbf7479e36fc9be4fea7d64581e7416/apigatewaybuilder-1.4.1.tar.gz",
"platform": null,
"description": "# API Gateway\n\n\n**API Gateway** is a modular, developer-friendly Python project designed to become a **full-featured API Gateway framework**. \n(v1.4.0) it ships with **request validation utilities, authorization, rate limiting and logging** powered by [Pydantic](https://docs.pydantic.dev). \n\n\n---\n\n## Vision\nThe goal of **API Gateway** is to provide:\n- **Validation**: Ensure only clean, schema-compliant data enters your services. *(available today)* \n- **Authentication & Authorization**: Pluggable security layers. *(coming soon)* \n- **Observability**: Metrics, logging, tracing. *(coming soon)* \n- **Routing**: Intelligent request routing and proxying. *(coming soon)* \n- **Rate Limiting & QoS**: Keep traffic fair and resilient. *(coming soon)* \n \n\n---\n\n## Installation For Contribution\n\nTo get started you need [`uv`](https://docs.astral.sh/uv/), a fast Python package manager. Install it first with:\n\n```bash\n# On Linux / macOS\ncurl -LsSf https://astral.sh/uv/install.sh | sh\n\n# On Windows (PowerShell)\npowershell -c \"irm https://astral.sh/uv/install.ps1 | iex\"\n\n\ngit clone https://github.com/PrabhbirJ/apigateway.git\ncd apigateway\nuv sync --all-extras --dev\n\n# Run tests\nuv run pytest\n```\n\n---\n\n## Installation To Use in Your Project\n\nTo get started you need [`uv`](https://docs.astral.sh/uv/), a fast Python package manager. Install it first with:\n\n```bash\n# On Linux / macOS\ncurl -LsSf https://astral.sh/uv/install.sh | sh\n\n# On Windows (PowerShell)\npowershell -c \"irm https://astral.sh/uv/install.ps1 | iex\"\n\ncd {Project_Directory}\n\nuv init\nuv add git+https://github.com/PrabhbirJ/apigateway.git\n\nuv add flask django fastapi\n\n```\n\n---\n\n## Project Structure\n\n```bash\napigateway\n\u251c\u2500\u2500 CHANGELOG.md\n\u251c\u2500\u2500 LICENSE\n\u251c\u2500\u2500 README.md\n\u251c\u2500\u2500 pyproject.toml\n\u251c\u2500\u2500 src\n\u2502 \u251c\u2500\u2500 apigateway\n\u2502 \u2502 \u251c\u2500\u2500 core\n\u2502 \u2502 \u2502 \u251c\u2500\u2500 __init__.py\n\u2502 \u2502 \u2502 \u251c\u2500\u2500 adapters\n\u2502 \u2502 \u2502 \u2502 \u251c\u2500\u2500 __init__.py\n\u2502 \u2502 \u2502 \u2502 \u251c\u2500\u2500 base_adapter.py\n\u2502 \u2502 \u2502 \u2502 \u251c\u2500\u2500 django.py\n\u2502 \u2502 \u2502 \u2502 \u251c\u2500\u2500 fastapi.py\n\u2502 \u2502 \u2502 \u2502 \u251c\u2500\u2500 flask.py\n\u2502 \u2502 \u2502 \u2502 \u2514\u2500\u2500 generic.py\n\u2502 \u2502 \u2502 \u251c\u2500\u2500 auth\n\u2502 \u2502 \u2502 \u2502 \u251c\u2500\u2500 __init__.py\n\u2502 \u2502 \u2502 \u2502 \u2514\u2500\u2500 auth.py\n\u2502 \u2502 \u2502 \u251c\u2500\u2500 enums\n\u2502 \u2502 \u2502 \u2502 \u251c\u2500\u2500 __init__.py\n\u2502 \u2502 \u2502 \u2502 \u2514\u2500\u2500 validation_modes.py\n\u2502 \u2502 \u2502 \u251c\u2500\u2500 errors\n\u2502 \u2502 \u2502 \u2502 \u251c\u2500\u2500 __init.py\n\u2502 \u2502 \u2502 \u2502 \u2514\u2500\u2500 formatters.py\n\u2502 \u2502 \u2502 \u2514\u2500\u2500 validation\n\u2502 \u2502 \u2502 \u251c\u2500\u2500 __init__.py\n\u2502 \u2502 \u2502 \u251c\u2500\u2500 file_validation.py\n\u2502 \u2502 \u2502 \u2514\u2500\u2500 validation.py\n\u2502 \u2502 \u2514\u2500\u2500 exceptions\n\u2502 \u2502 \u251c\u2500\u2500 AuthError.py\n\u2502 \u2502 \u251c\u2500\u2500 GatewayValidationError.py\n\u2502 \u2502 \u251c\u2500\u2500 __init__.py\n\u2502 \u2514\u2500\u2500 apigateway.egg-info\n\u2502 \u251c\u2500\u2500 PKG-INFO\n\u2502 \u251c\u2500\u2500 SOURCES.txt\n\u2502 \u251c\u2500\u2500 dependency_links.txt\n\u2502 \u251c\u2500\u2500 requires.txt\n\u2502 \u2514\u2500\u2500 top_level.txt\n\u251c\u2500\u2500 tests\n\u2502 \u251c\u2500\u2500 __init__.py\n\u2502 \u2514\u2500\u2500 validation\n\u2502 \u251c\u2500\u2500 __init__.py\n\u2502 \u251c\u2500\u2500 test_adapters\n\u2502 \u2502 \u251c\u2500\u2500 __init__.py\n\u2502 \u2502 \u251c\u2500\u2500 conftest.py\n\u2502 \u2502 \u251c\u2500\u2500 test_django.py\n\u2502 \u2502 \u251c\u2500\u2500 test_fastapi.py\n\u2502 \u2502 \u2514\u2500\u2500 test_flask.py\n\u2502 \u2514\u2500\u2500 test_generic\n\u2502 \u251c\u2500\u2500 __init__.py\n\u2502 \u251c\u2500\u2500 test_error_handling.py\n\u2502 \u251c\u2500\u2500 test_pre_post_validators.py\n\u2502 \u2514\u2500\u2500 test_strict_vs_lax.py\n\u2514\u2500\u2500 uv.lock\n\n```\n---\n\n## Error Handling\n\nAll validation errors are raised as GatewayValidationError with this schema:\n```bash\n{\n \"error\": \"Validation Failed\",\n \"code\": \"validation_error\",\n \"details\": [\n {\n \"field\": \"id\",\n \"message\": \"value is not a valid integer\",\n \"type\": \"type_error.integer\"\n }\n ]\n}\n```\nYou can customize formatting by supplying your own error_formatter\n\n---\n\n## Flask Example\n\n```python\nimport os\nimport json\nimport base64\nimport time\nimport secrets\nimport jwt # Add this import\nfrom datetime import datetime, timedelta\nfrom flask import Flask, jsonify, request\nfrom pydantic import BaseModel, ConfigDict\nfrom typing import Optional, List, Dict, Any\n\n# API Gateway imports\nfrom apigateway.core.validation.validation import validate_flask, PreValidators\nfrom apigateway.core.enums.validation_modes import ValidationMode\nfrom apigateway.core.auth.auth import authorize_flask # Updated import\nfrom apigateway.core.rate_limit.RateLimitEngine import configure_rate_limiting, KeyGenerators\nfrom apigateway.core.rate_limit.RateLimiting import rate_limit_flask\nfrom apigateway.core.rate_limit.MemoryBackend import MemoryBackend\n\n# NEW: Logging system imports\nfrom apigateway.core.logging import configure_logging, JsonLogger, LogLevel, get_logger\nfrom apigateway.core.logging.logger import log_request_flask\n\n# =============================================================================\n# LOGGING CONFIGURATION\n# =============================================================================\n\n# Configure structured JSON logging\nlogger_instance = JsonLogger(\n log_level=LogLevel.INFO,\n enable_sampling=False, # Disable sampling for demo (log everything)\n masked_fields={'authorization', 'cookie', 'x-api-key', 'token', 'password'}\n)\nconfigure_logging(logger_instance)\n\n# Get logger for manual logging\napp_logger = get_logger()\n\n# =============================================================================\n# APPLICATION SETUP\n# =============================================================================\n\n# Configure rate limiting with memory backend\nconfigure_rate_limiting(MemoryBackend())\n\napp = Flask(__name__)\n\n# =============================================================================\n# MOCK USER DATABASE\n# =============================================================================\n\nusers_db = {\n \"testuser\": {\n \"user_id\": \"1\", \n \"username\": \"testuser\",\n \"email\": \"test@example.com\",\n \"roles\": [\"user\", \"admin\"]\n },\n \"user1\": {\n \"user_id\": \"2\", \n \"username\": \"user1\",\n \"email\": \"user1@example.com\",\n \"roles\": [\"user\"]\n },\n \"premium\": {\n \"user_id\": \"3\", \n \"username\": \"premium\",\n \"email\": \"premium@example.com\",\n \"roles\": [\"user\", \"premium\"]\n },\n \"moderator\": {\n \"user_id\": \"4\", \n \"username\": \"moderator\", \n \"email\": \"mod@example.com\",\n \"roles\": [\"user\", \"moderator\"]\n }\n}\n\n\nJWT_SECRET_KEY = \"demo-secret-key-32-characters-long-for-development-only!\"\nJWT_ALGORITHM = \"HS256\"\n\n\ndef my_jwt_decoder(token: str) -> Dict[str, Any]:\n \"\"\"Our JWT decoder - we handle the secret and decoding logic.\"\"\"\n try:\n payload = jwt.decode(token, JWT_SECRET_KEY, algorithms=[JWT_ALGORITHM])\n return {\n 'user_id': payload.get('sub'),\n 'username': payload.get('username'),\n 'email': payload.get('email'),\n 'roles': payload.get('roles', []),\n 'permissions': payload.get('permissions', []),\n 'token_payload': payload\n }\n except jwt.ExpiredSignatureError:\n raise Exception(\"Token has expired\")\n except jwt.InvalidSignatureError:\n raise Exception(\"Invalid token signature\")\n except jwt.InvalidTokenError as e:\n raise Exception(f\"Invalid token: {str(e)}\")\n\n\nclass TokenRequestSchema(BaseModel):\n username: str\n model_config = ConfigDict(extra='forbid')\n\nclass ProtectedDataSchema(BaseModel):\n important_data: str\n sensitive_info: Optional[str] = None\n model_config = ConfigDict(extra='forbid')\n\nclass UserSchema(BaseModel):\n username: str\n age: int\n email: str\n model_config = ConfigDict(extra='forbid')\n\nclass ContactSchema(BaseModel):\n name: str\n email: str\n message: str\n model_config = ConfigDict(extra='ignore')\n\nclass SearchSchema(BaseModel):\n query: str\n limit: int = 10\n category: str = \"all\"\n model_config = ConfigDict(extra='forbid')\n\nclass PostSchema(BaseModel):\n title: str\n content: str\n tags: List[str] = []\n model_config = ConfigDict(extra='forbid')\n\nclass ApiKeySchema(BaseModel):\n name: str\n permissions: List[str]\n model_config = ConfigDict(extra='forbid')\n\n\n\ndef create_jwt_token(user_data: dict) -> str:\n \"\"\"Create a properly signed JWT token.\"\"\"\n now = int(time.time())\n payload = {\n \"sub\": str(user_data[\"user_id\"]),\n \"username\": user_data[\"username\"],\n \"email\": user_data[\"email\"],\n \"roles\": user_data[\"roles\"],\n \"permissions\": [\"read\", \"write\"],\n \"iat\": now,\n \"exp\": now + 3600, # 1 hour\n \"jti\": f\"token_{user_data['user_id']}_{now}\"\n }\n \n return jwt.encode(payload, JWT_SECRET_KEY, algorithm=JWT_ALGORITHM)\n\ndef create_custom_role_token(roles: List[str], username: str = \"demo_user\") -> str:\n \"\"\"Create JWT token with custom roles.\"\"\"\n user_data = {\n \"user_id\": f\"demo_{int(time.time())}\",\n \"username\": username,\n \"email\": f\"{username}@demo.com\",\n \"roles\": roles\n }\n return create_jwt_token(user_data)\n\n# Post-validators with logging\ndef audit_user_creation(user: UserSchema) -> UserSchema:\n \"\"\"Post-validator: Log user creation for audit.\"\"\"\n app_logger.log(LogLevel.INFO, \"User creation audit\", {\n 'audit_action': 'user_creation',\n 'new_username': user.username,\n 'user_age': user.age,\n 'user_email': user.email\n })\n return user\n\ndef uppercase_username(user: UserSchema) -> UserSchema:\n \"\"\"Post-validator: Transform username to uppercase.\"\"\"\n original_username = user.username\n user.username = user.username.upper()\n \n app_logger.log(LogLevel.INFO, \"Username transformed\", {\n 'transformation': 'uppercase',\n 'original_username': original_username,\n 'new_username': user.username\n })\n return user\n\ndef validate_admin_email(user: UserSchema) -> UserSchema:\n \"\"\"Post-validator: Ensure admin users have company email.\"\"\"\n if \"@company.com\" not in user.email:\n app_logger.log(LogLevel.WARNING, \"Admin email validation failed\", {\n 'validation_rule': 'company_email_required',\n 'provided_email': user.email,\n 'username': user.username\n })\n raise ValueError(\"Admin users must have @company.com email address\")\n \n app_logger.log(LogLevel.INFO, \"Admin email validation passed\", {\n 'validation_rule': 'company_email_required',\n 'email': user.email,\n 'username': user.username\n })\n return user\n\n\n\n@app.route('/', methods=['GET'])\n@log_request_flask()\ndef home():\n \"\"\"API documentation showing all available endpoints.\"\"\"\n app_logger.log(LogLevel.INFO, \"API documentation requested\", {\n 'endpoint': 'home',\n 'documentation_type': 'api_overview'\n })\n \n return jsonify({\n \"message\": \"API Gateway Demo - JWT + Logging\",\n \"version\": \"3.0-LOGGING\",\n \"features\": [\"JWT Verification\", \"Validation\", \"RBAC\", \"Rate Limiting\", \"Structured Logging\"],\n \"logging\": {\n \"format\": \"structured_json\",\n \"correlation_tracking\": \"enabled\",\n \"sensitive_masking\": \"enabled\",\n \"log_level\": \"INFO\"\n },\n \"endpoints\": {\n \"token_generation\": {\n \"POST /get-token\": \"Get JWT token (rate limited: 10/min)\",\n \"GET /whoami\": \"Get current user info (requires valid JWT)\"\n },\n \"public\": {\n \"POST /contact\": \"Submit contact form (rate limited: 10/min)\",\n \"GET /search\": \"Search with query params (rate limited: 20/min)\",\n \"GET /public-data\": \"Get public data (rate limited: 100/min)\"\n },\n \"user_protected\": {\n \"GET /profile\": \"View profile (user role required)\",\n \"POST /posts\": \"Create post (user role + validation + rate limit: 5/min)\",\n \"POST /submit\": \"Submit protected data (user role + validation)\"\n },\n \"admin_only\": {\n \"POST /users\": \"Create user (admin role + rate limit: 2/min)\",\n \"POST /admin/users\": \"Create admin user (admin role + strict validation)\",\n \"GET /admin/stats\": \"View admin stats (admin role)\"\n },\n \"moderator_only\": {\n \"POST /moderate\": \"Moderate content (moderator role + rate limit: 10/min)\"\n },\n \"premium_features\": {\n \"GET /premium/data\": \"Premium data access (premium role)\",\n \"POST /premium/api-keys\": \"Create API keys (premium role + rate limit: 1/min)\"\n }\n }\n })\n\n\n\n@app.route(\"/get-token\", methods=[\"POST\"])\n@log_request_flask() # OUTERMOST - logs everything\n@rate_limit_flask(requests=10, window=60) # Rate limiting\n@validate_flask(TokenRequestSchema) # Validation\ndef get_token(validated: TokenRequestSchema, _rate_limit_info=None):\n \"\"\"Generate JWT token for testing.\"\"\"\n user = users_db.get(validated.username)\n if not user:\n app_logger.log(LogLevel.WARNING, \"Token request for unknown user\", {\n 'requested_username': validated.username,\n 'available_users': list(users_db.keys())\n })\n return jsonify({\"error\": \"User not found\"}), 404\n \n # Create properly signed JWT token\n access_token = create_jwt_token(user)\n \n app_logger.log(LogLevel.INFO, \"JWT token generated successfully\", {\n 'token_action': 'generation',\n 'username': user[\"username\"],\n 'user_id': user[\"user_id\"],\n 'roles': user[\"roles\"],\n 'token_expiry': datetime.fromtimestamp(int(time.time()) + 3600).isoformat()\n })\n \n return jsonify({\n \"access_token\": access_token,\n \"token_type\": \"bearer\",\n \"expires_in\": 3600,\n \"user\": {\n \"username\": user[\"username\"],\n \"roles\": user[\"roles\"]\n },\n \"note\": \"Properly signed JWT token with verification\"\n })\n\n@app.route(\"/get-custom-token/<role>\", methods=[\"GET\"])\n@log_request_flask()\n@rate_limit_flask(requests=5, window=60, scope=\"custom_token\")\ndef get_custom_token(role, _rate_limit_info=None):\n \"\"\"Generate JWT token with specific role for testing.\"\"\"\n valid_roles = [\"user\", \"admin\", \"moderator\", \"premium\"]\n \n if role not in valid_roles:\n app_logger.log(LogLevel.WARNING, \"Invalid role requested for custom token\", {\n 'requested_role': role,\n 'valid_roles': valid_roles\n })\n return jsonify({\"error\": f\"Invalid role. Valid roles: {valid_roles}\"}), 400\n \n token = create_custom_role_token([role], f\"demo_{role}\")\n \n app_logger.log(LogLevel.INFO, \"Custom role token generated\", {\n 'token_action': 'custom_generation',\n 'role': role,\n 'demo_user': f\"demo_{role}\"\n })\n \n return jsonify({\n \"access_token\": token,\n \"role\": role,\n \"expires_in\": 3600,\n \"note\": f\"JWT token with {role} role\"\n })\n\n@app.route(\"/whoami\", methods=[\"GET\"])\n@log_request_flask()\n@authorize_flask(token_decoder=my_jwt_decoder) \ndef whoami(user):\n \"\"\"Get current user information from verified JWT token.\"\"\"\n app_logger.log(LogLevel.INFO, \"User identity verified\", {\n 'identity_check': 'whoami',\n 'user_id': user['user_id'],\n 'username': user.get('username'),\n 'roles': user['roles'],\n 'token_id': user['token_payload'].get('jti')\n })\n \n return jsonify({\n \"message\": \"JWT token successfully verified\",\n \"user\": {\n \"user_id\": user[\"user_id\"],\n \"username\": user.get(\"username\"),\n \"email\": user.get(\"email\"),\n \"roles\": user[\"roles\"],\n \"permissions\": user.get(\"permissions\", [])\n },\n \"token_info\": {\n \"expires_at\": datetime.fromtimestamp(user[\"token_payload\"][\"exp\"]).isoformat(),\n \"issued_at\": datetime.fromtimestamp(user[\"token_payload\"][\"iat\"]).isoformat(),\n \"token_id\": user[\"token_payload\"].get(\"jti\")\n }\n })\n\n\n\n@app.route('/contact', methods=['POST'])\n@log_request_flask()\n@rate_limit_flask(requests=10, window=60, scope=\"contact\")\n@validate_flask(\n ContactSchema, \n mode=ValidationMode.PERMISSIVE,\n pre_validators=[PreValidators.normalize_email, PreValidators.sanitize_strings]\n)\ndef submit_contact(validated: ContactSchema, _rate_limit_info=None):\n \"\"\"Submit contact form - public endpoint.\"\"\"\n app_logger.log(LogLevel.INFO, \"Contact form submitted\", {\n 'form_submission': 'contact',\n 'contact_name': validated.name,\n 'contact_email': validated.email,\n 'message_length': len(validated.message)\n })\n \n return jsonify({\n \"success\": True,\n \"message\": \"Contact form submitted successfully\",\n \"data\": validated.model_dump()\n })\n\n@app.route('/search', methods=['GET'])\n@log_request_flask()\n@rate_limit_flask(requests=20, window=60, scope=\"search\")\n@validate_flask(SearchSchema, mode=ValidationMode.LAX)\ndef search(validated: SearchSchema, _rate_limit_info=None):\n \"\"\"Search endpoint with query parameters.\"\"\"\n results = [\n f\"Result {i} for '{validated.query}'\" \n for i in range(1, min(validated.limit + 1, 6))\n ]\n \n app_logger.log(LogLevel.INFO, \"Search performed\", {\n 'search_action': 'query_executed',\n 'query': validated.query,\n 'category': validated.category,\n 'limit': validated.limit,\n 'results_count': len(results)\n })\n \n return jsonify({\n \"query\": validated.query,\n \"category\": validated.category,\n \"results\": results,\n \"total\": len(results)\n })\n\n\n\n@app.route(\"/profile\", methods=[\"GET\"])\n@log_request_flask()\n@authorize_flask([\"user\"], token_decoder=my_jwt_decoder) \ndef get_profile(user):\n \"\"\"Get user profile - requires user role.\"\"\"\n app_logger.log(LogLevel.INFO, \"User profile accessed\", {\n 'profile_access': 'view',\n 'user_id': user['user_id'],\n 'username': user.get('username'),\n 'account_type': \"premium\" if \"premium\" in user[\"roles\"] else \"standard\"\n })\n \n return jsonify({\n \"profile\": {\n \"user_id\": user[\"user_id\"],\n \"username\": user.get(\"username\"),\n \"email\": user.get(\"email\"),\n \"roles\": user[\"roles\"],\n \"account_type\": \"premium\" if \"premium\" in user[\"roles\"] else \"standard\"\n }\n })\n\n@app.route('/posts', methods=['POST'])\n@log_request_flask() \n@rate_limit_flask(requests=5, window=60, key_func=KeyGenerators.user_based)\n@authorize_flask([\"user\"], token_decoder=my_jwt_decoder) \n@validate_flask(PostSchema)\ndef create_post(validated: PostSchema, user, _rate_limit_info=None):\n \"\"\"Create a post - full decorator stack with logging.\"\"\"\n post_id = int(time.time())\n \n app_logger.log(LogLevel.INFO, \"Post created successfully\", {\n 'content_creation': 'post',\n 'post_id': post_id,\n 'title': validated.title,\n 'content_length': len(validated.content),\n 'tags_count': len(validated.tags),\n 'author_id': user['user_id'],\n 'author_username': user.get('username')\n })\n \n return jsonify({\n \"success\": True,\n \"message\": \"Post created successfully\",\n \"post\": {\n \"id\": post_id,\n \"title\": validated.title,\n \"content\": validated.content,\n \"tags\": validated.tags,\n \"author\": user.get(\"username\"),\n \"created_at\": datetime.now().isoformat()\n }\n })\n\n\n\n@app.route('/users', methods=['POST'])\n@log_request_flask()\n@rate_limit_flask(requests=2, window=60, key_func=KeyGenerators.user_based)\n@authorize_flask([\"admin\"], token_decoder=my_jwt_decoder) \n@validate_flask(UserSchema, mode=ValidationMode.STRICT, post_validators=[audit_user_creation])\ndef create_user(validated: UserSchema, user, _rate_limit_info=None):\n \"\"\"Create a new user - admin only with comprehensive logging.\"\"\"\n new_user_id = str(len(users_db) + 1)\n \n app_logger.log(LogLevel.INFO, \"Admin user creation completed\", {\n 'admin_action': 'user_creation',\n 'new_user_id': new_user_id,\n 'new_username': validated.username,\n 'new_user_email': validated.email,\n 'created_by_admin_id': user['user_id'],\n 'created_by_admin_username': user.get('username')\n })\n \n return jsonify({\n \"success\": True,\n \"message\": f\"User {validated.username} created successfully\",\n \"user\": {\n \"id\": new_user_id,\n **validated.model_dump()\n },\n \"created_by\": user.get(\"username\")\n })\n\n@app.route('/admin/users', methods=['POST'])\n@log_request_flask()\n@authorize_flask([\"admin\"], token_decoder=my_jwt_decoder) \n@validate_flask(\n UserSchema, \n mode=ValidationMode.STRICT,\n post_validators=[validate_admin_email, uppercase_username, audit_user_creation]\n)\ndef create_admin_user(validated: UserSchema, user):\n \"\"\"Create admin user with multiple post-validators and logging.\"\"\"\n app_logger.log(LogLevel.INFO, \"Admin user creation with enhanced validation\", {\n 'admin_action': 'admin_user_creation',\n 'new_admin_username': validated.username,\n 'email_validated': True,\n 'username_transformed': True,\n 'created_by': user.get('username')\n })\n \n return jsonify({\n \"success\": True,\n \"message\": f\"Admin user {validated.username} created\",\n \"user\": validated.model_dump(),\n \"created_by\": user.get(\"username\")\n })\n\n@app.route('/admin/stats', methods=['GET'])\n@log_request_flask()\n@authorize_flask([\"admin\"], token_decoder=my_jwt_decoder) \ndef admin_stats(user):\n \"\"\"Get admin statistics.\"\"\"\n stats_data = {\n \"total_users\": len(users_db),\n \"admin_users\": len([u for u in users_db.values() if \"admin\" in u[\"roles\"]]),\n \"premium_users\": len([u for u in users_db.values() if \"premium\" in u[\"roles\"]]),\n \"server_uptime\": \"demo mode\",\n \"last_access\": datetime.now().isoformat()\n }\n \n app_logger.log(LogLevel.INFO, \"Admin statistics accessed\", {\n 'admin_action': 'stats_view',\n 'accessed_by': user.get('username'),\n 'stats_summary': {\n 'total_users': stats_data[\"total_users\"],\n 'admin_users': stats_data[\"admin_users\"],\n 'premium_users': stats_data[\"premium_users\"]\n }\n })\n \n return jsonify({\n \"stats\": stats_data,\n \"accessed_by\": user.get(\"username\")\n })\n\n\n\n@app.route('/premium/data', methods=['GET'])\n@log_request_flask()\n@authorize_flask([\"premium\", \"admin\"], token_decoder=my_jwt_decoder) \ndef get_premium_data(user):\n \"\"\"Get premium data - premium role required.\"\"\"\n app_logger.log(LogLevel.INFO, \"Premium content accessed\", {\n 'premium_access': 'data_retrieval',\n 'user_id': user['user_id'],\n 'username': user.get('username'),\n 'access_tier': 'premium'\n })\n \n return jsonify({\n \"premium_data\": {\n \"exclusive_content\": \"This is premium content\",\n \"analytics\": {\"views\": 12345, \"engagement\": \"high\"},\n \"api_calls_remaining\": 9999,\n \"subscription_tier\": \"premium\"\n },\n \"user\": user.get(\"username\")\n })\n\n@app.route('/premium/api-keys', methods=['POST'])\n@log_request_flask()\n@rate_limit_flask(requests=1, window=60, key_func=KeyGenerators.user_based)\n@authorize_flask([\"premium\", \"admin\"], token_decoder=my_jwt_decoder) \n@validate_flask(ApiKeySchema)\ndef create_api_key(validated: ApiKeySchema, user, _rate_limit_info=None):\n \"\"\"Create API key - premium feature with strict rate limiting.\"\"\"\n api_key = f\"ak_{secrets.token_urlsafe(32)}\"\n \n app_logger.log(LogLevel.INFO, \"API key created\", {\n 'api_key_action': 'creation',\n 'key_name': validated.name,\n 'permissions': validated.permissions,\n 'created_by_user_id': user['user_id'],\n 'created_by_username': user.get('username'),\n 'key_prefix': api_key[:8] + \"...\" \n })\n \n return jsonify({\n \"success\": True,\n \"api_key\": {\n \"key\": api_key,\n \"name\": validated.name,\n \"permissions\": validated.permissions,\n \"created_at\": datetime.now().isoformat(),\n \"expires_at\": (datetime.now() + timedelta(days=365)).isoformat()\n },\n \"owner\": user.get(\"username\")\n })\n\n\n\n@app.errorhandler(404)\ndef not_found(error):\n app_logger.log(LogLevel.WARNING, \"Endpoint not found\", {\n 'error_type': '404_not_found',\n 'requested_path': request.path,\n 'method': request.method\n })\n return jsonify({\"error\": \"Endpoint not found\"}), 404\n\n@app.errorhandler(500)\ndef internal_error(error):\n app_logger.log(LogLevel.ERROR, \"Internal server error occurred\", {\n 'error_type': '500_internal_error',\n 'error_message': str(error),\n 'path': request.path,\n 'method': request.method\n })\n return jsonify({\"error\": \"Internal server error\"}), 500\n\n\n\nif __name__ == '__main__':\n print(\"\ud83d\ude80 Starting API Gateway with Comprehensive Logging...\")\n print(\"\ud83c\udf10 Server: http://127.0.0.1:5001\")\n print(\"\ud83d\udcd6 API Docs: GET http://127.0.0.1:5001/\")\n \n print(\"\\n\ud83d\udcca Logging Configuration:\")\n print(\" \u2022 Format: Structured JSON\")\n print(\" \u2022 Level: INFO\")\n print(\" \u2022 Correlation IDs: Enabled\") \n print(\" \u2022 Sensitive Masking: Enabled\")\n print(\" \u2022 Sampling: Disabled (logs everything)\")\n \n print(\"\\n\ud83d\udd11 Demo Users (use POST /get-token):\")\n for username, data in users_db.items():\n print(f\" \u2022 {username} (roles: {', '.join(data['roles'])})\")\n \n print(\"\\n\ud83c\udfad Quick Test Tokens:\")\n print(\" \u2022 GET /get-custom-token/user\")\n print(\" \u2022 GET /get-custom-token/admin\") \n print(\" \u2022 GET /get-custom-token/premium\")\n print(\" \u2022 GET /get-custom-token/moderator\")\n \n app_logger.log(LogLevel.INFO, \"Flask server starting\", {\n 'server_startup': True,\n 'host': '127.0.0.1',\n 'port': 5001,\n 'environment': 'development',\n 'logging_enabled': True,\n 'jwt_verification': True\n })\n \n app.run(debug=True, host='127.0.0.1', port=5001)\n```\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "Framework-agnostic API Gateway with validation, auth, and rate limiting",
"version": "1.4.1",
"project_urls": {
"Documentation": "https://github.com/PrabhBirJ/apigateway#readme",
"Homepage": "https://github.com/PrabhBirJ/apigateway",
"Issues": "https://github.com/PrabhBirJ/apigateway/issues",
"Repository": "https://github.com/PrabhBirJ/apigateway"
},
"split_keywords": [
"api",
" gateway",
" validation",
" authorization",
" rate-limiting"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "4f8342d1089de1bc5611429434ef910b9d11b8ca6f76bb1558f14bf200d3d72c",
"md5": "0bc23f50bcccc4b08eb5d17b98a4f275",
"sha256": "ee21a304585a699dce7d5fdc523d997e547430fd904d47ca8fc5a4bf33bc2e5a"
},
"downloads": -1,
"filename": "apigatewaybuilder-1.4.1-py3-none-any.whl",
"has_sig": false,
"md5_digest": "0bc23f50bcccc4b08eb5d17b98a4f275",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.11",
"size": 54100,
"upload_time": "2025-10-09T11:17:14",
"upload_time_iso_8601": "2025-10-09T11:17:14.553801Z",
"url": "https://files.pythonhosted.org/packages/4f/83/42d1089de1bc5611429434ef910b9d11b8ca6f76bb1558f14bf200d3d72c/apigatewaybuilder-1.4.1-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "db074043890f83252d75c60abe7415cf4cbf7479e36fc9be4fea7d64581e7416",
"md5": "3afed05bfc06be94a11f47ea5b3b482d",
"sha256": "3a9d22c691f8778e7167411059739ef66b8f644dd56075dfef2e3baf34b9ef6a"
},
"downloads": -1,
"filename": "apigatewaybuilder-1.4.1.tar.gz",
"has_sig": false,
"md5_digest": "3afed05bfc06be94a11f47ea5b3b482d",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.11",
"size": 47606,
"upload_time": "2025-10-09T11:17:16",
"upload_time_iso_8601": "2025-10-09T11:17:16.236825Z",
"url": "https://files.pythonhosted.org/packages/db/07/4043890f83252d75c60abe7415cf4cbf7479e36fc9be4fea7d64581e7416/apigatewaybuilder-1.4.1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-10-09 11:17:16",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "PrabhBirJ",
"github_project": "apigateway#readme",
"travis_ci": false,
"coveralls": false,
"github_actions": false,
"lcname": "apigatewaybuilder"
}