event-streamer-sdk


Nameevent-streamer-sdk JSON
Version 0.2.2 PyPI version JSON
download
home_pageNone
SummaryPython SDK for Event Streamer blockchain event monitoring service with HTTP streaming support
upload_time2025-07-28 06:05:47
maintainerNone
docs_urlNone
authorNone
requires_python>=3.11
licenseMIT
keywords blockchain ethereum events monitoring sdk streaming
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Event Streamer SDK

A Python SDK for interacting with the Event Streamer blockchain event monitoring service. This SDK provides a simple and powerful way to subscribe to blockchain events and receive them via HTTP streaming connections.

## Features

- 🔗 **Simple API Client**: Easy subscription management with typed responses
- 🌊 **HTTP Streaming**: Real-time event delivery via HTTP streaming connections
- 🔄 **Resume Capability**: Automatic resume from last processed position
- 🔒 **Type Safety**: Full type hints and Pydantic model validation
- ⚡ **Async/Await**: Modern async Python patterns throughout
- 🎯 **Decorator Pattern**: Clean event handler registration
- 🛡️ **Error Handling**: Comprehensive error handling and connection management
- 💓 **Health Monitoring**: Built-in heartbeat and connection health tracking
- 📝 **ABI Parsing**: Built-in contract ABI parsing for easy event extraction
- 🔮 **Future-Ready**: Prepared for authentication when the service adds it

## Installation

```bash
pip install event-streamer-sdk
```

## Quick Start

### HTTP Streaming Example

```python
import asyncio
from event_poller_sdk import EventStreamer
from event_poller_sdk.models.subscriptions import SubscriptionCreate
from event_poller_sdk.models.abi import ABIEvent, ABIInput

async def main():
    # Initialize the client
    async with EventStreamer(
        service_url="http://localhost:8000",
        subscriber_id="my-app"
    ) as client:

        # Create a subscription
        subscription = SubscriptionCreate(
            topic0="0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",
            event_signature=ABIEvent(
                type="event",
                name="Transfer",
                inputs=[
                    ABIInput(name="from", type="address", indexed=True),
                    ABIInput(name="to", type="address", indexed=True),
                    ABIInput(name="value", type="uint256", indexed=False)
                ]
            ),
            addresses=["0xA0b86a33E6417b3c4555ba476F04245600306D5D"],
            start_block=19000000,
            end_block=19010000,
            chain_id=1,
            subscriber_id="my-app"
        )

        result = await client.create_subscription(subscription)
        print(f"Created subscription: {result.id}")

        # Create streaming client
        streaming_client = client.create_streaming_client(
            subscription_id=result.id,
            client_metadata={"version": "1.0.0"}
        )

        # Register event handlers
        @streaming_client.on_event("Transfer")
        async def handle_transfers(events):
            for event in events:
                print(f"Transfer: {event['from']} -> {event['to']}: {event['value']}")

        # Start streaming
        await streaming_client.start_streaming()

        # Keep running to receive events
        try:
            while streaming_client.is_running:
                await asyncio.sleep(1)
        except KeyboardInterrupt:
            print("Stopping...")
        finally:
            await streaming_client.disconnect()

if __name__ == "__main__":
    asyncio.run(main())
```

### Resume Capability Example

```python
import asyncio
from event_poller_sdk import EventStreamer

async def main():
    async with EventStreamer(
        service_url="http://localhost:8000",
        subscriber_id="my-app"
    ) as client:

        # Create streaming client with resume token
        streaming_client = client.create_streaming_client(
            subscription_id=123,
            resume_token="rt_eyJzdWJzY3JpcHRpb25faWQiOjEyMywi...",
            client_metadata={"version": "1.0.0"}
        )

        @streaming_client.on_event("Transfer")
        async def handle_transfers(events):
            for event in events:
                print(f"Transfer: {event['from']} -> {event['to']}: {event['value']}")

        # Optional: Handle heartbeats
        @streaming_client.on_heartbeat
        async def handle_heartbeat(heartbeat):
            print(f"Heartbeat: {heartbeat.timestamp}")

        # Optional: Handle errors
        @streaming_client.on_error
        async def handle_error(error):
            print(f"Error: {error.error_message}")

        # Start streaming
        await streaming_client.start_streaming()

        # Keep running and save resume token periodically
        try:
            while streaming_client.is_running:
                current_token = streaming_client.get_current_resume_token()
                # Save token to persistent storage
                await asyncio.sleep(30)
        except KeyboardInterrupt:
            print("Stopping...")
        finally:
            await streaming_client.disconnect()

if __name__ == "__main__":
    asyncio.run(main())
```

## ABI Parsing

The SDK includes built-in ABI parsing functionality to make it easy to extract event definitions from contract ABIs without manually constructing `ABIEvent` objects.

### Extract Specific Event

```python
async def main():
    async with EventStreamer(
        service_url="http://localhost:8000",
        subscriber_id="my-app"
    ) as client:

        # Example ERC20 contract ABI
        erc20_abi = '''[
            {
                "type": "event",
                "name": "Transfer",
                "inputs": [
                    {"indexed": true, "name": "from", "type": "address"},
                    {"indexed": true, "name": "to", "type": "address"},
                    {"indexed": false, "name": "value", "type": "uint256"}
                ]
            }
        ]'''

        # Extract the Transfer event
        transfer_event = client.extract_abi_event(erc20_abi, "Transfer")

        # Use in subscription
        subscription = SubscriptionCreate(
            topic0="0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",
            event_signature=transfer_event,  # Use parsed event
            addresses=["0x..."],
            start_block=19000000,
            chain_id=1
        )
```

### Extract All Events

```python
# Extract all events from an ABI
all_events = client.extract_abi_events(erc20_abi)
transfer_event = all_events["Transfer"]
approval_event = all_events["Approval"]
```

### Error Handling

```python
try:
    event = client.extract_abi_event(abi_json, "NonExistentEvent")
except Exception as e:
    print(f"Event not found: {e}")
    # Error message includes available events
```

### Supported ABI Features

- ✅ **Event definitions**: Full support for event parsing
- ✅ **Indexed parameters**: Correctly handles indexed/non-indexed inputs
- ✅ **Array types**: Supports `uint256[]`, `address[]`, etc.
- ✅ **Anonymous events**: Handles anonymous event flag
- ✅ **Complex types**: Support for most Solidity types
- ✅ **Error handling**: Clear error messages with available events
- ⚠️ **Tuple components**: Basic support (see TODO in DCE-50)

## API Reference

### EventStreamer

The main client class for interacting with the Event Streamer service.

```python
class EventStreamer:
    def __init__(
        self,
        service_url: str,
        subscriber_id: str,
        timeout: int = 30,
        api_key: Optional[str] = None,  # Future use
        auth_token: Optional[str] = None,  # Future use
    )
```

#### Subscription Management

```python
# Create a subscription
async def create_subscription(self, subscription: SubscriptionCreate) -> SubscriptionResponse

# List subscriptions
async def list_subscriptions(self, page: int = 1, page_size: int = 20) -> SubscriptionListResponse

# Get a specific subscription
async def get_subscription(self, subscription_id: int) -> SubscriptionResponse

# Update a subscription
async def update_subscription(self, subscription_id: int, update: SubscriptionUpdate) -> SubscriptionResponse

# Delete a subscription
async def delete_subscription(self, subscription_id: int) -> bool
```

#### Streaming Client Creation

```python
# Create a streaming client for a subscription
def create_streaming_client(
    self,
    subscription_id: int,
    resume_token: Optional[str] = None,
    client_metadata: Optional[Dict[str, Any]] = None
) -> StreamingClient
```

### StreamingClient

The streaming client handles real-time event delivery via HTTP streaming connections.

#### Connection Management

```python
# Connect to streaming endpoint
async def connect() -> None

# Start streaming events
async def start_streaming() -> None

# Stop streaming events
async def stop_streaming() -> None

# Disconnect from streaming
async def disconnect() -> None

# Resume from a specific position
async def resume(resume_token: str) -> None

# Get current resume token
def get_current_resume_token() -> Optional[str]
```

#### Event Handler Registration

```python
# Handle specific event types
@client.on_event("Transfer")
async def handle_transfers(events: List[Dict[str, Any]]):
    for event in events:
        # Process event
        pass

# Handle all events
@client.on_all_events
async def handle_all_events(events: Dict[str, List[Dict[str, Any]]]):
    for event_name, event_list in events.items():
        # Process events by type
        pass

# Handle heartbeat messages
@client.on_heartbeat
async def handle_heartbeat(heartbeat: StreamingHeartbeat):
    print(f"Heartbeat: {heartbeat.timestamp}")

# Handle error messages
@client.on_error
async def handle_error(error: StreamingError):
    print(f"Error: {error.error_message}")
```

### Models

#### SubscriptionCreate

```python
class SubscriptionCreate(BaseModel):
    topic0: str                    # Event signature hash
    event_signature: ABIEvent      # ABI event definition
    addresses: List[str] = []      # Contract addresses (empty = all)
    start_block: int               # Starting block number
    end_block: Optional[int] = None # Ending block (None = live)
    chain_id: int                  # Blockchain network ID
    subscriber_id: str             # Your service identifier
```

#### ABIEvent

```python
class ABIEvent(BaseModel):
    type: Literal["event"]
    name: str                      # Event name
    inputs: List[ABIInput] = []    # Event parameters
    anonymous: bool = False
```

#### ABIInput

```python
class ABIInput(BaseModel):
    name: Optional[str] = None     # Parameter name
    type: str                      # Solidity type (e.g., "address", "uint256")
    indexed: Optional[bool] = False # Whether parameter is indexed
```

## Event Data Format

Events are delivered via streaming in batches with the following format:

```python
{
    "type": "event_batch",
    "response_id": "550e8400-e29b-41d4-a716-446655440000",
    "subscription_id": 123,
    "connection_id": "conn_550e8400-e29b-41d4-a716-446655440000",
    "resume_token": "rt_eyJzdWJzY3JpcHRpb25faWQiOjEyMywi...",
    "events": {
        "Transfer": [
            {
                # Event-specific fields
                "from": "0x1234567890123456789012345678901234567890",
                "to": "0x0987654321098765432109876543210987654321",
                "value": "1000000000000000000",

                # Metadata fields
                "block_number": 19000001,
                "transaction_hash": "0xabcdef...",
                "log_index": 0,
                "address": "0xA0b86a33E6417b3c4555ba476F04245600306D5D",
                "timestamp": "2024-05-23T10:30:00.000Z"
            }
        ]
    },
    "batch_size": 1,
    "timestamp": "2024-05-23T10:30:00.000Z"
}
```

### Streaming Message Types

The streaming connection delivers different types of messages:

#### Event Batch
Contains actual blockchain events for processing.

#### Heartbeat
Periodic heartbeat messages to maintain connection health:
```python
{
    "type": "heartbeat",
    "connection_id": "conn_550e8400-e29b-41d4-a716-446655440000",
    "subscription_id": 123,
    "timestamp": "2024-05-23T10:30:00.000Z"
}
```

#### Error Messages
Error notifications and connection issues:
```python
{
    "type": "error",
    "connection_id": "conn_550e8400-e29b-41d4-a716-446655440000",
    "subscription_id": 123,
    "error_code": "CONNECTION_LOST",
    "error_message": "Connection lost due to network timeout",
    "timestamp": "2024-05-23T10:30:00.000Z"
}
```

## Supported Chains

The SDK supports all chains configured in your Event Streamer service:

- **Ethereum Mainnet** (Chain ID: 1)
- **Polygon** (Chain ID: 137)
- **Base** (Chain ID: 8453)
- **Arbitrum One** (Chain ID: 42161)
- **Optimism** (Chain ID: 10)

## Error Handling

The SDK provides comprehensive error handling:

```python
from event_poller_sdk.exceptions import (
    EventPollerSDKError,           # Base exception
    EventPollerConnectionError,    # Connection issues
    EventPollerTimeoutError,       # Request timeouts
    EventPollerValidationError,    # Validation errors
    EventPollerSubscriptionError,  # Subscription errors
)

try:
    subscription = await client.create_subscription(subscription_data)
except EventPollerValidationError as e:
    print(f"Invalid subscription data: {e}")
except EventPollerConnectionError as e:
    print(f"Connection failed: {e}")
```

## Best Practices

### 1. Use Context Managers

Always use the EventStreamer as an async context manager to ensure proper cleanup:

```python
async with EventStreamer(service_url, subscriber_id) as client:
    # Your code here
    pass
```

### 2. Handle Events Efficiently

Process events quickly in your handlers to avoid blocking the streaming connection:

```python
@client.on_event("Transfer")
async def handle_transfers(events):
    # Process quickly to avoid blocking
    for event in events:
        await process_event_async(event)
```

### 3. Use Specific Event Handlers

Register handlers for specific event types rather than using only the global handler:

```python
@client.on_event("Transfer")
async def handle_transfers(events):
    # Specific handling for transfers
    pass

@client.on_event("Approval")
async def handle_approvals(events):
    # Specific handling for approvals
    pass
```

### 4. Implement Resume Token Persistence

Save resume tokens to persistent storage to resume from the correct position after restarts:

```python
# Save resume token periodically
resume_token = streaming_client.get_current_resume_token()
await save_resume_token_to_storage(subscription_id, resume_token)

# Resume from saved position
saved_token = await load_resume_token_from_storage(subscription_id)
streaming_client = client.create_streaming_client(
    subscription_id=subscription_id,
    resume_token=saved_token
)
```

### 5. Handle Connection Errors Gracefully

Implement proper error handling for connection issues:

```python
@client.on_error
async def handle_error(error):
    if error.error_code == "CONNECTION_LOST":
        # Implement reconnection logic
        await reconnect_with_backoff()
    else:
        # Log and handle other errors
        logger.error(f"Streaming error: {error.error_message}")
```

### 6. Monitor Connection Health

Use heartbeat handlers to monitor connection health:

```python
@client.on_heartbeat
async def handle_heartbeat(heartbeat):
    # Update last heartbeat time
    last_heartbeat = heartbeat.timestamp
    # Check connection health
    await update_connection_health_metrics()
```

## Development

### Requirements

- Python 3.11+
- aiohttp
- pydantic
- eth-typing
- event-poller-schemas

### Installation for Development

```bash
git clone https://github.com/dcentralab/event-poller-sdk
cd event-poller-sdk
pip install -e ".[dev]"
```

### Running Examples

```bash
# Live streaming example
python examples/streaming_example.py

# Historical streaming example
python examples/historical_streaming_example.py
```

## License

MIT License - see LICENSE file for details.

## Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "event-streamer-sdk",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.11",
    "maintainer_email": null,
    "keywords": "blockchain, ethereum, events, monitoring, sdk, streaming",
    "author": null,
    "author_email": "DcentraLab <contact@dcentralab.com>",
    "download_url": "https://files.pythonhosted.org/packages/7b/c1/fc756f1734c9e98a8d66a507a014f5848bdcfe42e692baf6dcbd46f5564b/event_streamer_sdk-0.2.2.tar.gz",
    "platform": null,
    "description": "# Event Streamer SDK\n\nA Python SDK for interacting with the Event Streamer blockchain event monitoring service. This SDK provides a simple and powerful way to subscribe to blockchain events and receive them via HTTP streaming connections.\n\n## Features\n\n- \ud83d\udd17 **Simple API Client**: Easy subscription management with typed responses\n- \ud83c\udf0a **HTTP Streaming**: Real-time event delivery via HTTP streaming connections\n- \ud83d\udd04 **Resume Capability**: Automatic resume from last processed position\n- \ud83d\udd12 **Type Safety**: Full type hints and Pydantic model validation\n- \u26a1 **Async/Await**: Modern async Python patterns throughout\n- \ud83c\udfaf **Decorator Pattern**: Clean event handler registration\n- \ud83d\udee1\ufe0f **Error Handling**: Comprehensive error handling and connection management\n- \ud83d\udc93 **Health Monitoring**: Built-in heartbeat and connection health tracking\n- \ud83d\udcdd **ABI Parsing**: Built-in contract ABI parsing for easy event extraction\n- \ud83d\udd2e **Future-Ready**: Prepared for authentication when the service adds it\n\n## Installation\n\n```bash\npip install event-streamer-sdk\n```\n\n## Quick Start\n\n### HTTP Streaming Example\n\n```python\nimport asyncio\nfrom event_poller_sdk import EventStreamer\nfrom event_poller_sdk.models.subscriptions import SubscriptionCreate\nfrom event_poller_sdk.models.abi import ABIEvent, ABIInput\n\nasync def main():\n    # Initialize the client\n    async with EventStreamer(\n        service_url=\"http://localhost:8000\",\n        subscriber_id=\"my-app\"\n    ) as client:\n\n        # Create a subscription\n        subscription = SubscriptionCreate(\n            topic0=\"0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef\",\n            event_signature=ABIEvent(\n                type=\"event\",\n                name=\"Transfer\",\n                inputs=[\n                    ABIInput(name=\"from\", type=\"address\", indexed=True),\n                    ABIInput(name=\"to\", type=\"address\", indexed=True),\n                    ABIInput(name=\"value\", type=\"uint256\", indexed=False)\n                ]\n            ),\n            addresses=[\"0xA0b86a33E6417b3c4555ba476F04245600306D5D\"],\n            start_block=19000000,\n            end_block=19010000,\n            chain_id=1,\n            subscriber_id=\"my-app\"\n        )\n\n        result = await client.create_subscription(subscription)\n        print(f\"Created subscription: {result.id}\")\n\n        # Create streaming client\n        streaming_client = client.create_streaming_client(\n            subscription_id=result.id,\n            client_metadata={\"version\": \"1.0.0\"}\n        )\n\n        # Register event handlers\n        @streaming_client.on_event(\"Transfer\")\n        async def handle_transfers(events):\n            for event in events:\n                print(f\"Transfer: {event['from']} -> {event['to']}: {event['value']}\")\n\n        # Start streaming\n        await streaming_client.start_streaming()\n\n        # Keep running to receive events\n        try:\n            while streaming_client.is_running:\n                await asyncio.sleep(1)\n        except KeyboardInterrupt:\n            print(\"Stopping...\")\n        finally:\n            await streaming_client.disconnect()\n\nif __name__ == \"__main__\":\n    asyncio.run(main())\n```\n\n### Resume Capability Example\n\n```python\nimport asyncio\nfrom event_poller_sdk import EventStreamer\n\nasync def main():\n    async with EventStreamer(\n        service_url=\"http://localhost:8000\",\n        subscriber_id=\"my-app\"\n    ) as client:\n\n        # Create streaming client with resume token\n        streaming_client = client.create_streaming_client(\n            subscription_id=123,\n            resume_token=\"rt_eyJzdWJzY3JpcHRpb25faWQiOjEyMywi...\",\n            client_metadata={\"version\": \"1.0.0\"}\n        )\n\n        @streaming_client.on_event(\"Transfer\")\n        async def handle_transfers(events):\n            for event in events:\n                print(f\"Transfer: {event['from']} -> {event['to']}: {event['value']}\")\n\n        # Optional: Handle heartbeats\n        @streaming_client.on_heartbeat\n        async def handle_heartbeat(heartbeat):\n            print(f\"Heartbeat: {heartbeat.timestamp}\")\n\n        # Optional: Handle errors\n        @streaming_client.on_error\n        async def handle_error(error):\n            print(f\"Error: {error.error_message}\")\n\n        # Start streaming\n        await streaming_client.start_streaming()\n\n        # Keep running and save resume token periodically\n        try:\n            while streaming_client.is_running:\n                current_token = streaming_client.get_current_resume_token()\n                # Save token to persistent storage\n                await asyncio.sleep(30)\n        except KeyboardInterrupt:\n            print(\"Stopping...\")\n        finally:\n            await streaming_client.disconnect()\n\nif __name__ == \"__main__\":\n    asyncio.run(main())\n```\n\n## ABI Parsing\n\nThe SDK includes built-in ABI parsing functionality to make it easy to extract event definitions from contract ABIs without manually constructing `ABIEvent` objects.\n\n### Extract Specific Event\n\n```python\nasync def main():\n    async with EventStreamer(\n        service_url=\"http://localhost:8000\",\n        subscriber_id=\"my-app\"\n    ) as client:\n\n        # Example ERC20 contract ABI\n        erc20_abi = '''[\n            {\n                \"type\": \"event\",\n                \"name\": \"Transfer\",\n                \"inputs\": [\n                    {\"indexed\": true, \"name\": \"from\", \"type\": \"address\"},\n                    {\"indexed\": true, \"name\": \"to\", \"type\": \"address\"},\n                    {\"indexed\": false, \"name\": \"value\", \"type\": \"uint256\"}\n                ]\n            }\n        ]'''\n\n        # Extract the Transfer event\n        transfer_event = client.extract_abi_event(erc20_abi, \"Transfer\")\n\n        # Use in subscription\n        subscription = SubscriptionCreate(\n            topic0=\"0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef\",\n            event_signature=transfer_event,  # Use parsed event\n            addresses=[\"0x...\"],\n            start_block=19000000,\n            chain_id=1\n        )\n```\n\n### Extract All Events\n\n```python\n# Extract all events from an ABI\nall_events = client.extract_abi_events(erc20_abi)\ntransfer_event = all_events[\"Transfer\"]\napproval_event = all_events[\"Approval\"]\n```\n\n### Error Handling\n\n```python\ntry:\n    event = client.extract_abi_event(abi_json, \"NonExistentEvent\")\nexcept Exception as e:\n    print(f\"Event not found: {e}\")\n    # Error message includes available events\n```\n\n### Supported ABI Features\n\n- \u2705 **Event definitions**: Full support for event parsing\n- \u2705 **Indexed parameters**: Correctly handles indexed/non-indexed inputs\n- \u2705 **Array types**: Supports `uint256[]`, `address[]`, etc.\n- \u2705 **Anonymous events**: Handles anonymous event flag\n- \u2705 **Complex types**: Support for most Solidity types\n- \u2705 **Error handling**: Clear error messages with available events\n- \u26a0\ufe0f **Tuple components**: Basic support (see TODO in DCE-50)\n\n## API Reference\n\n### EventStreamer\n\nThe main client class for interacting with the Event Streamer service.\n\n```python\nclass EventStreamer:\n    def __init__(\n        self,\n        service_url: str,\n        subscriber_id: str,\n        timeout: int = 30,\n        api_key: Optional[str] = None,  # Future use\n        auth_token: Optional[str] = None,  # Future use\n    )\n```\n\n#### Subscription Management\n\n```python\n# Create a subscription\nasync def create_subscription(self, subscription: SubscriptionCreate) -> SubscriptionResponse\n\n# List subscriptions\nasync def list_subscriptions(self, page: int = 1, page_size: int = 20) -> SubscriptionListResponse\n\n# Get a specific subscription\nasync def get_subscription(self, subscription_id: int) -> SubscriptionResponse\n\n# Update a subscription\nasync def update_subscription(self, subscription_id: int, update: SubscriptionUpdate) -> SubscriptionResponse\n\n# Delete a subscription\nasync def delete_subscription(self, subscription_id: int) -> bool\n```\n\n#### Streaming Client Creation\n\n```python\n# Create a streaming client for a subscription\ndef create_streaming_client(\n    self,\n    subscription_id: int,\n    resume_token: Optional[str] = None,\n    client_metadata: Optional[Dict[str, Any]] = None\n) -> StreamingClient\n```\n\n### StreamingClient\n\nThe streaming client handles real-time event delivery via HTTP streaming connections.\n\n#### Connection Management\n\n```python\n# Connect to streaming endpoint\nasync def connect() -> None\n\n# Start streaming events\nasync def start_streaming() -> None\n\n# Stop streaming events\nasync def stop_streaming() -> None\n\n# Disconnect from streaming\nasync def disconnect() -> None\n\n# Resume from a specific position\nasync def resume(resume_token: str) -> None\n\n# Get current resume token\ndef get_current_resume_token() -> Optional[str]\n```\n\n#### Event Handler Registration\n\n```python\n# Handle specific event types\n@client.on_event(\"Transfer\")\nasync def handle_transfers(events: List[Dict[str, Any]]):\n    for event in events:\n        # Process event\n        pass\n\n# Handle all events\n@client.on_all_events\nasync def handle_all_events(events: Dict[str, List[Dict[str, Any]]]):\n    for event_name, event_list in events.items():\n        # Process events by type\n        pass\n\n# Handle heartbeat messages\n@client.on_heartbeat\nasync def handle_heartbeat(heartbeat: StreamingHeartbeat):\n    print(f\"Heartbeat: {heartbeat.timestamp}\")\n\n# Handle error messages\n@client.on_error\nasync def handle_error(error: StreamingError):\n    print(f\"Error: {error.error_message}\")\n```\n\n### Models\n\n#### SubscriptionCreate\n\n```python\nclass SubscriptionCreate(BaseModel):\n    topic0: str                    # Event signature hash\n    event_signature: ABIEvent      # ABI event definition\n    addresses: List[str] = []      # Contract addresses (empty = all)\n    start_block: int               # Starting block number\n    end_block: Optional[int] = None # Ending block (None = live)\n    chain_id: int                  # Blockchain network ID\n    subscriber_id: str             # Your service identifier\n```\n\n#### ABIEvent\n\n```python\nclass ABIEvent(BaseModel):\n    type: Literal[\"event\"]\n    name: str                      # Event name\n    inputs: List[ABIInput] = []    # Event parameters\n    anonymous: bool = False\n```\n\n#### ABIInput\n\n```python\nclass ABIInput(BaseModel):\n    name: Optional[str] = None     # Parameter name\n    type: str                      # Solidity type (e.g., \"address\", \"uint256\")\n    indexed: Optional[bool] = False # Whether parameter is indexed\n```\n\n## Event Data Format\n\nEvents are delivered via streaming in batches with the following format:\n\n```python\n{\n    \"type\": \"event_batch\",\n    \"response_id\": \"550e8400-e29b-41d4-a716-446655440000\",\n    \"subscription_id\": 123,\n    \"connection_id\": \"conn_550e8400-e29b-41d4-a716-446655440000\",\n    \"resume_token\": \"rt_eyJzdWJzY3JpcHRpb25faWQiOjEyMywi...\",\n    \"events\": {\n        \"Transfer\": [\n            {\n                # Event-specific fields\n                \"from\": \"0x1234567890123456789012345678901234567890\",\n                \"to\": \"0x0987654321098765432109876543210987654321\",\n                \"value\": \"1000000000000000000\",\n\n                # Metadata fields\n                \"block_number\": 19000001,\n                \"transaction_hash\": \"0xabcdef...\",\n                \"log_index\": 0,\n                \"address\": \"0xA0b86a33E6417b3c4555ba476F04245600306D5D\",\n                \"timestamp\": \"2024-05-23T10:30:00.000Z\"\n            }\n        ]\n    },\n    \"batch_size\": 1,\n    \"timestamp\": \"2024-05-23T10:30:00.000Z\"\n}\n```\n\n### Streaming Message Types\n\nThe streaming connection delivers different types of messages:\n\n#### Event Batch\nContains actual blockchain events for processing.\n\n#### Heartbeat\nPeriodic heartbeat messages to maintain connection health:\n```python\n{\n    \"type\": \"heartbeat\",\n    \"connection_id\": \"conn_550e8400-e29b-41d4-a716-446655440000\",\n    \"subscription_id\": 123,\n    \"timestamp\": \"2024-05-23T10:30:00.000Z\"\n}\n```\n\n#### Error Messages\nError notifications and connection issues:\n```python\n{\n    \"type\": \"error\",\n    \"connection_id\": \"conn_550e8400-e29b-41d4-a716-446655440000\",\n    \"subscription_id\": 123,\n    \"error_code\": \"CONNECTION_LOST\",\n    \"error_message\": \"Connection lost due to network timeout\",\n    \"timestamp\": \"2024-05-23T10:30:00.000Z\"\n}\n```\n\n## Supported Chains\n\nThe SDK supports all chains configured in your Event Streamer service:\n\n- **Ethereum Mainnet** (Chain ID: 1)\n- **Polygon** (Chain ID: 137)\n- **Base** (Chain ID: 8453)\n- **Arbitrum One** (Chain ID: 42161)\n- **Optimism** (Chain ID: 10)\n\n## Error Handling\n\nThe SDK provides comprehensive error handling:\n\n```python\nfrom event_poller_sdk.exceptions import (\n    EventPollerSDKError,           # Base exception\n    EventPollerConnectionError,    # Connection issues\n    EventPollerTimeoutError,       # Request timeouts\n    EventPollerValidationError,    # Validation errors\n    EventPollerSubscriptionError,  # Subscription errors\n)\n\ntry:\n    subscription = await client.create_subscription(subscription_data)\nexcept EventPollerValidationError as e:\n    print(f\"Invalid subscription data: {e}\")\nexcept EventPollerConnectionError as e:\n    print(f\"Connection failed: {e}\")\n```\n\n## Best Practices\n\n### 1. Use Context Managers\n\nAlways use the EventStreamer as an async context manager to ensure proper cleanup:\n\n```python\nasync with EventStreamer(service_url, subscriber_id) as client:\n    # Your code here\n    pass\n```\n\n### 2. Handle Events Efficiently\n\nProcess events quickly in your handlers to avoid blocking the streaming connection:\n\n```python\n@client.on_event(\"Transfer\")\nasync def handle_transfers(events):\n    # Process quickly to avoid blocking\n    for event in events:\n        await process_event_async(event)\n```\n\n### 3. Use Specific Event Handlers\n\nRegister handlers for specific event types rather than using only the global handler:\n\n```python\n@client.on_event(\"Transfer\")\nasync def handle_transfers(events):\n    # Specific handling for transfers\n    pass\n\n@client.on_event(\"Approval\")\nasync def handle_approvals(events):\n    # Specific handling for approvals\n    pass\n```\n\n### 4. Implement Resume Token Persistence\n\nSave resume tokens to persistent storage to resume from the correct position after restarts:\n\n```python\n# Save resume token periodically\nresume_token = streaming_client.get_current_resume_token()\nawait save_resume_token_to_storage(subscription_id, resume_token)\n\n# Resume from saved position\nsaved_token = await load_resume_token_from_storage(subscription_id)\nstreaming_client = client.create_streaming_client(\n    subscription_id=subscription_id,\n    resume_token=saved_token\n)\n```\n\n### 5. Handle Connection Errors Gracefully\n\nImplement proper error handling for connection issues:\n\n```python\n@client.on_error\nasync def handle_error(error):\n    if error.error_code == \"CONNECTION_LOST\":\n        # Implement reconnection logic\n        await reconnect_with_backoff()\n    else:\n        # Log and handle other errors\n        logger.error(f\"Streaming error: {error.error_message}\")\n```\n\n### 6. Monitor Connection Health\n\nUse heartbeat handlers to monitor connection health:\n\n```python\n@client.on_heartbeat\nasync def handle_heartbeat(heartbeat):\n    # Update last heartbeat time\n    last_heartbeat = heartbeat.timestamp\n    # Check connection health\n    await update_connection_health_metrics()\n```\n\n## Development\n\n### Requirements\n\n- Python 3.11+\n- aiohttp\n- pydantic\n- eth-typing\n- event-poller-schemas\n\n### Installation for Development\n\n```bash\ngit clone https://github.com/dcentralab/event-poller-sdk\ncd event-poller-sdk\npip install -e \".[dev]\"\n```\n\n### Running Examples\n\n```bash\n# Live streaming example\npython examples/streaming_example.py\n\n# Historical streaming example\npython examples/historical_streaming_example.py\n```\n\n## License\n\nMIT License - see LICENSE file for details.\n\n## Contributing\n\nContributions are welcome! Please feel free to submit a Pull Request.\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "Python SDK for Event Streamer blockchain event monitoring service with HTTP streaming support",
    "version": "0.2.2",
    "project_urls": {
        "Documentation": "https://event-streamer-sdk.readthedocs.io/",
        "Homepage": "https://github.com/dcentralab/event-streamer-sdk",
        "Repository": "https://github.com/dcentralab/event-streamer-sdk.git"
    },
    "split_keywords": [
        "blockchain",
        " ethereum",
        " events",
        " monitoring",
        " sdk",
        " streaming"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "143e520eca78075ccbbb5499929c6bb2cd3a11444d3c1c5e05bfaad2a5065f45",
                "md5": "f8bd117be7bad7bea97215d323fbf84a",
                "sha256": "138f25735303cb7d43f989d793a708dd83ac42d54d990a7eaf47d772b17bd08f"
            },
            "downloads": -1,
            "filename": "event_streamer_sdk-0.2.2-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "f8bd117be7bad7bea97215d323fbf84a",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.11",
            "size": 34996,
            "upload_time": "2025-07-28T06:05:45",
            "upload_time_iso_8601": "2025-07-28T06:05:45.873627Z",
            "url": "https://files.pythonhosted.org/packages/14/3e/520eca78075ccbbb5499929c6bb2cd3a11444d3c1c5e05bfaad2a5065f45/event_streamer_sdk-0.2.2-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "7bc1fc756f1734c9e98a8d66a507a014f5848bdcfe42e692baf6dcbd46f5564b",
                "md5": "a4b6d715dfcb4ef260ea4f85437bba49",
                "sha256": "51b32e32dfff09a11d4baac385ceed73f83d08336f37430545922a577c445095"
            },
            "downloads": -1,
            "filename": "event_streamer_sdk-0.2.2.tar.gz",
            "has_sig": false,
            "md5_digest": "a4b6d715dfcb4ef260ea4f85437bba49",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.11",
            "size": 28440,
            "upload_time": "2025-07-28T06:05:47",
            "upload_time_iso_8601": "2025-07-28T06:05:47.160773Z",
            "url": "https://files.pythonhosted.org/packages/7b/c1/fc756f1734c9e98a8d66a507a014f5848bdcfe42e692baf6dcbd46f5564b/event_streamer_sdk-0.2.2.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-07-28 06:05:47",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "dcentralab",
    "github_project": "event-streamer-sdk",
    "github_not_found": true,
    "lcname": "event-streamer-sdk"
}
        
Elapsed time: 1.73415s