# Ninja Kafka SDK
**SDK for distributed task processing with Kafka messaging and automatic service isolation.**
Send tasks to Ninja services and get results back with automatic message routing based on consumer groups. Each service only receives its own results!
## š Quick Start
### 1ļøā£ Send a Task
```python
import asyncio
from ninja_kafka_sdk import NinjaClient
async def send_task_example():
# Create client with YOUR service's unique consumer group
client = NinjaClient(
kafka_servers="localhost:9092",
consumer_group="my-service" # Your service's unique identifier
)
# Send task to browser-ninja for processing
correlation_id = await client.send_task(
task="linkedin_verification",
account_id=123,
email="user@example.com",
# Optional: Add task-specific parameters
parameters={
'max_retries': 3,
'timeout': 60
}
)
print(f"ā
Task sent!")
print(f"š Correlation ID: {correlation_id}")
# Note: correlation_id will be 'my-service:uuid-...'
client.stop()
return correlation_id
# Run it
asyncio.run(send_task_example())
```
### 2ļøā£ Listen for Results
```python
import asyncio
from ninja_kafka_sdk import NinjaClient
async def listen_for_results():
# Use the SAME consumer group as when you sent the task
client = NinjaClient(
kafka_servers="localhost:9092",
consumer_group="my-service" # Same as sender!
)
print("š Listening for results...")
# SDK automatically filters - you ONLY get results for YOUR service
async for result in client.listen_results():
print(f"\nš„ Received result:")
print(f" Task: {result.get('task')}")
print(f" Status: {result.get('status')}")
print(f" Account ID: {result.get('account_id')}")
if result.get('status') == 'SUCCESS':
data = result.get('data', {})
print(f" ā
Success! Data: {data}")
else:
error = result.get('error', {})
print(f" ā Failed! Error: {error}")
# Run it
asyncio.run(listen_for_results())
```
## š¦ Installation
```bash
# Install latest version (recommended)
pip install --upgrade ninja-kafka-sdk
# Install from PyPI (gets latest if not installed)
pip install ninja-kafka-sdk
# Install specific version
pip install ninja-kafka-sdk==0.3.24
# Force reinstall to latest version
pip install --force-reinstall --upgrade ninja-kafka-sdk
```
## šØ Complete Example: Send and Listen
### Send a Task and Wait for Its Result
```python
import asyncio
from ninja_kafka_sdk import NinjaClient
async def send_and_wait_example():
client = NinjaClient(
kafka_servers="localhost:9092",
consumer_group="my-service"
)
# Step 1: Send task
correlation_id = await client.send_task(
task="linkedin_like",
account_id=456,
parameters={
'post_url': 'https://linkedin.com/posts/example'
}
)
print(f"š¤ Task sent: {correlation_id}")
# Step 2: Wait for this specific result
async for result in client.listen_results(correlation_ids=[correlation_id]):
if result.get('correlation_id') == correlation_id:
print(f"š„ Got result: {result.get('status')}")
break
client.stop()
asyncio.run(send_and_wait_example())
```
### Listen Continuously for All Results
```python
import asyncio
from ninja_kafka_sdk import NinjaClient
async def continuous_listener():
client = NinjaClient(
kafka_servers="localhost:9092",
consumer_group="my-service"
)
print("š§ Listening for all results for 'my-service'...")
# This will run forever, processing results as they arrive
async for result in client.listen_results():
correlation_id = result.get('correlation_id')
status = result.get('status')
# Only receives results for tasks sent by 'my-service'
print(f"Result {correlation_id}: {status}")
# Process based on status
if status == 'SUCCESS':
# Handle success
pass
else:
# Handle failure
pass
asyncio.run(continuous_listener())
```
## āļø Configuration
### š Key Concept: Consumer Groups
**IMPORTANT**: The `consumer_group` is your service's unique identity!
- Each service MUST have a UNIQUE consumer group name
- The SDK uses this to automatically route messages
- You only receive results for tasks YOUR service sent
- Consumer group names cannot contain ':' (reserved character)
```python
from ninja_kafka_sdk import NinjaClient
# ā
Good - Unique consumer groups
client1 = NinjaClient(
kafka_servers="localhost:9092",
consumer_group="auto-login" # Service 1
)
client2 = NinjaClient(
kafka_servers="localhost:9092",
consumer_group="like-service" # Service 2
)
# ā Bad - Consumer group with colon
client = NinjaClient(
kafka_servers="localhost:9092",
consumer_group="my:service" # Will raise ValueError!
)
```
### Configuration from Config Object
```python
from ninja_kafka_sdk import NinjaClient
from your_app.config import config # Your application's config
client = NinjaClient(
kafka_servers=config.KAFKA_SERVERS,
consumer_group=config.KAFKA_CONSUMER_GROUP
)
```
### Configuration with Config Object
```python
from ninja_kafka_sdk import NinjaClient, NinjaKafkaConfig
# Create configuration object
config = NinjaKafkaConfig(
kafka_servers="b-1.msk-cluster.amazonaws.com:9092,b-2.msk-cluster.amazonaws.com:9092",
consumer_group="my-service",
environment="stage",
tasks_topic="ninja-tasks",
results_topic="ninja-results"
)
# Use with client
client = NinjaClient(config=config)
```
## š Version 0.3.23+ Parameters Field Update
**ā
IMPROVEMENT**: The `parameters` field is now properly handled in `send_task()`:
```python
# After v0.3.23+, parameters go to the correct field
await client.send_task(
task="linkedin_like",
account_id=123,
# These go to their proper fields (not nested in metadata!)
parameters={'post_url': 'https://...'}, # ā request.parameters
api_endpoints={'callback': 'https://...'} # ā request.api_endpoints
)
```
## š Version 0.2.0 Breaking Changes
**ā ļø BREAKING CHANGE**: `kafka_servers` and `consumer_group` are now **REQUIRED** parameters.
### Migration from v0.1.x
```python
# OLD (v0.1.x) - Had auto-detection and localhost fallbacks
client = NinjaClient() # ā This no longer works
# NEW (v0.2.0+) - Explicit configuration required
client = NinjaClient(
kafka_servers="your-kafka-servers:9092", # ā
Required
consumer_group="your-service-name" # ā
Required
)
```
### Why This Change?
- **Production Safety**: Prevents localhost fallbacks in production environments
- **Explicit Configuration**: No more guessing what environment you're connecting to
- **Debugging**: Clear errors when configuration is missing
- **Environment Agnostic**: Same code works everywhere with different config
## š” How to Send Tasks
### Basic Task Execution
```python
from ninja_kafka_sdk import NinjaClient
async def verify_linkedin_account():
# Explicit configuration for production
client = NinjaClient(
kafka_servers="b-1.msk-cluster.amazonaws.com:9092,b-2.msk-cluster.amazonaws.com:9092",
consumer_group="auto-login-service",
environment="prod"
)
try:
# Send task and wait for result (one method call)
result = await client.execute_task(
task="linkedin_verification",
account_id=12345,
email="user@example.com",
timeout=300 # 5 minutes
)
if result.is_success:
print("ā
Verification successful!")
return result.cookies
else:
print(f"ā Failed: {result.error_message}")
return None
finally:
client.stop()
```
### Advanced Usage Patterns
#### Fire and Forget
```python
async def send_multiple_tasks():
# Must provide explicit configuration
client = NinjaClient(
kafka_servers="localhost:9092",
consumer_group="task-sender"
)
# Send task without waiting for result
correlation_id = await client.send_task(
task="linkedin_verification",
account_id=123
)
print(f"Task sent: {correlation_id}")
client.stop()
```
#### Batch Processing
```python
async def process_multiple_accounts():
client = NinjaClient(
kafka_servers="your-kafka-servers:9092",
consumer_group="batch-processor"
)
accounts = [123, 456, 789]
try:
# Send all tasks
task_ids = []
for account_id in accounts:
task_id = await client.send_task("linkedin_verification", account_id=account_id)
task_ids.append(task_id)
# Listen for all results
completed = 0
async for result in client.listen_results(correlation_ids=task_ids):
completed += 1
print(f"Account {result.account_id}: {result.status}")
if completed >= len(accounts):
break
finally:
client.stop()
```
#### Different Environment Examples
```python
# Local development
async def local_verification():
client = NinjaClient(
kafka_servers="localhost:9092",
consumer_group="local-test",
environment="local" # Optional: for logging only
)
result = await client.execute_task("linkedin_verification", account_id=123)
client.stop()
return result
# Production environment
async def production_verification():
client = NinjaClient(
kafka_servers="b-1.msk-cluster.amazonaws.com:9092,b-2.msk-cluster.amazonaws.com:9092",
consumer_group="auto-login-prod",
environment="production"
)
result = await client.execute_task("linkedin_verification", account_id=123)
client.stop()
return result
# Using config object
async def config_based_verification():
from your_app.config import config
client = NinjaClient(
kafka_servers=config.KAFKA_SERVERS,
consumer_group=config.KAFKA_CONSUMER_GROUP
)
result = await client.execute_task("linkedin_verification", account_id=123)
client.stop()
return result
```
## šļø Available Tasks
### LinkedIn Verification
```python
result = await client.execute_task(
task="linkedin_verification",
account_id=123,
email="user@example.com", # Optional but highly recommended
timeout=300 # 5 minutes
)
```
### Future Tasks
More task types will be added for different platforms:
- `twitter_verification`
- `instagram_verification`
- `facebook_verification`
## š Message Models
### Task Request
```python
@dataclass
class NinjaTaskRequest:
task: str # "linkedin_verification"
account_id: int # Account ID
correlation_id: str # Auto-generated UUID
email: Optional[str] # Account email
user_id: Optional[int] # User ID
metadata: Dict[str, Any] # Additional parameters
```
### Task Result
```python
@dataclass
class NinjaTaskResult:
correlation_id: str # Matches request
task: str # Task type
status: str # "VERIFIED", "FAILED", etc.
success: bool # True if successful
account_id: int # Account ID
cookies: Optional[str] # Extracted cookies
data: Optional[Dict] # Additional result data
error: Optional[Dict] # Error details if failed
@property
def is_success(self) -> bool:
return self.success or self.status == 'VERIFIED'
```
## šØ Error Handling
```python
from ninja_kafka_sdk import (
NinjaClient, NinjaTaskTimeoutError,
NinjaTaskError, NinjaKafkaConnectionError
)
try:
result = await client.execute_task("linkedin_verification", account_id=123)
except NinjaTaskTimeoutError:
print("Task took too long")
except NinjaTaskError as e:
print(f"Ninja couldn't complete task: {e.details}")
except NinjaKafkaConnectionError:
print("Can't connect to Kafka")
```
## š Extending for New Services
```python
# Add new task types easily
await client.send_task(
task="twitter_scraping",
account_id=123,
parameters={"target_user": "@elonmusk"}
)
# SDK handles routing to appropriate Ninja service
```
## š§ Troubleshooting
### Common Configuration Issues
#### Issue: "Can't connect to Kafka"
```python
# Check your servers configuration
from ninja_kafka_sdk.config import NinjaKafkaConfig
config = NinjaKafkaConfig()
print(f"Environment: {config.environment}")
print(f"Kafka servers: {config.kafka_servers}")
print(f"Consumer group: {config.consumer_group}")
```
**Solutions:**
1. **Local Development**: Ensure Kafka is running on `localhost:9092`
2. **Stage/Prod**: Verify `KAFKA_STAGE_SERVERS` or `KAFKA_PROD_SERVERS` are set
3. **Custom Provider**: Use `KAFKA_BOOTSTRAP_SERVERS` for explicit override
#### Issue: "No messages received"
```python
# Check consumer group conflicts
import os
print(f"Consumer group: {os.getenv('KAFKA_CONSUMER_GROUP', 'auto-detected')}")
# Force specific consumer group
os.environ['KAFKA_CONSUMER_GROUP'] = 'my-unique-group'
client = NinjaClient()
```
#### Issue: "Task timeout"
```python
# Increase timeout for slow operations
client = NinjaClient(timeout=600) # 10 minutes
result = await client.execute_task("linkedin_verification", account_id=123, timeout=300)
```
### Environment Detection Debug
```python
from ninja_kafka_sdk.config import NinjaKafkaConfig
# Debug environment detection
config = NinjaKafkaConfig()
print(f"Environment: {config.environment}")
print(f"Servers: {config.kafka_servers}")
# Force specific environment
config = NinjaKafkaConfig(environment='stage')
print(f"Forced stage servers: {config.kafka_servers}")
```
### Quick Health Check
```python
from ninja_kafka_sdk import NinjaClient
import asyncio
async def health_check():
client = NinjaClient()
try:
# Test connection by sending a test message
correlation_id = await client.send_task("health_check", account_id=0)
print(f"ā
Connection OK - Test message sent: {correlation_id}")
return True
except Exception as e:
print(f"ā Connection failed: {e}")
return False
finally:
client.stop()
# Run health check
asyncio.run(health_check())
```
---
## š Appendix: For Service Implementers
This section contains information for developers implementing Ninja services (like browser-ninja) that process tasks and send results back.
### Sending Task Results
If you're building a service that processes Ninja tasks, use these methods to send results:
```python
from ninja_kafka_sdk import NinjaClient
async def send_verification_result():
# Configure client for service that processes tasks
client = NinjaClient(
kafka_servers="your-kafka-servers:9092",
consumer_group="browser-ninja", # Service-specific consumer group
environment="prod"
)
try:
# Send success result
await client.send_success_result(
correlation_id="task-123-456",
account_id=12345,
email="user@example.com",
cookies="extracted_cookies_data",
screenshot="base64_screenshot"
)
# Or send error result
await client.send_error_result(
correlation_id="task-123-457",
account_id=12346,
email="user2@example.com",
error_code="LOGIN_FAILED",
error_message="Invalid credentials"
)
finally:
client.stop()
```
### Listening for Tasks (Future Feature)
```python
from ninja_kafka_sdk import NinjaClient
async def process_ninja_tasks():
client = NinjaClient(
kafka_servers="your-kafka-servers:9092",
consumer_group="browser-ninja"
)
try:
# Listen for incoming tasks
async for task in client.listen_tasks():
print(f"š„ Received task: {task.task} for account {task.account_id}")
# Process the task
if task.task == "linkedin_verification":
result = await process_linkedin_verification(task)
# Send result back
if result["success"]:
await client.send_success_result(
correlation_id=task.correlation_id,
account_id=task.account_id,
email=task.email,
cookies=result["cookies"]
)
else:
await client.send_error_result(
correlation_id=task.correlation_id,
account_id=task.account_id,
email=task.email,
error_code=result["error_code"],
error_message=result["error_message"]
)
finally:
client.stop()
```
---
**The Ninja Kafka SDK simplifies task-based communication while maintaining enterprise-grade reliability.**
Raw data
{
"_id": null,
"home_page": null,
"name": "ninja-kafka-sdk",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.8",
"maintainer_email": null,
"keywords": "kafka, task-queue, distributed-computing, microservices, messaging",
"author": null,
"author_email": "Blazel Team <dev@blazel.ai>",
"download_url": "https://files.pythonhosted.org/packages/18/4d/6cff14954d1b06581f8913b9162e4597a213a49ff79bcc9b93d5ce17a3d6/ninja_kafka_sdk-0.3.31.tar.gz",
"platform": null,
"description": "# Ninja Kafka SDK\n\n**SDK for distributed task processing with Kafka messaging and automatic service isolation.**\n\nSend tasks to Ninja services and get results back with automatic message routing based on consumer groups. Each service only receives its own results!\n\n## \ud83d\ude80 Quick Start\n\n### 1\ufe0f\u20e3 Send a Task\n\n```python\nimport asyncio\nfrom ninja_kafka_sdk import NinjaClient\n\nasync def send_task_example():\n # Create client with YOUR service's unique consumer group\n client = NinjaClient(\n kafka_servers=\"localhost:9092\",\n consumer_group=\"my-service\" # Your service's unique identifier\n )\n\n # Send task to browser-ninja for processing\n correlation_id = await client.send_task(\n task=\"linkedin_verification\",\n account_id=123,\n email=\"user@example.com\",\n\n # Optional: Add task-specific parameters\n parameters={\n 'max_retries': 3,\n 'timeout': 60\n }\n )\n\n print(f\"\u2705 Task sent!\")\n print(f\"\ud83d\udccb Correlation ID: {correlation_id}\")\n # Note: correlation_id will be 'my-service:uuid-...'\n\n client.stop()\n return correlation_id\n\n# Run it\nasyncio.run(send_task_example())\n```\n\n### 2\ufe0f\u20e3 Listen for Results\n\n```python\nimport asyncio\nfrom ninja_kafka_sdk import NinjaClient\n\nasync def listen_for_results():\n # Use the SAME consumer group as when you sent the task\n client = NinjaClient(\n kafka_servers=\"localhost:9092\",\n consumer_group=\"my-service\" # Same as sender!\n )\n\n print(\"\ud83d\udc42 Listening for results...\")\n\n # SDK automatically filters - you ONLY get results for YOUR service\n async for result in client.listen_results():\n print(f\"\\n\ud83d\udce5 Received result:\")\n print(f\" Task: {result.get('task')}\")\n print(f\" Status: {result.get('status')}\")\n print(f\" Account ID: {result.get('account_id')}\")\n\n if result.get('status') == 'SUCCESS':\n data = result.get('data', {})\n print(f\" \u2705 Success! Data: {data}\")\n else:\n error = result.get('error', {})\n print(f\" \u274c Failed! Error: {error}\")\n\n# Run it\nasyncio.run(listen_for_results())\n``` \n\n## \ud83d\udce6 Installation\n\n```bash\n# Install latest version (recommended)\npip install --upgrade ninja-kafka-sdk\n\n# Install from PyPI (gets latest if not installed)\npip install ninja-kafka-sdk\n\n# Install specific version\npip install ninja-kafka-sdk==0.3.24\n\n# Force reinstall to latest version\npip install --force-reinstall --upgrade ninja-kafka-sdk\n```\n\n## \ud83d\udce8 Complete Example: Send and Listen\n\n### Send a Task and Wait for Its Result\n\n```python\nimport asyncio\nfrom ninja_kafka_sdk import NinjaClient\n\nasync def send_and_wait_example():\n client = NinjaClient(\n kafka_servers=\"localhost:9092\",\n consumer_group=\"my-service\"\n )\n\n # Step 1: Send task\n correlation_id = await client.send_task(\n task=\"linkedin_like\",\n account_id=456,\n parameters={\n 'post_url': 'https://linkedin.com/posts/example'\n }\n )\n print(f\"\ud83d\udce4 Task sent: {correlation_id}\")\n\n # Step 2: Wait for this specific result\n async for result in client.listen_results(correlation_ids=[correlation_id]):\n if result.get('correlation_id') == correlation_id:\n print(f\"\ud83d\udce5 Got result: {result.get('status')}\")\n break\n\n client.stop()\n\nasyncio.run(send_and_wait_example())\n```\n\n### Listen Continuously for All Results\n\n```python\nimport asyncio\nfrom ninja_kafka_sdk import NinjaClient\n\nasync def continuous_listener():\n client = NinjaClient(\n kafka_servers=\"localhost:9092\",\n consumer_group=\"my-service\"\n )\n\n print(\"\ud83c\udfa7 Listening for all results for 'my-service'...\")\n\n # This will run forever, processing results as they arrive\n async for result in client.listen_results():\n correlation_id = result.get('correlation_id')\n status = result.get('status')\n\n # Only receives results for tasks sent by 'my-service'\n print(f\"Result {correlation_id}: {status}\")\n\n # Process based on status\n if status == 'SUCCESS':\n # Handle success\n pass\n else:\n # Handle failure\n pass\n\nasyncio.run(continuous_listener())\n```\n\n## \u2699\ufe0f Configuration\n\n### \ud83d\udd11 Key Concept: Consumer Groups\n\n**IMPORTANT**: The `consumer_group` is your service's unique identity!\n- Each service MUST have a UNIQUE consumer group name\n- The SDK uses this to automatically route messages\n- You only receive results for tasks YOUR service sent\n- Consumer group names cannot contain ':' (reserved character)\n\n```python\nfrom ninja_kafka_sdk import NinjaClient\n\n# \u2705 Good - Unique consumer groups\nclient1 = NinjaClient(\n kafka_servers=\"localhost:9092\",\n consumer_group=\"auto-login\" # Service 1\n)\n\nclient2 = NinjaClient(\n kafka_servers=\"localhost:9092\",\n consumer_group=\"like-service\" # Service 2\n)\n\n# \u274c Bad - Consumer group with colon\nclient = NinjaClient(\n kafka_servers=\"localhost:9092\",\n consumer_group=\"my:service\" # Will raise ValueError!\n)\n```\n\n### Configuration from Config Object\n```python\nfrom ninja_kafka_sdk import NinjaClient\nfrom your_app.config import config # Your application's config\n\nclient = NinjaClient(\n kafka_servers=config.KAFKA_SERVERS,\n consumer_group=config.KAFKA_CONSUMER_GROUP\n)\n```\n\n### Configuration with Config Object\n```python\nfrom ninja_kafka_sdk import NinjaClient, NinjaKafkaConfig\n\n# Create configuration object\nconfig = NinjaKafkaConfig(\n kafka_servers=\"b-1.msk-cluster.amazonaws.com:9092,b-2.msk-cluster.amazonaws.com:9092\",\n consumer_group=\"my-service\",\n environment=\"stage\",\n tasks_topic=\"ninja-tasks\",\n results_topic=\"ninja-results\"\n)\n\n# Use with client\nclient = NinjaClient(config=config)\n```\n\n## \ud83c\udd95 Version 0.3.23+ Parameters Field Update\n\n**\u2705 IMPROVEMENT**: The `parameters` field is now properly handled in `send_task()`:\n\n```python\n# After v0.3.23+, parameters go to the correct field\nawait client.send_task(\n task=\"linkedin_like\",\n account_id=123,\n\n # These go to their proper fields (not nested in metadata!)\n parameters={'post_url': 'https://...'}, # \u2192 request.parameters\n api_endpoints={'callback': 'https://...'} # \u2192 request.api_endpoints\n)\n```\n\n## \ud83c\udd95 Version 0.2.0 Breaking Changes\n\n**\u26a0\ufe0f BREAKING CHANGE**: `kafka_servers` and `consumer_group` are now **REQUIRED** parameters.\n\n### Migration from v0.1.x\n```python\n# OLD (v0.1.x) - Had auto-detection and localhost fallbacks\nclient = NinjaClient() # \u274c This no longer works\n\n# NEW (v0.2.0+) - Explicit configuration required \nclient = NinjaClient(\n kafka_servers=\"your-kafka-servers:9092\", # \u2705 Required\n consumer_group=\"your-service-name\" # \u2705 Required\n)\n```\n\n### Why This Change?\n- **Production Safety**: Prevents localhost fallbacks in production environments\n- **Explicit Configuration**: No more guessing what environment you're connecting to\n- **Debugging**: Clear errors when configuration is missing\n- **Environment Agnostic**: Same code works everywhere with different config\n\n## \ud83d\udca1 How to Send Tasks\n\n### Basic Task Execution\n```python\nfrom ninja_kafka_sdk import NinjaClient\n\nasync def verify_linkedin_account():\n # Explicit configuration for production\n client = NinjaClient(\n kafka_servers=\"b-1.msk-cluster.amazonaws.com:9092,b-2.msk-cluster.amazonaws.com:9092\",\n consumer_group=\"auto-login-service\",\n environment=\"prod\"\n )\n \n try:\n # Send task and wait for result (one method call)\n result = await client.execute_task(\n task=\"linkedin_verification\",\n account_id=12345,\n email=\"user@example.com\",\n timeout=300 # 5 minutes\n )\n \n if result.is_success:\n print(\"\u2705 Verification successful!\")\n return result.cookies\n else:\n print(f\"\u274c Failed: {result.error_message}\")\n return None\n \n finally:\n client.stop()\n```\n\n\n\n### Advanced Usage Patterns\n\n#### Fire and Forget\n```python\nasync def send_multiple_tasks():\n # Must provide explicit configuration\n client = NinjaClient(\n kafka_servers=\"localhost:9092\",\n consumer_group=\"task-sender\"\n )\n \n # Send task without waiting for result\n correlation_id = await client.send_task(\n task=\"linkedin_verification\", \n account_id=123\n )\n print(f\"Task sent: {correlation_id}\")\n client.stop()\n```\n\n#### Batch Processing\n```python\nasync def process_multiple_accounts():\n client = NinjaClient(\n kafka_servers=\"your-kafka-servers:9092\",\n consumer_group=\"batch-processor\"\n )\n accounts = [123, 456, 789]\n\n try:\n # Send all tasks\n task_ids = []\n for account_id in accounts:\n task_id = await client.send_task(\"linkedin_verification\", account_id=account_id)\n task_ids.append(task_id)\n\n # Listen for all results\n completed = 0\n async for result in client.listen_results(correlation_ids=task_ids):\n completed += 1\n print(f\"Account {result.account_id}: {result.status}\")\n if completed >= len(accounts):\n break\n \n finally:\n client.stop()\n```\n\n#### Different Environment Examples\n```python\n# Local development\nasync def local_verification():\n client = NinjaClient(\n kafka_servers=\"localhost:9092\",\n consumer_group=\"local-test\",\n environment=\"local\" # Optional: for logging only\n )\n result = await client.execute_task(\"linkedin_verification\", account_id=123)\n client.stop()\n return result\n\n# Production environment\nasync def production_verification():\n client = NinjaClient(\n kafka_servers=\"b-1.msk-cluster.amazonaws.com:9092,b-2.msk-cluster.amazonaws.com:9092\",\n consumer_group=\"auto-login-prod\",\n environment=\"production\"\n )\n result = await client.execute_task(\"linkedin_verification\", account_id=123)\n client.stop()\n return result\n\n# Using config object \nasync def config_based_verification():\n from your_app.config import config\n \n client = NinjaClient(\n kafka_servers=config.KAFKA_SERVERS,\n consumer_group=config.KAFKA_CONSUMER_GROUP\n )\n result = await client.execute_task(\"linkedin_verification\", account_id=123)\n client.stop()\n return result\n```\n\n## \ud83c\udfd7\ufe0f Available Tasks\n\n### LinkedIn Verification\n```python\nresult = await client.execute_task(\n task=\"linkedin_verification\",\n account_id=123,\n email=\"user@example.com\", # Optional but highly recommended\n timeout=300 # 5 minutes\n)\n```\n\n### Future Tasks\nMore task types will be added for different platforms:\n- `twitter_verification`\n- `instagram_verification` \n- `facebook_verification`\n\n## \ud83d\udcdd Message Models\n\n### Task Request\n```python\n@dataclass\nclass NinjaTaskRequest:\n task: str # \"linkedin_verification\"\n account_id: int # Account ID\n correlation_id: str # Auto-generated UUID\n email: Optional[str] # Account email\n user_id: Optional[int] # User ID\n metadata: Dict[str, Any] # Additional parameters\n```\n\n### Task Result\n```python\n@dataclass \nclass NinjaTaskResult:\n correlation_id: str # Matches request\n task: str # Task type\n status: str # \"VERIFIED\", \"FAILED\", etc.\n success: bool # True if successful\n account_id: int # Account ID\n cookies: Optional[str] # Extracted cookies\n data: Optional[Dict] # Additional result data\n error: Optional[Dict] # Error details if failed\n \n @property\n def is_success(self) -> bool:\n return self.success or self.status == 'VERIFIED'\n```\n\n## \ud83d\udea8 Error Handling\n\n```python\nfrom ninja_kafka_sdk import (\n NinjaClient, NinjaTaskTimeoutError, \n NinjaTaskError, NinjaKafkaConnectionError\n)\n\ntry:\n result = await client.execute_task(\"linkedin_verification\", account_id=123)\n \nexcept NinjaTaskTimeoutError:\n print(\"Task took too long\")\n \nexcept NinjaTaskError as e:\n print(f\"Ninja couldn't complete task: {e.details}\")\n \nexcept NinjaKafkaConnectionError:\n print(\"Can't connect to Kafka\")\n```\n\n## \ud83d\udd0c Extending for New Services\n\n```python\n# Add new task types easily\nawait client.send_task(\n task=\"twitter_scraping\",\n account_id=123,\n parameters={\"target_user\": \"@elonmusk\"}\n)\n\n# SDK handles routing to appropriate Ninja service\n```\n\n\n\n## \ud83d\udd27 Troubleshooting\n\n### Common Configuration Issues\n\n#### Issue: \"Can't connect to Kafka\"\n```python\n# Check your servers configuration\nfrom ninja_kafka_sdk.config import NinjaKafkaConfig\nconfig = NinjaKafkaConfig()\nprint(f\"Environment: {config.environment}\")\nprint(f\"Kafka servers: {config.kafka_servers}\")\nprint(f\"Consumer group: {config.consumer_group}\")\n```\n\n**Solutions:**\n1. **Local Development**: Ensure Kafka is running on `localhost:9092`\n2. **Stage/Prod**: Verify `KAFKA_STAGE_SERVERS` or `KAFKA_PROD_SERVERS` are set\n3. **Custom Provider**: Use `KAFKA_BOOTSTRAP_SERVERS` for explicit override\n\n#### Issue: \"No messages received\"\n```python\n# Check consumer group conflicts\nimport os\nprint(f\"Consumer group: {os.getenv('KAFKA_CONSUMER_GROUP', 'auto-detected')}\")\n\n# Force specific consumer group\nos.environ['KAFKA_CONSUMER_GROUP'] = 'my-unique-group'\nclient = NinjaClient()\n```\n\n#### Issue: \"Task timeout\"\n```python\n# Increase timeout for slow operations\nclient = NinjaClient(timeout=600) # 10 minutes\nresult = await client.execute_task(\"linkedin_verification\", account_id=123, timeout=300)\n```\n\n### Environment Detection Debug\n\n```python\nfrom ninja_kafka_sdk.config import NinjaKafkaConfig\n\n# Debug environment detection\nconfig = NinjaKafkaConfig()\nprint(f\"Environment: {config.environment}\")\nprint(f\"Servers: {config.kafka_servers}\")\n\n# Force specific environment\nconfig = NinjaKafkaConfig(environment='stage')\nprint(f\"Forced stage servers: {config.kafka_servers}\")\n```\n\n### Quick Health Check\n\n```python\nfrom ninja_kafka_sdk import NinjaClient\nimport asyncio\n\nasync def health_check():\n client = NinjaClient()\n try:\n # Test connection by sending a test message\n correlation_id = await client.send_task(\"health_check\", account_id=0)\n print(f\"\u2705 Connection OK - Test message sent: {correlation_id}\")\n return True\n except Exception as e:\n print(f\"\u274c Connection failed: {e}\")\n return False\n finally:\n client.stop()\n\n# Run health check\nasyncio.run(health_check())\n```\n\n\n\n\n---\n\n## \ud83d\udcda Appendix: For Service Implementers\n\nThis section contains information for developers implementing Ninja services (like browser-ninja) that process tasks and send results back.\n\n### Sending Task Results\n\nIf you're building a service that processes Ninja tasks, use these methods to send results:\n\n```python\nfrom ninja_kafka_sdk import NinjaClient\n\nasync def send_verification_result():\n # Configure client for service that processes tasks\n client = NinjaClient(\n kafka_servers=\"your-kafka-servers:9092\",\n consumer_group=\"browser-ninja\", # Service-specific consumer group\n environment=\"prod\"\n )\n \n try:\n # Send success result\n await client.send_success_result(\n correlation_id=\"task-123-456\",\n account_id=12345,\n email=\"user@example.com\",\n cookies=\"extracted_cookies_data\",\n screenshot=\"base64_screenshot\"\n )\n \n # Or send error result\n await client.send_error_result(\n correlation_id=\"task-123-457\",\n account_id=12346,\n email=\"user2@example.com\",\n error_code=\"LOGIN_FAILED\",\n error_message=\"Invalid credentials\"\n )\n \n finally:\n client.stop()\n```\n\n### Listening for Tasks (Future Feature)\n\n```python\nfrom ninja_kafka_sdk import NinjaClient\n\nasync def process_ninja_tasks():\n client = NinjaClient(\n kafka_servers=\"your-kafka-servers:9092\",\n consumer_group=\"browser-ninja\"\n )\n \n try:\n # Listen for incoming tasks\n async for task in client.listen_tasks():\n print(f\"\ud83d\udce5 Received task: {task.task} for account {task.account_id}\")\n \n # Process the task\n if task.task == \"linkedin_verification\":\n result = await process_linkedin_verification(task)\n \n # Send result back\n if result[\"success\"]:\n await client.send_success_result(\n correlation_id=task.correlation_id,\n account_id=task.account_id,\n email=task.email,\n cookies=result[\"cookies\"]\n )\n else:\n await client.send_error_result(\n correlation_id=task.correlation_id,\n account_id=task.account_id,\n email=task.email,\n error_code=result[\"error_code\"],\n error_message=result[\"error_message\"]\n )\n \n finally:\n client.stop()\n```\n\n\n\n---\n\n**The Ninja Kafka SDK simplifies task-based communication while maintaining enterprise-grade reliability.**\n",
"bugtrack_url": null,
"license": null,
"summary": "Kafka-based distributed task processing SDK",
"version": "0.3.31",
"project_urls": {
"Bug Reports": "https://github.com/blazel/ninja-kafka-sdk/issues",
"Homepage": "https://github.com/blazel/ninja-kafka-sdk",
"Source": "https://github.com/blazel/ninja-kafka-sdk"
},
"split_keywords": [
"kafka",
" task-queue",
" distributed-computing",
" microservices",
" messaging"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "2fd8d8df3ad892a3a61239ad0e9efb2160f7052118d7c7aba758afd361da7249",
"md5": "49c8b6ccaad1c4860f5cab1ce616f54d",
"sha256": "72fb79b355ec7d4fbffa177727b28302de425ebd0e1c680a295cc92c0f5a4e77"
},
"downloads": -1,
"filename": "ninja_kafka_sdk-0.3.31-py3-none-any.whl",
"has_sig": false,
"md5_digest": "49c8b6ccaad1c4860f5cab1ce616f54d",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.8",
"size": 43259,
"upload_time": "2025-10-30T22:19:12",
"upload_time_iso_8601": "2025-10-30T22:19:12.511408Z",
"url": "https://files.pythonhosted.org/packages/2f/d8/d8df3ad892a3a61239ad0e9efb2160f7052118d7c7aba758afd361da7249/ninja_kafka_sdk-0.3.31-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "184d6cff14954d1b06581f8913b9162e4597a213a49ff79bcc9b93d5ce17a3d6",
"md5": "ab4e16ff7fdf5879da7f5c0c6672078f",
"sha256": "54dacebbd99c3b8e7a81141f6a7dc64a33739d3573fa8aefa2d3d02be03aabb7"
},
"downloads": -1,
"filename": "ninja_kafka_sdk-0.3.31.tar.gz",
"has_sig": false,
"md5_digest": "ab4e16ff7fdf5879da7f5c0c6672078f",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.8",
"size": 43004,
"upload_time": "2025-10-30T22:19:13",
"upload_time_iso_8601": "2025-10-30T22:19:13.518054Z",
"url": "https://files.pythonhosted.org/packages/18/4d/6cff14954d1b06581f8913b9162e4597a213a49ff79bcc9b93d5ce17a3d6/ninja_kafka_sdk-0.3.31.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-10-30 22:19:13",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "blazel",
"github_project": "ninja-kafka-sdk",
"github_not_found": true,
"lcname": "ninja-kafka-sdk"
}