rabbitmq-json-sender


Namerabbitmq-json-sender JSON
Version 0.1.4 PyPI version JSON
download
home_pagehttps://github.com/yourusername/rabbitmq-json-sender
SummaryUniversal ClickHouse to RabbitMQ data synchronization system with configurable field mappings
upload_time2025-10-20 12:24:22
maintainerNone
docs_urlNone
authorVladimir Muzychenko
requires_python<4.0,>=3.9
licenseMIT
keywords rabbitmq clickhouse data-sync etl
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Universal ClickHouse to RabbitMQ Data Sync

This application provides a **universal data synchronization system** that can sync data from any ClickHouse table to RabbitMQ queues with configurable field mappings and transformations. No code changes required - just configure through JSON files!

## 🚀 Key Features

- **Universal Configuration**: Support any ClickHouse table through JSON configuration
- **Dynamic Field Mapping**: Map source fields to target JSON fields with validation
- **Data Transformation**: Built-in transformations for dates, strings, and data types
- **Comprehensive Validation**: JSON schema validation and database compatibility checks
- **Error Handling**: Robust error handling with detailed reporting
- **Backward Compatibility**: Works with existing configurations
- **Functional API**: Can be used as a library in other applications
- **Flexible Execution**: Command-line tool or programmatic function calls

## Prerequisites

- Python 3.7+
- ClickHouse server
- RabbitMQ server

## Installation

### Option 1: Install as a Package (Recommended)

Using Poetry (recommended for development):

```bash
# Clone the repository
git clone <repository-url>
cd rabbitmq-json-sender

# Install with Poetry
poetry install

# Or install in existing virtual environment
source .venv/bin/activate
poetry install
```

Using pip:

```bash
# Install from source
pip install .

# Or install in development mode
pip install -e .
```

After installation, the package can be used in two ways:

**As a CLI tool:**
```bash
rabbitmq-sync --help
rabbitmq-sync --env-file .env.company
```

**As a Python library:**
```python
from rabbitmq_json_sender import DataSyncAPI, SyncConfig

config = SyncConfig(
    clickhouse_host="localhost",
    rabbitmq_host="localhost"
)
api = DataSyncAPI(config)
result = api.sync_data()
```

### Option 2: Manual Installation

1. Clone the repository:
   ```bash
   git clone <repository-url>
   cd rabbitmq-json-sender
   ```

2. Create and activate a virtual environment:
   ```bash
   python -m venv .venv
   source .venv/bin/activate  # On Windows: .venv\Scripts\activate
   ```

3. Install dependencies:
   ```bash
   pip install -r requirements.txt
   ```

4. Copy the example environment file and update it with your configuration:
   ```bash
   cp .env.example .env
   ```
   Edit the `.env` file with your ClickHouse and RabbitMQ connection details.

## Configuration

### Environment Configuration

Edit the `.env` file with your connection details:

```env
# ClickHouse Configuration
CLICKHOUSE_HOST=localhost
CLICKHOUSE_PORT=9000
CLICKHOUSE_USER=default
CLICKHOUSE_PASSWORD=your_password
CLICKHOUSE_DATABASE=mart

# RabbitMQ Configuration
RABBITMQ_HOST=localhost
RABBITMQ_PORT=5672
RABBITMQ_USER=guest
RABBITMQ_PASSWORD=guest
RABBITMQ_QUEUE=legal_entities_queue
RABBITMQ_EXCHANGE=data_exchange
RABBITMQ_EXCHANGE_TYPE=direct
RABBITMQ_ROUTING_KEY=data.processed

# Application Configuration
BATCH_SIZE=1000
MAX_RETRIES=3
PROCESS_LIMIT=-1

# Field Mapping Configuration
FIELD_MAPPING_CONFIG_PATH=field_mapping.json
```

### Field Mapping Configuration

The heart of the universal system is the `field_mapping.json` file that defines:
- Source table and fields
- Target JSON field mappings
- Data validation rules
- Transformation settings

#### Basic Structure

```json
{
  "metadata": {
    "version": "1.0",
    "description": "Field mapping for legal entities"
  },
  "source": {
    "table": "datamart.legal_entities_view",
    "primary_key": "id",
    "batch_column": "id",
    "order_by": "id"
  },
  "field_mappings": {
    "source_field": {
      "target": "TARGET_FIELD",
      "type": "string",
      "required": true,
      "validation": {
        "min_length": 1
      }
    }
  },
  "transformations": {
    "date_format": "iso",
    "remove_null_values": true,
    "encoding": "utf-8"
  },
  "validation_rules": {
    "required_fields": ["TARGET_FIELD"],
    "skip_record_if_missing": ["source_field"]
  }
}
```

#### Field Mapping Options

Each field mapping supports:

- **target**: Target field name in output JSON (null to exclude)
- **type**: Data type (`string`, `integer`, `date`, `float`, `boolean`)
- **required**: Whether field is required
- **alias**: SQL alias for the field
- **validation**: Validation rules
  - `type`: `digits_only`, `email`, `url`, `regex`
  - `min_length`, `max_length`: Length constraints
  - `pattern`: Regex pattern
- **transformation**: Data transformation (`iso_format`, `uppercase`, `lowercase`)

#### Example Field Mappings

```json
{
  "bin": {
    "target": "UF_BIN",
    "type": "string",
    "required": true,
    "validation": {
      "type": "digits_only",
      "min_length": 12,
      "max_length": 12
    }
  },
  "company_name": {
    "target": "UF_COMPANY_NAME",
    "type": "string",
    "required": true
  },
  "register_date": {
    "target": "UF_REGISTER_DATE",
    "type": "date",
    "transformation": "iso_format"
  },
  "internal_id": {
    "target": null,
    "type": "integer",
    "description": "Internal ID, not included in output"
  }
}
```

## Usage

### Command-Line Usage

#### Basic Execution

1. **Validate Configuration** (recommended):
   ```bash
   python main.py --validate-only
   ```

2. **Start Data Synchronization**:
   ```bash
   python main.py
   ```

3. **Using Different Environment Files**:
   ```bash
   # Use preset configurations
   python main.py --env-preset company
   python main.py --env-preset products
   
   # Use custom .env file
   python main.py --env-file /path/to/custom.env
   ```

4. **Process Specific Range**:
   ```bash
   # Process 1000 records starting from offset 500
   python main.py --offset 500 --limit 1000
   ```

5. **Use Legacy Mode**:
   ```bash
   # Use the legacy DataSyncApp class
   python main.py --use-legacy
   ```

#### Command-Line Options

```bash
python main.py [OPTIONS]

Options:
  --env-file PATH           Path to .env file (default: .env)
  --env-preset PRESET       Use preset: default, company, or products
  --offset N                Starting offset for data retrieval (default: 0)
  --limit N                 Maximum records to process (overrides PROCESS_LIMIT)
  --use-legacy              Use legacy DataSyncApp class
  --validate-only           Only validate configuration without running sync
  -h, --help                Show help message
```

### Programmatic Usage (Functional API)

The application can be imported and used as a library in other Python applications.

#### Example 1: Basic Usage

```python
from data_sync_api import DataSyncAPI, SyncConfig

# Create configuration
config = SyncConfig(
    clickhouse_host="localhost",
    clickhouse_port=9000,
    clickhouse_user="default",
    clickhouse_password="",
    clickhouse_database="mart",
    rabbitmq_host="localhost",
    rabbitmq_port=5672,
    rabbitmq_user="guest",
    rabbitmq_password="guest",
    batch_size=100,
    process_limit=1000
)

# Create API instance and sync data
api = DataSyncAPI(config)
result = api.sync_data()

# Check results
if result.success:
    print(f"Processed: {result.total_processed}")
    print(f"Published: {result.total_published}")
    print(f"Success rate: {result.details.get('success_rate', 0):.1f}%")
else:
    print(f"Failed: {result.error_message}")
```

#### Example 2: Using Environment Files

```python
from data_sync_api import create_config_from_env, DataSyncAPI

# Load configuration from .env file
config = create_config_from_env(".env.company")

# Customize settings
config.process_limit = 500
config.enable_logging = False  # Disable logging for library use

# Sync data with offset and limit
api = DataSyncAPI(config)
result = api.sync_data(offset=100, limit=200)

print(f"Result: {'Success' if result.success else 'Failed'}")
```

#### Example 3: Simple Function Call

```python
from data_sync_api import sync_data_simple

# Define configurations
clickhouse_config = {
    "clickhouse_host": "localhost",
    "clickhouse_port": 9000,
    "clickhouse_user": "default",
    "clickhouse_password": "",
    "clickhouse_database": "mart"
}

rabbitmq_config = {
    "rabbitmq_host": "localhost",
    "rabbitmq_port": 5672,
    "rabbitmq_user": "guest",
    "rabbitmq_password": "guest"
}

# Sync data
result = sync_data_simple(
    clickhouse_config=clickhouse_config,
    rabbitmq_config=rabbitmq_config,
    batch_size=200,
    process_limit=100
)
```

#### Example 4: Integration in Application

```python
from data_sync_api import DataSyncAPI, create_config_from_env

class DataProcessor:
    def __init__(self, env_file: str = None):
        self.config = create_config_from_env(env_file)
        self.config.enable_logging = False
        self.api = DataSyncAPI(self.config)
    
    def sync_companies(self, max_records: int = None) -> dict:
        """Sync company data and return summary."""
        if max_records:
            self.config.process_limit = max_records
        
        result = self.api.sync_data()
        
        return {
            'success': result.success,
            'processed': result.total_processed,
            'published': result.total_published,
            'failed': result.total_failed,
            'execution_time': result.execution_time,
            'error': result.error_message
        }
    
    def health_check(self) -> bool:
        """Check if connections are working."""
        validation = self.api.validate_configuration()
        return validation.success

# Use in application
processor = DataProcessor(".env.company")
if processor.health_check():
    summary = processor.sync_companies(max_records=100)
    print(f"Sync summary: {summary}")
```

#### Example 5: Configuration Validation

```python
from data_sync_api import DataSyncAPI, SyncConfig

config = SyncConfig(
    clickhouse_host="localhost",
    rabbitmq_host="localhost"
)

api = DataSyncAPI(config)

# Validate configuration
validation_result = api.validate_configuration()

if validation_result.success:
    print("✅ Configuration is valid")
    print(f"ClickHouse records: {validation_result.details.get('clickhouse_records')}")
    print(f"RabbitMQ connected: {validation_result.details.get('rabbitmq_connected')}")
else:
    print(f"❌ Validation failed: {validation_result.error_message}")
```

#### SyncConfig Parameters

```python
SyncConfig(
    # ClickHouse configuration
    clickhouse_host: str = "localhost",
    clickhouse_port: int = 9000,
    clickhouse_user: str = "default",
    clickhouse_password: str = "",
    clickhouse_database: str = "mart",
    clickhouse_table: str = "datamart.statgov_snap_legal_entities_v1_2_view_latest",
    
    # RabbitMQ configuration
    rabbitmq_host: str = "localhost",
    rabbitmq_port: int = 5672,
    rabbitmq_user: str = "guest",
    rabbitmq_password: str = "guest",
    rabbitmq_queue: str = "legal_entities_queue",
    rabbitmq_exchange: str = "data_exchange",
    rabbitmq_exchange_type: str = "direct",
    rabbitmq_routing_key: str = "data.processed",
    
    # Processing configuration
    batch_size: int = 1000,
    max_retries: int = 3,
    process_limit: int = -1,  # -1 means no limit
    
    # Field mapping configuration
    field_mapping_config_path: Optional[str] = "field_mapping.json",
    
    # Logging configuration
    enable_logging: bool = True,
    log_level: str = "INFO"
)
```

#### SyncResult Structure

```python
@dataclass
class SyncResult:
    success: bool                    # Whether sync was successful
    total_processed: int = 0         # Total records processed
    total_published: int = 0         # Total records published to RabbitMQ
    total_failed: int = 0            # Total records that failed
    batches_processed: int = 0       # Number of batches processed
    error_message: Optional[str]     # Error message if failed
    execution_time: float = 0.0      # Execution time in seconds
    details: Dict[str, Any]          # Additional details (success_rate, etc.)
```

### More Examples

See `usage_examples.py` for comprehensive examples including:
- Batch processing with custom logic
- Error recovery and retry mechanisms
- Integration patterns for larger applications

## Data Flow

1. **Configuration Loading**: Load field mapping from JSON file
2. **Validation**: Validate configuration and database compatibility  
3. **Dynamic SQL Generation**: Build SELECT query based on field mappings
4. **Data Extraction**: Fetch data from configured ClickHouse table
5. **Universal Transformation**: Transform data using configurable rules
6. **Validation & Filtering**: Apply validation rules and filters
7. **Publishing**: Send transformed data to RabbitMQ queue

## Validation System

The application includes a comprehensive validation system:

### Configuration Validation
- **JSON Schema Validation**: Validates structure against schema
- **Business Rules**: Checks field mappings and cross-references
- **Type Compatibility**: Ensures data types are compatible

### Database Validation
- **Schema Compatibility**: Verifies fields exist in database
- **Type Mapping**: Checks ClickHouse to config type compatibility
- **Sample Query Testing**: Tests actual data retrieval

### Runtime Validation
- **Field Validation**: Validates individual field values
- **Record Validation**: Checks required fields and constraints
- **Error Reporting**: Detailed logging of validation issues

## Error Handling

Enhanced error handling includes:
- **Configuration Errors**: Invalid JSON, missing fields, type mismatches
- **Database Errors**: Connection issues, missing tables/fields, query failures
- **Transformation Errors**: Data type conversion, validation failures
- **Publishing Errors**: RabbitMQ connection issues, message failures
- **Retry Logic**: Automatic retries with exponential backoff
- **Graceful Degradation**: Fallback to hardcoded transformation if needed

## Troubleshooting

### Common Issues

1. **Configuration Validation Fails**
   ```bash
   # Validate using the new API
   python main.py --validate-only
   
   # Check configuration syntax
   python -c "import json; json.load(open('field_mapping.json'))"
   ```

2. **Database Connection Issues**
   ```bash
   # Test ClickHouse connection
   python check_connection.py
   ```

3. **Field Mapping Errors**
   ```bash
   # Test field mappings
   python test_field_mapping.py
   ```

4. **Transformation Issues**
   ```bash
   # Test transformer
   python test_universal_transform.py
   ```

### Debug Mode

Enable debug logging:

```python
# In your script
from data_sync_api import DataSyncAPI, SyncConfig

config = SyncConfig(
    # ... other config ...
    enable_logging=True,
    log_level="DEBUG"
)

api = DataSyncAPI(config)
```

Or set it globally:

```python
import logging
logging.basicConfig(level=logging.DEBUG)
```

## Migration Guide

### From Hardcoded to Universal Configuration

1. **Create Field Mapping**: Use existing `field_mapping.json` as template
2. **Update Environment**: Add `FIELD_MAPPING_CONFIG_PATH` to `.env`
3. **Validate Configuration**: Run validation tests
4. **Test with Sample Data**: Verify transformations work correctly
5. **Deploy**: The system automatically uses universal transformer

### Backward Compatibility

The system maintains backward compatibility:
- If no field mapping config is found, falls back to hardcoded transformation
- Existing `.env` configurations continue to work
- No changes required to existing deployment scripts
- Legacy mode available with `--use-legacy` flag

## Examples

See the following files for examples:
- `examples/` directory: Different table configurations and field mapping patterns
- `usage_examples.py`: Comprehensive functional API usage examples

## License

This project is licensed under the MIT License.

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/yourusername/rabbitmq-json-sender",
    "name": "rabbitmq-json-sender",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<4.0,>=3.9",
    "maintainer_email": null,
    "keywords": "rabbitmq, clickhouse, data-sync, etl",
    "author": "Vladimir Muzychenko",
    "author_email": "vladimir.muzychenko@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/7c/f2/e27337baf70737f4ec8b83352b6d249175f4dd54871d62a1ae999b19282a/rabbitmq_json_sender-0.1.4.tar.gz",
    "platform": null,
    "description": "# Universal ClickHouse to RabbitMQ Data Sync\n\nThis application provides a **universal data synchronization system** that can sync data from any ClickHouse table to RabbitMQ queues with configurable field mappings and transformations. No code changes required - just configure through JSON files!\n\n## \ud83d\ude80 Key Features\n\n- **Universal Configuration**: Support any ClickHouse table through JSON configuration\n- **Dynamic Field Mapping**: Map source fields to target JSON fields with validation\n- **Data Transformation**: Built-in transformations for dates, strings, and data types\n- **Comprehensive Validation**: JSON schema validation and database compatibility checks\n- **Error Handling**: Robust error handling with detailed reporting\n- **Backward Compatibility**: Works with existing configurations\n- **Functional API**: Can be used as a library in other applications\n- **Flexible Execution**: Command-line tool or programmatic function calls\n\n## Prerequisites\n\n- Python 3.7+\n- ClickHouse server\n- RabbitMQ server\n\n## Installation\n\n### Option 1: Install as a Package (Recommended)\n\nUsing Poetry (recommended for development):\n\n```bash\n# Clone the repository\ngit clone <repository-url>\ncd rabbitmq-json-sender\n\n# Install with Poetry\npoetry install\n\n# Or install in existing virtual environment\nsource .venv/bin/activate\npoetry install\n```\n\nUsing pip:\n\n```bash\n# Install from source\npip install .\n\n# Or install in development mode\npip install -e .\n```\n\nAfter installation, the package can be used in two ways:\n\n**As a CLI tool:**\n```bash\nrabbitmq-sync --help\nrabbitmq-sync --env-file .env.company\n```\n\n**As a Python library:**\n```python\nfrom rabbitmq_json_sender import DataSyncAPI, SyncConfig\n\nconfig = SyncConfig(\n    clickhouse_host=\"localhost\",\n    rabbitmq_host=\"localhost\"\n)\napi = DataSyncAPI(config)\nresult = api.sync_data()\n```\n\n### Option 2: Manual Installation\n\n1. Clone the repository:\n   ```bash\n   git clone <repository-url>\n   cd rabbitmq-json-sender\n   ```\n\n2. Create and activate a virtual environment:\n   ```bash\n   python -m venv .venv\n   source .venv/bin/activate  # On Windows: .venv\\Scripts\\activate\n   ```\n\n3. Install dependencies:\n   ```bash\n   pip install -r requirements.txt\n   ```\n\n4. Copy the example environment file and update it with your configuration:\n   ```bash\n   cp .env.example .env\n   ```\n   Edit the `.env` file with your ClickHouse and RabbitMQ connection details.\n\n## Configuration\n\n### Environment Configuration\n\nEdit the `.env` file with your connection details:\n\n```env\n# ClickHouse Configuration\nCLICKHOUSE_HOST=localhost\nCLICKHOUSE_PORT=9000\nCLICKHOUSE_USER=default\nCLICKHOUSE_PASSWORD=your_password\nCLICKHOUSE_DATABASE=mart\n\n# RabbitMQ Configuration\nRABBITMQ_HOST=localhost\nRABBITMQ_PORT=5672\nRABBITMQ_USER=guest\nRABBITMQ_PASSWORD=guest\nRABBITMQ_QUEUE=legal_entities_queue\nRABBITMQ_EXCHANGE=data_exchange\nRABBITMQ_EXCHANGE_TYPE=direct\nRABBITMQ_ROUTING_KEY=data.processed\n\n# Application Configuration\nBATCH_SIZE=1000\nMAX_RETRIES=3\nPROCESS_LIMIT=-1\n\n# Field Mapping Configuration\nFIELD_MAPPING_CONFIG_PATH=field_mapping.json\n```\n\n### Field Mapping Configuration\n\nThe heart of the universal system is the `field_mapping.json` file that defines:\n- Source table and fields\n- Target JSON field mappings\n- Data validation rules\n- Transformation settings\n\n#### Basic Structure\n\n```json\n{\n  \"metadata\": {\n    \"version\": \"1.0\",\n    \"description\": \"Field mapping for legal entities\"\n  },\n  \"source\": {\n    \"table\": \"datamart.legal_entities_view\",\n    \"primary_key\": \"id\",\n    \"batch_column\": \"id\",\n    \"order_by\": \"id\"\n  },\n  \"field_mappings\": {\n    \"source_field\": {\n      \"target\": \"TARGET_FIELD\",\n      \"type\": \"string\",\n      \"required\": true,\n      \"validation\": {\n        \"min_length\": 1\n      }\n    }\n  },\n  \"transformations\": {\n    \"date_format\": \"iso\",\n    \"remove_null_values\": true,\n    \"encoding\": \"utf-8\"\n  },\n  \"validation_rules\": {\n    \"required_fields\": [\"TARGET_FIELD\"],\n    \"skip_record_if_missing\": [\"source_field\"]\n  }\n}\n```\n\n#### Field Mapping Options\n\nEach field mapping supports:\n\n- **target**: Target field name in output JSON (null to exclude)\n- **type**: Data type (`string`, `integer`, `date`, `float`, `boolean`)\n- **required**: Whether field is required\n- **alias**: SQL alias for the field\n- **validation**: Validation rules\n  - `type`: `digits_only`, `email`, `url`, `regex`\n  - `min_length`, `max_length`: Length constraints\n  - `pattern`: Regex pattern\n- **transformation**: Data transformation (`iso_format`, `uppercase`, `lowercase`)\n\n#### Example Field Mappings\n\n```json\n{\n  \"bin\": {\n    \"target\": \"UF_BIN\",\n    \"type\": \"string\",\n    \"required\": true,\n    \"validation\": {\n      \"type\": \"digits_only\",\n      \"min_length\": 12,\n      \"max_length\": 12\n    }\n  },\n  \"company_name\": {\n    \"target\": \"UF_COMPANY_NAME\",\n    \"type\": \"string\",\n    \"required\": true\n  },\n  \"register_date\": {\n    \"target\": \"UF_REGISTER_DATE\",\n    \"type\": \"date\",\n    \"transformation\": \"iso_format\"\n  },\n  \"internal_id\": {\n    \"target\": null,\n    \"type\": \"integer\",\n    \"description\": \"Internal ID, not included in output\"\n  }\n}\n```\n\n## Usage\n\n### Command-Line Usage\n\n#### Basic Execution\n\n1. **Validate Configuration** (recommended):\n   ```bash\n   python main.py --validate-only\n   ```\n\n2. **Start Data Synchronization**:\n   ```bash\n   python main.py\n   ```\n\n3. **Using Different Environment Files**:\n   ```bash\n   # Use preset configurations\n   python main.py --env-preset company\n   python main.py --env-preset products\n   \n   # Use custom .env file\n   python main.py --env-file /path/to/custom.env\n   ```\n\n4. **Process Specific Range**:\n   ```bash\n   # Process 1000 records starting from offset 500\n   python main.py --offset 500 --limit 1000\n   ```\n\n5. **Use Legacy Mode**:\n   ```bash\n   # Use the legacy DataSyncApp class\n   python main.py --use-legacy\n   ```\n\n#### Command-Line Options\n\n```bash\npython main.py [OPTIONS]\n\nOptions:\n  --env-file PATH           Path to .env file (default: .env)\n  --env-preset PRESET       Use preset: default, company, or products\n  --offset N                Starting offset for data retrieval (default: 0)\n  --limit N                 Maximum records to process (overrides PROCESS_LIMIT)\n  --use-legacy              Use legacy DataSyncApp class\n  --validate-only           Only validate configuration without running sync\n  -h, --help                Show help message\n```\n\n### Programmatic Usage (Functional API)\n\nThe application can be imported and used as a library in other Python applications.\n\n#### Example 1: Basic Usage\n\n```python\nfrom data_sync_api import DataSyncAPI, SyncConfig\n\n# Create configuration\nconfig = SyncConfig(\n    clickhouse_host=\"localhost\",\n    clickhouse_port=9000,\n    clickhouse_user=\"default\",\n    clickhouse_password=\"\",\n    clickhouse_database=\"mart\",\n    rabbitmq_host=\"localhost\",\n    rabbitmq_port=5672,\n    rabbitmq_user=\"guest\",\n    rabbitmq_password=\"guest\",\n    batch_size=100,\n    process_limit=1000\n)\n\n# Create API instance and sync data\napi = DataSyncAPI(config)\nresult = api.sync_data()\n\n# Check results\nif result.success:\n    print(f\"Processed: {result.total_processed}\")\n    print(f\"Published: {result.total_published}\")\n    print(f\"Success rate: {result.details.get('success_rate', 0):.1f}%\")\nelse:\n    print(f\"Failed: {result.error_message}\")\n```\n\n#### Example 2: Using Environment Files\n\n```python\nfrom data_sync_api import create_config_from_env, DataSyncAPI\n\n# Load configuration from .env file\nconfig = create_config_from_env(\".env.company\")\n\n# Customize settings\nconfig.process_limit = 500\nconfig.enable_logging = False  # Disable logging for library use\n\n# Sync data with offset and limit\napi = DataSyncAPI(config)\nresult = api.sync_data(offset=100, limit=200)\n\nprint(f\"Result: {'Success' if result.success else 'Failed'}\")\n```\n\n#### Example 3: Simple Function Call\n\n```python\nfrom data_sync_api import sync_data_simple\n\n# Define configurations\nclickhouse_config = {\n    \"clickhouse_host\": \"localhost\",\n    \"clickhouse_port\": 9000,\n    \"clickhouse_user\": \"default\",\n    \"clickhouse_password\": \"\",\n    \"clickhouse_database\": \"mart\"\n}\n\nrabbitmq_config = {\n    \"rabbitmq_host\": \"localhost\",\n    \"rabbitmq_port\": 5672,\n    \"rabbitmq_user\": \"guest\",\n    \"rabbitmq_password\": \"guest\"\n}\n\n# Sync data\nresult = sync_data_simple(\n    clickhouse_config=clickhouse_config,\n    rabbitmq_config=rabbitmq_config,\n    batch_size=200,\n    process_limit=100\n)\n```\n\n#### Example 4: Integration in Application\n\n```python\nfrom data_sync_api import DataSyncAPI, create_config_from_env\n\nclass DataProcessor:\n    def __init__(self, env_file: str = None):\n        self.config = create_config_from_env(env_file)\n        self.config.enable_logging = False\n        self.api = DataSyncAPI(self.config)\n    \n    def sync_companies(self, max_records: int = None) -> dict:\n        \"\"\"Sync company data and return summary.\"\"\"\n        if max_records:\n            self.config.process_limit = max_records\n        \n        result = self.api.sync_data()\n        \n        return {\n            'success': result.success,\n            'processed': result.total_processed,\n            'published': result.total_published,\n            'failed': result.total_failed,\n            'execution_time': result.execution_time,\n            'error': result.error_message\n        }\n    \n    def health_check(self) -> bool:\n        \"\"\"Check if connections are working.\"\"\"\n        validation = self.api.validate_configuration()\n        return validation.success\n\n# Use in application\nprocessor = DataProcessor(\".env.company\")\nif processor.health_check():\n    summary = processor.sync_companies(max_records=100)\n    print(f\"Sync summary: {summary}\")\n```\n\n#### Example 5: Configuration Validation\n\n```python\nfrom data_sync_api import DataSyncAPI, SyncConfig\n\nconfig = SyncConfig(\n    clickhouse_host=\"localhost\",\n    rabbitmq_host=\"localhost\"\n)\n\napi = DataSyncAPI(config)\n\n# Validate configuration\nvalidation_result = api.validate_configuration()\n\nif validation_result.success:\n    print(\"\u2705 Configuration is valid\")\n    print(f\"ClickHouse records: {validation_result.details.get('clickhouse_records')}\")\n    print(f\"RabbitMQ connected: {validation_result.details.get('rabbitmq_connected')}\")\nelse:\n    print(f\"\u274c Validation failed: {validation_result.error_message}\")\n```\n\n#### SyncConfig Parameters\n\n```python\nSyncConfig(\n    # ClickHouse configuration\n    clickhouse_host: str = \"localhost\",\n    clickhouse_port: int = 9000,\n    clickhouse_user: str = \"default\",\n    clickhouse_password: str = \"\",\n    clickhouse_database: str = \"mart\",\n    clickhouse_table: str = \"datamart.statgov_snap_legal_entities_v1_2_view_latest\",\n    \n    # RabbitMQ configuration\n    rabbitmq_host: str = \"localhost\",\n    rabbitmq_port: int = 5672,\n    rabbitmq_user: str = \"guest\",\n    rabbitmq_password: str = \"guest\",\n    rabbitmq_queue: str = \"legal_entities_queue\",\n    rabbitmq_exchange: str = \"data_exchange\",\n    rabbitmq_exchange_type: str = \"direct\",\n    rabbitmq_routing_key: str = \"data.processed\",\n    \n    # Processing configuration\n    batch_size: int = 1000,\n    max_retries: int = 3,\n    process_limit: int = -1,  # -1 means no limit\n    \n    # Field mapping configuration\n    field_mapping_config_path: Optional[str] = \"field_mapping.json\",\n    \n    # Logging configuration\n    enable_logging: bool = True,\n    log_level: str = \"INFO\"\n)\n```\n\n#### SyncResult Structure\n\n```python\n@dataclass\nclass SyncResult:\n    success: bool                    # Whether sync was successful\n    total_processed: int = 0         # Total records processed\n    total_published: int = 0         # Total records published to RabbitMQ\n    total_failed: int = 0            # Total records that failed\n    batches_processed: int = 0       # Number of batches processed\n    error_message: Optional[str]     # Error message if failed\n    execution_time: float = 0.0      # Execution time in seconds\n    details: Dict[str, Any]          # Additional details (success_rate, etc.)\n```\n\n### More Examples\n\nSee `usage_examples.py` for comprehensive examples including:\n- Batch processing with custom logic\n- Error recovery and retry mechanisms\n- Integration patterns for larger applications\n\n## Data Flow\n\n1. **Configuration Loading**: Load field mapping from JSON file\n2. **Validation**: Validate configuration and database compatibility  \n3. **Dynamic SQL Generation**: Build SELECT query based on field mappings\n4. **Data Extraction**: Fetch data from configured ClickHouse table\n5. **Universal Transformation**: Transform data using configurable rules\n6. **Validation & Filtering**: Apply validation rules and filters\n7. **Publishing**: Send transformed data to RabbitMQ queue\n\n## Validation System\n\nThe application includes a comprehensive validation system:\n\n### Configuration Validation\n- **JSON Schema Validation**: Validates structure against schema\n- **Business Rules**: Checks field mappings and cross-references\n- **Type Compatibility**: Ensures data types are compatible\n\n### Database Validation\n- **Schema Compatibility**: Verifies fields exist in database\n- **Type Mapping**: Checks ClickHouse to config type compatibility\n- **Sample Query Testing**: Tests actual data retrieval\n\n### Runtime Validation\n- **Field Validation**: Validates individual field values\n- **Record Validation**: Checks required fields and constraints\n- **Error Reporting**: Detailed logging of validation issues\n\n## Error Handling\n\nEnhanced error handling includes:\n- **Configuration Errors**: Invalid JSON, missing fields, type mismatches\n- **Database Errors**: Connection issues, missing tables/fields, query failures\n- **Transformation Errors**: Data type conversion, validation failures\n- **Publishing Errors**: RabbitMQ connection issues, message failures\n- **Retry Logic**: Automatic retries with exponential backoff\n- **Graceful Degradation**: Fallback to hardcoded transformation if needed\n\n## Troubleshooting\n\n### Common Issues\n\n1. **Configuration Validation Fails**\n   ```bash\n   # Validate using the new API\n   python main.py --validate-only\n   \n   # Check configuration syntax\n   python -c \"import json; json.load(open('field_mapping.json'))\"\n   ```\n\n2. **Database Connection Issues**\n   ```bash\n   # Test ClickHouse connection\n   python check_connection.py\n   ```\n\n3. **Field Mapping Errors**\n   ```bash\n   # Test field mappings\n   python test_field_mapping.py\n   ```\n\n4. **Transformation Issues**\n   ```bash\n   # Test transformer\n   python test_universal_transform.py\n   ```\n\n### Debug Mode\n\nEnable debug logging:\n\n```python\n# In your script\nfrom data_sync_api import DataSyncAPI, SyncConfig\n\nconfig = SyncConfig(\n    # ... other config ...\n    enable_logging=True,\n    log_level=\"DEBUG\"\n)\n\napi = DataSyncAPI(config)\n```\n\nOr set it globally:\n\n```python\nimport logging\nlogging.basicConfig(level=logging.DEBUG)\n```\n\n## Migration Guide\n\n### From Hardcoded to Universal Configuration\n\n1. **Create Field Mapping**: Use existing `field_mapping.json` as template\n2. **Update Environment**: Add `FIELD_MAPPING_CONFIG_PATH` to `.env`\n3. **Validate Configuration**: Run validation tests\n4. **Test with Sample Data**: Verify transformations work correctly\n5. **Deploy**: The system automatically uses universal transformer\n\n### Backward Compatibility\n\nThe system maintains backward compatibility:\n- If no field mapping config is found, falls back to hardcoded transformation\n- Existing `.env` configurations continue to work\n- No changes required to existing deployment scripts\n- Legacy mode available with `--use-legacy` flag\n\n## Examples\n\nSee the following files for examples:\n- `examples/` directory: Different table configurations and field mapping patterns\n- `usage_examples.py`: Comprehensive functional API usage examples\n\n## License\n\nThis project is licensed under the MIT License.\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "Universal ClickHouse to RabbitMQ data synchronization system with configurable field mappings",
    "version": "0.1.4",
    "project_urls": {
        "Homepage": "https://github.com/yourusername/rabbitmq-json-sender",
        "Repository": "https://github.com/yourusername/rabbitmq-json-sender"
    },
    "split_keywords": [
        "rabbitmq",
        " clickhouse",
        " data-sync",
        " etl"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "d983f78919f685717cdab6fc2e806828bc9f9e041fa7d0fe17cab89846ba3d3a",
                "md5": "3c93560c32326572cb00d90b4006c894",
                "sha256": "ccfba0bdd7212b5c32056f05e0d494f586584702c363ecdceac3f45ba076c392"
            },
            "downloads": -1,
            "filename": "rabbitmq_json_sender-0.1.4-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "3c93560c32326572cb00d90b4006c894",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<4.0,>=3.9",
            "size": 28204,
            "upload_time": "2025-10-20T12:24:21",
            "upload_time_iso_8601": "2025-10-20T12:24:21.117374Z",
            "url": "https://files.pythonhosted.org/packages/d9/83/f78919f685717cdab6fc2e806828bc9f9e041fa7d0fe17cab89846ba3d3a/rabbitmq_json_sender-0.1.4-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "7cf2e27337baf70737f4ec8b83352b6d249175f4dd54871d62a1ae999b19282a",
                "md5": "8ef2c432687d79e3b04f733bd4bb9331",
                "sha256": "cb664c8c6e0110da060560bc08a151139a48d519e6dd114bc501344b6333604c"
            },
            "downloads": -1,
            "filename": "rabbitmq_json_sender-0.1.4.tar.gz",
            "has_sig": false,
            "md5_digest": "8ef2c432687d79e3b04f733bd4bb9331",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<4.0,>=3.9",
            "size": 33905,
            "upload_time": "2025-10-20T12:24:22",
            "upload_time_iso_8601": "2025-10-20T12:24:22.405104Z",
            "url": "https://files.pythonhosted.org/packages/7c/f2/e27337baf70737f4ec8b83352b6d249175f4dd54871d62a1ae999b19282a/rabbitmq_json_sender-0.1.4.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-10-20 12:24:22",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "yourusername",
    "github_project": "rabbitmq-json-sender",
    "github_not_found": true,
    "lcname": "rabbitmq-json-sender"
}
        
Elapsed time: 2.68740s