# Earnbase Common
Core library for Earnbase Platform services.
## Overview
Earnbase Common provides shared components, utilities, and standards for building microservices in the Earnbase Platform. It implements common patterns and best practices to ensure consistency across services.
## Features
### Domain Models
Base classes for domain-driven design:
- **BaseModel**: Enhanced Pydantic model with common functionality
```python
from earnbase_common.models import BaseModel
from datetime import datetime
from typing import Optional
class User(BaseModel):
name: str
email: str
created_at: datetime
status: Optional[str] = None
# Models are immutable by default
user = User(
name="John",
email="john@example.com",
created_at=datetime.utcnow()
)
```
- **Entity**: Base class for domain entities
```python
from earnbase_common.models import Entity
from typing import Optional
class Product(Entity):
name: str
price: float
description: Optional[str] = None
product = Product(name="Phone", price=999.99)
print(str(product)) # Product(id=123e4567-e89b-12d3-a456-426614174000)
```
- **AggregateRoot**: Base class for aggregate roots with event management
```python
from earnbase_common.models import AggregateRoot, DomainEvent
class OrderCreated(DomainEvent):
order_id: str
total_amount: float
class Order(AggregateRoot):
customer_id: str
total: float
status: str = "pending"
def place(self) -> None:
"""Place the order."""
self.status = "placed"
# Add domain event
self.add_event(
OrderCreated(
event_type="OrderCreated",
aggregate_id=str(self.id),
aggregate_type="Order",
order_id=str(self.id),
total_amount=self.total
)
)
# Update version
self.increment_version()
# Create and place order
order = Order(customer_id="123", total=100.0)
order.place()
# Access events and version
print(order.events) # [OrderCreated(...)]
print(order.version) # 2
```
Key features:
- Immutable models with Pydantic validation
- Event sourcing with domain events
- Automatic versioning and timestamps
- UUID generation for entities
- Type safety and validation
For detailed documentation, see [Models](docs/models.md).
### Security
Comprehensive security utilities:
```python
from earnbase_common.security import (
JWTConfig,
TokenManager,
PasswordHasher,
SecurityPolicy
)
# JWT token management
config = JWTConfig(secret_key="your-secret-key")
manager = TokenManager(config)
# Create tokens
access_token = manager.create_token(
data={"user_id": "123"},
token_type="access"
)
# Password management
hasher = PasswordHasher()
hash_value = await hasher.hash("StrongP@ssw0rd")
is_valid = await hasher.verify("StrongP@ssw0rd", hash_value.value)
# Security policies
policy = SecurityPolicy()
min_length = policy.PASSWORD_MIN_LENGTH # 8
max_attempts = policy.MAX_LOGIN_ATTEMPTS # 5
token_expire = policy.ACCESS_TOKEN_EXPIRE_MINUTES # 30
```
Key features:
- JWT token creation and verification
- Password hashing with policy validation
- Security policy configuration
- Session management
- Account lockout protection
- Token expiration control
For detailed documentation, see [Security](docs/security.md).
### Value Objects
Immutable value objects for common domain concepts:
```python
from earnbase_common.value_objects import (
Email,
PhoneNumber,
Money,
Address
)
from decimal import Decimal
# Email validation
email = Email(value="user@example.com")
print(str(email)) # user@example.com
# Phone number with country code
phone = PhoneNumber(
value="1234567890",
country_code="84"
)
print(str(phone)) # +841234567890
# Money with currency
price = Money(amount=Decimal("99.99"), currency="USD")
discount = Money(amount=Decimal("10.00"), currency="USD")
final = price - discount
print(str(final)) # 89.99 USD
# Address with optional unit
address = Address(
street="123 Main St",
city="San Francisco",
state="CA",
country="USA",
postal_code="94105",
unit="4B"
)
print(str(address))
# Unit 4B, 123 Main St, San Francisco, CA 94105, USA
```
Key features:
- Immutable value objects
- Format validation
- Pattern matching
- String representation
- Equality comparison
- Type safety
- Arithmetic operations (Money)
For detailed documentation, see [Value Objects](docs/value_objects.md).
### Core Components
- **Database**: MongoDB integration and repository patterns
```python
from earnbase_common.database import MongoDB, BaseRepository
from pydantic import BaseModel
# MongoDB client with automatic retries and connection pooling
mongodb = MongoDB()
await mongodb.connect(
url="mongodb://localhost:27017",
db_name="mydb",
min_pool_size=10,
max_pool_size=100
)
# Type-safe repository pattern
class User(BaseModel):
name: str
email: str
status: str = "active"
class UserRepository(BaseRepository[User]):
def __init__(self, mongodb: MongoDB):
super().__init__(
collection=mongodb.db["users"],
model=User
)
# Use repository
repo = UserRepository(mongodb)
user = await repo.find_one({"email": "user@example.com"})
users = await repo.find_many({"status": "active"})
```
Key features:
- MongoDB client with automatic retries
- Connection pooling and lifecycle management
- Type-safe repository pattern with Pydantic
- Automatic metrics collection
- Built-in error handling
For detailed documentation, see [Database](docs/database.md).
- **Redis**: Caching and session management
```python
from earnbase_common.redis import RedisClient, Cache
# Use Redis for caching
cache = Cache()
await cache.set("key", "value", expire=300) # 5 minutes
value = await cache.get("key")
# Use Redis for session
session = await RedisClient.get_session("session_id")
await session.set("user_id", "123")
user_id = await session.get("user_id")
```
- **HTTP**: HTTP client and request handling
```python
from earnbase_common.http import HTTPClient
# Make HTTP requests
client = HTTPClient()
response = await client.get("https://api.example.com/users")
user = await client.post(
"https://api.example.com/users",
json={"name": "John"}
)
```
- **Metrics**: Performance monitoring and metrics collection
```python
from earnbase_common.metrics import metrics
# Counter metric
request_counter = metrics.counter(
"http_requests_total",
labelnames=["method", "path"]
)
request_counter.labels(method="GET", path="/users").inc()
# Histogram metric
request_duration = metrics.histogram(
"http_request_duration_seconds",
label_names=["method", "path"],
buckets=(0.01, 0.05, 0.1, 0.5, 1.0)
)
with request_duration.time():
# Process request
pass
# Gauge metric
active_users = metrics.gauge(
"active_users",
labelnames=["status"]
)
active_users.labels(status="online").set(42)
# Summary metric
response_size = metrics.summary(
"response_size_bytes",
labelnames=["content_type"]
)
response_size.labels(content_type="json").observe(1024)
```
Key features:
- Multiple metric types (Counter, Histogram, Gauge, Summary)
- Automatic metrics collection for HTTP and Database operations
- Built-in service metrics (uptime, info)
- Prometheus integration
- Decorator-based metrics collection
For detailed documentation, see [Metrics](docs/metrics.md).
- **Logging**: Structured logging and error tracking
```python
from earnbase_common.logging import get_logger, setup_logging
# Configure logging
setup_logging(
service_name="my-service",
log_file="/var/log/my-service/app.log",
log_level="INFO",
debug=False # True for development
)
# Get logger
logger = get_logger(__name__)
# Structured logging with context
logger.info(
"Processing request",
request_id="req-123",
method="POST",
path="/users"
)
# Error logging with details
try:
result = await process_data()
except Exception as e:
logger.error(
"Operation failed",
error=str(e),
operation="process_data",
exc_info=True
)
```
Key features:
- Structured logging with JSON/Console formats
- Automatic log rotation and size limits
- Sensitive data filtering
- Service context enrichment
- Multiple output handlers (console, file, error file)
For detailed documentation, see [Logging](docs/logging.md).
## Database Operations
The MongoDB client now includes a retry mechanism using tenacity. This helps handle temporary connection issues and improves reliability.
### Retry Configuration
You can customize the retry behavior:
```python
from earnbase_common.retry import RetryConfig
from earnbase_common.database import mongodb
# Custom retry config
retry_config = RetryConfig(
max_attempts=5,
max_delay=10.0,
min_delay=1.0,
exceptions=(ConnectionError, TimeoutError)
)
# Apply to MongoDB client
await mongodb.connect(
url="mongodb://localhost:27017",
db_name="earnbase",
retry_config=retry_config
)
```
Default retry configuration:
- Max attempts: 3
- Max delay: 5 seconds
- Min delay: 1 second
- Retried exceptions: ConnectionFailure, ServerSelectionTimeoutError
All database operations (find, insert, update, delete) automatically use the configured retry mechanism.
## Project Structure
```
earnbase_common/
├── config/ # Configuration management
├── database/ # Database integration
├── errors/ # Error handling
├── http/ # HTTP utilities
├── logging/ # Logging configuration
├── metrics/ # Metrics collection
├── middleware/ # HTTP middleware
├── models/ # Domain models
├── redis/ # Redis integration
├── responses/ # API responses
├── security/ # Security utilities
└── value_objects/ # Domain value objects
```
## Installation
```bash
pdm add earnbase-common
```
## Contributing
1. Fork the repository
2. Create your feature branch (`git checkout -b feature/amazing-feature`)
3. Commit your changes (`git commit -m 'Add amazing feature'`)
4. Push to the branch (`git push origin feature/amazing-feature`)
5. Open a Pull Request
## License
This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.
## Dependency Injection with Containers
The `containers` module provides a standardized way to manage dependencies across services using the dependency-injector package.
### BaseContainer
The `BaseContainer` class serves as a foundation for service-specific containers, providing common functionality:
```python
from earnbase_common.containers import BaseContainer
from dependency_injector import providers
class ServiceContainer(BaseContainer):
"""Service-specific container."""
# Override config with service-specific settings
config = providers.Singleton(ServiceSettings)
# Add service-specific providers
service = providers.Singleton(MyService)
repository = providers.Singleton(MyRepository)
```
### Common Providers
The `BaseContainer` includes several pre-configured providers:
1. **MongoDB**:
```python
# Automatically configured from settings
mongodb = providers.Singleton(MongoDB)
# Access in your code
mongodb_client = container.mongodb()
await mongodb_client.connect(
url=config.MONGODB_URL,
db_name=config.MONGODB_DB_NAME
)
```
2. **Redis**:
```python
# Optional Redis support
redis = providers.Singleton(RedisClient)
# Access in your code if configured
redis_client = container.redis()
if redis_url:
await redis_client.connect(
url=redis_url,
db=redis_db
)
```
3. **Metrics**:
```python
# Metrics collection
metrics = providers.Singleton(
MetricsManager,
enabled=config.METRICS_ENABLED
)
```
### Resource Lifecycle
The `BaseContainer` manages resource lifecycle automatically:
```python
async def lifespan(app: FastAPI):
"""Application lifespan manager."""
container = ServiceContainer()
try:
# Initialize resources (MongoDB, Redis, etc.)
await container.init_resources()
# Wire container
container.wire(packages=["my_service"])
yield
finally:
# Cleanup resources
await container.shutdown_resources()
```
### Configuration Integration
The container works seamlessly with the configuration system:
```python
from earnbase_common.config import BaseSettings
class ServiceSettings(BaseSettings):
"""Service-specific settings."""
def _load_yaml_mappings(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""Load service-specific mappings."""
mappings = super()._load_yaml_mappings(config)
# Add service-specific mappings
service_mappings = {
"SERVICE_NAME": config["service"]["name"],
"MONGODB_URL": config["mongodb"]["url"],
"REDIS_URL": config["redis"].get("url"),
"METRICS_ENABLED": config["metrics"].get("enabled", True),
}
mappings.update(service_mappings)
return mappings
```
### Best Practices
1. **Resource Management**:
- Always use `init_resources()` and `shutdown_resources()`
- Handle optional resources like Redis gracefully
- Implement proper error handling for resource initialization
2. **Configuration**:
- Use type-safe configuration with proper defaults
- Handle optional settings gracefully
- Validate configuration during initialization
3. **Dependency Wiring**:
- Wire containers at application startup
- Use proper package scoping for wiring
- Avoid circular dependencies
4. **Error Handling**:
- Handle resource initialization failures
- Implement proper cleanup in shutdown
- Log resource lifecycle events
### Configuration
The configuration system provides a flexible and type-safe way to manage settings:
```python
from earnbase_common.config import BaseSettings
class ServiceSettings(BaseSettings):
"""Service-specific settings."""
# Default values with type hints
SERVICE_NAME: str
DEBUG: bool = True
HTTP_PORT: int = 8000
# Load from file
settings = ServiceSettings("config.yaml")
# Load with environment variables
# SERVICE_NAME=my-service python app.py
# Load with direct arguments
settings = ServiceSettings(
config_path="config.yaml",
DEBUG=False,
HTTP_PORT=9000
)
```
Key features:
- Multiple configuration sources (YAML, env vars, direct args)
- Type validation and immutability
- Environment-specific settings
- Secure handling of sensitive data
- Service-specific prefixes for env vars
For detailed documentation, see [Configuration](docs/config.md).
### Dependency Injection
The containers module provides a powerful dependency injection system:
```python
from earnbase_common.containers import BaseContainer
from dependency_injector import providers
class ServiceContainer(BaseContainer):
"""Service container."""
# Common providers are pre-configured:
# - config: Settings management
# - mongodb: Database connection
# - redis: Cache client
# - metrics: Metrics collection
# Add service-specific providers
repository = providers.Singleton(
Repository,
mongodb=mongodb
)
service = providers.Singleton(
Service,
repository=repository,
redis=redis
)
# Resource lifecycle management
async def lifespan(app: FastAPI):
container = ServiceContainer()
try:
await container.init_resources()
container.wire(packages=["my_service"])
yield
finally:
await container.shutdown_resources()
```
Key features:
- Pre-configured common providers
- Resource lifecycle management
- Integration with FastAPI
- Testing support with provider overrides
- Async resource providers
- Factory and contextual providers
For detailed documentation, see [Containers](docs/containers.md).
- **Middleware**: HTTP middleware components
```python
from fastapi import FastAPI
from earnbase_common.middleware import (
SecurityHeadersMiddleware,
RequestTrackingMiddleware
)
app = FastAPI()
# Add security headers
app.add_middleware(SecurityHeadersMiddleware) # Adds security headers
# Add request tracking
app.add_middleware(RequestTrackingMiddleware) # Tracks request details
@app.get("/users")
async def get_users(request: Request):
# Access request tracking info
request_id = request.state.request_id
start_time = request.state.start_time
return {"request_id": request_id}
```
Key features:
- Security headers middleware (XSS, CSP, HSTS)
- Request tracking with unique IDs
- Request/Response logging
- Performance monitoring
- Error handling
For detailed documentation, see [Middleware](docs/middleware.md).
### Redis
Redis client with caching and session management:
```python
from earnbase_common.redis import RedisClient
# Connect to Redis
redis = await RedisClient.connect(
url="redis://localhost:6379",
db=0,
prefix="myapp", # Optional key prefix
ttl=3600 # Default TTL in seconds
)
# Basic operations
await redis.set("user:123", "John Doe")
value = await redis.get("user:123") # "John Doe"
# Custom expiration
await redis.set("session:abc", "data", expire=1800) # 30 minutes
# Check existence and TTL
exists = await redis.exists("user:123") # True
ttl = await redis.ttl("session:abc") # Seconds remaining
# Close connection
await redis.close()
```
Key features:
- Connection pooling and management
- Key prefixing and TTL configuration
- Structured error handling and logging
- Support for caching and sessions
- Distributed locks and rate limiting
- Pub/Sub messaging
For detailed documentation, see [Redis](docs/redis.md).
### Responses
Standardized API response models:
```python
from earnbase_common.responses import (
SuccessResponse,
ErrorResponse,
PaginatedResponse,
CustomJSONResponse
)
# Success response
response = SuccessResponse(
message="User created",
data={"id": "123", "name": "John"},
meta={"timestamp": "2024-01-12T00:00:00Z"}
)
# Error response
error = ErrorResponse(
error="Validation failed",
details={"field": "email", "message": "Invalid format"},
errors=[
{"field": "email", "message": "Invalid format"},
{"field": "phone", "message": "Required field"}
]
)
# Paginated response
paginated = PaginatedResponse(
data=[{"id": "1"}, {"id": "2"}],
meta={
"page": 1,
"per_page": 10,
"total": 100,
"total_pages": 10
}
)
# FastAPI integration
app = FastAPI()
@app.get(
"/users/{user_id}",
response_model=SuccessResponse,
response_class=CustomJSONResponse
)
async def get_user(user_id: str):
return {
"id": user_id,
"name": "John Doe"
}
```
Key features:
- Standardized response structure
- Type validation with Pydantic
- Error handling with details
- Pagination support
- Custom JSON formatting
- FastAPI integration
For detailed documentation, see [Responses](docs/responses.md).
```
</rewritten_file>
Raw data
{
"_id": null,
"home_page": null,
"name": "earnbase-common",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.9",
"maintainer_email": null,
"keywords": "logging, utilities, microservices, fastapi, mongodb, redis",
"author": null,
"author_email": "Earnbase Team <dev@earnbase.io>",
"download_url": "https://files.pythonhosted.org/packages/43/07/a3b28976643d272e5daf638591d2266ab91a129e962dc41db8ad30f6574b/earnbase_common-0.2.7.tar.gz",
"platform": null,
"description": "# Earnbase Common\n\nCore library for Earnbase Platform services.\n\n## Overview\n\nEarnbase Common provides shared components, utilities, and standards for building microservices in the Earnbase Platform. It implements common patterns and best practices to ensure consistency across services.\n\n## Features\n\n### Domain Models\n\nBase classes for domain-driven design:\n\n- **BaseModel**: Enhanced Pydantic model with common functionality\n```python\nfrom earnbase_common.models import BaseModel\nfrom datetime import datetime\nfrom typing import Optional\n\nclass User(BaseModel):\n name: str\n email: str\n created_at: datetime\n status: Optional[str] = None\n\n# Models are immutable by default\nuser = User(\n name=\"John\",\n email=\"john@example.com\",\n created_at=datetime.utcnow()\n)\n```\n\n- **Entity**: Base class for domain entities\n```python\nfrom earnbase_common.models import Entity\nfrom typing import Optional\n\nclass Product(Entity):\n name: str\n price: float\n description: Optional[str] = None\n\nproduct = Product(name=\"Phone\", price=999.99)\nprint(str(product)) # Product(id=123e4567-e89b-12d3-a456-426614174000)\n```\n\n- **AggregateRoot**: Base class for aggregate roots with event management\n```python\nfrom earnbase_common.models import AggregateRoot, DomainEvent\n\nclass OrderCreated(DomainEvent):\n order_id: str\n total_amount: float\n\nclass Order(AggregateRoot):\n customer_id: str\n total: float\n status: str = \"pending\"\n\n def place(self) -> None:\n \"\"\"Place the order.\"\"\"\n self.status = \"placed\"\n \n # Add domain event\n self.add_event(\n OrderCreated(\n event_type=\"OrderCreated\",\n aggregate_id=str(self.id),\n aggregate_type=\"Order\",\n order_id=str(self.id),\n total_amount=self.total\n )\n )\n \n # Update version\n self.increment_version()\n\n# Create and place order\norder = Order(customer_id=\"123\", total=100.0)\norder.place()\n\n# Access events and version\nprint(order.events) # [OrderCreated(...)]\nprint(order.version) # 2\n```\n\nKey features:\n- Immutable models with Pydantic validation\n- Event sourcing with domain events\n- Automatic versioning and timestamps\n- UUID generation for entities\n- Type safety and validation\n\nFor detailed documentation, see [Models](docs/models.md).\n\n### Security\n\nComprehensive security utilities:\n\n```python\nfrom earnbase_common.security import (\n JWTConfig,\n TokenManager,\n PasswordHasher,\n SecurityPolicy\n)\n\n# JWT token management\nconfig = JWTConfig(secret_key=\"your-secret-key\")\nmanager = TokenManager(config)\n\n# Create tokens\naccess_token = manager.create_token(\n data={\"user_id\": \"123\"},\n token_type=\"access\"\n)\n\n# Password management\nhasher = PasswordHasher()\nhash_value = await hasher.hash(\"StrongP@ssw0rd\")\nis_valid = await hasher.verify(\"StrongP@ssw0rd\", hash_value.value)\n\n# Security policies\npolicy = SecurityPolicy()\nmin_length = policy.PASSWORD_MIN_LENGTH # 8\nmax_attempts = policy.MAX_LOGIN_ATTEMPTS # 5\ntoken_expire = policy.ACCESS_TOKEN_EXPIRE_MINUTES # 30\n```\n\nKey features:\n- JWT token creation and verification\n- Password hashing with policy validation\n- Security policy configuration\n- Session management\n- Account lockout protection\n- Token expiration control\n\nFor detailed documentation, see [Security](docs/security.md).\n\n### Value Objects\n\nImmutable value objects for common domain concepts:\n\n```python\nfrom earnbase_common.value_objects import (\n Email,\n PhoneNumber,\n Money,\n Address\n)\nfrom decimal import Decimal\n\n# Email validation\nemail = Email(value=\"user@example.com\")\nprint(str(email)) # user@example.com\n\n# Phone number with country code\nphone = PhoneNumber(\n value=\"1234567890\",\n country_code=\"84\"\n)\nprint(str(phone)) # +841234567890\n\n# Money with currency\nprice = Money(amount=Decimal(\"99.99\"), currency=\"USD\")\ndiscount = Money(amount=Decimal(\"10.00\"), currency=\"USD\")\nfinal = price - discount\nprint(str(final)) # 89.99 USD\n\n# Address with optional unit\naddress = Address(\n street=\"123 Main St\",\n city=\"San Francisco\",\n state=\"CA\",\n country=\"USA\",\n postal_code=\"94105\",\n unit=\"4B\"\n)\nprint(str(address))\n# Unit 4B, 123 Main St, San Francisco, CA 94105, USA\n```\n\nKey features:\n- Immutable value objects\n- Format validation\n- Pattern matching\n- String representation\n- Equality comparison\n- Type safety\n- Arithmetic operations (Money)\n\nFor detailed documentation, see [Value Objects](docs/value_objects.md).\n\n### Core Components\n\n- **Database**: MongoDB integration and repository patterns\n```python\nfrom earnbase_common.database import MongoDB, BaseRepository\nfrom pydantic import BaseModel\n\n# MongoDB client with automatic retries and connection pooling\nmongodb = MongoDB()\nawait mongodb.connect(\n url=\"mongodb://localhost:27017\",\n db_name=\"mydb\",\n min_pool_size=10,\n max_pool_size=100\n)\n\n# Type-safe repository pattern\nclass User(BaseModel):\n name: str\n email: str\n status: str = \"active\"\n\nclass UserRepository(BaseRepository[User]):\n def __init__(self, mongodb: MongoDB):\n super().__init__(\n collection=mongodb.db[\"users\"],\n model=User\n )\n\n# Use repository\nrepo = UserRepository(mongodb)\nuser = await repo.find_one({\"email\": \"user@example.com\"})\nusers = await repo.find_many({\"status\": \"active\"})\n```\n\nKey features:\n- MongoDB client with automatic retries\n- Connection pooling and lifecycle management\n- Type-safe repository pattern with Pydantic\n- Automatic metrics collection\n- Built-in error handling\n\nFor detailed documentation, see [Database](docs/database.md).\n\n- **Redis**: Caching and session management\n```python\nfrom earnbase_common.redis import RedisClient, Cache\n\n# Use Redis for caching\ncache = Cache()\nawait cache.set(\"key\", \"value\", expire=300) # 5 minutes\nvalue = await cache.get(\"key\")\n\n# Use Redis for session\nsession = await RedisClient.get_session(\"session_id\")\nawait session.set(\"user_id\", \"123\")\nuser_id = await session.get(\"user_id\")\n```\n\n- **HTTP**: HTTP client and request handling\n```python\nfrom earnbase_common.http import HTTPClient\n\n# Make HTTP requests\nclient = HTTPClient()\nresponse = await client.get(\"https://api.example.com/users\")\nuser = await client.post(\n \"https://api.example.com/users\",\n json={\"name\": \"John\"}\n)\n```\n\n- **Metrics**: Performance monitoring and metrics collection\n```python\nfrom earnbase_common.metrics import metrics\n\n# Counter metric\nrequest_counter = metrics.counter(\n \"http_requests_total\",\n labelnames=[\"method\", \"path\"]\n)\nrequest_counter.labels(method=\"GET\", path=\"/users\").inc()\n\n# Histogram metric\nrequest_duration = metrics.histogram(\n \"http_request_duration_seconds\",\n label_names=[\"method\", \"path\"],\n buckets=(0.01, 0.05, 0.1, 0.5, 1.0)\n)\nwith request_duration.time():\n # Process request\n pass\n\n# Gauge metric\nactive_users = metrics.gauge(\n \"active_users\",\n labelnames=[\"status\"]\n)\nactive_users.labels(status=\"online\").set(42)\n\n# Summary metric\nresponse_size = metrics.summary(\n \"response_size_bytes\",\n labelnames=[\"content_type\"]\n)\nresponse_size.labels(content_type=\"json\").observe(1024)\n```\n\nKey features:\n- Multiple metric types (Counter, Histogram, Gauge, Summary)\n- Automatic metrics collection for HTTP and Database operations\n- Built-in service metrics (uptime, info)\n- Prometheus integration\n- Decorator-based metrics collection\n\nFor detailed documentation, see [Metrics](docs/metrics.md).\n\n- **Logging**: Structured logging and error tracking\n```python\nfrom earnbase_common.logging import get_logger, setup_logging\n\n# Configure logging\nsetup_logging(\n service_name=\"my-service\",\n log_file=\"/var/log/my-service/app.log\",\n log_level=\"INFO\",\n debug=False # True for development\n)\n\n# Get logger\nlogger = get_logger(__name__)\n\n# Structured logging with context\nlogger.info(\n \"Processing request\",\n request_id=\"req-123\",\n method=\"POST\",\n path=\"/users\"\n)\n\n# Error logging with details\ntry:\n result = await process_data()\nexcept Exception as e:\n logger.error(\n \"Operation failed\",\n error=str(e),\n operation=\"process_data\",\n exc_info=True\n )\n```\n\nKey features:\n- Structured logging with JSON/Console formats\n- Automatic log rotation and size limits\n- Sensitive data filtering\n- Service context enrichment\n- Multiple output handlers (console, file, error file)\n\nFor detailed documentation, see [Logging](docs/logging.md).\n\n## Database Operations\n\nThe MongoDB client now includes a retry mechanism using tenacity. This helps handle temporary connection issues and improves reliability.\n\n### Retry Configuration\n\nYou can customize the retry behavior:\n\n```python\nfrom earnbase_common.retry import RetryConfig\nfrom earnbase_common.database import mongodb\n\n# Custom retry config\nretry_config = RetryConfig(\n max_attempts=5,\n max_delay=10.0,\n min_delay=1.0,\n exceptions=(ConnectionError, TimeoutError)\n)\n\n# Apply to MongoDB client\nawait mongodb.connect(\n url=\"mongodb://localhost:27017\",\n db_name=\"earnbase\",\n retry_config=retry_config\n)\n```\n\nDefault retry configuration:\n- Max attempts: 3\n- Max delay: 5 seconds\n- Min delay: 1 second\n- Retried exceptions: ConnectionFailure, ServerSelectionTimeoutError\n\nAll database operations (find, insert, update, delete) automatically use the configured retry mechanism.\n\n## Project Structure\n\n```\nearnbase_common/\n\u251c\u2500\u2500 config/ # Configuration management\n\u251c\u2500\u2500 database/ # Database integration\n\u251c\u2500\u2500 errors/ # Error handling\n\u251c\u2500\u2500 http/ # HTTP utilities\n\u251c\u2500\u2500 logging/ # Logging configuration\n\u251c\u2500\u2500 metrics/ # Metrics collection\n\u251c\u2500\u2500 middleware/ # HTTP middleware\n\u251c\u2500\u2500 models/ # Domain models\n\u251c\u2500\u2500 redis/ # Redis integration\n\u251c\u2500\u2500 responses/ # API responses\n\u251c\u2500\u2500 security/ # Security utilities\n\u2514\u2500\u2500 value_objects/ # Domain value objects\n```\n\n## Installation\n\n```bash\npdm add earnbase-common\n```\n\n## Contributing\n\n1. Fork the repository\n2. Create your feature branch (`git checkout -b feature/amazing-feature`)\n3. Commit your changes (`git commit -m 'Add amazing feature'`)\n4. Push to the branch (`git push origin feature/amazing-feature`)\n5. Open a Pull Request\n\n## License\n\nThis project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.\n\n## Dependency Injection with Containers\n\nThe `containers` module provides a standardized way to manage dependencies across services using the dependency-injector package.\n\n### BaseContainer\n\nThe `BaseContainer` class serves as a foundation for service-specific containers, providing common functionality:\n\n```python\nfrom earnbase_common.containers import BaseContainer\nfrom dependency_injector import providers\n\nclass ServiceContainer(BaseContainer):\n \"\"\"Service-specific container.\"\"\"\n \n # Override config with service-specific settings\n config = providers.Singleton(ServiceSettings)\n \n # Add service-specific providers\n service = providers.Singleton(MyService)\n repository = providers.Singleton(MyRepository)\n```\n\n### Common Providers\n\nThe `BaseContainer` includes several pre-configured providers:\n\n1. **MongoDB**:\n```python\n# Automatically configured from settings\nmongodb = providers.Singleton(MongoDB)\n\n# Access in your code\nmongodb_client = container.mongodb()\nawait mongodb_client.connect(\n url=config.MONGODB_URL,\n db_name=config.MONGODB_DB_NAME\n)\n```\n\n2. **Redis**:\n```python\n# Optional Redis support\nredis = providers.Singleton(RedisClient)\n\n# Access in your code if configured\nredis_client = container.redis()\nif redis_url:\n await redis_client.connect(\n url=redis_url,\n db=redis_db\n )\n```\n\n3. **Metrics**:\n```python\n# Metrics collection\nmetrics = providers.Singleton(\n MetricsManager,\n enabled=config.METRICS_ENABLED\n)\n```\n\n### Resource Lifecycle\n\nThe `BaseContainer` manages resource lifecycle automatically:\n\n```python\nasync def lifespan(app: FastAPI):\n \"\"\"Application lifespan manager.\"\"\"\n container = ServiceContainer()\n \n try:\n # Initialize resources (MongoDB, Redis, etc.)\n await container.init_resources()\n \n # Wire container\n container.wire(packages=[\"my_service\"])\n \n yield\n \n finally:\n # Cleanup resources\n await container.shutdown_resources()\n```\n\n### Configuration Integration\n\nThe container works seamlessly with the configuration system:\n\n```python\nfrom earnbase_common.config import BaseSettings\n\nclass ServiceSettings(BaseSettings):\n \"\"\"Service-specific settings.\"\"\"\n \n def _load_yaml_mappings(self, config: Dict[str, Any]) -> Dict[str, Any]:\n \"\"\"Load service-specific mappings.\"\"\"\n mappings = super()._load_yaml_mappings(config)\n \n # Add service-specific mappings\n service_mappings = {\n \"SERVICE_NAME\": config[\"service\"][\"name\"],\n \"MONGODB_URL\": config[\"mongodb\"][\"url\"],\n \"REDIS_URL\": config[\"redis\"].get(\"url\"),\n \"METRICS_ENABLED\": config[\"metrics\"].get(\"enabled\", True),\n }\n \n mappings.update(service_mappings)\n return mappings\n```\n\n### Best Practices\n\n1. **Resource Management**:\n - Always use `init_resources()` and `shutdown_resources()`\n - Handle optional resources like Redis gracefully\n - Implement proper error handling for resource initialization\n\n2. **Configuration**:\n - Use type-safe configuration with proper defaults\n - Handle optional settings gracefully\n - Validate configuration during initialization\n\n3. **Dependency Wiring**:\n - Wire containers at application startup\n - Use proper package scoping for wiring\n - Avoid circular dependencies\n\n4. **Error Handling**:\n - Handle resource initialization failures\n - Implement proper cleanup in shutdown\n - Log resource lifecycle events\n\n### Configuration\n\nThe configuration system provides a flexible and type-safe way to manage settings:\n\n```python\nfrom earnbase_common.config import BaseSettings\n\nclass ServiceSettings(BaseSettings):\n \"\"\"Service-specific settings.\"\"\"\n \n # Default values with type hints\n SERVICE_NAME: str\n DEBUG: bool = True\n HTTP_PORT: int = 8000\n\n# Load from file\nsettings = ServiceSettings(\"config.yaml\")\n\n# Load with environment variables\n# SERVICE_NAME=my-service python app.py\n\n# Load with direct arguments\nsettings = ServiceSettings(\n config_path=\"config.yaml\",\n DEBUG=False,\n HTTP_PORT=9000\n)\n```\n\nKey features:\n- Multiple configuration sources (YAML, env vars, direct args)\n- Type validation and immutability\n- Environment-specific settings\n- Secure handling of sensitive data\n- Service-specific prefixes for env vars\n\nFor detailed documentation, see [Configuration](docs/config.md).\n\n### Dependency Injection\n\nThe containers module provides a powerful dependency injection system:\n\n```python\nfrom earnbase_common.containers import BaseContainer\nfrom dependency_injector import providers\n\nclass ServiceContainer(BaseContainer):\n \"\"\"Service container.\"\"\"\n \n # Common providers are pre-configured:\n # - config: Settings management\n # - mongodb: Database connection\n # - redis: Cache client\n # - metrics: Metrics collection\n \n # Add service-specific providers\n repository = providers.Singleton(\n Repository,\n mongodb=mongodb\n )\n \n service = providers.Singleton(\n Service,\n repository=repository,\n redis=redis\n )\n\n# Resource lifecycle management\nasync def lifespan(app: FastAPI):\n container = ServiceContainer()\n try:\n await container.init_resources()\n container.wire(packages=[\"my_service\"])\n yield\n finally:\n await container.shutdown_resources()\n```\n\nKey features:\n- Pre-configured common providers\n- Resource lifecycle management\n- Integration with FastAPI\n- Testing support with provider overrides\n- Async resource providers\n- Factory and contextual providers\n\nFor detailed documentation, see [Containers](docs/containers.md).\n\n- **Middleware**: HTTP middleware components\n```python\nfrom fastapi import FastAPI\nfrom earnbase_common.middleware import (\n SecurityHeadersMiddleware,\n RequestTrackingMiddleware\n)\n\napp = FastAPI()\n\n# Add security headers\napp.add_middleware(SecurityHeadersMiddleware) # Adds security headers\n\n# Add request tracking\napp.add_middleware(RequestTrackingMiddleware) # Tracks request details\n\n@app.get(\"/users\")\nasync def get_users(request: Request):\n # Access request tracking info\n request_id = request.state.request_id\n start_time = request.state.start_time\n \n return {\"request_id\": request_id}\n```\n\nKey features:\n- Security headers middleware (XSS, CSP, HSTS)\n- Request tracking with unique IDs\n- Request/Response logging\n- Performance monitoring\n- Error handling\n\nFor detailed documentation, see [Middleware](docs/middleware.md).\n\n### Redis\n\nRedis client with caching and session management:\n\n```python\nfrom earnbase_common.redis import RedisClient\n\n# Connect to Redis\nredis = await RedisClient.connect(\n url=\"redis://localhost:6379\",\n db=0,\n prefix=\"myapp\", # Optional key prefix\n ttl=3600 # Default TTL in seconds\n)\n\n# Basic operations\nawait redis.set(\"user:123\", \"John Doe\")\nvalue = await redis.get(\"user:123\") # \"John Doe\"\n\n# Custom expiration\nawait redis.set(\"session:abc\", \"data\", expire=1800) # 30 minutes\n\n# Check existence and TTL\nexists = await redis.exists(\"user:123\") # True\nttl = await redis.ttl(\"session:abc\") # Seconds remaining\n\n# Close connection\nawait redis.close()\n```\n\nKey features:\n- Connection pooling and management\n- Key prefixing and TTL configuration\n- Structured error handling and logging\n- Support for caching and sessions\n- Distributed locks and rate limiting\n- Pub/Sub messaging\n\nFor detailed documentation, see [Redis](docs/redis.md).\n\n### Responses\n\nStandardized API response models:\n\n```python\nfrom earnbase_common.responses import (\n SuccessResponse,\n ErrorResponse,\n PaginatedResponse,\n CustomJSONResponse\n)\n\n# Success response\nresponse = SuccessResponse(\n message=\"User created\",\n data={\"id\": \"123\", \"name\": \"John\"},\n meta={\"timestamp\": \"2024-01-12T00:00:00Z\"}\n)\n\n# Error response\nerror = ErrorResponse(\n error=\"Validation failed\",\n details={\"field\": \"email\", \"message\": \"Invalid format\"},\n errors=[\n {\"field\": \"email\", \"message\": \"Invalid format\"},\n {\"field\": \"phone\", \"message\": \"Required field\"}\n ]\n)\n\n# Paginated response\npaginated = PaginatedResponse(\n data=[{\"id\": \"1\"}, {\"id\": \"2\"}],\n meta={\n \"page\": 1,\n \"per_page\": 10,\n \"total\": 100,\n \"total_pages\": 10\n }\n)\n\n# FastAPI integration\napp = FastAPI()\n\n@app.get(\n \"/users/{user_id}\",\n response_model=SuccessResponse,\n response_class=CustomJSONResponse\n)\nasync def get_user(user_id: str):\n return {\n \"id\": user_id,\n \"name\": \"John Doe\"\n }\n```\n\nKey features:\n- Standardized response structure\n- Type validation with Pydantic\n- Error handling with details\n- Pagination support\n- Custom JSON formatting\n- FastAPI integration\n\nFor detailed documentation, see [Responses](docs/responses.md).\n``` \n</rewritten_file>",
"bugtrack_url": null,
"license": "MIT",
"summary": "Common utilities for Earnbase services",
"version": "0.2.7",
"project_urls": {
"Bug Tracker": "https://github.com/earnbaseio/earnbase-common/issues",
"Changelog": "https://github.com/earnbaseio/earnbase-common/blob/main/CHANGELOG.md",
"Documentation": "https://github.com/earnbaseio/earnbase-common#readme",
"Homepage": "https://github.com/earnbaseio/earnbase-common",
"Source Code": "https://github.com/earnbaseio/earnbase-common"
},
"split_keywords": [
"logging",
" utilities",
" microservices",
" fastapi",
" mongodb",
" redis"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "9490a0dfe93bfc3fb9494325e331b1ab1f08854c99e7f43cf6be4ee0e84e9435",
"md5": "80164a2da97814caa8949df1767cd705",
"sha256": "1a4cdded0797b847e7cb0ea299386b858344da31cdf338ff432211066944f968"
},
"downloads": -1,
"filename": "earnbase_common-0.2.7-py3-none-any.whl",
"has_sig": false,
"md5_digest": "80164a2da97814caa8949df1767cd705",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.9",
"size": 36546,
"upload_time": "2025-01-12T10:07:00",
"upload_time_iso_8601": "2025-01-12T10:07:00.207818Z",
"url": "https://files.pythonhosted.org/packages/94/90/a0dfe93bfc3fb9494325e331b1ab1f08854c99e7f43cf6be4ee0e84e9435/earnbase_common-0.2.7-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "4307a3b28976643d272e5daf638591d2266ab91a129e962dc41db8ad30f6574b",
"md5": "fd816eb7ba1e279f9847944d5c02142a",
"sha256": "aa2fdbd15bc291b42500ce95693f881ebf670d7530471a70c1cbb774af2f751b"
},
"downloads": -1,
"filename": "earnbase_common-0.2.7.tar.gz",
"has_sig": false,
"md5_digest": "fd816eb7ba1e279f9847944d5c02142a",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.9",
"size": 29865,
"upload_time": "2025-01-12T10:07:04",
"upload_time_iso_8601": "2025-01-12T10:07:04.685232Z",
"url": "https://files.pythonhosted.org/packages/43/07/a3b28976643d272e5daf638591d2266ab91a129e962dc41db8ad30f6574b/earnbase_common-0.2.7.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-01-12 10:07:04",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "earnbaseio",
"github_project": "earnbase-common",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "earnbase-common"
}