# Event Streamer SDK
A Python SDK for interacting with the Event Streamer blockchain event monitoring service. This SDK provides a simple and powerful way to subscribe to blockchain events and receive them via HTTP streaming connections.
## Features
- 🔗 **Simple API Client**: Easy subscription management with typed responses
- 🌊 **HTTP Streaming**: Real-time event delivery via HTTP streaming connections
- 🔄 **Resume Capability**: Automatic resume from last processed position
- 🔒 **Type Safety**: Full type hints and Pydantic model validation
- ⚡ **Async/Await**: Modern async Python patterns throughout
- 🎯 **Decorator Pattern**: Clean event handler registration
- 🛡️ **Error Handling**: Comprehensive error handling and connection management
- 💓 **Health Monitoring**: Built-in heartbeat and connection health tracking
- 📝 **ABI Parsing**: Built-in contract ABI parsing for easy event extraction
- 🔮 **Future-Ready**: Prepared for authentication when the service adds it
## Installation
```bash
pip install event-streamer-sdk
```
## Quick Start
### HTTP Streaming Example
```python
import asyncio
from event_poller_sdk import EventStreamer
from event_poller_sdk.models.subscriptions import SubscriptionCreate
from event_poller_sdk.models.abi import ABIEvent, ABIInput
async def main():
# Initialize the client
async with EventStreamer(
service_url="http://localhost:8000",
subscriber_id="my-app"
) as client:
# Create a subscription
subscription = SubscriptionCreate(
topic0="0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",
event_signature=ABIEvent(
type="event",
name="Transfer",
inputs=[
ABIInput(name="from", type="address", indexed=True),
ABIInput(name="to", type="address", indexed=True),
ABIInput(name="value", type="uint256", indexed=False)
]
),
addresses=["0xA0b86a33E6417b3c4555ba476F04245600306D5D"],
start_block=19000000,
end_block=19010000,
chain_id=1,
subscriber_id="my-app"
)
result = await client.create_subscription(subscription)
print(f"Created subscription: {result.id}")
# Create streaming client
streaming_client = client.create_streaming_client(
subscription_id=result.id,
client_metadata={"version": "1.0.0"}
)
# Register event handlers
@streaming_client.on_event("Transfer")
async def handle_transfers(events):
for event in events:
print(f"Transfer: {event['from']} -> {event['to']}: {event['value']}")
# Start streaming
await streaming_client.start_streaming()
# Keep running to receive events
try:
while streaming_client.is_running:
await asyncio.sleep(1)
except KeyboardInterrupt:
print("Stopping...")
finally:
await streaming_client.disconnect()
if __name__ == "__main__":
asyncio.run(main())
```
### Resume Capability Example
```python
import asyncio
from event_poller_sdk import EventStreamer
async def main():
async with EventStreamer(
service_url="http://localhost:8000",
subscriber_id="my-app"
) as client:
# Create streaming client with resume token
streaming_client = client.create_streaming_client(
subscription_id=123,
resume_token="rt_eyJzdWJzY3JpcHRpb25faWQiOjEyMywi...",
client_metadata={"version": "1.0.0"}
)
@streaming_client.on_event("Transfer")
async def handle_transfers(events):
for event in events:
print(f"Transfer: {event['from']} -> {event['to']}: {event['value']}")
# Optional: Handle heartbeats
@streaming_client.on_heartbeat
async def handle_heartbeat(heartbeat):
print(f"Heartbeat: {heartbeat.timestamp}")
# Optional: Handle errors
@streaming_client.on_error
async def handle_error(error):
print(f"Error: {error.error_message}")
# Start streaming
await streaming_client.start_streaming()
# Keep running and save resume token periodically
try:
while streaming_client.is_running:
current_token = streaming_client.get_current_resume_token()
# Save token to persistent storage
await asyncio.sleep(30)
except KeyboardInterrupt:
print("Stopping...")
finally:
await streaming_client.disconnect()
if __name__ == "__main__":
asyncio.run(main())
```
## ABI Parsing
The SDK includes built-in ABI parsing functionality to make it easy to extract event definitions from contract ABIs without manually constructing `ABIEvent` objects.
### Extract Specific Event
```python
async def main():
async with EventStreamer(
service_url="http://localhost:8000",
subscriber_id="my-app"
) as client:
# Example ERC20 contract ABI
erc20_abi = '''[
{
"type": "event",
"name": "Transfer",
"inputs": [
{"indexed": true, "name": "from", "type": "address"},
{"indexed": true, "name": "to", "type": "address"},
{"indexed": false, "name": "value", "type": "uint256"}
]
}
]'''
# Extract the Transfer event
transfer_event = client.extract_abi_event(erc20_abi, "Transfer")
# Use in subscription
subscription = SubscriptionCreate(
topic0="0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",
event_signature=transfer_event, # Use parsed event
addresses=["0x..."],
start_block=19000000,
chain_id=1
)
```
### Extract All Events
```python
# Extract all events from an ABI
all_events = client.extract_abi_events(erc20_abi)
transfer_event = all_events["Transfer"]
approval_event = all_events["Approval"]
```
### Error Handling
```python
try:
event = client.extract_abi_event(abi_json, "NonExistentEvent")
except Exception as e:
print(f"Event not found: {e}")
# Error message includes available events
```
### Supported ABI Features
- ✅ **Event definitions**: Full support for event parsing
- ✅ **Indexed parameters**: Correctly handles indexed/non-indexed inputs
- ✅ **Array types**: Supports `uint256[]`, `address[]`, etc.
- ✅ **Anonymous events**: Handles anonymous event flag
- ✅ **Complex types**: Support for most Solidity types
- ✅ **Error handling**: Clear error messages with available events
- ⚠️ **Tuple components**: Basic support (see TODO in DCE-50)
## API Reference
### EventStreamer
The main client class for interacting with the Event Streamer service.
```python
class EventStreamer:
def __init__(
self,
service_url: str,
subscriber_id: str,
timeout: int = 30,
api_key: Optional[str] = None, # Future use
auth_token: Optional[str] = None, # Future use
)
```
#### Subscription Management
```python
# Create a subscription
async def create_subscription(self, subscription: SubscriptionCreate) -> SubscriptionResponse
# List subscriptions
async def list_subscriptions(self, page: int = 1, page_size: int = 20) -> SubscriptionListResponse
# Get a specific subscription
async def get_subscription(self, subscription_id: int) -> SubscriptionResponse
# Update a subscription
async def update_subscription(self, subscription_id: int, update: SubscriptionUpdate) -> SubscriptionResponse
# Delete a subscription
async def delete_subscription(self, subscription_id: int) -> bool
```
#### Streaming Client Creation
```python
# Create a streaming client for a subscription
def create_streaming_client(
self,
subscription_id: int,
resume_token: Optional[str] = None,
client_metadata: Optional[Dict[str, Any]] = None
) -> StreamingClient
```
### StreamingClient
The streaming client handles real-time event delivery via HTTP streaming connections.
#### Connection Management
```python
# Connect to streaming endpoint
async def connect() -> None
# Start streaming events
async def start_streaming() -> None
# Stop streaming events
async def stop_streaming() -> None
# Disconnect from streaming
async def disconnect() -> None
# Resume from a specific position
async def resume(resume_token: str) -> None
# Get current resume token
def get_current_resume_token() -> Optional[str]
```
#### Event Handler Registration
```python
# Handle specific event types
@client.on_event("Transfer")
async def handle_transfers(events: List[Dict[str, Any]]):
for event in events:
# Process event
pass
# Handle all events
@client.on_all_events
async def handle_all_events(events: Dict[str, List[Dict[str, Any]]]):
for event_name, event_list in events.items():
# Process events by type
pass
# Handle heartbeat messages
@client.on_heartbeat
async def handle_heartbeat(heartbeat: StreamingHeartbeat):
print(f"Heartbeat: {heartbeat.timestamp}")
# Handle error messages
@client.on_error
async def handle_error(error: StreamingError):
print(f"Error: {error.error_message}")
```
### Models
#### SubscriptionCreate
```python
class SubscriptionCreate(BaseModel):
topic0: str # Event signature hash
event_signature: ABIEvent # ABI event definition
addresses: List[str] = [] # Contract addresses (empty = all)
start_block: int # Starting block number
end_block: Optional[int] = None # Ending block (None = live)
chain_id: int # Blockchain network ID
subscriber_id: str # Your service identifier
```
#### ABIEvent
```python
class ABIEvent(BaseModel):
type: Literal["event"]
name: str # Event name
inputs: List[ABIInput] = [] # Event parameters
anonymous: bool = False
```
#### ABIInput
```python
class ABIInput(BaseModel):
name: Optional[str] = None # Parameter name
type: str # Solidity type (e.g., "address", "uint256")
indexed: Optional[bool] = False # Whether parameter is indexed
```
## Event Data Format
Events are delivered via streaming in batches with the following format:
```python
{
"type": "event_batch",
"response_id": "550e8400-e29b-41d4-a716-446655440000",
"subscription_id": 123,
"connection_id": "conn_550e8400-e29b-41d4-a716-446655440000",
"resume_token": "rt_eyJzdWJzY3JpcHRpb25faWQiOjEyMywi...",
"events": {
"Transfer": [
{
# Event-specific fields
"from": "0x1234567890123456789012345678901234567890",
"to": "0x0987654321098765432109876543210987654321",
"value": "1000000000000000000",
# Metadata fields
"block_number": 19000001,
"transaction_hash": "0xabcdef...",
"log_index": 0,
"address": "0xA0b86a33E6417b3c4555ba476F04245600306D5D",
"timestamp": "2024-05-23T10:30:00.000Z"
}
]
},
"batch_size": 1,
"timestamp": "2024-05-23T10:30:00.000Z"
}
```
### Streaming Message Types
The streaming connection delivers different types of messages:
#### Event Batch
Contains actual blockchain events for processing.
#### Heartbeat
Periodic heartbeat messages to maintain connection health:
```python
{
"type": "heartbeat",
"connection_id": "conn_550e8400-e29b-41d4-a716-446655440000",
"subscription_id": 123,
"timestamp": "2024-05-23T10:30:00.000Z"
}
```
#### Error Messages
Error notifications and connection issues:
```python
{
"type": "error",
"connection_id": "conn_550e8400-e29b-41d4-a716-446655440000",
"subscription_id": 123,
"error_code": "CONNECTION_LOST",
"error_message": "Connection lost due to network timeout",
"timestamp": "2024-05-23T10:30:00.000Z"
}
```
## Supported Chains
The SDK supports all chains configured in your Event Streamer service:
- **Ethereum Mainnet** (Chain ID: 1)
- **Polygon** (Chain ID: 137)
- **Base** (Chain ID: 8453)
- **Arbitrum One** (Chain ID: 42161)
- **Optimism** (Chain ID: 10)
## Error Handling
The SDK provides comprehensive error handling:
```python
from event_poller_sdk.exceptions import (
EventPollerSDKError, # Base exception
EventPollerConnectionError, # Connection issues
EventPollerTimeoutError, # Request timeouts
EventPollerValidationError, # Validation errors
EventPollerSubscriptionError, # Subscription errors
)
try:
subscription = await client.create_subscription(subscription_data)
except EventPollerValidationError as e:
print(f"Invalid subscription data: {e}")
except EventPollerConnectionError as e:
print(f"Connection failed: {e}")
```
## Best Practices
### 1. Use Context Managers
Always use the EventStreamer as an async context manager to ensure proper cleanup:
```python
async with EventStreamer(service_url, subscriber_id) as client:
# Your code here
pass
```
### 2. Handle Events Efficiently
Process events quickly in your handlers to avoid blocking the streaming connection:
```python
@client.on_event("Transfer")
async def handle_transfers(events):
# Process quickly to avoid blocking
for event in events:
await process_event_async(event)
```
### 3. Use Specific Event Handlers
Register handlers for specific event types rather than using only the global handler:
```python
@client.on_event("Transfer")
async def handle_transfers(events):
# Specific handling for transfers
pass
@client.on_event("Approval")
async def handle_approvals(events):
# Specific handling for approvals
pass
```
### 4. Implement Resume Token Persistence
Save resume tokens to persistent storage to resume from the correct position after restarts:
```python
# Save resume token periodically
resume_token = streaming_client.get_current_resume_token()
await save_resume_token_to_storage(subscription_id, resume_token)
# Resume from saved position
saved_token = await load_resume_token_from_storage(subscription_id)
streaming_client = client.create_streaming_client(
subscription_id=subscription_id,
resume_token=saved_token
)
```
### 5. Handle Connection Errors Gracefully
Implement proper error handling for connection issues:
```python
@client.on_error
async def handle_error(error):
if error.error_code == "CONNECTION_LOST":
# Implement reconnection logic
await reconnect_with_backoff()
else:
# Log and handle other errors
logger.error(f"Streaming error: {error.error_message}")
```
### 6. Monitor Connection Health
Use heartbeat handlers to monitor connection health:
```python
@client.on_heartbeat
async def handle_heartbeat(heartbeat):
# Update last heartbeat time
last_heartbeat = heartbeat.timestamp
# Check connection health
await update_connection_health_metrics()
```
## Development
### Requirements
- Python 3.11+
- aiohttp
- pydantic
- eth-typing
- event-poller-schemas
### Installation for Development
```bash
git clone https://github.com/dcentralab/event-poller-sdk
cd event-poller-sdk
pip install -e ".[dev]"
```
### Running Examples
```bash
# Live streaming example
python examples/streaming_example.py
# Historical streaming example
python examples/historical_streaming_example.py
```
## License
MIT License - see LICENSE file for details.
## Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
Raw data
{
"_id": null,
"home_page": null,
"name": "event-streamer-sdk",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.11",
"maintainer_email": null,
"keywords": "blockchain, ethereum, events, monitoring, sdk, streaming",
"author": null,
"author_email": "DcentraLab <contact@dcentralab.com>",
"download_url": "https://files.pythonhosted.org/packages/7b/c1/fc756f1734c9e98a8d66a507a014f5848bdcfe42e692baf6dcbd46f5564b/event_streamer_sdk-0.2.2.tar.gz",
"platform": null,
"description": "# Event Streamer SDK\n\nA Python SDK for interacting with the Event Streamer blockchain event monitoring service. This SDK provides a simple and powerful way to subscribe to blockchain events and receive them via HTTP streaming connections.\n\n## Features\n\n- \ud83d\udd17 **Simple API Client**: Easy subscription management with typed responses\n- \ud83c\udf0a **HTTP Streaming**: Real-time event delivery via HTTP streaming connections\n- \ud83d\udd04 **Resume Capability**: Automatic resume from last processed position\n- \ud83d\udd12 **Type Safety**: Full type hints and Pydantic model validation\n- \u26a1 **Async/Await**: Modern async Python patterns throughout\n- \ud83c\udfaf **Decorator Pattern**: Clean event handler registration\n- \ud83d\udee1\ufe0f **Error Handling**: Comprehensive error handling and connection management\n- \ud83d\udc93 **Health Monitoring**: Built-in heartbeat and connection health tracking\n- \ud83d\udcdd **ABI Parsing**: Built-in contract ABI parsing for easy event extraction\n- \ud83d\udd2e **Future-Ready**: Prepared for authentication when the service adds it\n\n## Installation\n\n```bash\npip install event-streamer-sdk\n```\n\n## Quick Start\n\n### HTTP Streaming Example\n\n```python\nimport asyncio\nfrom event_poller_sdk import EventStreamer\nfrom event_poller_sdk.models.subscriptions import SubscriptionCreate\nfrom event_poller_sdk.models.abi import ABIEvent, ABIInput\n\nasync def main():\n # Initialize the client\n async with EventStreamer(\n service_url=\"http://localhost:8000\",\n subscriber_id=\"my-app\"\n ) as client:\n\n # Create a subscription\n subscription = SubscriptionCreate(\n topic0=\"0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef\",\n event_signature=ABIEvent(\n type=\"event\",\n name=\"Transfer\",\n inputs=[\n ABIInput(name=\"from\", type=\"address\", indexed=True),\n ABIInput(name=\"to\", type=\"address\", indexed=True),\n ABIInput(name=\"value\", type=\"uint256\", indexed=False)\n ]\n ),\n addresses=[\"0xA0b86a33E6417b3c4555ba476F04245600306D5D\"],\n start_block=19000000,\n end_block=19010000,\n chain_id=1,\n subscriber_id=\"my-app\"\n )\n\n result = await client.create_subscription(subscription)\n print(f\"Created subscription: {result.id}\")\n\n # Create streaming client\n streaming_client = client.create_streaming_client(\n subscription_id=result.id,\n client_metadata={\"version\": \"1.0.0\"}\n )\n\n # Register event handlers\n @streaming_client.on_event(\"Transfer\")\n async def handle_transfers(events):\n for event in events:\n print(f\"Transfer: {event['from']} -> {event['to']}: {event['value']}\")\n\n # Start streaming\n await streaming_client.start_streaming()\n\n # Keep running to receive events\n try:\n while streaming_client.is_running:\n await asyncio.sleep(1)\n except KeyboardInterrupt:\n print(\"Stopping...\")\n finally:\n await streaming_client.disconnect()\n\nif __name__ == \"__main__\":\n asyncio.run(main())\n```\n\n### Resume Capability Example\n\n```python\nimport asyncio\nfrom event_poller_sdk import EventStreamer\n\nasync def main():\n async with EventStreamer(\n service_url=\"http://localhost:8000\",\n subscriber_id=\"my-app\"\n ) as client:\n\n # Create streaming client with resume token\n streaming_client = client.create_streaming_client(\n subscription_id=123,\n resume_token=\"rt_eyJzdWJzY3JpcHRpb25faWQiOjEyMywi...\",\n client_metadata={\"version\": \"1.0.0\"}\n )\n\n @streaming_client.on_event(\"Transfer\")\n async def handle_transfers(events):\n for event in events:\n print(f\"Transfer: {event['from']} -> {event['to']}: {event['value']}\")\n\n # Optional: Handle heartbeats\n @streaming_client.on_heartbeat\n async def handle_heartbeat(heartbeat):\n print(f\"Heartbeat: {heartbeat.timestamp}\")\n\n # Optional: Handle errors\n @streaming_client.on_error\n async def handle_error(error):\n print(f\"Error: {error.error_message}\")\n\n # Start streaming\n await streaming_client.start_streaming()\n\n # Keep running and save resume token periodically\n try:\n while streaming_client.is_running:\n current_token = streaming_client.get_current_resume_token()\n # Save token to persistent storage\n await asyncio.sleep(30)\n except KeyboardInterrupt:\n print(\"Stopping...\")\n finally:\n await streaming_client.disconnect()\n\nif __name__ == \"__main__\":\n asyncio.run(main())\n```\n\n## ABI Parsing\n\nThe SDK includes built-in ABI parsing functionality to make it easy to extract event definitions from contract ABIs without manually constructing `ABIEvent` objects.\n\n### Extract Specific Event\n\n```python\nasync def main():\n async with EventStreamer(\n service_url=\"http://localhost:8000\",\n subscriber_id=\"my-app\"\n ) as client:\n\n # Example ERC20 contract ABI\n erc20_abi = '''[\n {\n \"type\": \"event\",\n \"name\": \"Transfer\",\n \"inputs\": [\n {\"indexed\": true, \"name\": \"from\", \"type\": \"address\"},\n {\"indexed\": true, \"name\": \"to\", \"type\": \"address\"},\n {\"indexed\": false, \"name\": \"value\", \"type\": \"uint256\"}\n ]\n }\n ]'''\n\n # Extract the Transfer event\n transfer_event = client.extract_abi_event(erc20_abi, \"Transfer\")\n\n # Use in subscription\n subscription = SubscriptionCreate(\n topic0=\"0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef\",\n event_signature=transfer_event, # Use parsed event\n addresses=[\"0x...\"],\n start_block=19000000,\n chain_id=1\n )\n```\n\n### Extract All Events\n\n```python\n# Extract all events from an ABI\nall_events = client.extract_abi_events(erc20_abi)\ntransfer_event = all_events[\"Transfer\"]\napproval_event = all_events[\"Approval\"]\n```\n\n### Error Handling\n\n```python\ntry:\n event = client.extract_abi_event(abi_json, \"NonExistentEvent\")\nexcept Exception as e:\n print(f\"Event not found: {e}\")\n # Error message includes available events\n```\n\n### Supported ABI Features\n\n- \u2705 **Event definitions**: Full support for event parsing\n- \u2705 **Indexed parameters**: Correctly handles indexed/non-indexed inputs\n- \u2705 **Array types**: Supports `uint256[]`, `address[]`, etc.\n- \u2705 **Anonymous events**: Handles anonymous event flag\n- \u2705 **Complex types**: Support for most Solidity types\n- \u2705 **Error handling**: Clear error messages with available events\n- \u26a0\ufe0f **Tuple components**: Basic support (see TODO in DCE-50)\n\n## API Reference\n\n### EventStreamer\n\nThe main client class for interacting with the Event Streamer service.\n\n```python\nclass EventStreamer:\n def __init__(\n self,\n service_url: str,\n subscriber_id: str,\n timeout: int = 30,\n api_key: Optional[str] = None, # Future use\n auth_token: Optional[str] = None, # Future use\n )\n```\n\n#### Subscription Management\n\n```python\n# Create a subscription\nasync def create_subscription(self, subscription: SubscriptionCreate) -> SubscriptionResponse\n\n# List subscriptions\nasync def list_subscriptions(self, page: int = 1, page_size: int = 20) -> SubscriptionListResponse\n\n# Get a specific subscription\nasync def get_subscription(self, subscription_id: int) -> SubscriptionResponse\n\n# Update a subscription\nasync def update_subscription(self, subscription_id: int, update: SubscriptionUpdate) -> SubscriptionResponse\n\n# Delete a subscription\nasync def delete_subscription(self, subscription_id: int) -> bool\n```\n\n#### Streaming Client Creation\n\n```python\n# Create a streaming client for a subscription\ndef create_streaming_client(\n self,\n subscription_id: int,\n resume_token: Optional[str] = None,\n client_metadata: Optional[Dict[str, Any]] = None\n) -> StreamingClient\n```\n\n### StreamingClient\n\nThe streaming client handles real-time event delivery via HTTP streaming connections.\n\n#### Connection Management\n\n```python\n# Connect to streaming endpoint\nasync def connect() -> None\n\n# Start streaming events\nasync def start_streaming() -> None\n\n# Stop streaming events\nasync def stop_streaming() -> None\n\n# Disconnect from streaming\nasync def disconnect() -> None\n\n# Resume from a specific position\nasync def resume(resume_token: str) -> None\n\n# Get current resume token\ndef get_current_resume_token() -> Optional[str]\n```\n\n#### Event Handler Registration\n\n```python\n# Handle specific event types\n@client.on_event(\"Transfer\")\nasync def handle_transfers(events: List[Dict[str, Any]]):\n for event in events:\n # Process event\n pass\n\n# Handle all events\n@client.on_all_events\nasync def handle_all_events(events: Dict[str, List[Dict[str, Any]]]):\n for event_name, event_list in events.items():\n # Process events by type\n pass\n\n# Handle heartbeat messages\n@client.on_heartbeat\nasync def handle_heartbeat(heartbeat: StreamingHeartbeat):\n print(f\"Heartbeat: {heartbeat.timestamp}\")\n\n# Handle error messages\n@client.on_error\nasync def handle_error(error: StreamingError):\n print(f\"Error: {error.error_message}\")\n```\n\n### Models\n\n#### SubscriptionCreate\n\n```python\nclass SubscriptionCreate(BaseModel):\n topic0: str # Event signature hash\n event_signature: ABIEvent # ABI event definition\n addresses: List[str] = [] # Contract addresses (empty = all)\n start_block: int # Starting block number\n end_block: Optional[int] = None # Ending block (None = live)\n chain_id: int # Blockchain network ID\n subscriber_id: str # Your service identifier\n```\n\n#### ABIEvent\n\n```python\nclass ABIEvent(BaseModel):\n type: Literal[\"event\"]\n name: str # Event name\n inputs: List[ABIInput] = [] # Event parameters\n anonymous: bool = False\n```\n\n#### ABIInput\n\n```python\nclass ABIInput(BaseModel):\n name: Optional[str] = None # Parameter name\n type: str # Solidity type (e.g., \"address\", \"uint256\")\n indexed: Optional[bool] = False # Whether parameter is indexed\n```\n\n## Event Data Format\n\nEvents are delivered via streaming in batches with the following format:\n\n```python\n{\n \"type\": \"event_batch\",\n \"response_id\": \"550e8400-e29b-41d4-a716-446655440000\",\n \"subscription_id\": 123,\n \"connection_id\": \"conn_550e8400-e29b-41d4-a716-446655440000\",\n \"resume_token\": \"rt_eyJzdWJzY3JpcHRpb25faWQiOjEyMywi...\",\n \"events\": {\n \"Transfer\": [\n {\n # Event-specific fields\n \"from\": \"0x1234567890123456789012345678901234567890\",\n \"to\": \"0x0987654321098765432109876543210987654321\",\n \"value\": \"1000000000000000000\",\n\n # Metadata fields\n \"block_number\": 19000001,\n \"transaction_hash\": \"0xabcdef...\",\n \"log_index\": 0,\n \"address\": \"0xA0b86a33E6417b3c4555ba476F04245600306D5D\",\n \"timestamp\": \"2024-05-23T10:30:00.000Z\"\n }\n ]\n },\n \"batch_size\": 1,\n \"timestamp\": \"2024-05-23T10:30:00.000Z\"\n}\n```\n\n### Streaming Message Types\n\nThe streaming connection delivers different types of messages:\n\n#### Event Batch\nContains actual blockchain events for processing.\n\n#### Heartbeat\nPeriodic heartbeat messages to maintain connection health:\n```python\n{\n \"type\": \"heartbeat\",\n \"connection_id\": \"conn_550e8400-e29b-41d4-a716-446655440000\",\n \"subscription_id\": 123,\n \"timestamp\": \"2024-05-23T10:30:00.000Z\"\n}\n```\n\n#### Error Messages\nError notifications and connection issues:\n```python\n{\n \"type\": \"error\",\n \"connection_id\": \"conn_550e8400-e29b-41d4-a716-446655440000\",\n \"subscription_id\": 123,\n \"error_code\": \"CONNECTION_LOST\",\n \"error_message\": \"Connection lost due to network timeout\",\n \"timestamp\": \"2024-05-23T10:30:00.000Z\"\n}\n```\n\n## Supported Chains\n\nThe SDK supports all chains configured in your Event Streamer service:\n\n- **Ethereum Mainnet** (Chain ID: 1)\n- **Polygon** (Chain ID: 137)\n- **Base** (Chain ID: 8453)\n- **Arbitrum One** (Chain ID: 42161)\n- **Optimism** (Chain ID: 10)\n\n## Error Handling\n\nThe SDK provides comprehensive error handling:\n\n```python\nfrom event_poller_sdk.exceptions import (\n EventPollerSDKError, # Base exception\n EventPollerConnectionError, # Connection issues\n EventPollerTimeoutError, # Request timeouts\n EventPollerValidationError, # Validation errors\n EventPollerSubscriptionError, # Subscription errors\n)\n\ntry:\n subscription = await client.create_subscription(subscription_data)\nexcept EventPollerValidationError as e:\n print(f\"Invalid subscription data: {e}\")\nexcept EventPollerConnectionError as e:\n print(f\"Connection failed: {e}\")\n```\n\n## Best Practices\n\n### 1. Use Context Managers\n\nAlways use the EventStreamer as an async context manager to ensure proper cleanup:\n\n```python\nasync with EventStreamer(service_url, subscriber_id) as client:\n # Your code here\n pass\n```\n\n### 2. Handle Events Efficiently\n\nProcess events quickly in your handlers to avoid blocking the streaming connection:\n\n```python\n@client.on_event(\"Transfer\")\nasync def handle_transfers(events):\n # Process quickly to avoid blocking\n for event in events:\n await process_event_async(event)\n```\n\n### 3. Use Specific Event Handlers\n\nRegister handlers for specific event types rather than using only the global handler:\n\n```python\n@client.on_event(\"Transfer\")\nasync def handle_transfers(events):\n # Specific handling for transfers\n pass\n\n@client.on_event(\"Approval\")\nasync def handle_approvals(events):\n # Specific handling for approvals\n pass\n```\n\n### 4. Implement Resume Token Persistence\n\nSave resume tokens to persistent storage to resume from the correct position after restarts:\n\n```python\n# Save resume token periodically\nresume_token = streaming_client.get_current_resume_token()\nawait save_resume_token_to_storage(subscription_id, resume_token)\n\n# Resume from saved position\nsaved_token = await load_resume_token_from_storage(subscription_id)\nstreaming_client = client.create_streaming_client(\n subscription_id=subscription_id,\n resume_token=saved_token\n)\n```\n\n### 5. Handle Connection Errors Gracefully\n\nImplement proper error handling for connection issues:\n\n```python\n@client.on_error\nasync def handle_error(error):\n if error.error_code == \"CONNECTION_LOST\":\n # Implement reconnection logic\n await reconnect_with_backoff()\n else:\n # Log and handle other errors\n logger.error(f\"Streaming error: {error.error_message}\")\n```\n\n### 6. Monitor Connection Health\n\nUse heartbeat handlers to monitor connection health:\n\n```python\n@client.on_heartbeat\nasync def handle_heartbeat(heartbeat):\n # Update last heartbeat time\n last_heartbeat = heartbeat.timestamp\n # Check connection health\n await update_connection_health_metrics()\n```\n\n## Development\n\n### Requirements\n\n- Python 3.11+\n- aiohttp\n- pydantic\n- eth-typing\n- event-poller-schemas\n\n### Installation for Development\n\n```bash\ngit clone https://github.com/dcentralab/event-poller-sdk\ncd event-poller-sdk\npip install -e \".[dev]\"\n```\n\n### Running Examples\n\n```bash\n# Live streaming example\npython examples/streaming_example.py\n\n# Historical streaming example\npython examples/historical_streaming_example.py\n```\n\n## License\n\nMIT License - see LICENSE file for details.\n\n## Contributing\n\nContributions are welcome! Please feel free to submit a Pull Request.\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "Python SDK for Event Streamer blockchain event monitoring service with HTTP streaming support",
"version": "0.2.2",
"project_urls": {
"Documentation": "https://event-streamer-sdk.readthedocs.io/",
"Homepage": "https://github.com/dcentralab/event-streamer-sdk",
"Repository": "https://github.com/dcentralab/event-streamer-sdk.git"
},
"split_keywords": [
"blockchain",
" ethereum",
" events",
" monitoring",
" sdk",
" streaming"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "143e520eca78075ccbbb5499929c6bb2cd3a11444d3c1c5e05bfaad2a5065f45",
"md5": "f8bd117be7bad7bea97215d323fbf84a",
"sha256": "138f25735303cb7d43f989d793a708dd83ac42d54d990a7eaf47d772b17bd08f"
},
"downloads": -1,
"filename": "event_streamer_sdk-0.2.2-py3-none-any.whl",
"has_sig": false,
"md5_digest": "f8bd117be7bad7bea97215d323fbf84a",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.11",
"size": 34996,
"upload_time": "2025-07-28T06:05:45",
"upload_time_iso_8601": "2025-07-28T06:05:45.873627Z",
"url": "https://files.pythonhosted.org/packages/14/3e/520eca78075ccbbb5499929c6bb2cd3a11444d3c1c5e05bfaad2a5065f45/event_streamer_sdk-0.2.2-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "7bc1fc756f1734c9e98a8d66a507a014f5848bdcfe42e692baf6dcbd46f5564b",
"md5": "a4b6d715dfcb4ef260ea4f85437bba49",
"sha256": "51b32e32dfff09a11d4baac385ceed73f83d08336f37430545922a577c445095"
},
"downloads": -1,
"filename": "event_streamer_sdk-0.2.2.tar.gz",
"has_sig": false,
"md5_digest": "a4b6d715dfcb4ef260ea4f85437bba49",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.11",
"size": 28440,
"upload_time": "2025-07-28T06:05:47",
"upload_time_iso_8601": "2025-07-28T06:05:47.160773Z",
"url": "https://files.pythonhosted.org/packages/7b/c1/fc756f1734c9e98a8d66a507a014f5848bdcfe42e692baf6dcbd46f5564b/event_streamer_sdk-0.2.2.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-07-28 06:05:47",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "dcentralab",
"github_project": "event-streamer-sdk",
"github_not_found": true,
"lcname": "event-streamer-sdk"
}