ninja-kafka-sdk


Nameninja-kafka-sdk JSON
Version 0.3.31 PyPI version JSON
download
home_pageNone
SummaryKafka-based distributed task processing SDK
upload_time2025-10-30 22:19:13
maintainerNone
docs_urlNone
authorNone
requires_python>=3.8
licenseNone
keywords kafka task-queue distributed-computing microservices messaging
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Ninja Kafka SDK

**SDK for distributed task processing with Kafka messaging and automatic service isolation.**

Send tasks to Ninja services and get results back with automatic message routing based on consumer groups. Each service only receives its own results!

## šŸš€ Quick Start

### 1ļøāƒ£ Send a Task

```python
import asyncio
from ninja_kafka_sdk import NinjaClient

async def send_task_example():
    # Create client with YOUR service's unique consumer group
    client = NinjaClient(
        kafka_servers="localhost:9092",
        consumer_group="my-service"  # Your service's unique identifier
    )

    # Send task to browser-ninja for processing
    correlation_id = await client.send_task(
        task="linkedin_verification",
        account_id=123,
        email="user@example.com",

        # Optional: Add task-specific parameters
        parameters={
            'max_retries': 3,
            'timeout': 60
        }
    )

    print(f"āœ… Task sent!")
    print(f"šŸ“‹ Correlation ID: {correlation_id}")
    # Note: correlation_id will be 'my-service:uuid-...'

    client.stop()
    return correlation_id

# Run it
asyncio.run(send_task_example())
```

### 2ļøāƒ£ Listen for Results

```python
import asyncio
from ninja_kafka_sdk import NinjaClient

async def listen_for_results():
    # Use the SAME consumer group as when you sent the task
    client = NinjaClient(
        kafka_servers="localhost:9092",
        consumer_group="my-service"  # Same as sender!
    )

    print("šŸ‘‚ Listening for results...")

    # SDK automatically filters - you ONLY get results for YOUR service
    async for result in client.listen_results():
        print(f"\nšŸ“„ Received result:")
        print(f"   Task: {result.get('task')}")
        print(f"   Status: {result.get('status')}")
        print(f"   Account ID: {result.get('account_id')}")

        if result.get('status') == 'SUCCESS':
            data = result.get('data', {})
            print(f"   āœ… Success! Data: {data}")
        else:
            error = result.get('error', {})
            print(f"   āŒ Failed! Error: {error}")

# Run it
asyncio.run(listen_for_results())
```  

## šŸ“¦ Installation

```bash
# Install latest version (recommended)
pip install --upgrade ninja-kafka-sdk

# Install from PyPI (gets latest if not installed)
pip install ninja-kafka-sdk

# Install specific version
pip install ninja-kafka-sdk==0.3.24

# Force reinstall to latest version
pip install --force-reinstall --upgrade ninja-kafka-sdk
```

## šŸ“Ø Complete Example: Send and Listen

### Send a Task and Wait for Its Result

```python
import asyncio
from ninja_kafka_sdk import NinjaClient

async def send_and_wait_example():
    client = NinjaClient(
        kafka_servers="localhost:9092",
        consumer_group="my-service"
    )

    # Step 1: Send task
    correlation_id = await client.send_task(
        task="linkedin_like",
        account_id=456,
        parameters={
            'post_url': 'https://linkedin.com/posts/example'
        }
    )
    print(f"šŸ“¤ Task sent: {correlation_id}")

    # Step 2: Wait for this specific result
    async for result in client.listen_results(correlation_ids=[correlation_id]):
        if result.get('correlation_id') == correlation_id:
            print(f"šŸ“„ Got result: {result.get('status')}")
            break

    client.stop()

asyncio.run(send_and_wait_example())
```

### Listen Continuously for All Results

```python
import asyncio
from ninja_kafka_sdk import NinjaClient

async def continuous_listener():
    client = NinjaClient(
        kafka_servers="localhost:9092",
        consumer_group="my-service"
    )

    print("šŸŽ§ Listening for all results for 'my-service'...")

    # This will run forever, processing results as they arrive
    async for result in client.listen_results():
        correlation_id = result.get('correlation_id')
        status = result.get('status')

        # Only receives results for tasks sent by 'my-service'
        print(f"Result {correlation_id}: {status}")

        # Process based on status
        if status == 'SUCCESS':
            # Handle success
            pass
        else:
            # Handle failure
            pass

asyncio.run(continuous_listener())
```

## āš™ļø Configuration

### šŸ”‘ Key Concept: Consumer Groups

**IMPORTANT**: The `consumer_group` is your service's unique identity!
- Each service MUST have a UNIQUE consumer group name
- The SDK uses this to automatically route messages
- You only receive results for tasks YOUR service sent
- Consumer group names cannot contain ':' (reserved character)

```python
from ninja_kafka_sdk import NinjaClient

# āœ… Good - Unique consumer groups
client1 = NinjaClient(
    kafka_servers="localhost:9092",
    consumer_group="auto-login"  # Service 1
)

client2 = NinjaClient(
    kafka_servers="localhost:9092",
    consumer_group="like-service"  # Service 2
)

# āŒ Bad - Consumer group with colon
client = NinjaClient(
    kafka_servers="localhost:9092",
    consumer_group="my:service"  # Will raise ValueError!
)
```

### Configuration from Config Object
```python
from ninja_kafka_sdk import NinjaClient
from your_app.config import config  # Your application's config

client = NinjaClient(
    kafka_servers=config.KAFKA_SERVERS,
    consumer_group=config.KAFKA_CONSUMER_GROUP
)
```

### Configuration with Config Object
```python
from ninja_kafka_sdk import NinjaClient, NinjaKafkaConfig

# Create configuration object
config = NinjaKafkaConfig(
    kafka_servers="b-1.msk-cluster.amazonaws.com:9092,b-2.msk-cluster.amazonaws.com:9092",
    consumer_group="my-service",
    environment="stage",
    tasks_topic="ninja-tasks",
    results_topic="ninja-results"
)

# Use with client
client = NinjaClient(config=config)
```

## šŸ†• Version 0.3.23+ Parameters Field Update

**āœ… IMPROVEMENT**: The `parameters` field is now properly handled in `send_task()`:

```python
# After v0.3.23+, parameters go to the correct field
await client.send_task(
    task="linkedin_like",
    account_id=123,

    # These go to their proper fields (not nested in metadata!)
    parameters={'post_url': 'https://...'},  # → request.parameters
    api_endpoints={'callback': 'https://...'}  # → request.api_endpoints
)
```

## šŸ†• Version 0.2.0 Breaking Changes

**āš ļø BREAKING CHANGE**: `kafka_servers` and `consumer_group` are now **REQUIRED** parameters.

### Migration from v0.1.x
```python
# OLD (v0.1.x) - Had auto-detection and localhost fallbacks
client = NinjaClient()  # āŒ This no longer works

# NEW (v0.2.0+) - Explicit configuration required  
client = NinjaClient(
    kafka_servers="your-kafka-servers:9092",  # āœ… Required
    consumer_group="your-service-name"        # āœ… Required
)
```

### Why This Change?
- **Production Safety**: Prevents localhost fallbacks in production environments
- **Explicit Configuration**: No more guessing what environment you're connecting to
- **Debugging**: Clear errors when configuration is missing
- **Environment Agnostic**: Same code works everywhere with different config

## šŸ’” How to Send Tasks

### Basic Task Execution
```python
from ninja_kafka_sdk import NinjaClient

async def verify_linkedin_account():
    # Explicit configuration for production
    client = NinjaClient(
        kafka_servers="b-1.msk-cluster.amazonaws.com:9092,b-2.msk-cluster.amazonaws.com:9092",
        consumer_group="auto-login-service",
        environment="prod"
    )
    
    try:
        # Send task and wait for result (one method call)
        result = await client.execute_task(
            task="linkedin_verification",
            account_id=12345,
            email="user@example.com",
            timeout=300  # 5 minutes
        )
        
        if result.is_success:
            print("āœ… Verification successful!")
            return result.cookies
        else:
            print(f"āŒ Failed: {result.error_message}")
            return None
            
    finally:
        client.stop()
```



### Advanced Usage Patterns

#### Fire and Forget
```python
async def send_multiple_tasks():
    # Must provide explicit configuration
    client = NinjaClient(
        kafka_servers="localhost:9092",
        consumer_group="task-sender"
    )
    
    # Send task without waiting for result
    correlation_id = await client.send_task(
        task="linkedin_verification", 
        account_id=123
    )
    print(f"Task sent: {correlation_id}")
    client.stop()
```

#### Batch Processing
```python
async def process_multiple_accounts():
    client = NinjaClient(
        kafka_servers="your-kafka-servers:9092",
        consumer_group="batch-processor"
    )
    accounts = [123, 456, 789]

    try:
        # Send all tasks
        task_ids = []
        for account_id in accounts:
            task_id = await client.send_task("linkedin_verification", account_id=account_id)
            task_ids.append(task_id)

        # Listen for all results
        completed = 0
        async for result in client.listen_results(correlation_ids=task_ids):
            completed += 1
            print(f"Account {result.account_id}: {result.status}")
            if completed >= len(accounts):
                break
                
    finally:
        client.stop()
```

#### Different Environment Examples
```python
# Local development
async def local_verification():
    client = NinjaClient(
        kafka_servers="localhost:9092",
        consumer_group="local-test",
        environment="local"  # Optional: for logging only
    )
    result = await client.execute_task("linkedin_verification", account_id=123)
    client.stop()
    return result

# Production environment
async def production_verification():
    client = NinjaClient(
        kafka_servers="b-1.msk-cluster.amazonaws.com:9092,b-2.msk-cluster.amazonaws.com:9092",
        consumer_group="auto-login-prod",
        environment="production"
    )
    result = await client.execute_task("linkedin_verification", account_id=123)
    client.stop()
    return result

# Using config object  
async def config_based_verification():
    from your_app.config import config
    
    client = NinjaClient(
        kafka_servers=config.KAFKA_SERVERS,
        consumer_group=config.KAFKA_CONSUMER_GROUP
    )
    result = await client.execute_task("linkedin_verification", account_id=123)
    client.stop()
    return result
```

## šŸ—ļø Available Tasks

### LinkedIn Verification
```python
result = await client.execute_task(
    task="linkedin_verification",
    account_id=123,
    email="user@example.com",  # Optional but highly recommended
    timeout=300  # 5 minutes
)
```

### Future Tasks
More task types will be added for different platforms:
- `twitter_verification`
- `instagram_verification` 
- `facebook_verification`

## šŸ“ Message Models

### Task Request
```python
@dataclass
class NinjaTaskRequest:
    task: str              # "linkedin_verification"
    account_id: int        # Account ID
    correlation_id: str    # Auto-generated UUID
    email: Optional[str]   # Account email
    user_id: Optional[int] # User ID
    metadata: Dict[str, Any]  # Additional parameters
```

### Task Result
```python
@dataclass 
class NinjaTaskResult:
    correlation_id: str    # Matches request
    task: str             # Task type
    status: str           # "VERIFIED", "FAILED", etc.
    success: bool         # True if successful
    account_id: int       # Account ID
    cookies: Optional[str] # Extracted cookies
    data: Optional[Dict]   # Additional result data
    error: Optional[Dict]  # Error details if failed
    
    @property
    def is_success(self) -> bool:
        return self.success or self.status == 'VERIFIED'
```

## 🚨 Error Handling

```python
from ninja_kafka_sdk import (
    NinjaClient, NinjaTaskTimeoutError, 
    NinjaTaskError, NinjaKafkaConnectionError
)

try:
    result = await client.execute_task("linkedin_verification", account_id=123)
    
except NinjaTaskTimeoutError:
    print("Task took too long")
    
except NinjaTaskError as e:
    print(f"Ninja couldn't complete task: {e.details}")
    
except NinjaKafkaConnectionError:
    print("Can't connect to Kafka")
```

## šŸ”Œ Extending for New Services

```python
# Add new task types easily
await client.send_task(
    task="twitter_scraping",
    account_id=123,
    parameters={"target_user": "@elonmusk"}
)

# SDK handles routing to appropriate Ninja service
```



## šŸ”§ Troubleshooting

### Common Configuration Issues

#### Issue: "Can't connect to Kafka"
```python
# Check your servers configuration
from ninja_kafka_sdk.config import NinjaKafkaConfig
config = NinjaKafkaConfig()
print(f"Environment: {config.environment}")
print(f"Kafka servers: {config.kafka_servers}")
print(f"Consumer group: {config.consumer_group}")
```

**Solutions:**
1. **Local Development**: Ensure Kafka is running on `localhost:9092`
2. **Stage/Prod**: Verify `KAFKA_STAGE_SERVERS` or `KAFKA_PROD_SERVERS` are set
3. **Custom Provider**: Use `KAFKA_BOOTSTRAP_SERVERS` for explicit override

#### Issue: "No messages received"
```python
# Check consumer group conflicts
import os
print(f"Consumer group: {os.getenv('KAFKA_CONSUMER_GROUP', 'auto-detected')}")

# Force specific consumer group
os.environ['KAFKA_CONSUMER_GROUP'] = 'my-unique-group'
client = NinjaClient()
```

#### Issue: "Task timeout"
```python
# Increase timeout for slow operations
client = NinjaClient(timeout=600)  # 10 minutes
result = await client.execute_task("linkedin_verification", account_id=123, timeout=300)
```

### Environment Detection Debug

```python
from ninja_kafka_sdk.config import NinjaKafkaConfig

# Debug environment detection
config = NinjaKafkaConfig()
print(f"Environment: {config.environment}")
print(f"Servers: {config.kafka_servers}")

# Force specific environment
config = NinjaKafkaConfig(environment='stage')
print(f"Forced stage servers: {config.kafka_servers}")
```

### Quick Health Check

```python
from ninja_kafka_sdk import NinjaClient
import asyncio

async def health_check():
    client = NinjaClient()
    try:
        # Test connection by sending a test message
        correlation_id = await client.send_task("health_check", account_id=0)
        print(f"āœ… Connection OK - Test message sent: {correlation_id}")
        return True
    except Exception as e:
        print(f"āŒ Connection failed: {e}")
        return False
    finally:
        client.stop()

# Run health check
asyncio.run(health_check())
```




---

## šŸ“š Appendix: For Service Implementers

This section contains information for developers implementing Ninja services (like browser-ninja) that process tasks and send results back.

### Sending Task Results

If you're building a service that processes Ninja tasks, use these methods to send results:

```python
from ninja_kafka_sdk import NinjaClient

async def send_verification_result():
    # Configure client for service that processes tasks
    client = NinjaClient(
        kafka_servers="your-kafka-servers:9092",
        consumer_group="browser-ninja",  # Service-specific consumer group
        environment="prod"
    )
    
    try:
        # Send success result
        await client.send_success_result(
            correlation_id="task-123-456",
            account_id=12345,
            email="user@example.com",
            cookies="extracted_cookies_data",
            screenshot="base64_screenshot"
        )
        
        # Or send error result
        await client.send_error_result(
            correlation_id="task-123-457",
            account_id=12346,
            email="user2@example.com",
            error_code="LOGIN_FAILED",
            error_message="Invalid credentials"
        )
        
    finally:
        client.stop()
```

### Listening for Tasks (Future Feature)

```python
from ninja_kafka_sdk import NinjaClient

async def process_ninja_tasks():
    client = NinjaClient(
        kafka_servers="your-kafka-servers:9092",
        consumer_group="browser-ninja"
    )
    
    try:
        # Listen for incoming tasks
        async for task in client.listen_tasks():
            print(f"šŸ“„ Received task: {task.task} for account {task.account_id}")
            
            # Process the task
            if task.task == "linkedin_verification":
                result = await process_linkedin_verification(task)
                
                # Send result back
                if result["success"]:
                    await client.send_success_result(
                        correlation_id=task.correlation_id,
                        account_id=task.account_id,
                        email=task.email,
                        cookies=result["cookies"]
                    )
                else:
                    await client.send_error_result(
                        correlation_id=task.correlation_id,
                        account_id=task.account_id,
                        email=task.email,
                        error_code=result["error_code"],
                        error_message=result["error_message"]
                    )
                    
    finally:
        client.stop()
```



---

**The Ninja Kafka SDK simplifies task-based communication while maintaining enterprise-grade reliability.**

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "ninja-kafka-sdk",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.8",
    "maintainer_email": null,
    "keywords": "kafka, task-queue, distributed-computing, microservices, messaging",
    "author": null,
    "author_email": "Blazel Team <dev@blazel.ai>",
    "download_url": "https://files.pythonhosted.org/packages/18/4d/6cff14954d1b06581f8913b9162e4597a213a49ff79bcc9b93d5ce17a3d6/ninja_kafka_sdk-0.3.31.tar.gz",
    "platform": null,
    "description": "# Ninja Kafka SDK\n\n**SDK for distributed task processing with Kafka messaging and automatic service isolation.**\n\nSend tasks to Ninja services and get results back with automatic message routing based on consumer groups. Each service only receives its own results!\n\n## \ud83d\ude80 Quick Start\n\n### 1\ufe0f\u20e3 Send a Task\n\n```python\nimport asyncio\nfrom ninja_kafka_sdk import NinjaClient\n\nasync def send_task_example():\n    # Create client with YOUR service's unique consumer group\n    client = NinjaClient(\n        kafka_servers=\"localhost:9092\",\n        consumer_group=\"my-service\"  # Your service's unique identifier\n    )\n\n    # Send task to browser-ninja for processing\n    correlation_id = await client.send_task(\n        task=\"linkedin_verification\",\n        account_id=123,\n        email=\"user@example.com\",\n\n        # Optional: Add task-specific parameters\n        parameters={\n            'max_retries': 3,\n            'timeout': 60\n        }\n    )\n\n    print(f\"\u2705 Task sent!\")\n    print(f\"\ud83d\udccb Correlation ID: {correlation_id}\")\n    # Note: correlation_id will be 'my-service:uuid-...'\n\n    client.stop()\n    return correlation_id\n\n# Run it\nasyncio.run(send_task_example())\n```\n\n### 2\ufe0f\u20e3 Listen for Results\n\n```python\nimport asyncio\nfrom ninja_kafka_sdk import NinjaClient\n\nasync def listen_for_results():\n    # Use the SAME consumer group as when you sent the task\n    client = NinjaClient(\n        kafka_servers=\"localhost:9092\",\n        consumer_group=\"my-service\"  # Same as sender!\n    )\n\n    print(\"\ud83d\udc42 Listening for results...\")\n\n    # SDK automatically filters - you ONLY get results for YOUR service\n    async for result in client.listen_results():\n        print(f\"\\n\ud83d\udce5 Received result:\")\n        print(f\"   Task: {result.get('task')}\")\n        print(f\"   Status: {result.get('status')}\")\n        print(f\"   Account ID: {result.get('account_id')}\")\n\n        if result.get('status') == 'SUCCESS':\n            data = result.get('data', {})\n            print(f\"   \u2705 Success! Data: {data}\")\n        else:\n            error = result.get('error', {})\n            print(f\"   \u274c Failed! Error: {error}\")\n\n# Run it\nasyncio.run(listen_for_results())\n```  \n\n## \ud83d\udce6 Installation\n\n```bash\n# Install latest version (recommended)\npip install --upgrade ninja-kafka-sdk\n\n# Install from PyPI (gets latest if not installed)\npip install ninja-kafka-sdk\n\n# Install specific version\npip install ninja-kafka-sdk==0.3.24\n\n# Force reinstall to latest version\npip install --force-reinstall --upgrade ninja-kafka-sdk\n```\n\n## \ud83d\udce8 Complete Example: Send and Listen\n\n### Send a Task and Wait for Its Result\n\n```python\nimport asyncio\nfrom ninja_kafka_sdk import NinjaClient\n\nasync def send_and_wait_example():\n    client = NinjaClient(\n        kafka_servers=\"localhost:9092\",\n        consumer_group=\"my-service\"\n    )\n\n    # Step 1: Send task\n    correlation_id = await client.send_task(\n        task=\"linkedin_like\",\n        account_id=456,\n        parameters={\n            'post_url': 'https://linkedin.com/posts/example'\n        }\n    )\n    print(f\"\ud83d\udce4 Task sent: {correlation_id}\")\n\n    # Step 2: Wait for this specific result\n    async for result in client.listen_results(correlation_ids=[correlation_id]):\n        if result.get('correlation_id') == correlation_id:\n            print(f\"\ud83d\udce5 Got result: {result.get('status')}\")\n            break\n\n    client.stop()\n\nasyncio.run(send_and_wait_example())\n```\n\n### Listen Continuously for All Results\n\n```python\nimport asyncio\nfrom ninja_kafka_sdk import NinjaClient\n\nasync def continuous_listener():\n    client = NinjaClient(\n        kafka_servers=\"localhost:9092\",\n        consumer_group=\"my-service\"\n    )\n\n    print(\"\ud83c\udfa7 Listening for all results for 'my-service'...\")\n\n    # This will run forever, processing results as they arrive\n    async for result in client.listen_results():\n        correlation_id = result.get('correlation_id')\n        status = result.get('status')\n\n        # Only receives results for tasks sent by 'my-service'\n        print(f\"Result {correlation_id}: {status}\")\n\n        # Process based on status\n        if status == 'SUCCESS':\n            # Handle success\n            pass\n        else:\n            # Handle failure\n            pass\n\nasyncio.run(continuous_listener())\n```\n\n## \u2699\ufe0f Configuration\n\n### \ud83d\udd11 Key Concept: Consumer Groups\n\n**IMPORTANT**: The `consumer_group` is your service's unique identity!\n- Each service MUST have a UNIQUE consumer group name\n- The SDK uses this to automatically route messages\n- You only receive results for tasks YOUR service sent\n- Consumer group names cannot contain ':' (reserved character)\n\n```python\nfrom ninja_kafka_sdk import NinjaClient\n\n# \u2705 Good - Unique consumer groups\nclient1 = NinjaClient(\n    kafka_servers=\"localhost:9092\",\n    consumer_group=\"auto-login\"  # Service 1\n)\n\nclient2 = NinjaClient(\n    kafka_servers=\"localhost:9092\",\n    consumer_group=\"like-service\"  # Service 2\n)\n\n# \u274c Bad - Consumer group with colon\nclient = NinjaClient(\n    kafka_servers=\"localhost:9092\",\n    consumer_group=\"my:service\"  # Will raise ValueError!\n)\n```\n\n### Configuration from Config Object\n```python\nfrom ninja_kafka_sdk import NinjaClient\nfrom your_app.config import config  # Your application's config\n\nclient = NinjaClient(\n    kafka_servers=config.KAFKA_SERVERS,\n    consumer_group=config.KAFKA_CONSUMER_GROUP\n)\n```\n\n### Configuration with Config Object\n```python\nfrom ninja_kafka_sdk import NinjaClient, NinjaKafkaConfig\n\n# Create configuration object\nconfig = NinjaKafkaConfig(\n    kafka_servers=\"b-1.msk-cluster.amazonaws.com:9092,b-2.msk-cluster.amazonaws.com:9092\",\n    consumer_group=\"my-service\",\n    environment=\"stage\",\n    tasks_topic=\"ninja-tasks\",\n    results_topic=\"ninja-results\"\n)\n\n# Use with client\nclient = NinjaClient(config=config)\n```\n\n## \ud83c\udd95 Version 0.3.23+ Parameters Field Update\n\n**\u2705 IMPROVEMENT**: The `parameters` field is now properly handled in `send_task()`:\n\n```python\n# After v0.3.23+, parameters go to the correct field\nawait client.send_task(\n    task=\"linkedin_like\",\n    account_id=123,\n\n    # These go to their proper fields (not nested in metadata!)\n    parameters={'post_url': 'https://...'},  # \u2192 request.parameters\n    api_endpoints={'callback': 'https://...'}  # \u2192 request.api_endpoints\n)\n```\n\n## \ud83c\udd95 Version 0.2.0 Breaking Changes\n\n**\u26a0\ufe0f BREAKING CHANGE**: `kafka_servers` and `consumer_group` are now **REQUIRED** parameters.\n\n### Migration from v0.1.x\n```python\n# OLD (v0.1.x) - Had auto-detection and localhost fallbacks\nclient = NinjaClient()  # \u274c This no longer works\n\n# NEW (v0.2.0+) - Explicit configuration required  \nclient = NinjaClient(\n    kafka_servers=\"your-kafka-servers:9092\",  # \u2705 Required\n    consumer_group=\"your-service-name\"        # \u2705 Required\n)\n```\n\n### Why This Change?\n- **Production Safety**: Prevents localhost fallbacks in production environments\n- **Explicit Configuration**: No more guessing what environment you're connecting to\n- **Debugging**: Clear errors when configuration is missing\n- **Environment Agnostic**: Same code works everywhere with different config\n\n## \ud83d\udca1 How to Send Tasks\n\n### Basic Task Execution\n```python\nfrom ninja_kafka_sdk import NinjaClient\n\nasync def verify_linkedin_account():\n    # Explicit configuration for production\n    client = NinjaClient(\n        kafka_servers=\"b-1.msk-cluster.amazonaws.com:9092,b-2.msk-cluster.amazonaws.com:9092\",\n        consumer_group=\"auto-login-service\",\n        environment=\"prod\"\n    )\n    \n    try:\n        # Send task and wait for result (one method call)\n        result = await client.execute_task(\n            task=\"linkedin_verification\",\n            account_id=12345,\n            email=\"user@example.com\",\n            timeout=300  # 5 minutes\n        )\n        \n        if result.is_success:\n            print(\"\u2705 Verification successful!\")\n            return result.cookies\n        else:\n            print(f\"\u274c Failed: {result.error_message}\")\n            return None\n            \n    finally:\n        client.stop()\n```\n\n\n\n### Advanced Usage Patterns\n\n#### Fire and Forget\n```python\nasync def send_multiple_tasks():\n    # Must provide explicit configuration\n    client = NinjaClient(\n        kafka_servers=\"localhost:9092\",\n        consumer_group=\"task-sender\"\n    )\n    \n    # Send task without waiting for result\n    correlation_id = await client.send_task(\n        task=\"linkedin_verification\", \n        account_id=123\n    )\n    print(f\"Task sent: {correlation_id}\")\n    client.stop()\n```\n\n#### Batch Processing\n```python\nasync def process_multiple_accounts():\n    client = NinjaClient(\n        kafka_servers=\"your-kafka-servers:9092\",\n        consumer_group=\"batch-processor\"\n    )\n    accounts = [123, 456, 789]\n\n    try:\n        # Send all tasks\n        task_ids = []\n        for account_id in accounts:\n            task_id = await client.send_task(\"linkedin_verification\", account_id=account_id)\n            task_ids.append(task_id)\n\n        # Listen for all results\n        completed = 0\n        async for result in client.listen_results(correlation_ids=task_ids):\n            completed += 1\n            print(f\"Account {result.account_id}: {result.status}\")\n            if completed >= len(accounts):\n                break\n                \n    finally:\n        client.stop()\n```\n\n#### Different Environment Examples\n```python\n# Local development\nasync def local_verification():\n    client = NinjaClient(\n        kafka_servers=\"localhost:9092\",\n        consumer_group=\"local-test\",\n        environment=\"local\"  # Optional: for logging only\n    )\n    result = await client.execute_task(\"linkedin_verification\", account_id=123)\n    client.stop()\n    return result\n\n# Production environment\nasync def production_verification():\n    client = NinjaClient(\n        kafka_servers=\"b-1.msk-cluster.amazonaws.com:9092,b-2.msk-cluster.amazonaws.com:9092\",\n        consumer_group=\"auto-login-prod\",\n        environment=\"production\"\n    )\n    result = await client.execute_task(\"linkedin_verification\", account_id=123)\n    client.stop()\n    return result\n\n# Using config object  \nasync def config_based_verification():\n    from your_app.config import config\n    \n    client = NinjaClient(\n        kafka_servers=config.KAFKA_SERVERS,\n        consumer_group=config.KAFKA_CONSUMER_GROUP\n    )\n    result = await client.execute_task(\"linkedin_verification\", account_id=123)\n    client.stop()\n    return result\n```\n\n## \ud83c\udfd7\ufe0f Available Tasks\n\n### LinkedIn Verification\n```python\nresult = await client.execute_task(\n    task=\"linkedin_verification\",\n    account_id=123,\n    email=\"user@example.com\",  # Optional but highly recommended\n    timeout=300  # 5 minutes\n)\n```\n\n### Future Tasks\nMore task types will be added for different platforms:\n- `twitter_verification`\n- `instagram_verification` \n- `facebook_verification`\n\n## \ud83d\udcdd Message Models\n\n### Task Request\n```python\n@dataclass\nclass NinjaTaskRequest:\n    task: str              # \"linkedin_verification\"\n    account_id: int        # Account ID\n    correlation_id: str    # Auto-generated UUID\n    email: Optional[str]   # Account email\n    user_id: Optional[int] # User ID\n    metadata: Dict[str, Any]  # Additional parameters\n```\n\n### Task Result\n```python\n@dataclass \nclass NinjaTaskResult:\n    correlation_id: str    # Matches request\n    task: str             # Task type\n    status: str           # \"VERIFIED\", \"FAILED\", etc.\n    success: bool         # True if successful\n    account_id: int       # Account ID\n    cookies: Optional[str] # Extracted cookies\n    data: Optional[Dict]   # Additional result data\n    error: Optional[Dict]  # Error details if failed\n    \n    @property\n    def is_success(self) -> bool:\n        return self.success or self.status == 'VERIFIED'\n```\n\n## \ud83d\udea8 Error Handling\n\n```python\nfrom ninja_kafka_sdk import (\n    NinjaClient, NinjaTaskTimeoutError, \n    NinjaTaskError, NinjaKafkaConnectionError\n)\n\ntry:\n    result = await client.execute_task(\"linkedin_verification\", account_id=123)\n    \nexcept NinjaTaskTimeoutError:\n    print(\"Task took too long\")\n    \nexcept NinjaTaskError as e:\n    print(f\"Ninja couldn't complete task: {e.details}\")\n    \nexcept NinjaKafkaConnectionError:\n    print(\"Can't connect to Kafka\")\n```\n\n## \ud83d\udd0c Extending for New Services\n\n```python\n# Add new task types easily\nawait client.send_task(\n    task=\"twitter_scraping\",\n    account_id=123,\n    parameters={\"target_user\": \"@elonmusk\"}\n)\n\n# SDK handles routing to appropriate Ninja service\n```\n\n\n\n## \ud83d\udd27 Troubleshooting\n\n### Common Configuration Issues\n\n#### Issue: \"Can't connect to Kafka\"\n```python\n# Check your servers configuration\nfrom ninja_kafka_sdk.config import NinjaKafkaConfig\nconfig = NinjaKafkaConfig()\nprint(f\"Environment: {config.environment}\")\nprint(f\"Kafka servers: {config.kafka_servers}\")\nprint(f\"Consumer group: {config.consumer_group}\")\n```\n\n**Solutions:**\n1. **Local Development**: Ensure Kafka is running on `localhost:9092`\n2. **Stage/Prod**: Verify `KAFKA_STAGE_SERVERS` or `KAFKA_PROD_SERVERS` are set\n3. **Custom Provider**: Use `KAFKA_BOOTSTRAP_SERVERS` for explicit override\n\n#### Issue: \"No messages received\"\n```python\n# Check consumer group conflicts\nimport os\nprint(f\"Consumer group: {os.getenv('KAFKA_CONSUMER_GROUP', 'auto-detected')}\")\n\n# Force specific consumer group\nos.environ['KAFKA_CONSUMER_GROUP'] = 'my-unique-group'\nclient = NinjaClient()\n```\n\n#### Issue: \"Task timeout\"\n```python\n# Increase timeout for slow operations\nclient = NinjaClient(timeout=600)  # 10 minutes\nresult = await client.execute_task(\"linkedin_verification\", account_id=123, timeout=300)\n```\n\n### Environment Detection Debug\n\n```python\nfrom ninja_kafka_sdk.config import NinjaKafkaConfig\n\n# Debug environment detection\nconfig = NinjaKafkaConfig()\nprint(f\"Environment: {config.environment}\")\nprint(f\"Servers: {config.kafka_servers}\")\n\n# Force specific environment\nconfig = NinjaKafkaConfig(environment='stage')\nprint(f\"Forced stage servers: {config.kafka_servers}\")\n```\n\n### Quick Health Check\n\n```python\nfrom ninja_kafka_sdk import NinjaClient\nimport asyncio\n\nasync def health_check():\n    client = NinjaClient()\n    try:\n        # Test connection by sending a test message\n        correlation_id = await client.send_task(\"health_check\", account_id=0)\n        print(f\"\u2705 Connection OK - Test message sent: {correlation_id}\")\n        return True\n    except Exception as e:\n        print(f\"\u274c Connection failed: {e}\")\n        return False\n    finally:\n        client.stop()\n\n# Run health check\nasyncio.run(health_check())\n```\n\n\n\n\n---\n\n## \ud83d\udcda Appendix: For Service Implementers\n\nThis section contains information for developers implementing Ninja services (like browser-ninja) that process tasks and send results back.\n\n### Sending Task Results\n\nIf you're building a service that processes Ninja tasks, use these methods to send results:\n\n```python\nfrom ninja_kafka_sdk import NinjaClient\n\nasync def send_verification_result():\n    # Configure client for service that processes tasks\n    client = NinjaClient(\n        kafka_servers=\"your-kafka-servers:9092\",\n        consumer_group=\"browser-ninja\",  # Service-specific consumer group\n        environment=\"prod\"\n    )\n    \n    try:\n        # Send success result\n        await client.send_success_result(\n            correlation_id=\"task-123-456\",\n            account_id=12345,\n            email=\"user@example.com\",\n            cookies=\"extracted_cookies_data\",\n            screenshot=\"base64_screenshot\"\n        )\n        \n        # Or send error result\n        await client.send_error_result(\n            correlation_id=\"task-123-457\",\n            account_id=12346,\n            email=\"user2@example.com\",\n            error_code=\"LOGIN_FAILED\",\n            error_message=\"Invalid credentials\"\n        )\n        \n    finally:\n        client.stop()\n```\n\n### Listening for Tasks (Future Feature)\n\n```python\nfrom ninja_kafka_sdk import NinjaClient\n\nasync def process_ninja_tasks():\n    client = NinjaClient(\n        kafka_servers=\"your-kafka-servers:9092\",\n        consumer_group=\"browser-ninja\"\n    )\n    \n    try:\n        # Listen for incoming tasks\n        async for task in client.listen_tasks():\n            print(f\"\ud83d\udce5 Received task: {task.task} for account {task.account_id}\")\n            \n            # Process the task\n            if task.task == \"linkedin_verification\":\n                result = await process_linkedin_verification(task)\n                \n                # Send result back\n                if result[\"success\"]:\n                    await client.send_success_result(\n                        correlation_id=task.correlation_id,\n                        account_id=task.account_id,\n                        email=task.email,\n                        cookies=result[\"cookies\"]\n                    )\n                else:\n                    await client.send_error_result(\n                        correlation_id=task.correlation_id,\n                        account_id=task.account_id,\n                        email=task.email,\n                        error_code=result[\"error_code\"],\n                        error_message=result[\"error_message\"]\n                    )\n                    \n    finally:\n        client.stop()\n```\n\n\n\n---\n\n**The Ninja Kafka SDK simplifies task-based communication while maintaining enterprise-grade reliability.**\n",
    "bugtrack_url": null,
    "license": null,
    "summary": "Kafka-based distributed task processing SDK",
    "version": "0.3.31",
    "project_urls": {
        "Bug Reports": "https://github.com/blazel/ninja-kafka-sdk/issues",
        "Homepage": "https://github.com/blazel/ninja-kafka-sdk",
        "Source": "https://github.com/blazel/ninja-kafka-sdk"
    },
    "split_keywords": [
        "kafka",
        " task-queue",
        " distributed-computing",
        " microservices",
        " messaging"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "2fd8d8df3ad892a3a61239ad0e9efb2160f7052118d7c7aba758afd361da7249",
                "md5": "49c8b6ccaad1c4860f5cab1ce616f54d",
                "sha256": "72fb79b355ec7d4fbffa177727b28302de425ebd0e1c680a295cc92c0f5a4e77"
            },
            "downloads": -1,
            "filename": "ninja_kafka_sdk-0.3.31-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "49c8b6ccaad1c4860f5cab1ce616f54d",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.8",
            "size": 43259,
            "upload_time": "2025-10-30T22:19:12",
            "upload_time_iso_8601": "2025-10-30T22:19:12.511408Z",
            "url": "https://files.pythonhosted.org/packages/2f/d8/d8df3ad892a3a61239ad0e9efb2160f7052118d7c7aba758afd361da7249/ninja_kafka_sdk-0.3.31-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "184d6cff14954d1b06581f8913b9162e4597a213a49ff79bcc9b93d5ce17a3d6",
                "md5": "ab4e16ff7fdf5879da7f5c0c6672078f",
                "sha256": "54dacebbd99c3b8e7a81141f6a7dc64a33739d3573fa8aefa2d3d02be03aabb7"
            },
            "downloads": -1,
            "filename": "ninja_kafka_sdk-0.3.31.tar.gz",
            "has_sig": false,
            "md5_digest": "ab4e16ff7fdf5879da7f5c0c6672078f",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.8",
            "size": 43004,
            "upload_time": "2025-10-30T22:19:13",
            "upload_time_iso_8601": "2025-10-30T22:19:13.518054Z",
            "url": "https://files.pythonhosted.org/packages/18/4d/6cff14954d1b06581f8913b9162e4597a213a49ff79bcc9b93d5ce17a3d6/ninja_kafka_sdk-0.3.31.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-10-30 22:19:13",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "blazel",
    "github_project": "ninja-kafka-sdk",
    "github_not_found": true,
    "lcname": "ninja-kafka-sdk"
}
        
Elapsed time: 3.95710s