# OmniGen π
**Generate synthetic data at scale using an enterprise-ready framework with full customizable configuration, security, and ease of use**
Built by [Ultrasafe AI](https://us.inc) for production environments.
---
## What is OmniGen?
**OmniGen** is an enterprise-grade framework for generating synthetic datasets at scaleβfrom scratch or from base data. Generate **trillions of tokens** and **billions of samples** across multiple modalities:
### π― Data Types Supported
- π¬ **Conversational Data** - Single-turn to multi-turn dialogues
- π€ **Agentic Datasets** - Tool use, function calling, multi-step reasoning
- π¨ **Multimodal Datasets** - Text, images, audio, video combinations
- πΌοΈ **Images** - Synthetic image generation and editing
- π΅ **Audio** - Speech, music, sound effects
- π¬ **Video** - Synthetic video sequences
### π Use Cases
- **Fine-Tuning** - Instruction following, task-specific models
- **Supervised Fine-Tuning (SFT)** - High-quality labeled datasets
- **Offline Reinforcement Learning** - Preference datasets with rewards
- **Online Reinforcement Learning** - Ground truth with reward checking scripts
- **Pre-Training** - Large-scale corpus generation
- **Machine Learning** - Training data for any ML task
### ποΈ Why OmniGen?
- β
**Enterprise-Ready** - Built for production at scale
- β
**Fully Customizable** - Configure every aspect of generation
- β
**Secure** - Complete isolation, no data mixing
- β
**Easy** - Simple API, clear examples
- β
**Modular** - Independent pipelines for different data types
---
## π Currently Available Pipeline
### **conversation_extension** - Extend Single-Turn to Multi-Turn Conversations
Turn your base questions into rich multi-turn dialogues. This is just the first pipelineβmore coming soon!
---
## Why OmniGen?
β
**Simple** - One command to generate thousands of conversations
β
**Scalable** - Parallel processing for fast generation
β
**Flexible** - Mix different AI providers (OpenAI, Anthropic, Ultrasafe AI)
β
**Production Ready** - Built for SaaS platforms with multi-tenant support
β
**Fault Tolerant** - Checkpoint/resume system prevents data loss from any interruption
---
## Quick Start
### 1. Install
```bash
pip install omnigen-usf
```
### 2. Prepare Base Data
Create a file `base_data.jsonl` with your starting questions:
```jsonl
{"conversations": [{"role": "user", "content": "How do I learn Python?"}]}
{"conversations": [{"role": "user", "content": "What is machine learning?"}]}
{"conversations": [{"role": "user", "content": "Explain neural networks"}]}
```
### 3. Generate Conversations
You can configure the pipeline in **two ways**:
#### Option A: Using YAML Configuration File
Create a `config.yaml` file:
```yaml
# NEW: Minimal config with smart defaults!
providers:
user_followup:
name: ultrasafe
api_key: ${ULTRASAFE_API_KEY}
# Defaults: usf-mini, 0.7 temp, 4096 tokens β
assistant_response:
name: ultrasafe
api_key: ${ULTRASAFE_API_KEY}
# Defaults: usf-mini, 0.7 temp, 4096 tokens β
generation:
num_conversations: 100
turn_range: {min: 3, max: 8}
base_data:
source_type: file
file_path: base_data.jsonl
storage:
type: jsonl
output_file: output.jsonl
```
Then load and run:
```python
from omnigen.pipelines.conversation_extension import (
ConversationExtensionConfig,
ConversationExtensionPipeline
)
# Load configuration from YAML file
config = ConversationExtensionConfig.from_yaml('config.yaml')
# Run the pipeline
pipeline = ConversationExtensionPipeline(config)
pipeline.run()
```
#### Option B: Using Programmatic Configuration (Python)
```python
from omnigen.pipelines.conversation_extension import (
ConversationExtensionConfigBuilder,
ConversationExtensionPipeline
)
# Configure the pipeline programmatically
config = (ConversationExtensionConfigBuilder()
# User followup generator - minimal config!
.add_provider(
role='user_followup',
name='ultrasafe',
api_key='your-api-key'
# Defaults: usf-mini, 0.7 temp, 4096 tokens β
)
# Assistant response generator
.add_provider(
role='assistant_response',
name='ultrasafe',
api_key='your-api-key'
# Defaults: usf-mini, 0.7 temp, 4096 tokens β
)
# Generation settings
.set_generation(
num_conversations=100,
turn_range=(3, 8) # 3-8 turns per conversation
)
# Input data
.set_data_source(
source_type='file',
file_path='base_data.jsonl'
)
# Output
.set_storage(
type='jsonl',
output_file='output.jsonl'
)
.build()
)
# Run the pipeline
pipeline = ConversationExtensionPipeline(config)
pipeline.run()
```
### 4. Get Results
Your generated conversations will be in `output.jsonl`:
```jsonl
{
"id": 0,
"conversations": [
{"role": "user", "content": "How do I learn Python?"},
{"role": "assistant", "content": "Great choice! Start with the basics..."},
{"role": "user", "content": "What resources do you recommend?"},
{"role": "assistant", "content": "I recommend these resources..."},
{"role": "user", "content": "How long will it take?"},
{"role": "assistant", "content": "With consistent practice..."}
],
"num_turns": 3,
"success": true
}
```
---
## Supported AI Providers
**NEW: Smart defaults automatically applied!** Only specify `name` and `api_key` - model, temperature, and max_tokens use optimized defaults.
| Provider | Default Model | Default Temp | Default Tokens |
|----------|---------------|--------------|----------------|
| **Ultrasafe AI** | `usf-mini` | `0.7` | `4096` |
| **OpenAI** | `gpt-4-turbo` | `0.7` | `4096` |
| **Anthropic** | `claude-3-5-sonnet-20241022` | `0.7` | `4096` |
| **OpenRouter** | `openai/gpt-4-turbo` | `0.7` | `4096` |
**Override defaults as needed:**
```yaml
providers:
user_followup:
name: openai
api_key: ${OPENAI_API_KEY}
temperature: 0.9 # Override only what you need
```
### Mix Different Providers
```python
config = (ConversationExtensionConfigBuilder()
.add_provider('user_followup', 'openai', api_key, 'gpt-4-turbo')
.add_provider('assistant_response', 'anthropic', api_key, 'claude-3-5-sonnet')
# ... rest of config
.build()
)
```
---
## Advanced Features
### π Checkpoint & Resume System
**NEW!** Automatic checkpoint/resume functionality prevents data loss and duplication from any interruption.
#### Overview
The checkpoint system automatically saves progress and enables seamless resume from where you left off - handling manual stops (Ctrl+C), errors, rate limits, server failures, and crashes without losing data or creating duplicates.
#### How It Works
**Conversation Tracking:**
- **Position-based ID**: Index in base data file (for ordering)
- **Content Hash**: SHA256 hash of conversation (for deduplication)
- **Combined Key**: `{position}_{hash[:8]}` for unique identification
**Checkpoint Saves:**
- Automatically after every N conversations (configurable)
- On graceful shutdown (Ctrl+C, SIGTERM)
- Uses atomic file operations (write temp β sync β atomic rename)
- Validates input file integrity with SHA256 hash
#### Configuration
```yaml
checkpoint:
enabled: true # Enable checkpoint/resume
checkpoint_file: "checkpoint.json" # Path to checkpoint file
auto_save_frequency: 10 # Save every 10 conversations
validate_input_hash: true # Verify input hasn't changed
resume_mode: "auto" # auto | manual | fresh
```
**Configuration Options:**
| Option | Description | Default |
|--------|-------------|---------|
| `enabled` | Enable checkpoint/resume | `true` |
| `checkpoint_file` | Path to checkpoint file | `workspaces/{workspace_id}/checkpoint.json` |
| `auto_save_frequency` | Save checkpoint every N conversations | `10` |
| `validate_input_hash` | Verify input file unchanged on resume | `true` |
| `resume_mode` | Resume behavior: `auto`, `manual`, or `fresh` | `auto` |
#### Usage Examples
**YAML Configuration:**
```yaml
workspace_id: "production_run"
checkpoint:
enabled: true
checkpoint_file: "checkpoint.json"
auto_save_frequency: 10
validate_input_hash: true
resume_mode: "auto"
providers:
user_followup:
name: ultrasafe
api_key: ${ULTRASAFE_API_KEY}
assistant_response:
name: ultrasafe
api_key: ${ULTRASAFE_API_KEY}
generation:
num_conversations: 1000
turn_range: {min: 3, max: 8}
parallel_workers: 10
base_data:
source_type: file
file_path: input.jsonl
storage:
type: jsonl
output_file: output.jsonl
```
**Programmatic Configuration:**
```python
from omnigen.pipelines.conversation_extension import (
ConversationExtensionConfigBuilder,
ConversationExtensionPipeline
)
config = (ConversationExtensionConfigBuilder()
.add_provider('user_followup', 'ultrasafe', api_key, 'usf-mini')
.add_provider('assistant_response', 'ultrasafe', api_key, 'usf-mini')
.set_generation(num_conversations=1000, turn_range=(3, 8))
.set_data_source('file', file_path='input.jsonl')
.set_storage('jsonl', output_file='output.jsonl')
.set_checkpoint(
enabled=True,
checkpoint_file='checkpoint.json',
auto_save_frequency=10,
validate_input_hash=True,
resume_mode='auto'
)
.build()
)
pipeline = ConversationExtensionPipeline(config)
pipeline.run() # Can interrupt and resume anytime!
```
#### Resume Behavior
**First Run (No Checkpoint):**
```
==========================================
CONVERSATION EXTENSION PIPELINE
==========================================
Base conversations: 1000
Generating: 1000
Parallel workers: 10
==========================================
Generating: 15%|ββββββββββββ| 150/1000 [β120 β 20 β10]
^C
β οΈ Shutdown signal received. Saving checkpoint...
β Final checkpoint saved
β οΈ Interrupted. Progress saved in checkpoint.
```
**Resume Run (Checkpoint Exists):**
```
==========================================
RESUMING FROM CHECKPOINT
==========================================
Previous Run: 2025-10-04 16:30:00 UTC
Already Processed: 150 (β120 β 20 β10 ~0)
Resuming Partials: 20
Remaining: 850
Parallel workers: 10
==========================================
Generating: 100%|ββββββββββββ| 1000/1000 [β850 β 100 β50]
==========================================
GENERATION COMPLETE
==========================================
β Complete: 850
β Partial: 100
β Failed: 50
ββββββββββββββββββββββββββββββββββββββββ
πΎ Saved: 950 (95.0%)
==========================================
```
#### Output Format
Both partial and complete conversations saved to **same output file** with status indicators:
```jsonl
{"id": 0, "status": "completed", "is_complete": true, "conversations": [...], "num_turns": 5}
{"id": 1, "status": "partial", "is_complete": false, "conversations": [...], "num_turns": 3}
{"id": 2, "status": "completed", "is_complete": true, "conversations": [...], "num_turns": 7}
```
**Status Values:**
- `"completed"` - Successfully generated all requested turns
- `"partial"` - Interrupted during generation, may be incomplete
- `"failed"` - Error occurred, saved to failed file
**Filtering Results:**
```bash
# Get only completed conversations
jq 'select(.status == "completed")' output.jsonl > completed.jsonl
# Get partial conversations (to review/complete)
jq 'select(.status == "partial")' output.jsonl > partial.jsonl
# Count by status
jq -s 'group_by(.status) | map({status: .[0].status, count: length})' output.jsonl
```
#### Key Features
β
**Zero Data Loss**
- Progress saved automatically at configurable intervals
- Every conversation saved before moving to next
- Atomic file operations prevent corruption
β
**No Duplicates**
- Hybrid identification (position + content hash)
- Skips already processed conversations automatically
- Prevents reprocessing on resume
β
**Partial Resume**
- Continues incomplete multi-turn conversations
- Resumes from exact turn where interrupted
- Preserves all generated content
β
**Universal Coverage**
- Handles ALL interruption types:
- Manual stops (Ctrl+C)
- Server/connection failures
- Rate limit errors
- System crashes
- Any other errors
β
**Input Validation**
- SHA256 hash verification of input file
- Detects if input changed between runs
- Prevents processing wrong data
β
**Safety Features**
- Atomic file operations (corruption-proof)
- Automatic backup of corrupted checkpoints
- Graceful shutdown handlers (SIGINT/SIGTERM)
#### Checkpoint Structure
```json
{
"version": "1.0",
"run_id": "unique-uuid",
"started_at": "2025-10-04T20:00:00.000Z",
"last_checkpoint_at": "2025-10-04T20:15:30.000Z",
"base_data": {
"source_type": "file",
"file_path": "input.jsonl",
"file_hash": "sha256-hash",
"total_available": 1000
},
"progress": {
"total_processed": 150,
"completed": 120,
"partial": 20,
"failed": 10,
"last_position": 149
},
"processed_records": [
{
"position": 0,
"content_hash": "a1b2c3d4",
"status": "completed",
"turns_generated": 5,
"processed_at": "2025-10-04T20:01:00.000Z"
}
],
"partial_states": {
"5_b2c3d4e5": {
"position": 5,
"conversation": [...],
"turns_completed": 3,
"target_turns": 5
}
}
}
```
#### Best Practices
1. **Set Appropriate Save Frequency**
```yaml
checkpoint:
auto_save_frequency: 10 # Balance performance vs safety
```
- Lower (5-10): Better safety, slight overhead
- Higher (50-100): Better performance, more risk
2. **Use Workspace Isolation**
```python
config = ConversationExtensionConfigBuilder(
workspace_id="unique_project_id" # Prevents conflicts
)
```
3. **Monitor Checkpoint Size**
- Large checkpoints (>100MB) may slow saves
- Consider processing in batches for huge datasets
4. **Backup Important Checkpoints**
```bash
cp checkpoint.json checkpoint.backup.json
```
5. **Clean Up After Completion**
```bash
# After successful run
rm checkpoint.json
```
#### Troubleshooting
**Problem: Checkpoint not resuming**
- Check `checkpoint.enabled` is `true`
- Verify checkpoint file exists at specified path
- Ensure `resume_mode` is not set to `"fresh"`
- Review logs for validation errors
**Problem: Input file changed warning**
- If intentional: Set `validate_input_hash: false`
- If unintentional: Restore original input file
- Or delete checkpoint to start fresh
**Problem: Partial conversations not resuming**
- Verify `partial_states` exists in checkpoint
- Check conversation has `_position` and `_content_hash`
- Review logs for partial state loading errors
**Problem: Duplicate conversations**
- Checkpoint prevents this automatically
- Check both `position` and `content_hash` are tracked
- Verify hybrid identification is working
#### Performance Impact
- **Checkpoint overhead**: ~10-50ms per save (amortized)
- **Memory usage**: Minimal (lightweight data structure)
- **Disk space**: ~1-2KB per conversation in checkpoint
- **Resume speed**: Instant (IDs loaded into memory set)
#### Example: Complete Workflow
```python
import os
from omnigen.pipelines.conversation_extension import (
ConversationExtensionConfigBuilder,
ConversationExtensionPipeline
)
# Build configuration
config = (ConversationExtensionConfigBuilder(workspace_id="production_v1")
.add_provider(
role="user_followup",
name="openai",
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o-mini"
)
.add_provider(
role="assistant_response",
name="openai",
api_key=os.getenv("OPENAI_API_KEY"),
model="gpt-4o-mini"
)
.set_generation(
num_conversations=10000,
turn_range=(3, 8),
parallel_workers=20
)
.set_data_source(
source_type="file",
file_path="data/base_conversations.jsonl"
)
.set_storage(
type="jsonl",
output_file="output.jsonl"
)
.set_checkpoint(
enabled=True,
checkpoint_file="checkpoint.json",
auto_save_frequency=50,
validate_input_hash=True,
resume_mode="auto"
)
.build()
)
# Run with automatic resume
pipeline = ConversationExtensionPipeline(config)
pipeline.run()
# Can be interrupted and resumed multiple times
# Progress is never lost!
```
π **See also**: [`examples/checkpoint_resume_example.py`](examples/conversation_extension/checkpoint_resume_example.py) for a complete working example with checkpoint inspection utilities.
---
### Multi-Tenant SaaS Support
Perfect for platforms serving multiple users concurrently:
```python
# Each user gets isolated workspace
workspace_id = f"user_{user_id}_session_{session_id}"
config = (ConversationExtensionConfigBuilder(workspace_id=workspace_id)
.add_provider('user_followup', 'ultrasafe', shared_api_key, 'usf-mini')
.add_provider('assistant_response', 'ultrasafe', shared_api_key, 'usf-mini')
.set_storage('jsonl', output_file='output.jsonl') # Auto-isolated
.build()
)
# Storage automatically goes to: workspaces/{workspace_id}/output.jsonl
```
### Parallel Dataset Generation
```python
from concurrent.futures import ProcessPoolExecutor
def process_dataset(input_file, output_file):
config = (ConversationExtensionConfigBuilder()
.add_provider('user_followup', 'ultrasafe', api_key, 'usf-mini')
.add_provider('assistant_response', 'ultrasafe', api_key, 'usf-mini')
.set_data_source('file', file_path=input_file)
.set_storage('jsonl', output_file=output_file)
.build()
)
ConversationExtensionPipeline(config).run()
# Process 3 datasets in parallel
with ProcessPoolExecutor(max_workers=3) as executor:
executor.submit(process_dataset, 'data1.jsonl', 'out1.jsonl')
executor.submit(process_dataset, 'data2.jsonl', 'out2.jsonl')
executor.submit(process_dataset, 'data3.jsonl', 'out3.jsonl')
```
## π Configuration Methods
OmniGen supports **multiple ways** to configure your pipeline. Choose the method that best fits your workflow:
### Method 1: YAML Configuration File β (Recommended)
The most flexible and maintainable approach:
**Step 1:** Create `config.yaml`
```yaml
# NEW: Minimal configuration with smart defaults
providers:
user_followup:
name: ultrasafe
api_key: ${ULTRASAFE_API_KEY} # Only name and API key required!
# Defaults: usf-mini, 0.7 temp, 4096 tokens β
assistant_response:
name: ultrasafe
api_key: ${ULTRASAFE_API_KEY}
# Defaults: usf-mini, 0.7 temp, 4096 tokens β
generation:
num_conversations: 100
turn_range: {min: 3, max: 8}
base_data:
source_type: file
file_path: base_data.jsonl
storage:
type: jsonl
output_file: output.jsonl
```
**Step 2:** Load and run
```python
from omnigen.pipelines.conversation_extension import (
ConversationExtensionConfig,
ConversationExtensionPipeline
)
# Load from YAML file
config = ConversationExtensionConfig.from_yaml('config.yaml')
# Run pipeline
pipeline = ConversationExtensionPipeline(config)
pipeline.run()
```
**β
Benefits:**
- Easy to version control and share
- Environment variable support with `${VAR_NAME}`
- No code changes needed for config updates
- Clear, readable configuration
### Method 2: Programmatic Configuration (Python)
Build configuration directly in code:
```python
from omnigen.pipelines.conversation_extension import (
ConversationExtensionConfigBuilder,
ConversationExtensionPipeline
)
config = (ConversationExtensionConfigBuilder()
.add_provider('user_followup', 'ultrasafe', 'api-key', 'usf-mini')
.add_provider('assistant_response', 'ultrasafe', 'api-key', 'usf-mini')
.set_generation(num_conversations=100, turn_range=(3, 8))
.set_data_source('file', file_path='base_data.jsonl')
.set_storage('jsonl', output_file='output.jsonl')
.build()
)
pipeline = ConversationExtensionPipeline(config)
pipeline.run()
```
**β
Benefits:**
- Type safety and IDE autocomplete
- Dynamic configuration based on runtime
- No external files needed
- Easy application integration
### Method 3: Dictionary Configuration
Create from Python dictionary:
```python
from omnigen.pipelines.conversation_extension import (
ConversationExtensionConfig,
ConversationExtensionPipeline
)
config_dict = {
'providers': {
'user_followup': {'name': 'ultrasafe', 'api_key': 'key', 'model': 'usf-mini'},
'assistant_response': {'name': 'ultrasafe', 'api_key': 'key', 'model': 'usf-mini'}
},
'generation': {'num_conversations': 100, 'turn_range': {'min': 3, 'max': 8}},
'base_data': {'source_type': 'file', 'file_path': 'base_data.jsonl'},
'storage': {'type': 'jsonl', 'output_file': 'output.jsonl'}
}
config = ConversationExtensionConfig.from_dict(config_dict)
pipeline = ConversationExtensionPipeline(config)
pipeline.run()
```
**β
Benefits:**
- Load from JSON files or APIs
- Easy programmatic modification
- Flexible for dynamic scenarios
### Method 4: Hybrid Approach
Combine YAML with programmatic overrides:
```python
from omnigen.pipelines.conversation_extension import (
ConversationExtensionConfig,
ConversationExtensionPipeline
)
# Load base config from YAML
config = ConversationExtensionConfig.from_yaml('base_config.yaml')
# Modify specific settings
config_dict = config.to_dict()
config_dict['generation']['num_conversations'] = 500 # Override
config_dict['storage']['output_file'] = f'output_{user_id}.jsonl' # Dynamic
# Rebuild and run
config = ConversationExtensionConfig.from_dict(config_dict)
pipeline = ConversationExtensionPipeline(config)
pipeline.run()
```
**β
Benefits:**
- Base configuration in YAML
- Runtime customization in code
- Best of both worlds
### Environment Variables in YAML
Use `${VARIABLE_NAME}` syntax:
```yaml
providers:
user_followup:
name: ${PROVIDER_NAME} # From environment
api_key: ${ULTRASAFE_API_KEY} # From environment
model: ${USER_MODEL} # From environment
base_data:
file_path: ${INPUT_PATH} # From environment
storage:
output_file: ${OUTPUT_PATH} # From environment
```
Set variables:
```bash
export PROVIDER_NAME="ultrasafe"
export ULTRASAFE_API_KEY="your-key"
export USER_MODEL="usf-mini"
export INPUT_PATH="data/input.jsonl"
export OUTPUT_PATH="data/output.jsonl"
# Then run
python your_script.py
```
### CLI Usage with Config File
Run directly from command line:
```bash
# Using YAML config
omnigen conversation-extension --config config.yaml
# With overrides
omnigen conversation-extension \
--config config.yaml \
--num-conversations 500 \
--output custom_output.jsonl
```
---
---
## π Complete Configuration Reference
### All Configuration Options Explained
Below is a comprehensive YAML configuration showing **ALL** available options with detailed explanations:
```yaml
# ==============================================================================
# WORKSPACE ISOLATION (Optional)
# ==============================================================================
# Unique ID for multi-tenant environments - auto-isolates all output files
workspace_id: "user_123_session_abc"
# ==============================================================================
# PROVIDERS - AI Model Configuration
# ==============================================================================
# Configure different AI providers for each role
# Each role can use a different provider/model combination
providers:
# Provider for generating user follow-up questions
user_followup:
name: ultrasafe # Required: ultrasafe, openai, anthropic, openrouter
api_key: ${API_KEY} # Required: Use env var ${VAR_NAME} or direct key
# Optional - Smart defaults applied if not specified:
model: usf-mini # Default varies by provider
temperature: 0.7 # Default: 0.7
max_tokens: 2048 # Default: 4096 (overridden here)
timeout: 300 # Default: 300
max_retries: 5 # Default: 5
retry_delay: 2 # Default: 2
# Provider for generating assistant responses
assistant_response:
name: ultrasafe # Can use different provider than user_followup
api_key: ${API_KEY} # Only name and api_key are required!
max_tokens: 8192 # Override default (4096) for detailed responses
# model, temperature, timeout, retries use defaults β
# PROVIDER OPTIONS:
# ----------------
# ultrasafe:
# models: usf-mini, usf-max
#
# openai:
# models: gpt-4-turbo, gpt-4, gpt-3.5-turbo, gpt-4o, gpt-4o-mini
#
# anthropic:
# models: claude-3-5-sonnet-20241022, claude-3-opus-20240229,
# claude-3-sonnet-20240229, claude-3-haiku-20240307
#
# openrouter:
# models: Any OpenRouter supported model
# base_url: https://openrouter.ai/api/v1 (optional)
# ==============================================================================
# GENERATION SETTINGS
# ==============================================================================
generation:
num_conversations: 100 # Total conversations to generate
# Use 0 or omit to process ALL available conversations
turn_range: # Number of turns per conversation
min: 3 # Minimum turns
max: 8 # Maximum turns
parallel_workers: 10 # Concurrent workers (balance speed vs rate limits)
# Extension behavior for multi-turn input
extension_mode: "smart" # Options: "smart" | "legacy"
# - smart: Intelligently handle multi-turn conversations
# - legacy: Always extract first user message only
skip_invalid: true # Skip invalid patterns (recommended: true)
# Turn calculation method
turn_calculation: "additional" # Options: "additional" | "total"
# - additional: Add NEW turns on top of existing (default)
# - total: Keep total turns within range (never removes existing)
# ==============================================================================
# DATA SOURCE CONFIGURATION
# ==============================================================================
base_data:
enabled: true # Enable base data loading
# OPTION 1: Local File
source_type: file # Use local JSONL/JSON file
file_path: data/input.jsonl # Path to file
format: conversations # JSON key containing conversation array
shuffle: false # Shuffle data before processing
# OPTION 2: HuggingFace Dataset
# source_type: huggingface # Use HuggingFace dataset
# hf_dataset: username/dataset # HuggingFace dataset path
# hf_split: train # Dataset split: train, test, validation
# hf_token: ${HF_TOKEN} # HuggingFace API token (if private)
# hf_streaming: false # Stream dataset (for large datasets)
# format: conversations # Field name in dataset
# shuffle: true # Shuffle after loading
# ==============================================================================
# STORAGE CONFIGURATION
# ==============================================================================
storage:
type: jsonl # Options: jsonl | mongodb
# JSONL Storage (Default)
output_file: output.jsonl # Successful conversations
partial_file: partial.jsonl # Partial/incomplete conversations (legacy)
failed_file: failed.jsonl # Failed conversations
# MongoDB Storage (Alternative)
# type: mongodb
# mongodb:
# connection_string: mongodb://localhost:27017
# database: omnigen
# collection: conversations
# output_collection: output # Successful
# partial_collection: partial # Partial
# failed_collection: failed # Failed
# ==============================================================================
# CHECKPOINT/RESUME CONFIGURATION (NEW!)
# ==============================================================================
checkpoint:
enabled: true # Enable automatic checkpoint/resume
checkpoint_file: "workspaces/{workspace_id}/checkpoint.json"
auto_save_frequency: 10 # Save checkpoint every N conversations
validate_input_hash: true # Verify input file hasn't changed on resume
resume_mode: "auto" # Options: "auto" | "manual" | "fresh"
# Features:
# - Zero data loss from interruptions
# - No duplicate processing
# - Partial conversation resume
# - Handles: Ctrl+C, errors, rate limits, crashes
# ==============================================================================
# DATETIME CONFIGURATION (Optional)
# ==============================================================================
datetime_config:
enabled: true # Enable datetime generation
mode: random_from_range # Options: random_from_range | current | fixed
timezone: UTC # Timezone (UTC, America/New_York, Asia/Dubai, etc.)
format: "%Y-%m-%d %H:%M:%S" # Python strftime format
# For random_from_range mode
range:
start: "2024-01-01 00:00:00" # Start datetime
end: "2024-12-31 23:59:59" # End datetime
# For fixed mode
# fixed_datetime: "2024-06-15 12:00:00"
# ==============================================================================
# SYSTEM MESSAGES (Optional)
# ==============================================================================
system_messages:
# Prepend system message to every conversation
prepend_always:
enabled: true
content: "You are a helpful AI assistant. Current time: {current_datetime} ({timezone})."
# Append system message to every conversation
append_always:
enabled: false
content: "Remember to be concise and helpful."
# Add system message only if none exists
add_if_missing:
enabled: false
content: "You are an AI assistant."
# Available variables in system messages:
# - {current_datetime}: Generated datetime
# - {timezone}: Configured timezone
# - {workspace_id}: Current workspace ID
# ==============================================================================
# CUSTOM PROMPTS (Optional - Smart Defaults Provided!)
# ==============================================================================
# NOTE: Prompts are completely OPTIONAL. The system automatically uses optimized
# default prompts if you don't specify any. Only add this section if you want
# to customize the prompt behavior.
prompts:
# Custom prompt for user follow-up generation
followup_question: |
## Your Task
Generate an intelligent follow-up user question based on conversation history.
### CONVERSATION HISTORY:
{history}
### INSTRUCTIONS:
- Generate a meaningful follow-up question
- Be conversational and natural
- Vary your phrasing and tone
- Build on the assistant's last response
Return your follow-up question wrapped in XML tags:
<user>Your follow-up question here</user>
# Custom prompt for assistant response generation
# assistant_response: |
# Your custom assistant response prompt here...
# ==============================================================================
# DEBUG OPTIONS (Optional)
# ==============================================================================
debug:
log_api_timing: true # Log API call timings
log_parallel_status: true # Log parallel worker status
verbose: false # Verbose logging
```
### Quick Configuration Examples
#### Example 1: Minimal Configuration (NEW!)
```yaml
# Only specify what's required - defaults applied automatically
providers:
user_followup:
name: ultrasafe
api_key: ${ULTRASAFE_API_KEY}
assistant_response:
name: ultrasafe
api_key: ${ULTRASAFE_API_KEY}
generation:
num_conversations: 100
turn_range: {min: 3, max: 8}
base_data:
source_type: file
file_path: input.jsonl
storage:
type: jsonl
output_file: output.jsonl
# Enable checkpoint/resume (NEW!)
checkpoint:
enabled: true
auto_save_frequency: 10
```
#### Example 2: HuggingFace Dataset Input
```yaml
providers:
user_followup:
name: openai
api_key: ${OPENAI_API_KEY}
model: gpt-4-turbo
assistant_response:
name: anthropic
api_key: ${ANTHROPIC_API_KEY}
model: claude-3-5-sonnet-20241022
generation:
num_conversations: 1000
turn_range: {min: 5, max: 10}
parallel_workers: 20
base_data:
source_type: huggingface
hf_dataset: username/my-dataset
hf_split: train
hf_token: ${HF_TOKEN}
format: conversations
shuffle: true
storage:
type: jsonl
output_file: output.jsonl
# Checkpoint/resume for fault tolerance
checkpoint:
enabled: true
checkpoint_file: "checkpoint.json"
auto_save_frequency: 50
validate_input_hash: true
```
#### Example 3: Mixed Providers with MongoDB
```yaml
providers:
user_followup:
name: openai
api_key: ${OPENAI_API_KEY}
model: gpt-3.5-turbo
temperature: 0.8
assistant_response:
name: anthropic
api_key: ${ANTHROPIC_API_KEY}
model: claude-3-5-sonnet-20241022
temperature: 0.7
generation:
num_conversations: 500
turn_range: {min: 3, max: 8}
base_data:
source_type: file
file_path: questions.jsonl
storage:
type: mongodb
mongodb:
connection_string: mongodb://localhost:27017
database: omnigen
collection: conversations
# Checkpoint works with any storage type
checkpoint:
enabled: true
auto_save_frequency: 25
```
#### Example 4: Programmatic Configuration (Python)
```python
from omnigen.pipelines.conversation_extension import (
ConversationExtensionConfigBuilder,
ConversationExtensionPipeline
)
# Build configuration programmatically
config = (ConversationExtensionConfigBuilder()
# Workspace isolation
.set_workspace_id("user_123_session_abc")
# Providers
.add_provider(
role='user_followup',
name='ultrasafe',
api_key='your-api-key',
model='usf-mini',
temperature=0.7,
max_tokens=2048
)
.add_provider(
role='assistant_response',
name='ultrasafe',
api_key='your-api-key',
model='usf-mini',
temperature=0.7,
max_tokens=8192
)
# Generation settings
.set_generation(
num_conversations=100, # Or use 0/None to process ALL available
turn_range=(3, 8),
parallel_workers=10,
extension_mode='smart',
skip_invalid=True,
turn_calculation='additional'
)
# Data source - Local file
.set_data_source(
source_type='file',
file_path='input.jsonl',
format='conversations',
shuffle=False
)
# Data source - HuggingFace (alternative)
# .set_data_source(
# source_type='huggingface',
# hf_dataset='username/dataset',
# hf_split='train',
# hf_token='your-token',
# format='conversations',
# shuffle=True
# )
# Storage
.set_storage(
type='jsonl',
output_file='output.jsonl',
partial_file='partial.jsonl',
failed_file='failed.jsonl'
)
# Custom prompts (optional)
.set_prompts(
followup_question="Your custom prompt here with {history}"
)
.build()
)
# Run pipeline with checkpoint/resume support
pipeline = ConversationExtensionPipeline(config)
pipeline.run() # Can be interrupted and resumed automatically!
```
---
## π§ Advanced Configuration Options - Deep Dive
### System Messages Configuration
System messages allow you to inject context or instructions into conversations. You have three modes of operation:
#### 1. Prepend Always
Add a system message at the **start** of every conversation:
```yaml
system_messages:
prepend_always:
enabled: true
content: "You are a helpful AI assistant. Current time: {current_datetime} ({timezone})."
```
**Use Cases:**
- Set assistant personality/role
- Provide real-time context (date, time)
- Add global instructions
#### 2. Append Always
Add a system message at the **end** of every conversation:
```yaml
system_messages:
append_always:
enabled: true
content: "Remember to be concise and provide sources when possible."
```
**Use Cases:**
- Add final reminders
- Set response style constraints
- Add quality guidelines
#### 3. Add If Missing
Add a system message **only if** the conversation doesn't already have one:
```yaml
system_messages:
add_if_missing:
enabled: true
content: "You are an AI assistant."
```
**Use Cases:**
- Ensure all conversations have baseline instructions
- Fallback when base data lacks system messages
#### Available Template Variables
You can use these variables in your system message content:
| Variable | Description | Example Output |
|----------|-------------|----------------|
| `{current_datetime}` | Generated datetime | `2024-06-15 14:30:00` |
| `{timezone}` | Configured timezone | `UTC` or `America/New_York` |
| `{workspace_id}` | Current workspace ID | `user_123_session_abc` |
#### Complete Example - All Three Modes
```yaml
system_messages:
prepend_always:
enabled: true
content: |
You are a knowledgeable AI assistant.
Current datetime: {current_datetime} ({timezone})
Workspace: {workspace_id}
append_always:
enabled: true
content: |
IMPORTANT REMINDERS:
- Always cite sources when providing facts
- Be concise but thorough
- Ask clarifying questions when needed
add_if_missing:
enabled: true
content: "You are a helpful assistant."
```
**Note:** If both `prepend_always` and `add_if_missing` are enabled, `prepend_always` takes precedence.
#### Programmatic Usage
```python
from omnigen.pipelines.conversation_extension import ConversationExtensionConfigBuilder
config = (ConversationExtensionConfigBuilder()
# ... other config ...
.build()
)
# Add system messages to config dict
config_dict = config.to_dict()
config_dict['system_messages'] = {
'prepend_always': {
'enabled': True,
'content': 'You are a helpful assistant. Time: {current_datetime}'
}
}
```
---
### DateTime Configuration
Control how datetime values are generated and injected into system messages and conversations.
#### Mode 1: Random from Range (Default)
Generate random datetimes within a specified range:
```yaml
datetime_config:
enabled: true
mode: random_from_range
timezone: UTC # Any valid timezone
format: "%Y-%m-%d %H:%M:%S" # Python strftime format
range:
start: "2024-01-01 00:00:00"
end: "2024-12-31 23:59:59"
```
**Use Cases:**
- Training data with temporal diversity
- Simulating historical conversations
- Creating time-aware datasets
**Common Timezones:**
- `UTC` - Coordinated Universal Time
- `America/New_York` - Eastern Time
- `Europe/London` - British Time
- `Asia/Dubai` - Gulf Standard Time
- `Asia/Tokyo` - Japan Standard Time
#### Mode 2: Current Time
Use actual current time when generating:
```yaml
datetime_config:
enabled: true
mode: current
timezone: America/New_York
format: "%B %d, %Y at %I:%M %p" # December 15, 2024 at 02:30 PM
```
**Use Cases:**
- Real-time conversation simulation
- Current events discussions
- Live system demonstrations
#### Mode 3: Fixed DateTime
Use the same datetime for all conversations:
```yaml
datetime_config:
enabled: true
mode: fixed
timezone: UTC
format: "%Y-%m-%d %H:%M:%S"
fixed_datetime: "2024-06-15 12:00:00"
```
**Use Cases:**
- Consistent training data
- Specific time period simulation
- Testing and debugging
#### Format String Examples
Common datetime formats using Python's strftime:
```yaml
# ISO 8601 Format
format: "%Y-%m-%d %H:%M:%S" # 2024-12-15 14:30:00
# Human-Readable
format: "%B %d, %Y at %I:%M %p" # December 15, 2024 at 02:30 PM
# Date Only
format: "%Y-%m-%d" # 2024-12-15
# Time Only
format: "%H:%M:%S" # 14:30:00
# Custom Format
format: "%A, %B %d, %Y" # Monday, December 15, 2024
```
#### Complete DateTime Example
```yaml
datetime_config:
enabled: true
mode: random_from_range
timezone: America/New_York
format: "%A, %B %d, %Y at %I:%M %p %Z"
range:
start: "2024-01-01 09:00:00" # Business hours only
end: "2024-12-31 17:00:00"
# Then use in system messages:
system_messages:
prepend_always:
enabled: true
content: |
You are a business assistant.
Current datetime: {current_datetime}
Timezone: {timezone}
```
**Output Example:**
```
You are a business assistant.
Current datetime: Wednesday, June 15, 2024 at 02:30 PM EST
Timezone: America/New_York
```
---
### Custom Prompts Configuration
**NOTE:** Custom prompts are **completely optional**. OmniGen automatically uses optimized default prompts if the `prompts` section is not included in your configuration. Only add custom prompts if you want to override the default behavior.
#### Default Prompts
OmniGen uses optimized default prompts that work well for most use cases. You can customize them if needed:
**1. Follow-up Question Prompt** (for `user_followup` role):
```yaml
prompts:
followup_question: |
## Your Task
Generate an intelligent follow-up user question based on conversation history.
### CONVERSATION HISTORY:
{history}
### INSTRUCTIONS:
- Generate a meaningful follow-up question
- Be conversational and natural
- Vary your phrasing and tone
- Build on the assistant's last response
- Make the question specific to the conversation context
Return your follow-up question wrapped in XML tags:
<user>Your follow-up question here</user>
```
**2. Assistant Response Prompt** (for `assistant_response` role):
```yaml
prompts:
assistant_response: |
## Your Task
Generate a helpful assistant response based on the conversation history.
### CONVERSATION HISTORY:
{history}
### INSTRUCTIONS:
- Provide accurate and helpful information
- Be conversational and friendly
- Reference previous context when relevant
- Keep responses focused and concise
Return your response wrapped in XML tags:
<assistant>Your response here</assistant>
```
#### Available Template Variables
| Variable | Description | Content |
|----------|-------------|---------|
| `{history}` | Full conversation history | All previous messages formatted as text |
#### Custom Prompt Examples
**Example 1: Technical Documentation Assistant**
```yaml
prompts:
followup_question: |
Generate a technical follow-up question from a developer's perspective.
CONVERSATION:
{history}
Create a question that:
- Asks about implementation details
- Seeks code examples or best practices
- Explores edge cases or potential issues
<user>Your technical question</user>
assistant_response: |
Provide a detailed technical response with code examples.
CONVERSATION:
{history}
Your response should:
- Include code snippets when helpful
- Explain technical concepts clearly
- Provide links to documentation
- Mention potential pitfalls
<assistant>Your technical response</assistant>
```
**Example 2: Customer Support Simulation**
```yaml
prompts:
followup_question: |
Simulate a customer asking for help.
CONVERSATION:
{history}
Generate a question that:
- Shows frustration or confusion (realistic)
- Asks for clarification on previous response
- Requests specific solutions or workarounds
<user>Customer question</user>
assistant_response: |
Provide empathetic customer support.
CONVERSATION:
{history}
Your response must:
- Show empathy and understanding
- Provide clear step-by-step solutions
- Offer alternatives if applicable
- End with "Is there anything else I can help with?"
<assistant>Support response</assistant>
```
**Example 3: Educational Tutor**
```yaml
prompts:
followup_question: |
Generate a student's follow-up question showing learning progression.
CONTEXT:
{history}
The question should:
- Build on what was just explained
- Show either understanding or confusion
- Ask for examples or clarification
- Demonstrate curiosity about the topic
<user>Student question</user>
assistant_response: |
Respond as a patient, knowledgeable tutor.
CONTEXT:
{history}
Your response should:
- Use simple, clear language
- Provide concrete examples
- Check for understanding
- Encourage further questions
<assistant>Tutor response</assistant>
```
#### Programmatic Prompt Configuration
```python
from omnigen.pipelines.conversation_extension import ConversationExtensionConfigBuilder
custom_prompts = {
'followup_question': """
Generate a follow-up based on: {history}
Make it specific and contextual.
<user>question</user>
""",
'assistant_response': """
Respond helpfully to: {history}
Be clear and concise.
<assistant>response</assistant>
"""
}
config = (ConversationExtensionConfigBuilder()
# ... other config ...
.set_prompts(**custom_prompts)
.build()
)
```
---
### Debug Configuration
Enable detailed logging and monitoring for troubleshooting and optimization.
#### Debug Options
```yaml
debug:
log_api_timing: true # Log API call duration and performance
log_parallel_status: true # Log parallel worker status and progress
verbose: false # Enable verbose logging (all operations)
```
#### Option Details
**1. API Timing Logs** (`log_api_timing: true`)
Tracks API performance for each call:
```
[API TIMING] user_followup request took 1.23s
[API TIMING] assistant_response request took 2.45s
[API TIMING] Average response time: 1.84s
```
**Use Cases:**
- Identify slow API providers
- Optimize provider selection
- Monitor rate limits
- Debug timeout issues
**2. Parallel Status Logs** (`log_parallel_status: true`)
Shows worker activity in real-time:
```
[PARALLEL] Worker 1/10: Processing conversation 5
[PARALLEL] Worker 2/10: Processing conversation 6
[PARALLEL] Worker 3/10: Waiting for task...
[PARALLEL] Progress: 45/100 conversations complete (45%)
[PARALLEL] Active workers: 8/10 | Queue: 12 remaining
```
**Use Cases:**
- Monitor parallel processing
- Identify bottlenecks
- Optimize worker count
- Track progress in real-time
**3. Verbose Logging** (`verbose: true`)
Enables comprehensive logging of all operations:
```
[DEBUG] Loading base data from: data/input.jsonl
[DEBUG] Loaded 100 base conversations
[DEBUG] Initializing provider: ultrasafe (usf-mini)
[DEBUG] Starting parallel generation with 10 workers
[DEBUG] Conversation 1: Processing...
[DEBUG] Conversation 1: Generated 5 turns
[DEBUG] Conversation 1: Saved to output.jsonl
[DEBUG] Final stats: 95 success, 3 partial, 2 failed
```
**Use Cases:**
- Development and testing
- Debugging issues
- Understanding pipeline flow
- Troubleshooting data problems
#### Complete Debug Configuration
```yaml
# Full debug mode for development
debug:
log_api_timing: true
log_parallel_status: true
verbose: true
# Production mode (minimal logging)
debug:
log_api_timing: false
log_parallel_status: false
verbose: false
# Performance monitoring mode
debug:
log_api_timing: true # Track API performance
log_parallel_status: true # Monitor workers
verbose: false # Don't flood logs
```
#### Programmatic Debug Configuration
```python
from omnigen.pipelines.conversation_extension import ConversationExtensionConfigBuilder
config = (ConversationExtensionConfigBuilder()
# ... other config ...
.build()
)
# Add debug settings
config_dict = config.to_dict()
config_dict['debug'] = {
'log_api_timing': True,
'log_parallel_status': True,
'verbose': False
}
```
#### Debug Output Examples
**Scenario 1: API Performance Issue**
```yaml
debug:
log_api_timing: true
```
Output helps identify slow providers:
```
[API TIMING] openai/gpt-4-turbo: 0.8s
[API TIMING] anthropic/claude-3-5-sonnet: 3.2s β SLOW!
[API TIMING] ultrasafe/usf-mini: 0.5s
```
**Scenario 2: Parallel Processing Bottleneck**
```yaml
debug:
log_parallel_status: true
```
Output shows worker utilization:
```
[PARALLEL] Active: 3/10 workers β Only 30% utilized!
[PARALLEL] Queue: 0 tasks remaining
[PARALLEL] Suggestion: Reduce worker count to 5
```
**Scenario 3: Data Loading Issues**
```yaml
debug:
verbose: true
```
Output reveals the problem:
```
[DEBUG] Loading: data/input.jsonl
[DEBUG] Line 1: Valid β
[DEBUG] Line 2: Valid β
[DEBUG] Line 3: ERROR - First message not from user
[DEBUG] Line 4: Valid β
[DEBUG] Loaded: 3 valid, 1 invalid
```
---
## π Conversation Extension Pipeline - Complete Guide
### Overview
The **Conversation Extension Pipeline** intelligently transforms base conversations into rich multi-turn dialogues. It can handle both single-turn questions and extend existing multi-turn conversations.
### Key Features
- β
**Smart Extension** - Continues from existing conversations based on last role
- β
**Flexible Input** - Handles single-turn or multi-turn base data
- β
**Provider Mix** - Use different AI providers for user and assistant
- β
**Multi-Tenant** - Complete workspace isolation
- β
**Configurable** - Full control over generation behavior
### Configuration Options
#### Extension Modes
**Smart Mode (Default)**
```yaml
generation:
extension_mode: "smart"
```
- **Single-turn input** β Generate new conversation from scratch
- **Multi-turn (user last)** β Add 1 assistant response, then continue
- **Multi-turn (assistant last)** β Add user + assistant, then continue
- **Invalid patterns** β Skip row entirely
**Legacy Mode**
```yaml
generation:
extension_mode: "legacy"
```
- Always extracts first user message only (original behavior)
#### Turn Calculation
**Additional Mode (Default)** - Add NEW turns on top of existing
```yaml
generation:
turn_calculation: "additional" # Add 3-8 NEW turns
```
**Total Mode** - Keep total within range (never removes existing)
```yaml
generation:
turn_calculation: "total" # Total should be 3-8 turns
```
#### Complete Configuration
```yaml
# Workspace isolation (optional)
workspace_id: "user_123"
# AI Providers - Smart defaults applied!
providers:
user_followup:
name: "ultrasafe"
api_key: "${ULTRASAFE_API_KEY}"
# Defaults: usf-mini, 0.7 temp, 4096 tokens
max_tokens: 2048 # Override default if needed
assistant_response:
name: "ultrasafe"
api_key: "${ULTRASAFE_API_KEY}"
# Defaults: usf-mini, 0.7 temp, 4096 tokens
max_tokens: 8192 # Override for detailed responses
# Generation Settings
generation:
num_conversations: 100
turn_range:
min: 3
max: 8
parallel_workers: 10
# Extension behavior
extension_mode: "smart" # "smart" | "legacy"
skip_invalid: true # Skip invalid patterns
turn_calculation: "additional" # "additional" | "total"
# Input Data
base_data:
enabled: true
source_type: "file"
file_path: "base_data.jsonl"
format: "conversations"
shuffle: false
# Output Storage
storage:
type: "jsonl"
output_file: "output.jsonl"
partial_file: "partial.jsonl"
failed_file: "failed.jsonl"
# Checkpoint/Resume (recommended for large datasets)
checkpoint:
enabled: true
checkpoint_file: "checkpoint.json"
auto_save_frequency: 10
validate_input_hash: true
resume_mode: "auto"
# System Messages (optional)
system_messages:
add_if_missing:
enabled: true
content: "You are a helpful assistant. Current datetime: {current_datetime}"
# DateTime (optional)
datetime_config:
enabled: true
timezone: "UTC"
format: "%Y-%m-%d %H:%M:%S"
range:
start_date: "2024-01-01"
end_date: "2024-12-31"
```
### Input Data Formats
#### Valid Patterns
**Single-turn** β
```json
{"conversations": [{"role": "user", "content": "How do I learn Python?"}]}
```
**Multi-turn (user last)** β
```json
{
"conversations": [
{"role": "user", "content": "How do I learn Python?"},
{"role": "assistant", "content": "Start with basics..."},
{"role": "user", "content": "What resources?"}
]
}
```
**Multi-turn (assistant last)** β
```json
{
"conversations": [
{"role": "user", "content": "How do I learn Python?"},
{"role": "assistant", "content": "Start with basics..."}
]
}
```
#### Invalid Patterns (Skipped)
β First message not user
```json
{"conversations": [{"role": "assistant", "content": "Hello"}]}
```
β Empty conversations
```json
{"conversations": []}
```
### Programmatic Usage
```python
from omnigen.pipelines.conversation_extension import (
ConversationExtensionConfigBuilder,
ConversationExtensionPipeline
)
config = (ConversationExtensionConfigBuilder()
.add_provider('user_followup', 'ultrasafe', 'api-key', 'usf-mini')
.add_provider('assistant_response', 'ultrasafe', 'api-key', 'usf-mini')
.set_generation(
num_conversations=100,
turn_range=(3, 8),
parallel_workers=10,
extension_mode='smart', # Handle multi-turn intelligently
skip_invalid=True, # Skip invalid patterns
turn_calculation='additional' # Add new turns (default)
)
.set_data_source('file', file_path='base_data.jsonl')
.set_storage('jsonl', output_file='output.jsonl')
.set_checkpoint(enabled=True, auto_save_frequency=10) # Enable checkpoint
.build()
)
pipeline = ConversationExtensionPipeline(config)
pipeline.run() # Automatically resumes if interrupted!
```
### Turn Calculation Examples
**Additional Mode (Default)**
```
Existing: 2 turns
Config: turn_range = (3, 8)
Result: Add 3-8 NEW turns β Total: 5-10 turns
```
**Total Mode**
```
Existing: 2 turns
Config: turn_range = (3, 8)
Result: Add 1-6 turns β Total: 3-8 turns
Existing: 10 turns (already > max)
Config: turn_range = (3, 8)
Result: Add 0 turns β Keep 10 turns (never remove)
```
### Best Practices
**Provider Selection**
- Use better models for assistant (claude-3-5-sonnet, gpt-4-turbo)
- Use cheaper models for user followups (usf-mini, gpt-3.5-turbo)
**Turn Range**
- Quick exchanges: `(2, 4)`
- In-depth: `(5, 10)`
- Balanced: `(3, 8)` β
**Parallel Workers**
- Conservative: `5` (avoid rate limits)
- Balanced: `10` β
- Aggressive: `20` (watch for rate limits)
### Troubleshooting
**Issue: Empty output**
- Check input data format (first message must be user)
- Set `skip_invalid: false` to see errors
**Issue: Rate limits**
- Reduce `parallel_workers`
- Check provider API limits
- Enable checkpoint to preserve progress: `checkpoint.enabled: true`
**Issue: Pipeline interrupted**
- β
Don't worry! Progress is automatically saved if checkpoint enabled
- Simply run again - it will resume from where it stopped
- Check `checkpoint.json` for current state
**Issue: Low quality**
- Increase temperature (0.8-0.9)
- Use better models
- Add custom prompts and system messages
**Issue: Duplicate conversations in output**
- Checkpoint system prevents this automatically
- Both partial and complete saved to same file with `status` field
- Filter completed: `jq 'select(.status == "completed")' output.jsonl`
---
## License
MIT License - Ultrasafe AI Β© 2024
---
## About Ultrasafe AI
Enterprise-grade AI tools with focus on safety and performance.
- π Website: [us.inc](https://us.inc)
- π§ Email: support@us.inc
---
<div align="center">
**Made with β€οΈ by [Ultrasafe AI](https://us.inc)**
</div>
Raw data
{
"_id": null,
"home_page": null,
"name": "omnigen-usf",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.8",
"maintainer_email": "Ultrasafe AI <support@us.inc>",
"keywords": "synthetic-data, data-generation, conversational-ai, llm, pipeline, conversation-extension, machine-learning, ai",
"author": null,
"author_email": "Ultrasafe AI <support@us.inc>",
"download_url": "https://files.pythonhosted.org/packages/21/10/c0edb3f185aa9f583240b984d10996a310870b8ada2076dc0e5e9393f92d/omnigen_usf-0.0.1.post9.tar.gz",
"platform": null,
"description": "# OmniGen \ud83d\ude80\n\n**Generate synthetic data at scale using an enterprise-ready framework with full customizable configuration, security, and ease of use**\n\nBuilt by [Ultrasafe AI](https://us.inc) for production environments.\n\n---\n\n## What is OmniGen?\n\n**OmniGen** is an enterprise-grade framework for generating synthetic datasets at scale\u2014from scratch or from base data. Generate **trillions of tokens** and **billions of samples** across multiple modalities:\n\n### \ud83c\udfaf Data Types Supported\n- \ud83d\udcac **Conversational Data** - Single-turn to multi-turn dialogues\n- \ud83e\udd16 **Agentic Datasets** - Tool use, function calling, multi-step reasoning\n- \ud83c\udfa8 **Multimodal Datasets** - Text, images, audio, video combinations\n- \ud83d\uddbc\ufe0f **Images** - Synthetic image generation and editing\n- \ud83c\udfb5 **Audio** - Speech, music, sound effects\n- \ud83c\udfac **Video** - Synthetic video sequences\n\n### \ud83c\udf93 Use Cases\n- **Fine-Tuning** - Instruction following, task-specific models\n- **Supervised Fine-Tuning (SFT)** - High-quality labeled datasets\n- **Offline Reinforcement Learning** - Preference datasets with rewards\n- **Online Reinforcement Learning** - Ground truth with reward checking scripts\n- **Pre-Training** - Large-scale corpus generation\n- **Machine Learning** - Training data for any ML task\n\n### \ud83c\udfd7\ufe0f Why OmniGen?\n- \u2705 **Enterprise-Ready** - Built for production at scale\n- \u2705 **Fully Customizable** - Configure every aspect of generation\n- \u2705 **Secure** - Complete isolation, no data mixing\n- \u2705 **Easy** - Simple API, clear examples\n- \u2705 **Modular** - Independent pipelines for different data types\n\n---\n\n## \ud83d\ude80 Currently Available Pipeline\n\n### **conversation_extension** - Extend Single-Turn to Multi-Turn Conversations\n\nTurn your base questions into rich multi-turn dialogues. This is just the first pipeline\u2014more coming soon!\n\n---\n\n## Why OmniGen?\n\n\u2705 **Simple** - One command to generate thousands of conversations\n\u2705 **Scalable** - Parallel processing for fast generation\n\u2705 **Flexible** - Mix different AI providers (OpenAI, Anthropic, Ultrasafe AI)\n\u2705 **Production Ready** - Built for SaaS platforms with multi-tenant support\n\u2705 **Fault Tolerant** - Checkpoint/resume system prevents data loss from any interruption\n\n---\n\n## Quick Start\n\n### 1. Install\n\n```bash\npip install omnigen-usf\n```\n\n### 2. Prepare Base Data\n\nCreate a file `base_data.jsonl` with your starting questions:\n\n```jsonl\n{\"conversations\": [{\"role\": \"user\", \"content\": \"How do I learn Python?\"}]}\n{\"conversations\": [{\"role\": \"user\", \"content\": \"What is machine learning?\"}]}\n{\"conversations\": [{\"role\": \"user\", \"content\": \"Explain neural networks\"}]}\n```\n\n### 3. Generate Conversations\n\nYou can configure the pipeline in **two ways**:\n\n#### Option A: Using YAML Configuration File\n\nCreate a `config.yaml` file:\n\n```yaml\n# NEW: Minimal config with smart defaults!\nproviders:\n user_followup:\n name: ultrasafe\n api_key: ${ULTRASAFE_API_KEY}\n # Defaults: usf-mini, 0.7 temp, 4096 tokens \u2713\n assistant_response:\n name: ultrasafe\n api_key: ${ULTRASAFE_API_KEY}\n # Defaults: usf-mini, 0.7 temp, 4096 tokens \u2713\n\ngeneration:\n num_conversations: 100\n turn_range: {min: 3, max: 8}\n\nbase_data:\n source_type: file\n file_path: base_data.jsonl\n\nstorage:\n type: jsonl\n output_file: output.jsonl\n```\n\nThen load and run:\n\n```python\nfrom omnigen.pipelines.conversation_extension import (\n ConversationExtensionConfig,\n ConversationExtensionPipeline\n)\n\n# Load configuration from YAML file\nconfig = ConversationExtensionConfig.from_yaml('config.yaml')\n\n# Run the pipeline\npipeline = ConversationExtensionPipeline(config)\npipeline.run()\n```\n\n#### Option B: Using Programmatic Configuration (Python)\n\n```python\nfrom omnigen.pipelines.conversation_extension import (\n ConversationExtensionConfigBuilder,\n ConversationExtensionPipeline\n)\n\n# Configure the pipeline programmatically\nconfig = (ConversationExtensionConfigBuilder()\n # User followup generator - minimal config!\n .add_provider(\n role='user_followup',\n name='ultrasafe',\n api_key='your-api-key'\n # Defaults: usf-mini, 0.7 temp, 4096 tokens \u2713\n )\n # Assistant response generator\n .add_provider(\n role='assistant_response',\n name='ultrasafe',\n api_key='your-api-key'\n # Defaults: usf-mini, 0.7 temp, 4096 tokens \u2713\n )\n # Generation settings\n .set_generation(\n num_conversations=100,\n turn_range=(3, 8) # 3-8 turns per conversation\n )\n # Input data\n .set_data_source(\n source_type='file',\n file_path='base_data.jsonl'\n )\n # Output\n .set_storage(\n type='jsonl',\n output_file='output.jsonl'\n )\n .build()\n)\n\n# Run the pipeline\npipeline = ConversationExtensionPipeline(config)\npipeline.run()\n```\n\n### 4. Get Results\n\nYour generated conversations will be in `output.jsonl`:\n\n```jsonl\n{\n \"id\": 0,\n \"conversations\": [\n {\"role\": \"user\", \"content\": \"How do I learn Python?\"},\n {\"role\": \"assistant\", \"content\": \"Great choice! Start with the basics...\"},\n {\"role\": \"user\", \"content\": \"What resources do you recommend?\"},\n {\"role\": \"assistant\", \"content\": \"I recommend these resources...\"},\n {\"role\": \"user\", \"content\": \"How long will it take?\"},\n {\"role\": \"assistant\", \"content\": \"With consistent practice...\"}\n ],\n \"num_turns\": 3,\n \"success\": true\n}\n```\n\n---\n\n## Supported AI Providers\n\n**NEW: Smart defaults automatically applied!** Only specify `name` and `api_key` - model, temperature, and max_tokens use optimized defaults.\n\n| Provider | Default Model | Default Temp | Default Tokens |\n|----------|---------------|--------------|----------------|\n| **Ultrasafe AI** | `usf-mini` | `0.7` | `4096` |\n| **OpenAI** | `gpt-4-turbo` | `0.7` | `4096` |\n| **Anthropic** | `claude-3-5-sonnet-20241022` | `0.7` | `4096` |\n| **OpenRouter** | `openai/gpt-4-turbo` | `0.7` | `4096` |\n\n**Override defaults as needed:**\n```yaml\nproviders:\n user_followup:\n name: openai\n api_key: ${OPENAI_API_KEY}\n temperature: 0.9 # Override only what you need\n```\n\n### Mix Different Providers\n\n```python\nconfig = (ConversationExtensionConfigBuilder()\n .add_provider('user_followup', 'openai', api_key, 'gpt-4-turbo')\n .add_provider('assistant_response', 'anthropic', api_key, 'claude-3-5-sonnet')\n # ... rest of config\n .build()\n)\n```\n\n---\n\n## Advanced Features\n\n### \ud83d\udd04 Checkpoint & Resume System\n\n**NEW!** Automatic checkpoint/resume functionality prevents data loss and duplication from any interruption.\n\n#### Overview\n\nThe checkpoint system automatically saves progress and enables seamless resume from where you left off - handling manual stops (Ctrl+C), errors, rate limits, server failures, and crashes without losing data or creating duplicates.\n\n#### How It Works\n\n**Conversation Tracking:**\n- **Position-based ID**: Index in base data file (for ordering)\n- **Content Hash**: SHA256 hash of conversation (for deduplication)\n- **Combined Key**: `{position}_{hash[:8]}` for unique identification\n\n**Checkpoint Saves:**\n- Automatically after every N conversations (configurable)\n- On graceful shutdown (Ctrl+C, SIGTERM)\n- Uses atomic file operations (write temp \u2192 sync \u2192 atomic rename)\n- Validates input file integrity with SHA256 hash\n\n#### Configuration\n\n```yaml\ncheckpoint:\n enabled: true # Enable checkpoint/resume\n checkpoint_file: \"checkpoint.json\" # Path to checkpoint file\n auto_save_frequency: 10 # Save every 10 conversations\n validate_input_hash: true # Verify input hasn't changed\n resume_mode: \"auto\" # auto | manual | fresh\n```\n\n**Configuration Options:**\n\n| Option | Description | Default |\n|--------|-------------|---------|\n| `enabled` | Enable checkpoint/resume | `true` |\n| `checkpoint_file` | Path to checkpoint file | `workspaces/{workspace_id}/checkpoint.json` |\n| `auto_save_frequency` | Save checkpoint every N conversations | `10` |\n| `validate_input_hash` | Verify input file unchanged on resume | `true` |\n| `resume_mode` | Resume behavior: `auto`, `manual`, or `fresh` | `auto` |\n\n#### Usage Examples\n\n**YAML Configuration:**\n```yaml\nworkspace_id: \"production_run\"\n\ncheckpoint:\n enabled: true\n checkpoint_file: \"checkpoint.json\"\n auto_save_frequency: 10\n validate_input_hash: true\n resume_mode: \"auto\"\n\nproviders:\n user_followup:\n name: ultrasafe\n api_key: ${ULTRASAFE_API_KEY}\n assistant_response:\n name: ultrasafe\n api_key: ${ULTRASAFE_API_KEY}\n\ngeneration:\n num_conversations: 1000\n turn_range: {min: 3, max: 8}\n parallel_workers: 10\n\nbase_data:\n source_type: file\n file_path: input.jsonl\n\nstorage:\n type: jsonl\n output_file: output.jsonl\n```\n\n**Programmatic Configuration:**\n```python\nfrom omnigen.pipelines.conversation_extension import (\n ConversationExtensionConfigBuilder,\n ConversationExtensionPipeline\n)\n\nconfig = (ConversationExtensionConfigBuilder()\n .add_provider('user_followup', 'ultrasafe', api_key, 'usf-mini')\n .add_provider('assistant_response', 'ultrasafe', api_key, 'usf-mini')\n .set_generation(num_conversations=1000, turn_range=(3, 8))\n .set_data_source('file', file_path='input.jsonl')\n .set_storage('jsonl', output_file='output.jsonl')\n .set_checkpoint(\n enabled=True,\n checkpoint_file='checkpoint.json',\n auto_save_frequency=10,\n validate_input_hash=True,\n resume_mode='auto'\n )\n .build()\n)\n\npipeline = ConversationExtensionPipeline(config)\npipeline.run() # Can interrupt and resume anytime!\n```\n\n#### Resume Behavior\n\n**First Run (No Checkpoint):**\n```\n==========================================\nCONVERSATION EXTENSION PIPELINE\n==========================================\nBase conversations: 1000\nGenerating: 1000\nParallel workers: 10\n==========================================\nGenerating: 15%|\u2588\u2588\u2588\u2591\u2591\u2591\u2591\u2591\u2591\u2591\u2591\u2591| 150/1000 [\u2713120 \u26a020 \u271710]\n^C\n\u26a0\ufe0f Shutdown signal received. Saving checkpoint...\n\u2713 Final checkpoint saved\n\u26a0\ufe0f Interrupted. Progress saved in checkpoint.\n```\n\n**Resume Run (Checkpoint Exists):**\n```\n==========================================\nRESUMING FROM CHECKPOINT\n==========================================\nPrevious Run: 2025-10-04 16:30:00 UTC\nAlready Processed: 150 (\u2713120 \u26a020 \u271710 ~0)\nResuming Partials: 20\nRemaining: 850\nParallel workers: 10\n==========================================\nGenerating: 100%|\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2588| 1000/1000 [\u2713850 \u26a0100 \u271750]\n\n==========================================\nGENERATION COMPLETE\n==========================================\n\u2713 Complete: 850\n\u26a0 Partial: 100\n\u2717 Failed: 50\n\u2501\u2501\u2501\u2501\u2501\u2501\u2501\u2501\u2501\u2501\u2501\u2501\u2501\u2501\u2501\u2501\u2501\u2501\u2501\u2501\u2501\u2501\u2501\u2501\u2501\u2501\u2501\u2501\u2501\u2501\u2501\u2501\u2501\u2501\u2501\u2501\u2501\u2501\u2501\u2501\n\ud83d\udcbe Saved: 950 (95.0%)\n==========================================\n```\n\n#### Output Format\n\nBoth partial and complete conversations saved to **same output file** with status indicators:\n\n```jsonl\n{\"id\": 0, \"status\": \"completed\", \"is_complete\": true, \"conversations\": [...], \"num_turns\": 5}\n{\"id\": 1, \"status\": \"partial\", \"is_complete\": false, \"conversations\": [...], \"num_turns\": 3}\n{\"id\": 2, \"status\": \"completed\", \"is_complete\": true, \"conversations\": [...], \"num_turns\": 7}\n```\n\n**Status Values:**\n- `\"completed\"` - Successfully generated all requested turns\n- `\"partial\"` - Interrupted during generation, may be incomplete\n- `\"failed\"` - Error occurred, saved to failed file\n\n**Filtering Results:**\n```bash\n# Get only completed conversations\njq 'select(.status == \"completed\")' output.jsonl > completed.jsonl\n\n# Get partial conversations (to review/complete)\njq 'select(.status == \"partial\")' output.jsonl > partial.jsonl\n\n# Count by status\njq -s 'group_by(.status) | map({status: .[0].status, count: length})' output.jsonl\n```\n\n#### Key Features\n\n\u2705 **Zero Data Loss**\n- Progress saved automatically at configurable intervals\n- Every conversation saved before moving to next\n- Atomic file operations prevent corruption\n\n\u2705 **No Duplicates**\n- Hybrid identification (position + content hash)\n- Skips already processed conversations automatically\n- Prevents reprocessing on resume\n\n\u2705 **Partial Resume**\n- Continues incomplete multi-turn conversations\n- Resumes from exact turn where interrupted\n- Preserves all generated content\n\n\u2705 **Universal Coverage**\n- Handles ALL interruption types:\n - Manual stops (Ctrl+C)\n - Server/connection failures\n - Rate limit errors\n - System crashes\n - Any other errors\n\n\u2705 **Input Validation**\n- SHA256 hash verification of input file\n- Detects if input changed between runs\n- Prevents processing wrong data\n\n\u2705 **Safety Features**\n- Atomic file operations (corruption-proof)\n- Automatic backup of corrupted checkpoints\n- Graceful shutdown handlers (SIGINT/SIGTERM)\n\n#### Checkpoint Structure\n\n```json\n{\n \"version\": \"1.0\",\n \"run_id\": \"unique-uuid\",\n \"started_at\": \"2025-10-04T20:00:00.000Z\",\n \"last_checkpoint_at\": \"2025-10-04T20:15:30.000Z\",\n \"base_data\": {\n \"source_type\": \"file\",\n \"file_path\": \"input.jsonl\",\n \"file_hash\": \"sha256-hash\",\n \"total_available\": 1000\n },\n \"progress\": {\n \"total_processed\": 150,\n \"completed\": 120,\n \"partial\": 20,\n \"failed\": 10,\n \"last_position\": 149\n },\n \"processed_records\": [\n {\n \"position\": 0,\n \"content_hash\": \"a1b2c3d4\",\n \"status\": \"completed\",\n \"turns_generated\": 5,\n \"processed_at\": \"2025-10-04T20:01:00.000Z\"\n }\n ],\n \"partial_states\": {\n \"5_b2c3d4e5\": {\n \"position\": 5,\n \"conversation\": [...],\n \"turns_completed\": 3,\n \"target_turns\": 5\n }\n }\n}\n```\n\n#### Best Practices\n\n1. **Set Appropriate Save Frequency**\n ```yaml\n checkpoint:\n auto_save_frequency: 10 # Balance performance vs safety\n ```\n - Lower (5-10): Better safety, slight overhead\n - Higher (50-100): Better performance, more risk\n\n2. **Use Workspace Isolation**\n ```python\n config = ConversationExtensionConfigBuilder(\n workspace_id=\"unique_project_id\" # Prevents conflicts\n )\n ```\n\n3. **Monitor Checkpoint Size**\n - Large checkpoints (>100MB) may slow saves\n - Consider processing in batches for huge datasets\n\n4. **Backup Important Checkpoints**\n ```bash\n cp checkpoint.json checkpoint.backup.json\n ```\n\n5. **Clean Up After Completion**\n ```bash\n # After successful run\n rm checkpoint.json\n ```\n\n#### Troubleshooting\n\n**Problem: Checkpoint not resuming**\n- Check `checkpoint.enabled` is `true`\n- Verify checkpoint file exists at specified path\n- Ensure `resume_mode` is not set to `\"fresh\"`\n- Review logs for validation errors\n\n**Problem: Input file changed warning**\n- If intentional: Set `validate_input_hash: false`\n- If unintentional: Restore original input file\n- Or delete checkpoint to start fresh\n\n**Problem: Partial conversations not resuming**\n- Verify `partial_states` exists in checkpoint\n- Check conversation has `_position` and `_content_hash`\n- Review logs for partial state loading errors\n\n**Problem: Duplicate conversations**\n- Checkpoint prevents this automatically\n- Check both `position` and `content_hash` are tracked\n- Verify hybrid identification is working\n\n#### Performance Impact\n\n- **Checkpoint overhead**: ~10-50ms per save (amortized)\n- **Memory usage**: Minimal (lightweight data structure)\n- **Disk space**: ~1-2KB per conversation in checkpoint\n- **Resume speed**: Instant (IDs loaded into memory set)\n\n#### Example: Complete Workflow\n\n```python\nimport os\nfrom omnigen.pipelines.conversation_extension import (\n ConversationExtensionConfigBuilder,\n ConversationExtensionPipeline\n)\n\n# Build configuration\nconfig = (ConversationExtensionConfigBuilder(workspace_id=\"production_v1\")\n .add_provider(\n role=\"user_followup\",\n name=\"openai\",\n api_key=os.getenv(\"OPENAI_API_KEY\"),\n model=\"gpt-4o-mini\"\n )\n .add_provider(\n role=\"assistant_response\",\n name=\"openai\",\n api_key=os.getenv(\"OPENAI_API_KEY\"),\n model=\"gpt-4o-mini\"\n )\n .set_generation(\n num_conversations=10000,\n turn_range=(3, 8),\n parallel_workers=20\n )\n .set_data_source(\n source_type=\"file\",\n file_path=\"data/base_conversations.jsonl\"\n )\n .set_storage(\n type=\"jsonl\",\n output_file=\"output.jsonl\"\n )\n .set_checkpoint(\n enabled=True,\n checkpoint_file=\"checkpoint.json\",\n auto_save_frequency=50,\n validate_input_hash=True,\n resume_mode=\"auto\"\n )\n .build()\n)\n\n# Run with automatic resume\npipeline = ConversationExtensionPipeline(config)\npipeline.run()\n\n# Can be interrupted and resumed multiple times\n# Progress is never lost!\n```\n\n\ud83d\udcdd **See also**: [`examples/checkpoint_resume_example.py`](examples/conversation_extension/checkpoint_resume_example.py) for a complete working example with checkpoint inspection utilities.\n\n---\n\n### Multi-Tenant SaaS Support\n\nPerfect for platforms serving multiple users concurrently:\n\n```python\n# Each user gets isolated workspace\nworkspace_id = f\"user_{user_id}_session_{session_id}\"\n\nconfig = (ConversationExtensionConfigBuilder(workspace_id=workspace_id)\n .add_provider('user_followup', 'ultrasafe', shared_api_key, 'usf-mini')\n .add_provider('assistant_response', 'ultrasafe', shared_api_key, 'usf-mini')\n .set_storage('jsonl', output_file='output.jsonl') # Auto-isolated\n .build()\n)\n\n# Storage automatically goes to: workspaces/{workspace_id}/output.jsonl\n```\n\n### Parallel Dataset Generation\n\n```python\nfrom concurrent.futures import ProcessPoolExecutor\n\ndef process_dataset(input_file, output_file):\n config = (ConversationExtensionConfigBuilder()\n .add_provider('user_followup', 'ultrasafe', api_key, 'usf-mini')\n .add_provider('assistant_response', 'ultrasafe', api_key, 'usf-mini')\n .set_data_source('file', file_path=input_file)\n .set_storage('jsonl', output_file=output_file)\n .build()\n )\n ConversationExtensionPipeline(config).run()\n\n# Process 3 datasets in parallel\nwith ProcessPoolExecutor(max_workers=3) as executor:\n executor.submit(process_dataset, 'data1.jsonl', 'out1.jsonl')\n executor.submit(process_dataset, 'data2.jsonl', 'out2.jsonl')\n executor.submit(process_dataset, 'data3.jsonl', 'out3.jsonl')\n```\n\n## \ud83d\udcdd Configuration Methods\n\nOmniGen supports **multiple ways** to configure your pipeline. Choose the method that best fits your workflow:\n\n### Method 1: YAML Configuration File \u2b50 (Recommended)\n\nThe most flexible and maintainable approach:\n\n**Step 1:** Create `config.yaml`\n```yaml\n# NEW: Minimal configuration with smart defaults\nproviders:\n user_followup:\n name: ultrasafe\n api_key: ${ULTRASAFE_API_KEY} # Only name and API key required!\n # Defaults: usf-mini, 0.7 temp, 4096 tokens \u2713\n assistant_response:\n name: ultrasafe\n api_key: ${ULTRASAFE_API_KEY}\n # Defaults: usf-mini, 0.7 temp, 4096 tokens \u2713\n\ngeneration:\n num_conversations: 100\n turn_range: {min: 3, max: 8}\n\nbase_data:\n source_type: file\n file_path: base_data.jsonl\n\nstorage:\n type: jsonl\n output_file: output.jsonl\n```\n\n**Step 2:** Load and run\n```python\nfrom omnigen.pipelines.conversation_extension import (\n ConversationExtensionConfig,\n ConversationExtensionPipeline\n)\n\n# Load from YAML file\nconfig = ConversationExtensionConfig.from_yaml('config.yaml')\n\n# Run pipeline\npipeline = ConversationExtensionPipeline(config)\npipeline.run()\n```\n\n**\u2705 Benefits:**\n- Easy to version control and share\n- Environment variable support with `${VAR_NAME}`\n- No code changes needed for config updates\n- Clear, readable configuration\n\n### Method 2: Programmatic Configuration (Python)\n\nBuild configuration directly in code:\n\n```python\nfrom omnigen.pipelines.conversation_extension import (\n ConversationExtensionConfigBuilder,\n ConversationExtensionPipeline\n)\n\nconfig = (ConversationExtensionConfigBuilder()\n .add_provider('user_followup', 'ultrasafe', 'api-key', 'usf-mini')\n .add_provider('assistant_response', 'ultrasafe', 'api-key', 'usf-mini')\n .set_generation(num_conversations=100, turn_range=(3, 8))\n .set_data_source('file', file_path='base_data.jsonl')\n .set_storage('jsonl', output_file='output.jsonl')\n .build()\n)\n\npipeline = ConversationExtensionPipeline(config)\npipeline.run()\n```\n\n**\u2705 Benefits:**\n- Type safety and IDE autocomplete\n- Dynamic configuration based on runtime\n- No external files needed\n- Easy application integration\n\n### Method 3: Dictionary Configuration\n\nCreate from Python dictionary:\n\n```python\nfrom omnigen.pipelines.conversation_extension import (\n ConversationExtensionConfig,\n ConversationExtensionPipeline\n)\n\nconfig_dict = {\n 'providers': {\n 'user_followup': {'name': 'ultrasafe', 'api_key': 'key', 'model': 'usf-mini'},\n 'assistant_response': {'name': 'ultrasafe', 'api_key': 'key', 'model': 'usf-mini'}\n },\n 'generation': {'num_conversations': 100, 'turn_range': {'min': 3, 'max': 8}},\n 'base_data': {'source_type': 'file', 'file_path': 'base_data.jsonl'},\n 'storage': {'type': 'jsonl', 'output_file': 'output.jsonl'}\n}\n\nconfig = ConversationExtensionConfig.from_dict(config_dict)\npipeline = ConversationExtensionPipeline(config)\npipeline.run()\n```\n\n**\u2705 Benefits:**\n- Load from JSON files or APIs\n- Easy programmatic modification\n- Flexible for dynamic scenarios\n\n### Method 4: Hybrid Approach\n\nCombine YAML with programmatic overrides:\n\n```python\nfrom omnigen.pipelines.conversation_extension import (\n ConversationExtensionConfig,\n ConversationExtensionPipeline\n)\n\n# Load base config from YAML\nconfig = ConversationExtensionConfig.from_yaml('base_config.yaml')\n\n# Modify specific settings\nconfig_dict = config.to_dict()\nconfig_dict['generation']['num_conversations'] = 500 # Override\nconfig_dict['storage']['output_file'] = f'output_{user_id}.jsonl' # Dynamic\n\n# Rebuild and run\nconfig = ConversationExtensionConfig.from_dict(config_dict)\npipeline = ConversationExtensionPipeline(config)\npipeline.run()\n```\n\n**\u2705 Benefits:**\n- Base configuration in YAML\n- Runtime customization in code\n- Best of both worlds\n\n### Environment Variables in YAML\n\nUse `${VARIABLE_NAME}` syntax:\n\n```yaml\nproviders:\n user_followup:\n name: ${PROVIDER_NAME} # From environment\n api_key: ${ULTRASAFE_API_KEY} # From environment\n model: ${USER_MODEL} # From environment\n\nbase_data:\n file_path: ${INPUT_PATH} # From environment\n\nstorage:\n output_file: ${OUTPUT_PATH} # From environment\n```\n\nSet variables:\n```bash\nexport PROVIDER_NAME=\"ultrasafe\"\nexport ULTRASAFE_API_KEY=\"your-key\"\nexport USER_MODEL=\"usf-mini\"\nexport INPUT_PATH=\"data/input.jsonl\"\nexport OUTPUT_PATH=\"data/output.jsonl\"\n\n# Then run\npython your_script.py\n```\n\n### CLI Usage with Config File\n\nRun directly from command line:\n\n```bash\n# Using YAML config\nomnigen conversation-extension --config config.yaml\n\n# With overrides\nomnigen conversation-extension \\\n --config config.yaml \\\n --num-conversations 500 \\\n --output custom_output.jsonl\n```\n\n---\n\n---\n\n## \ud83d\udcd6 Complete Configuration Reference\n\n### All Configuration Options Explained\n\nBelow is a comprehensive YAML configuration showing **ALL** available options with detailed explanations:\n\n```yaml\n# ==============================================================================\n# WORKSPACE ISOLATION (Optional)\n# ==============================================================================\n# Unique ID for multi-tenant environments - auto-isolates all output files\nworkspace_id: \"user_123_session_abc\"\n\n# ==============================================================================\n# PROVIDERS - AI Model Configuration\n# ==============================================================================\n# Configure different AI providers for each role\n# Each role can use a different provider/model combination\n\nproviders:\n # Provider for generating user follow-up questions\n user_followup:\n name: ultrasafe # Required: ultrasafe, openai, anthropic, openrouter\n api_key: ${API_KEY} # Required: Use env var ${VAR_NAME} or direct key\n \n # Optional - Smart defaults applied if not specified:\n model: usf-mini # Default varies by provider\n temperature: 0.7 # Default: 0.7\n max_tokens: 2048 # Default: 4096 (overridden here)\n timeout: 300 # Default: 300\n max_retries: 5 # Default: 5\n retry_delay: 2 # Default: 2\n \n # Provider for generating assistant responses\n assistant_response:\n name: ultrasafe # Can use different provider than user_followup\n api_key: ${API_KEY} # Only name and api_key are required!\n max_tokens: 8192 # Override default (4096) for detailed responses\n # model, temperature, timeout, retries use defaults \u2713\n\n# PROVIDER OPTIONS:\n# ----------------\n# ultrasafe:\n# models: usf-mini, usf-max\n#\n# openai:\n# models: gpt-4-turbo, gpt-4, gpt-3.5-turbo, gpt-4o, gpt-4o-mini\n#\n# anthropic:\n# models: claude-3-5-sonnet-20241022, claude-3-opus-20240229,\n# claude-3-sonnet-20240229, claude-3-haiku-20240307\n#\n# openrouter:\n# models: Any OpenRouter supported model\n# base_url: https://openrouter.ai/api/v1 (optional)\n\n# ==============================================================================\n# GENERATION SETTINGS\n# ==============================================================================\ngeneration:\n num_conversations: 100 # Total conversations to generate\n # Use 0 or omit to process ALL available conversations\n \n turn_range: # Number of turns per conversation\n min: 3 # Minimum turns\n max: 8 # Maximum turns\n \n parallel_workers: 10 # Concurrent workers (balance speed vs rate limits)\n \n # Extension behavior for multi-turn input\n extension_mode: \"smart\" # Options: \"smart\" | \"legacy\"\n # - smart: Intelligently handle multi-turn conversations\n # - legacy: Always extract first user message only\n \n skip_invalid: true # Skip invalid patterns (recommended: true)\n \n # Turn calculation method\n turn_calculation: \"additional\" # Options: \"additional\" | \"total\"\n # - additional: Add NEW turns on top of existing (default)\n # - total: Keep total turns within range (never removes existing)\n\n# ==============================================================================\n# DATA SOURCE CONFIGURATION\n# ==============================================================================\nbase_data:\n enabled: true # Enable base data loading\n \n # OPTION 1: Local File\n source_type: file # Use local JSONL/JSON file\n file_path: data/input.jsonl # Path to file\n format: conversations # JSON key containing conversation array\n shuffle: false # Shuffle data before processing\n \n # OPTION 2: HuggingFace Dataset\n # source_type: huggingface # Use HuggingFace dataset\n # hf_dataset: username/dataset # HuggingFace dataset path\n # hf_split: train # Dataset split: train, test, validation\n # hf_token: ${HF_TOKEN} # HuggingFace API token (if private)\n # hf_streaming: false # Stream dataset (for large datasets)\n # format: conversations # Field name in dataset\n # shuffle: true # Shuffle after loading\n\n# ==============================================================================\n# STORAGE CONFIGURATION\n# ==============================================================================\nstorage:\n type: jsonl # Options: jsonl | mongodb\n \n # JSONL Storage (Default)\n output_file: output.jsonl # Successful conversations\n partial_file: partial.jsonl # Partial/incomplete conversations (legacy)\n failed_file: failed.jsonl # Failed conversations\n \n # MongoDB Storage (Alternative)\n # type: mongodb\n # mongodb:\n # connection_string: mongodb://localhost:27017\n # database: omnigen\n # collection: conversations\n # output_collection: output # Successful\n # partial_collection: partial # Partial\n # failed_collection: failed # Failed\n\n# ==============================================================================\n# CHECKPOINT/RESUME CONFIGURATION (NEW!)\n# ==============================================================================\ncheckpoint:\n enabled: true # Enable automatic checkpoint/resume\n checkpoint_file: \"workspaces/{workspace_id}/checkpoint.json\"\n auto_save_frequency: 10 # Save checkpoint every N conversations\n validate_input_hash: true # Verify input file hasn't changed on resume\n resume_mode: \"auto\" # Options: \"auto\" | \"manual\" | \"fresh\"\n \n # Features:\n # - Zero data loss from interruptions\n # - No duplicate processing\n # - Partial conversation resume\n # - Handles: Ctrl+C, errors, rate limits, crashes\n\n# ==============================================================================\n# DATETIME CONFIGURATION (Optional)\n# ==============================================================================\ndatetime_config:\n enabled: true # Enable datetime generation\n mode: random_from_range # Options: random_from_range | current | fixed\n timezone: UTC # Timezone (UTC, America/New_York, Asia/Dubai, etc.)\n format: \"%Y-%m-%d %H:%M:%S\" # Python strftime format\n \n # For random_from_range mode\n range:\n start: \"2024-01-01 00:00:00\" # Start datetime\n end: \"2024-12-31 23:59:59\" # End datetime\n \n # For fixed mode\n # fixed_datetime: \"2024-06-15 12:00:00\"\n\n# ==============================================================================\n# SYSTEM MESSAGES (Optional)\n# ==============================================================================\nsystem_messages:\n # Prepend system message to every conversation\n prepend_always:\n enabled: true\n content: \"You are a helpful AI assistant. Current time: {current_datetime} ({timezone}).\"\n \n # Append system message to every conversation\n append_always:\n enabled: false\n content: \"Remember to be concise and helpful.\"\n \n # Add system message only if none exists\n add_if_missing:\n enabled: false\n content: \"You are an AI assistant.\"\n\n# Available variables in system messages:\n# - {current_datetime}: Generated datetime\n# - {timezone}: Configured timezone\n# - {workspace_id}: Current workspace ID\n\n# ==============================================================================\n# CUSTOM PROMPTS (Optional - Smart Defaults Provided!)\n# ==============================================================================\n# NOTE: Prompts are completely OPTIONAL. The system automatically uses optimized\n# default prompts if you don't specify any. Only add this section if you want\n# to customize the prompt behavior.\n\nprompts:\n # Custom prompt for user follow-up generation\n followup_question: |\n ## Your Task\n Generate an intelligent follow-up user question based on conversation history.\n \n ### CONVERSATION HISTORY:\n {history}\n \n ### INSTRUCTIONS:\n - Generate a meaningful follow-up question\n - Be conversational and natural\n - Vary your phrasing and tone\n - Build on the assistant's last response\n \n Return your follow-up question wrapped in XML tags:\n <user>Your follow-up question here</user>\n \n # Custom prompt for assistant response generation\n # assistant_response: |\n # Your custom assistant response prompt here...\n\n# ==============================================================================\n# DEBUG OPTIONS (Optional)\n# ==============================================================================\ndebug:\n log_api_timing: true # Log API call timings\n log_parallel_status: true # Log parallel worker status\n verbose: false # Verbose logging\n```\n\n### Quick Configuration Examples\n\n#### Example 1: Minimal Configuration (NEW!)\n```yaml\n# Only specify what's required - defaults applied automatically\nproviders:\n user_followup:\n name: ultrasafe\n api_key: ${ULTRASAFE_API_KEY}\n assistant_response:\n name: ultrasafe\n api_key: ${ULTRASAFE_API_KEY}\n\ngeneration:\n num_conversations: 100\n turn_range: {min: 3, max: 8}\n\nbase_data:\n source_type: file\n file_path: input.jsonl\n\nstorage:\n type: jsonl\n output_file: output.jsonl\n\n# Enable checkpoint/resume (NEW!)\ncheckpoint:\n enabled: true\n auto_save_frequency: 10\n```\n\n#### Example 2: HuggingFace Dataset Input\n```yaml\nproviders:\n user_followup:\n name: openai\n api_key: ${OPENAI_API_KEY}\n model: gpt-4-turbo\n assistant_response:\n name: anthropic\n api_key: ${ANTHROPIC_API_KEY}\n model: claude-3-5-sonnet-20241022\n\ngeneration:\n num_conversations: 1000\n turn_range: {min: 5, max: 10}\n parallel_workers: 20\n\nbase_data:\n source_type: huggingface\n hf_dataset: username/my-dataset\n hf_split: train\n hf_token: ${HF_TOKEN}\n format: conversations\n shuffle: true\n\nstorage:\n type: jsonl\n output_file: output.jsonl\n\n# Checkpoint/resume for fault tolerance\ncheckpoint:\n enabled: true\n checkpoint_file: \"checkpoint.json\"\n auto_save_frequency: 50\n validate_input_hash: true\n```\n\n#### Example 3: Mixed Providers with MongoDB\n```yaml\nproviders:\n user_followup:\n name: openai\n api_key: ${OPENAI_API_KEY}\n model: gpt-3.5-turbo\n temperature: 0.8\n assistant_response:\n name: anthropic\n api_key: ${ANTHROPIC_API_KEY}\n model: claude-3-5-sonnet-20241022\n temperature: 0.7\n\ngeneration:\n num_conversations: 500\n turn_range: {min: 3, max: 8}\n\nbase_data:\n source_type: file\n file_path: questions.jsonl\n\nstorage:\n type: mongodb\n mongodb:\n connection_string: mongodb://localhost:27017\n database: omnigen\n collection: conversations\n\n# Checkpoint works with any storage type\ncheckpoint:\n enabled: true\n auto_save_frequency: 25\n```\n\n#### Example 4: Programmatic Configuration (Python)\n```python\nfrom omnigen.pipelines.conversation_extension import (\n ConversationExtensionConfigBuilder,\n ConversationExtensionPipeline\n)\n\n# Build configuration programmatically\nconfig = (ConversationExtensionConfigBuilder()\n # Workspace isolation\n .set_workspace_id(\"user_123_session_abc\")\n \n # Providers\n .add_provider(\n role='user_followup',\n name='ultrasafe',\n api_key='your-api-key',\n model='usf-mini',\n temperature=0.7,\n max_tokens=2048\n )\n .add_provider(\n role='assistant_response',\n name='ultrasafe',\n api_key='your-api-key',\n model='usf-mini',\n temperature=0.7,\n max_tokens=8192\n )\n \n # Generation settings\n .set_generation(\n num_conversations=100, # Or use 0/None to process ALL available\n turn_range=(3, 8),\n parallel_workers=10,\n extension_mode='smart',\n skip_invalid=True,\n turn_calculation='additional'\n )\n \n # Data source - Local file\n .set_data_source(\n source_type='file',\n file_path='input.jsonl',\n format='conversations',\n shuffle=False\n )\n \n # Data source - HuggingFace (alternative)\n # .set_data_source(\n # source_type='huggingface',\n # hf_dataset='username/dataset',\n # hf_split='train',\n # hf_token='your-token',\n # format='conversations',\n # shuffle=True\n # )\n \n # Storage\n .set_storage(\n type='jsonl',\n output_file='output.jsonl',\n partial_file='partial.jsonl',\n failed_file='failed.jsonl'\n )\n \n # Custom prompts (optional)\n .set_prompts(\n followup_question=\"Your custom prompt here with {history}\"\n )\n \n .build()\n)\n\n# Run pipeline with checkpoint/resume support\npipeline = ConversationExtensionPipeline(config)\npipeline.run() # Can be interrupted and resumed automatically!\n```\n\n---\n\n## \ud83d\udd27 Advanced Configuration Options - Deep Dive\n\n### System Messages Configuration\n\nSystem messages allow you to inject context or instructions into conversations. You have three modes of operation:\n\n#### 1. Prepend Always\nAdd a system message at the **start** of every conversation:\n\n```yaml\nsystem_messages:\n prepend_always:\n enabled: true\n content: \"You are a helpful AI assistant. Current time: {current_datetime} ({timezone}).\"\n```\n\n**Use Cases:**\n- Set assistant personality/role\n- Provide real-time context (date, time)\n- Add global instructions\n\n#### 2. Append Always\nAdd a system message at the **end** of every conversation:\n\n```yaml\nsystem_messages:\n append_always:\n enabled: true\n content: \"Remember to be concise and provide sources when possible.\"\n```\n\n**Use Cases:**\n- Add final reminders\n- Set response style constraints\n- Add quality guidelines\n\n#### 3. Add If Missing\nAdd a system message **only if** the conversation doesn't already have one:\n\n```yaml\nsystem_messages:\n add_if_missing:\n enabled: true\n content: \"You are an AI assistant.\"\n```\n\n**Use Cases:**\n- Ensure all conversations have baseline instructions\n- Fallback when base data lacks system messages\n\n#### Available Template Variables\n\nYou can use these variables in your system message content:\n\n| Variable | Description | Example Output |\n|----------|-------------|----------------|\n| `{current_datetime}` | Generated datetime | `2024-06-15 14:30:00` |\n| `{timezone}` | Configured timezone | `UTC` or `America/New_York` |\n| `{workspace_id}` | Current workspace ID | `user_123_session_abc` |\n\n#### Complete Example - All Three Modes\n\n```yaml\nsystem_messages:\n prepend_always:\n enabled: true\n content: |\n You are a knowledgeable AI assistant.\n Current datetime: {current_datetime} ({timezone})\n Workspace: {workspace_id}\n \n append_always:\n enabled: true\n content: |\n IMPORTANT REMINDERS:\n - Always cite sources when providing facts\n - Be concise but thorough\n - Ask clarifying questions when needed\n \n add_if_missing:\n enabled: true\n content: \"You are a helpful assistant.\"\n```\n\n**Note:** If both `prepend_always` and `add_if_missing` are enabled, `prepend_always` takes precedence.\n\n#### Programmatic Usage\n\n```python\nfrom omnigen.pipelines.conversation_extension import ConversationExtensionConfigBuilder\n\nconfig = (ConversationExtensionConfigBuilder()\n # ... other config ...\n .build()\n)\n\n# Add system messages to config dict\nconfig_dict = config.to_dict()\nconfig_dict['system_messages'] = {\n 'prepend_always': {\n 'enabled': True,\n 'content': 'You are a helpful assistant. Time: {current_datetime}'\n }\n}\n```\n\n---\n\n### DateTime Configuration\n\nControl how datetime values are generated and injected into system messages and conversations.\n\n#### Mode 1: Random from Range (Default)\nGenerate random datetimes within a specified range:\n\n```yaml\ndatetime_config:\n enabled: true\n mode: random_from_range\n timezone: UTC # Any valid timezone\n format: \"%Y-%m-%d %H:%M:%S\" # Python strftime format\n range:\n start: \"2024-01-01 00:00:00\"\n end: \"2024-12-31 23:59:59\"\n```\n\n**Use Cases:**\n- Training data with temporal diversity\n- Simulating historical conversations\n- Creating time-aware datasets\n\n**Common Timezones:**\n- `UTC` - Coordinated Universal Time\n- `America/New_York` - Eastern Time\n- `Europe/London` - British Time\n- `Asia/Dubai` - Gulf Standard Time\n- `Asia/Tokyo` - Japan Standard Time\n\n#### Mode 2: Current Time\nUse actual current time when generating:\n\n```yaml\ndatetime_config:\n enabled: true\n mode: current\n timezone: America/New_York\n format: \"%B %d, %Y at %I:%M %p\" # December 15, 2024 at 02:30 PM\n```\n\n**Use Cases:**\n- Real-time conversation simulation\n- Current events discussions\n- Live system demonstrations\n\n#### Mode 3: Fixed DateTime\nUse the same datetime for all conversations:\n\n```yaml\ndatetime_config:\n enabled: true\n mode: fixed\n timezone: UTC\n format: \"%Y-%m-%d %H:%M:%S\"\n fixed_datetime: \"2024-06-15 12:00:00\"\n```\n\n**Use Cases:**\n- Consistent training data\n- Specific time period simulation\n- Testing and debugging\n\n#### Format String Examples\n\nCommon datetime formats using Python's strftime:\n\n```yaml\n# ISO 8601 Format\nformat: \"%Y-%m-%d %H:%M:%S\" # 2024-12-15 14:30:00\n\n# Human-Readable\nformat: \"%B %d, %Y at %I:%M %p\" # December 15, 2024 at 02:30 PM\n\n# Date Only\nformat: \"%Y-%m-%d\" # 2024-12-15\n\n# Time Only\nformat: \"%H:%M:%S\" # 14:30:00\n\n# Custom Format\nformat: \"%A, %B %d, %Y\" # Monday, December 15, 2024\n```\n\n#### Complete DateTime Example\n\n```yaml\ndatetime_config:\n enabled: true\n mode: random_from_range\n timezone: America/New_York\n format: \"%A, %B %d, %Y at %I:%M %p %Z\"\n range:\n start: \"2024-01-01 09:00:00\" # Business hours only\n end: \"2024-12-31 17:00:00\"\n\n# Then use in system messages:\nsystem_messages:\n prepend_always:\n enabled: true\n content: |\n You are a business assistant.\n Current datetime: {current_datetime}\n Timezone: {timezone}\n```\n\n**Output Example:**\n```\nYou are a business assistant.\nCurrent datetime: Wednesday, June 15, 2024 at 02:30 PM EST\nTimezone: America/New_York\n```\n\n---\n\n### Custom Prompts Configuration\n\n**NOTE:** Custom prompts are **completely optional**. OmniGen automatically uses optimized default prompts if the `prompts` section is not included in your configuration. Only add custom prompts if you want to override the default behavior.\n\n#### Default Prompts\n\nOmniGen uses optimized default prompts that work well for most use cases. You can customize them if needed:\n\n**1. Follow-up Question Prompt** (for `user_followup` role):\n\n```yaml\nprompts:\n followup_question: |\n ## Your Task\n Generate an intelligent follow-up user question based on conversation history.\n \n ### CONVERSATION HISTORY:\n {history}\n \n ### INSTRUCTIONS:\n - Generate a meaningful follow-up question\n - Be conversational and natural\n - Vary your phrasing and tone\n - Build on the assistant's last response\n - Make the question specific to the conversation context\n \n Return your follow-up question wrapped in XML tags:\n <user>Your follow-up question here</user>\n```\n\n**2. Assistant Response Prompt** (for `assistant_response` role):\n\n```yaml\nprompts:\n assistant_response: |\n ## Your Task\n Generate a helpful assistant response based on the conversation history.\n \n ### CONVERSATION HISTORY:\n {history}\n \n ### INSTRUCTIONS:\n - Provide accurate and helpful information\n - Be conversational and friendly\n - Reference previous context when relevant\n - Keep responses focused and concise\n \n Return your response wrapped in XML tags:\n <assistant>Your response here</assistant>\n```\n\n#### Available Template Variables\n\n| Variable | Description | Content |\n|----------|-------------|---------|\n| `{history}` | Full conversation history | All previous messages formatted as text |\n\n#### Custom Prompt Examples\n\n**Example 1: Technical Documentation Assistant**\n```yaml\nprompts:\n followup_question: |\n Generate a technical follow-up question from a developer's perspective.\n \n CONVERSATION:\n {history}\n \n Create a question that:\n - Asks about implementation details\n - Seeks code examples or best practices\n - Explores edge cases or potential issues\n \n <user>Your technical question</user>\n \n assistant_response: |\n Provide a detailed technical response with code examples.\n \n CONVERSATION:\n {history}\n \n Your response should:\n - Include code snippets when helpful\n - Explain technical concepts clearly\n - Provide links to documentation\n - Mention potential pitfalls\n \n <assistant>Your technical response</assistant>\n```\n\n**Example 2: Customer Support Simulation**\n```yaml\nprompts:\n followup_question: |\n Simulate a customer asking for help.\n \n CONVERSATION:\n {history}\n \n Generate a question that:\n - Shows frustration or confusion (realistic)\n - Asks for clarification on previous response\n - Requests specific solutions or workarounds\n \n <user>Customer question</user>\n \n assistant_response: |\n Provide empathetic customer support.\n \n CONVERSATION:\n {history}\n \n Your response must:\n - Show empathy and understanding\n - Provide clear step-by-step solutions\n - Offer alternatives if applicable\n - End with \"Is there anything else I can help with?\"\n \n <assistant>Support response</assistant>\n```\n\n**Example 3: Educational Tutor**\n```yaml\nprompts:\n followup_question: |\n Generate a student's follow-up question showing learning progression.\n \n CONTEXT:\n {history}\n \n The question should:\n - Build on what was just explained\n - Show either understanding or confusion\n - Ask for examples or clarification\n - Demonstrate curiosity about the topic\n \n <user>Student question</user>\n \n assistant_response: |\n Respond as a patient, knowledgeable tutor.\n \n CONTEXT:\n {history}\n \n Your response should:\n - Use simple, clear language\n - Provide concrete examples\n - Check for understanding\n - Encourage further questions\n \n <assistant>Tutor response</assistant>\n```\n\n#### Programmatic Prompt Configuration\n\n```python\nfrom omnigen.pipelines.conversation_extension import ConversationExtensionConfigBuilder\n\ncustom_prompts = {\n 'followup_question': \"\"\"\n Generate a follow-up based on: {history}\n Make it specific and contextual.\n <user>question</user>\n \"\"\",\n 'assistant_response': \"\"\"\n Respond helpfully to: {history}\n Be clear and concise.\n <assistant>response</assistant>\n \"\"\"\n}\n\nconfig = (ConversationExtensionConfigBuilder()\n # ... other config ...\n .set_prompts(**custom_prompts)\n .build()\n)\n```\n\n---\n\n### Debug Configuration\n\nEnable detailed logging and monitoring for troubleshooting and optimization.\n\n#### Debug Options\n\n```yaml\ndebug:\n log_api_timing: true # Log API call duration and performance\n log_parallel_status: true # Log parallel worker status and progress\n verbose: false # Enable verbose logging (all operations)\n```\n\n#### Option Details\n\n**1. API Timing Logs** (`log_api_timing: true`)\n\nTracks API performance for each call:\n\n```\n[API TIMING] user_followup request took 1.23s\n[API TIMING] assistant_response request took 2.45s\n[API TIMING] Average response time: 1.84s\n```\n\n**Use Cases:**\n- Identify slow API providers\n- Optimize provider selection\n- Monitor rate limits\n- Debug timeout issues\n\n**2. Parallel Status Logs** (`log_parallel_status: true`)\n\nShows worker activity in real-time:\n\n```\n[PARALLEL] Worker 1/10: Processing conversation 5\n[PARALLEL] Worker 2/10: Processing conversation 6\n[PARALLEL] Worker 3/10: Waiting for task...\n[PARALLEL] Progress: 45/100 conversations complete (45%)\n[PARALLEL] Active workers: 8/10 | Queue: 12 remaining\n```\n\n**Use Cases:**\n- Monitor parallel processing\n- Identify bottlenecks\n- Optimize worker count\n- Track progress in real-time\n\n**3. Verbose Logging** (`verbose: true`)\n\nEnables comprehensive logging of all operations:\n\n```\n[DEBUG] Loading base data from: data/input.jsonl\n[DEBUG] Loaded 100 base conversations\n[DEBUG] Initializing provider: ultrasafe (usf-mini)\n[DEBUG] Starting parallel generation with 10 workers\n[DEBUG] Conversation 1: Processing...\n[DEBUG] Conversation 1: Generated 5 turns\n[DEBUG] Conversation 1: Saved to output.jsonl\n[DEBUG] Final stats: 95 success, 3 partial, 2 failed\n```\n\n**Use Cases:**\n- Development and testing\n- Debugging issues\n- Understanding pipeline flow\n- Troubleshooting data problems\n\n#### Complete Debug Configuration\n\n```yaml\n# Full debug mode for development\ndebug:\n log_api_timing: true\n log_parallel_status: true\n verbose: true\n\n# Production mode (minimal logging)\ndebug:\n log_api_timing: false\n log_parallel_status: false\n verbose: false\n\n# Performance monitoring mode\ndebug:\n log_api_timing: true # Track API performance\n log_parallel_status: true # Monitor workers\n verbose: false # Don't flood logs\n```\n\n#### Programmatic Debug Configuration\n\n```python\nfrom omnigen.pipelines.conversation_extension import ConversationExtensionConfigBuilder\n\nconfig = (ConversationExtensionConfigBuilder()\n # ... other config ...\n .build()\n)\n\n# Add debug settings\nconfig_dict = config.to_dict()\nconfig_dict['debug'] = {\n 'log_api_timing': True,\n 'log_parallel_status': True,\n 'verbose': False\n}\n```\n\n#### Debug Output Examples\n\n**Scenario 1: API Performance Issue**\n```yaml\ndebug:\n log_api_timing: true\n```\n\nOutput helps identify slow providers:\n```\n[API TIMING] openai/gpt-4-turbo: 0.8s\n[API TIMING] anthropic/claude-3-5-sonnet: 3.2s \u2190 SLOW!\n[API TIMING] ultrasafe/usf-mini: 0.5s\n```\n\n**Scenario 2: Parallel Processing Bottleneck**\n```yaml\ndebug:\n log_parallel_status: true\n```\n\nOutput shows worker utilization:\n```\n[PARALLEL] Active: 3/10 workers \u2190 Only 30% utilized!\n[PARALLEL] Queue: 0 tasks remaining\n[PARALLEL] Suggestion: Reduce worker count to 5\n```\n\n**Scenario 3: Data Loading Issues**\n```yaml\ndebug:\n verbose: true\n```\n\nOutput reveals the problem:\n```\n[DEBUG] Loading: data/input.jsonl\n[DEBUG] Line 1: Valid \u2713\n[DEBUG] Line 2: Valid \u2713\n[DEBUG] Line 3: ERROR - First message not from user\n[DEBUG] Line 4: Valid \u2713\n[DEBUG] Loaded: 3 valid, 1 invalid\n```\n\n---\n\n## \ud83d\udcd6 Conversation Extension Pipeline - Complete Guide\n\n### Overview\n\nThe **Conversation Extension Pipeline** intelligently transforms base conversations into rich multi-turn dialogues. It can handle both single-turn questions and extend existing multi-turn conversations.\n\n### Key Features\n\n- \u2705 **Smart Extension** - Continues from existing conversations based on last role\n- \u2705 **Flexible Input** - Handles single-turn or multi-turn base data\n- \u2705 **Provider Mix** - Use different AI providers for user and assistant\n- \u2705 **Multi-Tenant** - Complete workspace isolation\n- \u2705 **Configurable** - Full control over generation behavior\n\n### Configuration Options\n\n#### Extension Modes\n\n**Smart Mode (Default)**\n```yaml\ngeneration:\n extension_mode: \"smart\"\n```\n\n- **Single-turn input** \u2192 Generate new conversation from scratch\n- **Multi-turn (user last)** \u2192 Add 1 assistant response, then continue\n- **Multi-turn (assistant last)** \u2192 Add user + assistant, then continue\n- **Invalid patterns** \u2192 Skip row entirely\n\n**Legacy Mode**\n```yaml\ngeneration:\n extension_mode: \"legacy\"\n```\n- Always extracts first user message only (original behavior)\n\n#### Turn Calculation\n\n**Additional Mode (Default)** - Add NEW turns on top of existing\n```yaml\ngeneration:\n turn_calculation: \"additional\" # Add 3-8 NEW turns\n```\n\n**Total Mode** - Keep total within range (never removes existing)\n```yaml\ngeneration:\n turn_calculation: \"total\" # Total should be 3-8 turns\n```\n\n#### Complete Configuration\n\n```yaml\n# Workspace isolation (optional)\nworkspace_id: \"user_123\"\n\n# AI Providers - Smart defaults applied!\nproviders:\n user_followup:\n name: \"ultrasafe\"\n api_key: \"${ULTRASAFE_API_KEY}\"\n # Defaults: usf-mini, 0.7 temp, 4096 tokens\n max_tokens: 2048 # Override default if needed\n \n assistant_response:\n name: \"ultrasafe\"\n api_key: \"${ULTRASAFE_API_KEY}\"\n # Defaults: usf-mini, 0.7 temp, 4096 tokens\n max_tokens: 8192 # Override for detailed responses\n\n# Generation Settings\ngeneration:\n num_conversations: 100\n turn_range:\n min: 3\n max: 8\n parallel_workers: 10\n \n # Extension behavior\n extension_mode: \"smart\" # \"smart\" | \"legacy\"\n skip_invalid: true # Skip invalid patterns\n turn_calculation: \"additional\" # \"additional\" | \"total\"\n\n# Input Data\nbase_data:\n enabled: true\n source_type: \"file\"\n file_path: \"base_data.jsonl\"\n format: \"conversations\"\n shuffle: false\n\n# Output Storage\nstorage:\n type: \"jsonl\"\n output_file: \"output.jsonl\"\n partial_file: \"partial.jsonl\"\n failed_file: \"failed.jsonl\"\n\n# Checkpoint/Resume (recommended for large datasets)\ncheckpoint:\n enabled: true\n checkpoint_file: \"checkpoint.json\"\n auto_save_frequency: 10\n validate_input_hash: true\n resume_mode: \"auto\"\n\n# System Messages (optional)\nsystem_messages:\n add_if_missing:\n enabled: true\n content: \"You are a helpful assistant. Current datetime: {current_datetime}\"\n\n# DateTime (optional)\ndatetime_config:\n enabled: true\n timezone: \"UTC\"\n format: \"%Y-%m-%d %H:%M:%S\"\n range:\n start_date: \"2024-01-01\"\n end_date: \"2024-12-31\"\n```\n\n### Input Data Formats\n\n#### Valid Patterns\n\n**Single-turn** \u2705\n```json\n{\"conversations\": [{\"role\": \"user\", \"content\": \"How do I learn Python?\"}]}\n```\n\n**Multi-turn (user last)** \u2705\n```json\n{\n \"conversations\": [\n {\"role\": \"user\", \"content\": \"How do I learn Python?\"},\n {\"role\": \"assistant\", \"content\": \"Start with basics...\"},\n {\"role\": \"user\", \"content\": \"What resources?\"}\n ]\n}\n```\n\n**Multi-turn (assistant last)** \u2705\n```json\n{\n \"conversations\": [\n {\"role\": \"user\", \"content\": \"How do I learn Python?\"},\n {\"role\": \"assistant\", \"content\": \"Start with basics...\"}\n ]\n}\n```\n\n#### Invalid Patterns (Skipped)\n\n\u274c First message not user\n```json\n{\"conversations\": [{\"role\": \"assistant\", \"content\": \"Hello\"}]}\n```\n\n\u274c Empty conversations\n```json\n{\"conversations\": []}\n```\n\n### Programmatic Usage\n\n```python\nfrom omnigen.pipelines.conversation_extension import (\n ConversationExtensionConfigBuilder,\n ConversationExtensionPipeline\n)\n\nconfig = (ConversationExtensionConfigBuilder()\n .add_provider('user_followup', 'ultrasafe', 'api-key', 'usf-mini')\n .add_provider('assistant_response', 'ultrasafe', 'api-key', 'usf-mini')\n .set_generation(\n num_conversations=100,\n turn_range=(3, 8),\n parallel_workers=10,\n extension_mode='smart', # Handle multi-turn intelligently\n skip_invalid=True, # Skip invalid patterns\n turn_calculation='additional' # Add new turns (default)\n )\n .set_data_source('file', file_path='base_data.jsonl')\n .set_storage('jsonl', output_file='output.jsonl')\n .set_checkpoint(enabled=True, auto_save_frequency=10) # Enable checkpoint\n .build()\n)\n\npipeline = ConversationExtensionPipeline(config)\npipeline.run() # Automatically resumes if interrupted!\n```\n\n### Turn Calculation Examples\n\n**Additional Mode (Default)**\n```\nExisting: 2 turns\nConfig: turn_range = (3, 8)\nResult: Add 3-8 NEW turns \u2192 Total: 5-10 turns\n```\n\n**Total Mode**\n```\nExisting: 2 turns\nConfig: turn_range = (3, 8)\nResult: Add 1-6 turns \u2192 Total: 3-8 turns\n\nExisting: 10 turns (already > max)\nConfig: turn_range = (3, 8)\nResult: Add 0 turns \u2192 Keep 10 turns (never remove)\n```\n\n### Best Practices\n\n**Provider Selection**\n- Use better models for assistant (claude-3-5-sonnet, gpt-4-turbo)\n- Use cheaper models for user followups (usf-mini, gpt-3.5-turbo)\n\n**Turn Range**\n- Quick exchanges: `(2, 4)`\n- In-depth: `(5, 10)`\n- Balanced: `(3, 8)` \u2705\n\n**Parallel Workers**\n- Conservative: `5` (avoid rate limits)\n- Balanced: `10` \u2705\n- Aggressive: `20` (watch for rate limits)\n\n### Troubleshooting\n\n**Issue: Empty output**\n- Check input data format (first message must be user)\n- Set `skip_invalid: false` to see errors\n\n**Issue: Rate limits**\n- Reduce `parallel_workers`\n- Check provider API limits\n- Enable checkpoint to preserve progress: `checkpoint.enabled: true`\n\n**Issue: Pipeline interrupted**\n- \u2705 Don't worry! Progress is automatically saved if checkpoint enabled\n- Simply run again - it will resume from where it stopped\n- Check `checkpoint.json` for current state\n\n**Issue: Low quality**\n- Increase temperature (0.8-0.9)\n- Use better models\n- Add custom prompts and system messages\n\n**Issue: Duplicate conversations in output**\n- Checkpoint system prevents this automatically\n- Both partial and complete saved to same file with `status` field\n- Filter completed: `jq 'select(.status == \"completed\")' output.jsonl`\n\n---\n\n## License\n\nMIT License - Ultrasafe AI \u00a9 2024\n\n---\n\n## About Ultrasafe AI\n\nEnterprise-grade AI tools with focus on safety and performance.\n\n- \ud83c\udf10 Website: [us.inc](https://us.inc)\n- \ud83d\udce7 Email: support@us.inc\n\n---\n\n<div align=\"center\">\n\n**Made with \u2764\ufe0f by [Ultrasafe AI](https://us.inc)**\n\n</div>\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "Enterprise-Grade Synthetic Data Generation",
"version": "0.0.1.post9",
"project_urls": {
"Documentation": "https://github.com/ultrasafe-ai/omnigen",
"Homepage": "https://us.inc",
"Repository": "https://github.com/ultrasafe-ai/omnigen"
},
"split_keywords": [
"synthetic-data",
" data-generation",
" conversational-ai",
" llm",
" pipeline",
" conversation-extension",
" machine-learning",
" ai"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "5e25586af3cb2f8e972b60992651b55afda74ec01bf15ea44653db5cbdc03ad1",
"md5": "5d5984eb4ce5a238e4d78628dba2d50f",
"sha256": "9dd39a92bd404e9581fe35d76250a5443cb398340485230aabcdc25eb698d294"
},
"downloads": -1,
"filename": "omnigen_usf-0.0.1.post9-py3-none-any.whl",
"has_sig": false,
"md5_digest": "5d5984eb4ce5a238e4d78628dba2d50f",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.8",
"size": 59433,
"upload_time": "2025-10-06T11:42:51",
"upload_time_iso_8601": "2025-10-06T11:42:51.939203Z",
"url": "https://files.pythonhosted.org/packages/5e/25/586af3cb2f8e972b60992651b55afda74ec01bf15ea44653db5cbdc03ad1/omnigen_usf-0.0.1.post9-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "2110c0edb3f185aa9f583240b984d10996a310870b8ada2076dc0e5e9393f92d",
"md5": "45d6b22f9e50b3a7fc50f078f273f9f5",
"sha256": "a3d223b874ce8e8aadfa91a18a5865c47036ea6b9d5a8b309c83fa4ea4b08caf"
},
"downloads": -1,
"filename": "omnigen_usf-0.0.1.post9.tar.gz",
"has_sig": false,
"md5_digest": "45d6b22f9e50b3a7fc50f078f273f9f5",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.8",
"size": 93028,
"upload_time": "2025-10-06T11:42:53",
"upload_time_iso_8601": "2025-10-06T11:42:53.807710Z",
"url": "https://files.pythonhosted.org/packages/21/10/c0edb3f185aa9f583240b984d10996a310870b8ada2076dc0e5e9393f92d/omnigen_usf-0.0.1.post9.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-10-06 11:42:53",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "ultrasafe-ai",
"github_project": "omnigen",
"github_not_found": true,
"lcname": "omnigen-usf"
}