# Universal ClickHouse to RabbitMQ Data Sync
This application provides a **universal data synchronization system** that can sync data from any ClickHouse table to RabbitMQ queues with configurable field mappings and transformations. No code changes required - just configure through JSON files!
## 🚀 Key Features
- **Universal Configuration**: Support any ClickHouse table through JSON configuration
- **Dynamic Field Mapping**: Map source fields to target JSON fields with validation
- **Data Transformation**: Built-in transformations for dates, strings, and data types
- **Comprehensive Validation**: JSON schema validation and database compatibility checks
- **Error Handling**: Robust error handling with detailed reporting
- **Backward Compatibility**: Works with existing configurations
- **Functional API**: Can be used as a library in other applications
- **Flexible Execution**: Command-line tool or programmatic function calls
## Prerequisites
- Python 3.7+
- ClickHouse server
- RabbitMQ server
## Installation
### Option 1: Install as a Package (Recommended)
Using Poetry (recommended for development):
```bash
# Clone the repository
git clone <repository-url>
cd rabbitmq-json-sender
# Install with Poetry
poetry install
# Or install in existing virtual environment
source .venv/bin/activate
poetry install
```
Using pip:
```bash
# Install from source
pip install .
# Or install in development mode
pip install -e .
```
After installation, the package can be used in two ways:
**As a CLI tool:**
```bash
rabbitmq-sync --help
rabbitmq-sync --env-file .env.company
```
**As a Python library:**
```python
from rabbitmq_json_sender import DataSyncAPI, SyncConfig
config = SyncConfig(
clickhouse_host="localhost",
rabbitmq_host="localhost"
)
api = DataSyncAPI(config)
result = api.sync_data()
```
### Option 2: Manual Installation
1. Clone the repository:
```bash
git clone <repository-url>
cd rabbitmq-json-sender
```
2. Create and activate a virtual environment:
```bash
python -m venv .venv
source .venv/bin/activate # On Windows: .venv\Scripts\activate
```
3. Install dependencies:
```bash
pip install -r requirements.txt
```
4. Copy the example environment file and update it with your configuration:
```bash
cp .env.example .env
```
Edit the `.env` file with your ClickHouse and RabbitMQ connection details.
## Configuration
### Environment Configuration
Edit the `.env` file with your connection details:
```env
# ClickHouse Configuration
CLICKHOUSE_HOST=localhost
CLICKHOUSE_PORT=9000
CLICKHOUSE_USER=default
CLICKHOUSE_PASSWORD=your_password
CLICKHOUSE_DATABASE=mart
# RabbitMQ Configuration
RABBITMQ_HOST=localhost
RABBITMQ_PORT=5672
RABBITMQ_USER=guest
RABBITMQ_PASSWORD=guest
RABBITMQ_QUEUE=legal_entities_queue
RABBITMQ_EXCHANGE=data_exchange
RABBITMQ_EXCHANGE_TYPE=direct
RABBITMQ_ROUTING_KEY=data.processed
# Application Configuration
BATCH_SIZE=1000
MAX_RETRIES=3
PROCESS_LIMIT=-1
# Field Mapping Configuration
FIELD_MAPPING_CONFIG_PATH=field_mapping.json
```
### Field Mapping Configuration
The heart of the universal system is the `field_mapping.json` file that defines:
- Source table and fields
- Target JSON field mappings
- Data validation rules
- Transformation settings
#### Basic Structure
```json
{
"metadata": {
"version": "1.0",
"description": "Field mapping for legal entities"
},
"source": {
"table": "datamart.legal_entities_view",
"primary_key": "id",
"batch_column": "id",
"order_by": "id"
},
"field_mappings": {
"source_field": {
"target": "TARGET_FIELD",
"type": "string",
"required": true,
"validation": {
"min_length": 1
}
}
},
"transformations": {
"date_format": "iso",
"remove_null_values": true,
"encoding": "utf-8"
},
"validation_rules": {
"required_fields": ["TARGET_FIELD"],
"skip_record_if_missing": ["source_field"]
}
}
```
#### Field Mapping Options
Each field mapping supports:
- **target**: Target field name in output JSON (null to exclude)
- **type**: Data type (`string`, `integer`, `date`, `float`, `boolean`)
- **required**: Whether field is required
- **alias**: SQL alias for the field
- **validation**: Validation rules
- `type`: `digits_only`, `email`, `url`, `regex`
- `min_length`, `max_length`: Length constraints
- `pattern`: Regex pattern
- **transformation**: Data transformation (`iso_format`, `uppercase`, `lowercase`)
#### Example Field Mappings
```json
{
"bin": {
"target": "UF_BIN",
"type": "string",
"required": true,
"validation": {
"type": "digits_only",
"min_length": 12,
"max_length": 12
}
},
"company_name": {
"target": "UF_COMPANY_NAME",
"type": "string",
"required": true
},
"register_date": {
"target": "UF_REGISTER_DATE",
"type": "date",
"transformation": "iso_format"
},
"internal_id": {
"target": null,
"type": "integer",
"description": "Internal ID, not included in output"
}
}
```
## Usage
### Command-Line Usage
#### Basic Execution
1. **Validate Configuration** (recommended):
```bash
python main.py --validate-only
```
2. **Start Data Synchronization**:
```bash
python main.py
```
3. **Using Different Environment Files**:
```bash
# Use preset configurations
python main.py --env-preset company
python main.py --env-preset products
# Use custom .env file
python main.py --env-file /path/to/custom.env
```
4. **Process Specific Range**:
```bash
# Process 1000 records starting from offset 500
python main.py --offset 500 --limit 1000
```
5. **Use Legacy Mode**:
```bash
# Use the legacy DataSyncApp class
python main.py --use-legacy
```
#### Command-Line Options
```bash
python main.py [OPTIONS]
Options:
--env-file PATH Path to .env file (default: .env)
--env-preset PRESET Use preset: default, company, or products
--offset N Starting offset for data retrieval (default: 0)
--limit N Maximum records to process (overrides PROCESS_LIMIT)
--use-legacy Use legacy DataSyncApp class
--validate-only Only validate configuration without running sync
-h, --help Show help message
```
### Programmatic Usage (Functional API)
The application can be imported and used as a library in other Python applications.
#### Example 1: Basic Usage
```python
from data_sync_api import DataSyncAPI, SyncConfig
# Create configuration
config = SyncConfig(
clickhouse_host="localhost",
clickhouse_port=9000,
clickhouse_user="default",
clickhouse_password="",
clickhouse_database="mart",
rabbitmq_host="localhost",
rabbitmq_port=5672,
rabbitmq_user="guest",
rabbitmq_password="guest",
batch_size=100,
process_limit=1000
)
# Create API instance and sync data
api = DataSyncAPI(config)
result = api.sync_data()
# Check results
if result.success:
print(f"Processed: {result.total_processed}")
print(f"Published: {result.total_published}")
print(f"Success rate: {result.details.get('success_rate', 0):.1f}%")
else:
print(f"Failed: {result.error_message}")
```
#### Example 2: Using Environment Files
```python
from data_sync_api import create_config_from_env, DataSyncAPI
# Load configuration from .env file
config = create_config_from_env(".env.company")
# Customize settings
config.process_limit = 500
config.enable_logging = False # Disable logging for library use
# Sync data with offset and limit
api = DataSyncAPI(config)
result = api.sync_data(offset=100, limit=200)
print(f"Result: {'Success' if result.success else 'Failed'}")
```
#### Example 3: Simple Function Call
```python
from data_sync_api import sync_data_simple
# Define configurations
clickhouse_config = {
"clickhouse_host": "localhost",
"clickhouse_port": 9000,
"clickhouse_user": "default",
"clickhouse_password": "",
"clickhouse_database": "mart"
}
rabbitmq_config = {
"rabbitmq_host": "localhost",
"rabbitmq_port": 5672,
"rabbitmq_user": "guest",
"rabbitmq_password": "guest"
}
# Sync data
result = sync_data_simple(
clickhouse_config=clickhouse_config,
rabbitmq_config=rabbitmq_config,
batch_size=200,
process_limit=100
)
```
#### Example 4: Integration in Application
```python
from data_sync_api import DataSyncAPI, create_config_from_env
class DataProcessor:
def __init__(self, env_file: str = None):
self.config = create_config_from_env(env_file)
self.config.enable_logging = False
self.api = DataSyncAPI(self.config)
def sync_companies(self, max_records: int = None) -> dict:
"""Sync company data and return summary."""
if max_records:
self.config.process_limit = max_records
result = self.api.sync_data()
return {
'success': result.success,
'processed': result.total_processed,
'published': result.total_published,
'failed': result.total_failed,
'execution_time': result.execution_time,
'error': result.error_message
}
def health_check(self) -> bool:
"""Check if connections are working."""
validation = self.api.validate_configuration()
return validation.success
# Use in application
processor = DataProcessor(".env.company")
if processor.health_check():
summary = processor.sync_companies(max_records=100)
print(f"Sync summary: {summary}")
```
#### Example 5: Configuration Validation
```python
from data_sync_api import DataSyncAPI, SyncConfig
config = SyncConfig(
clickhouse_host="localhost",
rabbitmq_host="localhost"
)
api = DataSyncAPI(config)
# Validate configuration
validation_result = api.validate_configuration()
if validation_result.success:
print("✅ Configuration is valid")
print(f"ClickHouse records: {validation_result.details.get('clickhouse_records')}")
print(f"RabbitMQ connected: {validation_result.details.get('rabbitmq_connected')}")
else:
print(f"❌ Validation failed: {validation_result.error_message}")
```
#### SyncConfig Parameters
```python
SyncConfig(
# ClickHouse configuration
clickhouse_host: str = "localhost",
clickhouse_port: int = 9000,
clickhouse_user: str = "default",
clickhouse_password: str = "",
clickhouse_database: str = "mart",
clickhouse_table: str = "datamart.statgov_snap_legal_entities_v1_2_view_latest",
# RabbitMQ configuration
rabbitmq_host: str = "localhost",
rabbitmq_port: int = 5672,
rabbitmq_user: str = "guest",
rabbitmq_password: str = "guest",
rabbitmq_queue: str = "legal_entities_queue",
rabbitmq_exchange: str = "data_exchange",
rabbitmq_exchange_type: str = "direct",
rabbitmq_routing_key: str = "data.processed",
# Processing configuration
batch_size: int = 1000,
max_retries: int = 3,
process_limit: int = -1, # -1 means no limit
# Field mapping configuration
field_mapping_config_path: Optional[str] = "field_mapping.json",
# Logging configuration
enable_logging: bool = True,
log_level: str = "INFO"
)
```
#### SyncResult Structure
```python
@dataclass
class SyncResult:
success: bool # Whether sync was successful
total_processed: int = 0 # Total records processed
total_published: int = 0 # Total records published to RabbitMQ
total_failed: int = 0 # Total records that failed
batches_processed: int = 0 # Number of batches processed
error_message: Optional[str] # Error message if failed
execution_time: float = 0.0 # Execution time in seconds
details: Dict[str, Any] # Additional details (success_rate, etc.)
```
### More Examples
See `usage_examples.py` for comprehensive examples including:
- Batch processing with custom logic
- Error recovery and retry mechanisms
- Integration patterns for larger applications
## Data Flow
1. **Configuration Loading**: Load field mapping from JSON file
2. **Validation**: Validate configuration and database compatibility
3. **Dynamic SQL Generation**: Build SELECT query based on field mappings
4. **Data Extraction**: Fetch data from configured ClickHouse table
5. **Universal Transformation**: Transform data using configurable rules
6. **Validation & Filtering**: Apply validation rules and filters
7. **Publishing**: Send transformed data to RabbitMQ queue
## Validation System
The application includes a comprehensive validation system:
### Configuration Validation
- **JSON Schema Validation**: Validates structure against schema
- **Business Rules**: Checks field mappings and cross-references
- **Type Compatibility**: Ensures data types are compatible
### Database Validation
- **Schema Compatibility**: Verifies fields exist in database
- **Type Mapping**: Checks ClickHouse to config type compatibility
- **Sample Query Testing**: Tests actual data retrieval
### Runtime Validation
- **Field Validation**: Validates individual field values
- **Record Validation**: Checks required fields and constraints
- **Error Reporting**: Detailed logging of validation issues
## Error Handling
Enhanced error handling includes:
- **Configuration Errors**: Invalid JSON, missing fields, type mismatches
- **Database Errors**: Connection issues, missing tables/fields, query failures
- **Transformation Errors**: Data type conversion, validation failures
- **Publishing Errors**: RabbitMQ connection issues, message failures
- **Retry Logic**: Automatic retries with exponential backoff
- **Graceful Degradation**: Fallback to hardcoded transformation if needed
## Troubleshooting
### Common Issues
1. **Configuration Validation Fails**
```bash
# Validate using the new API
python main.py --validate-only
# Check configuration syntax
python -c "import json; json.load(open('field_mapping.json'))"
```
2. **Database Connection Issues**
```bash
# Test ClickHouse connection
python check_connection.py
```
3. **Field Mapping Errors**
```bash
# Test field mappings
python test_field_mapping.py
```
4. **Transformation Issues**
```bash
# Test transformer
python test_universal_transform.py
```
### Debug Mode
Enable debug logging:
```python
# In your script
from data_sync_api import DataSyncAPI, SyncConfig
config = SyncConfig(
# ... other config ...
enable_logging=True,
log_level="DEBUG"
)
api = DataSyncAPI(config)
```
Or set it globally:
```python
import logging
logging.basicConfig(level=logging.DEBUG)
```
## Migration Guide
### From Hardcoded to Universal Configuration
1. **Create Field Mapping**: Use existing `field_mapping.json` as template
2. **Update Environment**: Add `FIELD_MAPPING_CONFIG_PATH` to `.env`
3. **Validate Configuration**: Run validation tests
4. **Test with Sample Data**: Verify transformations work correctly
5. **Deploy**: The system automatically uses universal transformer
### Backward Compatibility
The system maintains backward compatibility:
- If no field mapping config is found, falls back to hardcoded transformation
- Existing `.env` configurations continue to work
- No changes required to existing deployment scripts
- Legacy mode available with `--use-legacy` flag
## Examples
See the following files for examples:
- `examples/` directory: Different table configurations and field mapping patterns
- `usage_examples.py`: Comprehensive functional API usage examples
## License
This project is licensed under the MIT License.
Raw data
{
"_id": null,
"home_page": "https://github.com/yourusername/rabbitmq-json-sender",
"name": "rabbitmq-json-sender",
"maintainer": null,
"docs_url": null,
"requires_python": "<4.0,>=3.9",
"maintainer_email": null,
"keywords": "rabbitmq, clickhouse, data-sync, etl",
"author": "Vladimir Muzychenko",
"author_email": "vladimir.muzychenko@gmail.com",
"download_url": "https://files.pythonhosted.org/packages/7c/f2/e27337baf70737f4ec8b83352b6d249175f4dd54871d62a1ae999b19282a/rabbitmq_json_sender-0.1.4.tar.gz",
"platform": null,
"description": "# Universal ClickHouse to RabbitMQ Data Sync\n\nThis application provides a **universal data synchronization system** that can sync data from any ClickHouse table to RabbitMQ queues with configurable field mappings and transformations. No code changes required - just configure through JSON files!\n\n## \ud83d\ude80 Key Features\n\n- **Universal Configuration**: Support any ClickHouse table through JSON configuration\n- **Dynamic Field Mapping**: Map source fields to target JSON fields with validation\n- **Data Transformation**: Built-in transformations for dates, strings, and data types\n- **Comprehensive Validation**: JSON schema validation and database compatibility checks\n- **Error Handling**: Robust error handling with detailed reporting\n- **Backward Compatibility**: Works with existing configurations\n- **Functional API**: Can be used as a library in other applications\n- **Flexible Execution**: Command-line tool or programmatic function calls\n\n## Prerequisites\n\n- Python 3.7+\n- ClickHouse server\n- RabbitMQ server\n\n## Installation\n\n### Option 1: Install as a Package (Recommended)\n\nUsing Poetry (recommended for development):\n\n```bash\n# Clone the repository\ngit clone <repository-url>\ncd rabbitmq-json-sender\n\n# Install with Poetry\npoetry install\n\n# Or install in existing virtual environment\nsource .venv/bin/activate\npoetry install\n```\n\nUsing pip:\n\n```bash\n# Install from source\npip install .\n\n# Or install in development mode\npip install -e .\n```\n\nAfter installation, the package can be used in two ways:\n\n**As a CLI tool:**\n```bash\nrabbitmq-sync --help\nrabbitmq-sync --env-file .env.company\n```\n\n**As a Python library:**\n```python\nfrom rabbitmq_json_sender import DataSyncAPI, SyncConfig\n\nconfig = SyncConfig(\n clickhouse_host=\"localhost\",\n rabbitmq_host=\"localhost\"\n)\napi = DataSyncAPI(config)\nresult = api.sync_data()\n```\n\n### Option 2: Manual Installation\n\n1. Clone the repository:\n ```bash\n git clone <repository-url>\n cd rabbitmq-json-sender\n ```\n\n2. Create and activate a virtual environment:\n ```bash\n python -m venv .venv\n source .venv/bin/activate # On Windows: .venv\\Scripts\\activate\n ```\n\n3. Install dependencies:\n ```bash\n pip install -r requirements.txt\n ```\n\n4. Copy the example environment file and update it with your configuration:\n ```bash\n cp .env.example .env\n ```\n Edit the `.env` file with your ClickHouse and RabbitMQ connection details.\n\n## Configuration\n\n### Environment Configuration\n\nEdit the `.env` file with your connection details:\n\n```env\n# ClickHouse Configuration\nCLICKHOUSE_HOST=localhost\nCLICKHOUSE_PORT=9000\nCLICKHOUSE_USER=default\nCLICKHOUSE_PASSWORD=your_password\nCLICKHOUSE_DATABASE=mart\n\n# RabbitMQ Configuration\nRABBITMQ_HOST=localhost\nRABBITMQ_PORT=5672\nRABBITMQ_USER=guest\nRABBITMQ_PASSWORD=guest\nRABBITMQ_QUEUE=legal_entities_queue\nRABBITMQ_EXCHANGE=data_exchange\nRABBITMQ_EXCHANGE_TYPE=direct\nRABBITMQ_ROUTING_KEY=data.processed\n\n# Application Configuration\nBATCH_SIZE=1000\nMAX_RETRIES=3\nPROCESS_LIMIT=-1\n\n# Field Mapping Configuration\nFIELD_MAPPING_CONFIG_PATH=field_mapping.json\n```\n\n### Field Mapping Configuration\n\nThe heart of the universal system is the `field_mapping.json` file that defines:\n- Source table and fields\n- Target JSON field mappings\n- Data validation rules\n- Transformation settings\n\n#### Basic Structure\n\n```json\n{\n \"metadata\": {\n \"version\": \"1.0\",\n \"description\": \"Field mapping for legal entities\"\n },\n \"source\": {\n \"table\": \"datamart.legal_entities_view\",\n \"primary_key\": \"id\",\n \"batch_column\": \"id\",\n \"order_by\": \"id\"\n },\n \"field_mappings\": {\n \"source_field\": {\n \"target\": \"TARGET_FIELD\",\n \"type\": \"string\",\n \"required\": true,\n \"validation\": {\n \"min_length\": 1\n }\n }\n },\n \"transformations\": {\n \"date_format\": \"iso\",\n \"remove_null_values\": true,\n \"encoding\": \"utf-8\"\n },\n \"validation_rules\": {\n \"required_fields\": [\"TARGET_FIELD\"],\n \"skip_record_if_missing\": [\"source_field\"]\n }\n}\n```\n\n#### Field Mapping Options\n\nEach field mapping supports:\n\n- **target**: Target field name in output JSON (null to exclude)\n- **type**: Data type (`string`, `integer`, `date`, `float`, `boolean`)\n- **required**: Whether field is required\n- **alias**: SQL alias for the field\n- **validation**: Validation rules\n - `type`: `digits_only`, `email`, `url`, `regex`\n - `min_length`, `max_length`: Length constraints\n - `pattern`: Regex pattern\n- **transformation**: Data transformation (`iso_format`, `uppercase`, `lowercase`)\n\n#### Example Field Mappings\n\n```json\n{\n \"bin\": {\n \"target\": \"UF_BIN\",\n \"type\": \"string\",\n \"required\": true,\n \"validation\": {\n \"type\": \"digits_only\",\n \"min_length\": 12,\n \"max_length\": 12\n }\n },\n \"company_name\": {\n \"target\": \"UF_COMPANY_NAME\",\n \"type\": \"string\",\n \"required\": true\n },\n \"register_date\": {\n \"target\": \"UF_REGISTER_DATE\",\n \"type\": \"date\",\n \"transformation\": \"iso_format\"\n },\n \"internal_id\": {\n \"target\": null,\n \"type\": \"integer\",\n \"description\": \"Internal ID, not included in output\"\n }\n}\n```\n\n## Usage\n\n### Command-Line Usage\n\n#### Basic Execution\n\n1. **Validate Configuration** (recommended):\n ```bash\n python main.py --validate-only\n ```\n\n2. **Start Data Synchronization**:\n ```bash\n python main.py\n ```\n\n3. **Using Different Environment Files**:\n ```bash\n # Use preset configurations\n python main.py --env-preset company\n python main.py --env-preset products\n \n # Use custom .env file\n python main.py --env-file /path/to/custom.env\n ```\n\n4. **Process Specific Range**:\n ```bash\n # Process 1000 records starting from offset 500\n python main.py --offset 500 --limit 1000\n ```\n\n5. **Use Legacy Mode**:\n ```bash\n # Use the legacy DataSyncApp class\n python main.py --use-legacy\n ```\n\n#### Command-Line Options\n\n```bash\npython main.py [OPTIONS]\n\nOptions:\n --env-file PATH Path to .env file (default: .env)\n --env-preset PRESET Use preset: default, company, or products\n --offset N Starting offset for data retrieval (default: 0)\n --limit N Maximum records to process (overrides PROCESS_LIMIT)\n --use-legacy Use legacy DataSyncApp class\n --validate-only Only validate configuration without running sync\n -h, --help Show help message\n```\n\n### Programmatic Usage (Functional API)\n\nThe application can be imported and used as a library in other Python applications.\n\n#### Example 1: Basic Usage\n\n```python\nfrom data_sync_api import DataSyncAPI, SyncConfig\n\n# Create configuration\nconfig = SyncConfig(\n clickhouse_host=\"localhost\",\n clickhouse_port=9000,\n clickhouse_user=\"default\",\n clickhouse_password=\"\",\n clickhouse_database=\"mart\",\n rabbitmq_host=\"localhost\",\n rabbitmq_port=5672,\n rabbitmq_user=\"guest\",\n rabbitmq_password=\"guest\",\n batch_size=100,\n process_limit=1000\n)\n\n# Create API instance and sync data\napi = DataSyncAPI(config)\nresult = api.sync_data()\n\n# Check results\nif result.success:\n print(f\"Processed: {result.total_processed}\")\n print(f\"Published: {result.total_published}\")\n print(f\"Success rate: {result.details.get('success_rate', 0):.1f}%\")\nelse:\n print(f\"Failed: {result.error_message}\")\n```\n\n#### Example 2: Using Environment Files\n\n```python\nfrom data_sync_api import create_config_from_env, DataSyncAPI\n\n# Load configuration from .env file\nconfig = create_config_from_env(\".env.company\")\n\n# Customize settings\nconfig.process_limit = 500\nconfig.enable_logging = False # Disable logging for library use\n\n# Sync data with offset and limit\napi = DataSyncAPI(config)\nresult = api.sync_data(offset=100, limit=200)\n\nprint(f\"Result: {'Success' if result.success else 'Failed'}\")\n```\n\n#### Example 3: Simple Function Call\n\n```python\nfrom data_sync_api import sync_data_simple\n\n# Define configurations\nclickhouse_config = {\n \"clickhouse_host\": \"localhost\",\n \"clickhouse_port\": 9000,\n \"clickhouse_user\": \"default\",\n \"clickhouse_password\": \"\",\n \"clickhouse_database\": \"mart\"\n}\n\nrabbitmq_config = {\n \"rabbitmq_host\": \"localhost\",\n \"rabbitmq_port\": 5672,\n \"rabbitmq_user\": \"guest\",\n \"rabbitmq_password\": \"guest\"\n}\n\n# Sync data\nresult = sync_data_simple(\n clickhouse_config=clickhouse_config,\n rabbitmq_config=rabbitmq_config,\n batch_size=200,\n process_limit=100\n)\n```\n\n#### Example 4: Integration in Application\n\n```python\nfrom data_sync_api import DataSyncAPI, create_config_from_env\n\nclass DataProcessor:\n def __init__(self, env_file: str = None):\n self.config = create_config_from_env(env_file)\n self.config.enable_logging = False\n self.api = DataSyncAPI(self.config)\n \n def sync_companies(self, max_records: int = None) -> dict:\n \"\"\"Sync company data and return summary.\"\"\"\n if max_records:\n self.config.process_limit = max_records\n \n result = self.api.sync_data()\n \n return {\n 'success': result.success,\n 'processed': result.total_processed,\n 'published': result.total_published,\n 'failed': result.total_failed,\n 'execution_time': result.execution_time,\n 'error': result.error_message\n }\n \n def health_check(self) -> bool:\n \"\"\"Check if connections are working.\"\"\"\n validation = self.api.validate_configuration()\n return validation.success\n\n# Use in application\nprocessor = DataProcessor(\".env.company\")\nif processor.health_check():\n summary = processor.sync_companies(max_records=100)\n print(f\"Sync summary: {summary}\")\n```\n\n#### Example 5: Configuration Validation\n\n```python\nfrom data_sync_api import DataSyncAPI, SyncConfig\n\nconfig = SyncConfig(\n clickhouse_host=\"localhost\",\n rabbitmq_host=\"localhost\"\n)\n\napi = DataSyncAPI(config)\n\n# Validate configuration\nvalidation_result = api.validate_configuration()\n\nif validation_result.success:\n print(\"\u2705 Configuration is valid\")\n print(f\"ClickHouse records: {validation_result.details.get('clickhouse_records')}\")\n print(f\"RabbitMQ connected: {validation_result.details.get('rabbitmq_connected')}\")\nelse:\n print(f\"\u274c Validation failed: {validation_result.error_message}\")\n```\n\n#### SyncConfig Parameters\n\n```python\nSyncConfig(\n # ClickHouse configuration\n clickhouse_host: str = \"localhost\",\n clickhouse_port: int = 9000,\n clickhouse_user: str = \"default\",\n clickhouse_password: str = \"\",\n clickhouse_database: str = \"mart\",\n clickhouse_table: str = \"datamart.statgov_snap_legal_entities_v1_2_view_latest\",\n \n # RabbitMQ configuration\n rabbitmq_host: str = \"localhost\",\n rabbitmq_port: int = 5672,\n rabbitmq_user: str = \"guest\",\n rabbitmq_password: str = \"guest\",\n rabbitmq_queue: str = \"legal_entities_queue\",\n rabbitmq_exchange: str = \"data_exchange\",\n rabbitmq_exchange_type: str = \"direct\",\n rabbitmq_routing_key: str = \"data.processed\",\n \n # Processing configuration\n batch_size: int = 1000,\n max_retries: int = 3,\n process_limit: int = -1, # -1 means no limit\n \n # Field mapping configuration\n field_mapping_config_path: Optional[str] = \"field_mapping.json\",\n \n # Logging configuration\n enable_logging: bool = True,\n log_level: str = \"INFO\"\n)\n```\n\n#### SyncResult Structure\n\n```python\n@dataclass\nclass SyncResult:\n success: bool # Whether sync was successful\n total_processed: int = 0 # Total records processed\n total_published: int = 0 # Total records published to RabbitMQ\n total_failed: int = 0 # Total records that failed\n batches_processed: int = 0 # Number of batches processed\n error_message: Optional[str] # Error message if failed\n execution_time: float = 0.0 # Execution time in seconds\n details: Dict[str, Any] # Additional details (success_rate, etc.)\n```\n\n### More Examples\n\nSee `usage_examples.py` for comprehensive examples including:\n- Batch processing with custom logic\n- Error recovery and retry mechanisms\n- Integration patterns for larger applications\n\n## Data Flow\n\n1. **Configuration Loading**: Load field mapping from JSON file\n2. **Validation**: Validate configuration and database compatibility \n3. **Dynamic SQL Generation**: Build SELECT query based on field mappings\n4. **Data Extraction**: Fetch data from configured ClickHouse table\n5. **Universal Transformation**: Transform data using configurable rules\n6. **Validation & Filtering**: Apply validation rules and filters\n7. **Publishing**: Send transformed data to RabbitMQ queue\n\n## Validation System\n\nThe application includes a comprehensive validation system:\n\n### Configuration Validation\n- **JSON Schema Validation**: Validates structure against schema\n- **Business Rules**: Checks field mappings and cross-references\n- **Type Compatibility**: Ensures data types are compatible\n\n### Database Validation\n- **Schema Compatibility**: Verifies fields exist in database\n- **Type Mapping**: Checks ClickHouse to config type compatibility\n- **Sample Query Testing**: Tests actual data retrieval\n\n### Runtime Validation\n- **Field Validation**: Validates individual field values\n- **Record Validation**: Checks required fields and constraints\n- **Error Reporting**: Detailed logging of validation issues\n\n## Error Handling\n\nEnhanced error handling includes:\n- **Configuration Errors**: Invalid JSON, missing fields, type mismatches\n- **Database Errors**: Connection issues, missing tables/fields, query failures\n- **Transformation Errors**: Data type conversion, validation failures\n- **Publishing Errors**: RabbitMQ connection issues, message failures\n- **Retry Logic**: Automatic retries with exponential backoff\n- **Graceful Degradation**: Fallback to hardcoded transformation if needed\n\n## Troubleshooting\n\n### Common Issues\n\n1. **Configuration Validation Fails**\n ```bash\n # Validate using the new API\n python main.py --validate-only\n \n # Check configuration syntax\n python -c \"import json; json.load(open('field_mapping.json'))\"\n ```\n\n2. **Database Connection Issues**\n ```bash\n # Test ClickHouse connection\n python check_connection.py\n ```\n\n3. **Field Mapping Errors**\n ```bash\n # Test field mappings\n python test_field_mapping.py\n ```\n\n4. **Transformation Issues**\n ```bash\n # Test transformer\n python test_universal_transform.py\n ```\n\n### Debug Mode\n\nEnable debug logging:\n\n```python\n# In your script\nfrom data_sync_api import DataSyncAPI, SyncConfig\n\nconfig = SyncConfig(\n # ... other config ...\n enable_logging=True,\n log_level=\"DEBUG\"\n)\n\napi = DataSyncAPI(config)\n```\n\nOr set it globally:\n\n```python\nimport logging\nlogging.basicConfig(level=logging.DEBUG)\n```\n\n## Migration Guide\n\n### From Hardcoded to Universal Configuration\n\n1. **Create Field Mapping**: Use existing `field_mapping.json` as template\n2. **Update Environment**: Add `FIELD_MAPPING_CONFIG_PATH` to `.env`\n3. **Validate Configuration**: Run validation tests\n4. **Test with Sample Data**: Verify transformations work correctly\n5. **Deploy**: The system automatically uses universal transformer\n\n### Backward Compatibility\n\nThe system maintains backward compatibility:\n- If no field mapping config is found, falls back to hardcoded transformation\n- Existing `.env` configurations continue to work\n- No changes required to existing deployment scripts\n- Legacy mode available with `--use-legacy` flag\n\n## Examples\n\nSee the following files for examples:\n- `examples/` directory: Different table configurations and field mapping patterns\n- `usage_examples.py`: Comprehensive functional API usage examples\n\n## License\n\nThis project is licensed under the MIT License.\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "Universal ClickHouse to RabbitMQ data synchronization system with configurable field mappings",
"version": "0.1.4",
"project_urls": {
"Homepage": "https://github.com/yourusername/rabbitmq-json-sender",
"Repository": "https://github.com/yourusername/rabbitmq-json-sender"
},
"split_keywords": [
"rabbitmq",
" clickhouse",
" data-sync",
" etl"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "d983f78919f685717cdab6fc2e806828bc9f9e041fa7d0fe17cab89846ba3d3a",
"md5": "3c93560c32326572cb00d90b4006c894",
"sha256": "ccfba0bdd7212b5c32056f05e0d494f586584702c363ecdceac3f45ba076c392"
},
"downloads": -1,
"filename": "rabbitmq_json_sender-0.1.4-py3-none-any.whl",
"has_sig": false,
"md5_digest": "3c93560c32326572cb00d90b4006c894",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "<4.0,>=3.9",
"size": 28204,
"upload_time": "2025-10-20T12:24:21",
"upload_time_iso_8601": "2025-10-20T12:24:21.117374Z",
"url": "https://files.pythonhosted.org/packages/d9/83/f78919f685717cdab6fc2e806828bc9f9e041fa7d0fe17cab89846ba3d3a/rabbitmq_json_sender-0.1.4-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "7cf2e27337baf70737f4ec8b83352b6d249175f4dd54871d62a1ae999b19282a",
"md5": "8ef2c432687d79e3b04f733bd4bb9331",
"sha256": "cb664c8c6e0110da060560bc08a151139a48d519e6dd114bc501344b6333604c"
},
"downloads": -1,
"filename": "rabbitmq_json_sender-0.1.4.tar.gz",
"has_sig": false,
"md5_digest": "8ef2c432687d79e3b04f733bd4bb9331",
"packagetype": "sdist",
"python_version": "source",
"requires_python": "<4.0,>=3.9",
"size": 33905,
"upload_time": "2025-10-20T12:24:22",
"upload_time_iso_8601": "2025-10-20T12:24:22.405104Z",
"url": "https://files.pythonhosted.org/packages/7c/f2/e27337baf70737f4ec8b83352b6d249175f4dd54871d62a1ae999b19282a/rabbitmq_json_sender-0.1.4.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-10-20 12:24:22",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "yourusername",
"github_project": "rabbitmq-json-sender",
"github_not_found": true,
"lcname": "rabbitmq-json-sender"
}