krules-framework


Namekrules-framework JSON
Version 2.0.0 PyPI version JSON
download
home_pageNone
SummaryKRules Framework - Async event-driven framework
upload_time2025-11-03 10:23:35
maintainerNone
docs_urlNone
authorNone
requires_python>=3.11
licenseApache-2.0
keywords async event-driven framework krules reactive
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # KRules Framework

**Modern async-first event-driven application framework for Python**

KRules is a Python framework for building reactive, event-driven applications with a focus on dynamic state management and declarative event handling.

## Features

- **Reactive Subjects** - Dynamic entities with schema-less properties that automatically emit events on changes
- **Declarative Handlers** - Clean decorator-based API (`@on`, `@when`, `@middleware`)
- **Async Native** - Built on asyncio for high-performance concurrent event processing
- **Type Safe** - Full type hints for excellent IDE support and type checking
- **Dependency Injection** - Container-based architecture for testability and flexibility
- **Storage Agnostic** - Pluggable backends (Redis, SQLite, in-memory, custom)
- **Production Ready** - Middleware support, error isolation, monitoring hooks

## Installation

```bash
pip install krules-framework
```

With optional features:

```bash
# Redis storage backend
pip install "krules-framework[redis]"

# Google Cloud Pub/Sub
pip install "krules-framework[pubsub]"

# FastAPI integration
pip install "krules-framework[fastapi]"
```

## Quick Example

This example demonstrates **reactive state composition** - building complex states from simple properties, where each layer reacts to changes in lower layers.

```python
from krules_core.container import KRulesContainer
from krules_core.event_types import SUBJECT_PROPERTY_CHANGED

container = KRulesContainer()
on, when, middleware, emit = container.handlers()

# Layer 1: Derive health status from metrics (ONLY for device: subjects)
@on(SUBJECT_PROPERTY_CHANGED)
@when(lambda ctx: ctx.subject.name.startswith("device:"))
@when(lambda ctx: ctx.property_name in ["cpu_usage", "memory_usage", "error_rate"])
async def compute_device_health(ctx):
    """Aggregate device metrics into health status"""
    device = ctx.subject
    # Read from subject's internal cache (even if not yet persisted)
    cpu = device.get("cpu_usage", 0)
    memory = device.get("memory_usage", 0)
    errors = device.get("error_rate", 0)

    if cpu > 90 or memory > 90 or errors > 10:
        device.set("health", "critical")
    elif cpu > 70 or memory > 70 or errors > 5:
        device.set("health", "warning")
    else:
        device.set("health", "healthy")

# Layer 2: React to health transitions (ONLY for device: subjects)
@on(SUBJECT_PROPERTY_CHANGED)
@when(lambda ctx: ctx.subject.name.startswith("device:"))
@when(lambda ctx: ctx.property_name == "health")
async def handle_device_health_change(ctx):
    """Take action based on health state transition"""
    print(f"{ctx.subject.name}: {ctx.old_value} → {ctx.new_value}")

    if ctx.new_value == "critical":
        await ctx.emit("device.alert.critical", ctx.subject)
    elif ctx.new_value == "healthy" and ctx.old_value == "critical":
        await ctx.emit("device.alert.recovered", ctx.subject)

# Usage
device = container.subject("device:prod-01")

# Batch mode: multiple sets + single store
device.set("cpu_usage", 75)      # → triggers handler, health="warning"
device.set("memory_usage", 60)
device.set("error_rate", 2)
device.store()  # Single persistence, flushes cache to Redis

# Single update mode: bypass cache, write directly
await device.set("cpu_usage", 95, use_cache=False)  # → health="critical" → alert!
await device.set("cpu_usage", 50, use_cache=False)  # → health="healthy" → recovered!
await device.set("cpu_usage", 45, use_cache=False)  # → NO EVENT (health unchanged)
```

**Key Concepts:**

1. **Reactive Composition** - `health` state is automatically derived from metrics
2. **Subject Type Filtering** - Handlers target `device:*` subjects using naming conventions
3. **Events on Change Only** - Property change events fire only when values actually change
4. **State Transitions** - Access `old_value` and `new_value` to handle transitions
5. **Efficient Persistence** - Batch updates with single `store()`, or `use_cache=False` for single updates
6. **Bounded Entities** - Devices are predictable, limited entities (not infinite like orders)

## Core Concepts

### Subjects - Reactive State Entities

Subjects are dynamic entities with persistent, reactive properties. Setting a property automatically emits a `subject-property-changed` event.

```python
from krules_core.container import KRulesContainer

container = KRulesContainer()

# Create subject
device = container.subject("device-456")

# Set properties (schema-less, fully dynamic)
device.set("temperature", 75.5)
device.set("status", "online")
device.set("metadata", {"location": "room-1", "floor": 2})

# Lambda values for atomic operations
device.set("count", 0)
device.set("count", lambda c: c + 1)  # Atomic increment

# Get with defaults
temp = device.get("temperature")
status = device.get("status", default="offline")

# Extended properties (metadata, no events)
device.set_ext("tags", ["production", "critical"])

# Persist to storage
device.store()
```

### Event Handlers - Declarative Processing

Define handlers using decorators. Supports glob patterns and conditional filters.

```python
from krules_core.container import KRulesContainer
from krules_core.event_types import SUBJECT_PROPERTY_CHANGED

container = KRulesContainer()
on, when, middleware, emit = container.handlers()

# Single event
@on("order.created")
async def process_order(ctx):
    ctx.subject.set("status", "processing")
    await ctx.emit("order.processing")

# Multiple events
@on("user.created", "user.updated", "user.deleted")
async def log_user_change(ctx):
    print(f"User event: {ctx.event_type}")

# Glob patterns
@on("device.*")
async def handle_device(ctx):
    print(f"Device event: {ctx.event_type}")

# Property change with filters
@on(SUBJECT_PROPERTY_CHANGED)
@when(lambda ctx: ctx.property_name == "status")
@when(lambda ctx: ctx.new_value == "error")
async def on_error_status(ctx):
    await ctx.emit("alert.device_error", {
        "device_id": ctx.subject.name
    })
```

### Filters - Conditional Execution

Stack multiple `@when` decorators for conditional execution (all must pass).

```python
# Multiple filters (AND logic)
@on("payment.process")
@when(lambda ctx: ctx.payload.get("amount") > 0)
@when(lambda ctx: ctx.subject.get("verified") == True)
async def process_payment(ctx):
    # Only for verified users with amount > 0
    pass

# Reusable filters
def is_premium(ctx):
    return ctx.subject.get("tier") == "premium"

def has_credits(ctx):
    return ctx.subject.get("credits", 0) > 0

@on("feature.use")
@when(is_premium)
@when(has_credits)
async def use_premium_feature(ctx):
    ctx.subject.set("credits", lambda c: c - 1)
```

### Middleware - Cross-Cutting Concerns

Middleware runs for all events, enabling logging, timing, error handling, etc.

```python
from krules_core.container import KRulesContainer
import time

container = KRulesContainer()
on, when, middleware, emit = container.handlers()

@middleware
async def timing_middleware(ctx, next):
    """Measure handler execution time"""
    start = time.time()
    await next()
    duration = time.time() - start
    print(f"{ctx.event_type} took {duration:.3f}s")

@middleware
async def error_handling(ctx, next):
    """Global error handler"""
    try:
        await next()
    except Exception as e:
        print(f"Handler error: {e}")
        await ctx.emit("error.handler_failed", {"error": str(e)})
```

## Storage Backends

KRules supports pluggable storage backends for subject persistence.

### Redis Storage

```python
from dependency_injector import providers
from krules_core.container import KRulesContainer
from redis_subjects_storage.storage_impl import create_redis_storage

# Create container
container = KRulesContainer()

# Override storage with Redis
redis_factory = create_redis_storage(
    url="redis://localhost:6379",
    key_prefix="myapp:"
)
container.subject_storage.override(providers.Object(redis_factory))

# Now all subjects use Redis
user = container.subject("user-123")
user.set("name", "John")  # Persisted in Redis
user.store()
```

### Custom Storage

Implement the storage interface to create custom backends:

```python
class CustomStorage:
    def __init__(self, subject_name, event_info=None, event_data=None):
        self._subject = subject_name

    def load(self):
        """Return (properties_dict, ext_properties_dict)"""
        return {}, {}

    def store(self, inserts=[], updates=[], deletes=[]):
        """Persist property changes"""
        pass

    def set(self, prop):
        """Set single property, return (new_value, old_value)"""
        pass

    def get(self, prop):
        """Get property value"""
        pass

    def delete(self, prop):
        """Delete property"""
        pass

    def flush(self):
        """Delete entire subject"""
        pass

    def get_ext_props(self):
        """Return extended properties dict"""
        return {}
```

## Testing

KRules provides utilities for easy testing:

```python
import pytest
from krules_core.container import KRulesContainer
from krules_core.event_types import SUBJECT_PROPERTY_CHANGED

@pytest.fixture
def container():
    """Create fresh container for each test"""
    return KRulesContainer()

@pytest.mark.asyncio
async def test_user_login(container):
    """Test user login handler"""
    on, when, middleware, emit = container.handlers()
    results = []

    @on("user.login")
    async def handler(ctx):
        results.append(ctx.event_type)
        ctx.subject.set("logged_in", True)

    user = container.subject("test-user")
    await emit("user.login", user)

    assert len(results) == 1
    assert user.get("logged_in") == True
```

## Documentation

- [Quick Start Guide](docs/QUICKSTART.md) - 5-minute tutorial
- [Core Concepts](docs/CORE_CONCEPTS.md) - Framework fundamentals
- [Subjects](docs/SUBJECTS.md) - Reactive property store deep dive
- [Event Handlers](docs/EVENT_HANDLERS.md) - Handlers, filters, patterns
- [Middleware](docs/MIDDLEWARE.md) - Cross-cutting concerns
- [Container & DI](docs/CONTAINER_DI.md) - Dependency injection
- [Storage Backends](docs/STORAGE_BACKENDS.md) - Persistence layer
- [Integrations](docs/INTEGRATIONS.md) - FastAPI, Pub/Sub, CloudEvents
- [Testing](docs/TESTING.md) - Testing strategies
- [Advanced Patterns](docs/ADVANCED_PATTERNS.md) - Production best practices
- [Shell Mode](docs/SHELL_MODE.md) - Interactive REPL usage
- [API Reference](docs/API_REFERENCE.md) - Complete API documentation

## Integrations

KRules supports event-driven communication with external systems through **event receivers** (inbound) and **event emitters** (outbound).

### Event Receivers (Inbound)

**FastAPI Integration** - Receive HTTP CloudEvents

```python
from krules_fastapi_env import KRulesApp
from krules_core.container import KRulesContainer

container = KRulesContainer()
on, when, middleware, emit = container.handlers()

# Define handlers (same as local events)
@on("order.created")
async def handle_order(ctx):
    print(f"Received order: {ctx.subject.name}")

# Create FastAPI app that receives CloudEvents
app = KRulesApp(krules_container=container)
# POST /krules endpoint now receives CloudEvents and triggers handlers
```

**Pub/Sub Subscriber** - Receive events from Google Pub/Sub

```python
from krules_cloudevents_pubsub import PubSubSubscriber

# Subscribe to Pub/Sub topic
subscriber = PubSubSubscriber(
    project_id="my-project",
    subscription_name="my-subscription",
    container=container
)

# Same handlers work for Pub/Sub events
await subscriber.run()
```

### Event Emitters (Outbound)

**HTTP CloudEvents** - Send events to external HTTP endpoints

```python
from krules_cloudevents import CloudEventsDispatcher, create_dispatcher_middleware

# Create dispatcher
dispatcher = CloudEventsDispatcher(
    dispatch_url="https://api.example.com/events",
    source="my-service",
    krules_container=container
)

# Register as middleware
dispatcher_mw = create_dispatcher_middleware(dispatcher)
container.event_bus().add_middleware(dispatcher_mw)

# Now emit events to external URL
await emit("user.created", user, dispatch_url="https://api.example.com/events")
```

**Pub/Sub Publisher** - Send events to Google Pub/Sub

```python
from krules_cloudevents_pubsub import CloudEventsDispatcher, create_dispatcher_middleware

# Create dispatcher
dispatcher = CloudEventsDispatcher(
    project_id="my-project",
    default_topic="krules-events",
    source="my-service",
    krules_container=container
)

# Register as middleware
dispatcher_mw = create_dispatcher_middleware(dispatcher)
container.event_bus().add_middleware(dispatcher_mw)

# Emit to Pub/Sub topic
await emit("user.created", user, topic="user-events")
```

See [Integrations](docs/INTEGRATIONS.md) for detailed guides.

## Requirements

- Python >=3.11
- asyncio support

## License

Apache License 2.0

## Contributing

This framework is maintained by [Airspot](mailto:info@airspot.tech) for internal use, but contributions are welcome.

## Support

For issues and questions, please open a GitHub issue.

---

**Built with ❤️ by Airspot**

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "krules-framework",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.11",
    "maintainer_email": null,
    "keywords": "async, event-driven, framework, krules, reactive",
    "author": null,
    "author_email": "Airspot <info@airspot.tech>",
    "download_url": "https://files.pythonhosted.org/packages/6e/00/ed54226a6f2d37b9b171da606e3fc47f9e12855aaaa2d76c5190c1921d1a/krules_framework-2.0.0.tar.gz",
    "platform": null,
    "description": "# KRules Framework\n\n**Modern async-first event-driven application framework for Python**\n\nKRules is a Python framework for building reactive, event-driven applications with a focus on dynamic state management and declarative event handling.\n\n## Features\n\n- **Reactive Subjects** - Dynamic entities with schema-less properties that automatically emit events on changes\n- **Declarative Handlers** - Clean decorator-based API (`@on`, `@when`, `@middleware`)\n- **Async Native** - Built on asyncio for high-performance concurrent event processing\n- **Type Safe** - Full type hints for excellent IDE support and type checking\n- **Dependency Injection** - Container-based architecture for testability and flexibility\n- **Storage Agnostic** - Pluggable backends (Redis, SQLite, in-memory, custom)\n- **Production Ready** - Middleware support, error isolation, monitoring hooks\n\n## Installation\n\n```bash\npip install krules-framework\n```\n\nWith optional features:\n\n```bash\n# Redis storage backend\npip install \"krules-framework[redis]\"\n\n# Google Cloud Pub/Sub\npip install \"krules-framework[pubsub]\"\n\n# FastAPI integration\npip install \"krules-framework[fastapi]\"\n```\n\n## Quick Example\n\nThis example demonstrates **reactive state composition** - building complex states from simple properties, where each layer reacts to changes in lower layers.\n\n```python\nfrom krules_core.container import KRulesContainer\nfrom krules_core.event_types import SUBJECT_PROPERTY_CHANGED\n\ncontainer = KRulesContainer()\non, when, middleware, emit = container.handlers()\n\n# Layer 1: Derive health status from metrics (ONLY for device: subjects)\n@on(SUBJECT_PROPERTY_CHANGED)\n@when(lambda ctx: ctx.subject.name.startswith(\"device:\"))\n@when(lambda ctx: ctx.property_name in [\"cpu_usage\", \"memory_usage\", \"error_rate\"])\nasync def compute_device_health(ctx):\n    \"\"\"Aggregate device metrics into health status\"\"\"\n    device = ctx.subject\n    # Read from subject's internal cache (even if not yet persisted)\n    cpu = device.get(\"cpu_usage\", 0)\n    memory = device.get(\"memory_usage\", 0)\n    errors = device.get(\"error_rate\", 0)\n\n    if cpu > 90 or memory > 90 or errors > 10:\n        device.set(\"health\", \"critical\")\n    elif cpu > 70 or memory > 70 or errors > 5:\n        device.set(\"health\", \"warning\")\n    else:\n        device.set(\"health\", \"healthy\")\n\n# Layer 2: React to health transitions (ONLY for device: subjects)\n@on(SUBJECT_PROPERTY_CHANGED)\n@when(lambda ctx: ctx.subject.name.startswith(\"device:\"))\n@when(lambda ctx: ctx.property_name == \"health\")\nasync def handle_device_health_change(ctx):\n    \"\"\"Take action based on health state transition\"\"\"\n    print(f\"{ctx.subject.name}: {ctx.old_value} \u2192 {ctx.new_value}\")\n\n    if ctx.new_value == \"critical\":\n        await ctx.emit(\"device.alert.critical\", ctx.subject)\n    elif ctx.new_value == \"healthy\" and ctx.old_value == \"critical\":\n        await ctx.emit(\"device.alert.recovered\", ctx.subject)\n\n# Usage\ndevice = container.subject(\"device:prod-01\")\n\n# Batch mode: multiple sets + single store\ndevice.set(\"cpu_usage\", 75)      # \u2192 triggers handler, health=\"warning\"\ndevice.set(\"memory_usage\", 60)\ndevice.set(\"error_rate\", 2)\ndevice.store()  # Single persistence, flushes cache to Redis\n\n# Single update mode: bypass cache, write directly\nawait device.set(\"cpu_usage\", 95, use_cache=False)  # \u2192 health=\"critical\" \u2192 alert!\nawait device.set(\"cpu_usage\", 50, use_cache=False)  # \u2192 health=\"healthy\" \u2192 recovered!\nawait device.set(\"cpu_usage\", 45, use_cache=False)  # \u2192 NO EVENT (health unchanged)\n```\n\n**Key Concepts:**\n\n1. **Reactive Composition** - `health` state is automatically derived from metrics\n2. **Subject Type Filtering** - Handlers target `device:*` subjects using naming conventions\n3. **Events on Change Only** - Property change events fire only when values actually change\n4. **State Transitions** - Access `old_value` and `new_value` to handle transitions\n5. **Efficient Persistence** - Batch updates with single `store()`, or `use_cache=False` for single updates\n6. **Bounded Entities** - Devices are predictable, limited entities (not infinite like orders)\n\n## Core Concepts\n\n### Subjects - Reactive State Entities\n\nSubjects are dynamic entities with persistent, reactive properties. Setting a property automatically emits a `subject-property-changed` event.\n\n```python\nfrom krules_core.container import KRulesContainer\n\ncontainer = KRulesContainer()\n\n# Create subject\ndevice = container.subject(\"device-456\")\n\n# Set properties (schema-less, fully dynamic)\ndevice.set(\"temperature\", 75.5)\ndevice.set(\"status\", \"online\")\ndevice.set(\"metadata\", {\"location\": \"room-1\", \"floor\": 2})\n\n# Lambda values for atomic operations\ndevice.set(\"count\", 0)\ndevice.set(\"count\", lambda c: c + 1)  # Atomic increment\n\n# Get with defaults\ntemp = device.get(\"temperature\")\nstatus = device.get(\"status\", default=\"offline\")\n\n# Extended properties (metadata, no events)\ndevice.set_ext(\"tags\", [\"production\", \"critical\"])\n\n# Persist to storage\ndevice.store()\n```\n\n### Event Handlers - Declarative Processing\n\nDefine handlers using decorators. Supports glob patterns and conditional filters.\n\n```python\nfrom krules_core.container import KRulesContainer\nfrom krules_core.event_types import SUBJECT_PROPERTY_CHANGED\n\ncontainer = KRulesContainer()\non, when, middleware, emit = container.handlers()\n\n# Single event\n@on(\"order.created\")\nasync def process_order(ctx):\n    ctx.subject.set(\"status\", \"processing\")\n    await ctx.emit(\"order.processing\")\n\n# Multiple events\n@on(\"user.created\", \"user.updated\", \"user.deleted\")\nasync def log_user_change(ctx):\n    print(f\"User event: {ctx.event_type}\")\n\n# Glob patterns\n@on(\"device.*\")\nasync def handle_device(ctx):\n    print(f\"Device event: {ctx.event_type}\")\n\n# Property change with filters\n@on(SUBJECT_PROPERTY_CHANGED)\n@when(lambda ctx: ctx.property_name == \"status\")\n@when(lambda ctx: ctx.new_value == \"error\")\nasync def on_error_status(ctx):\n    await ctx.emit(\"alert.device_error\", {\n        \"device_id\": ctx.subject.name\n    })\n```\n\n### Filters - Conditional Execution\n\nStack multiple `@when` decorators for conditional execution (all must pass).\n\n```python\n# Multiple filters (AND logic)\n@on(\"payment.process\")\n@when(lambda ctx: ctx.payload.get(\"amount\") > 0)\n@when(lambda ctx: ctx.subject.get(\"verified\") == True)\nasync def process_payment(ctx):\n    # Only for verified users with amount > 0\n    pass\n\n# Reusable filters\ndef is_premium(ctx):\n    return ctx.subject.get(\"tier\") == \"premium\"\n\ndef has_credits(ctx):\n    return ctx.subject.get(\"credits\", 0) > 0\n\n@on(\"feature.use\")\n@when(is_premium)\n@when(has_credits)\nasync def use_premium_feature(ctx):\n    ctx.subject.set(\"credits\", lambda c: c - 1)\n```\n\n### Middleware - Cross-Cutting Concerns\n\nMiddleware runs for all events, enabling logging, timing, error handling, etc.\n\n```python\nfrom krules_core.container import KRulesContainer\nimport time\n\ncontainer = KRulesContainer()\non, when, middleware, emit = container.handlers()\n\n@middleware\nasync def timing_middleware(ctx, next):\n    \"\"\"Measure handler execution time\"\"\"\n    start = time.time()\n    await next()\n    duration = time.time() - start\n    print(f\"{ctx.event_type} took {duration:.3f}s\")\n\n@middleware\nasync def error_handling(ctx, next):\n    \"\"\"Global error handler\"\"\"\n    try:\n        await next()\n    except Exception as e:\n        print(f\"Handler error: {e}\")\n        await ctx.emit(\"error.handler_failed\", {\"error\": str(e)})\n```\n\n## Storage Backends\n\nKRules supports pluggable storage backends for subject persistence.\n\n### Redis Storage\n\n```python\nfrom dependency_injector import providers\nfrom krules_core.container import KRulesContainer\nfrom redis_subjects_storage.storage_impl import create_redis_storage\n\n# Create container\ncontainer = KRulesContainer()\n\n# Override storage with Redis\nredis_factory = create_redis_storage(\n    url=\"redis://localhost:6379\",\n    key_prefix=\"myapp:\"\n)\ncontainer.subject_storage.override(providers.Object(redis_factory))\n\n# Now all subjects use Redis\nuser = container.subject(\"user-123\")\nuser.set(\"name\", \"John\")  # Persisted in Redis\nuser.store()\n```\n\n### Custom Storage\n\nImplement the storage interface to create custom backends:\n\n```python\nclass CustomStorage:\n    def __init__(self, subject_name, event_info=None, event_data=None):\n        self._subject = subject_name\n\n    def load(self):\n        \"\"\"Return (properties_dict, ext_properties_dict)\"\"\"\n        return {}, {}\n\n    def store(self, inserts=[], updates=[], deletes=[]):\n        \"\"\"Persist property changes\"\"\"\n        pass\n\n    def set(self, prop):\n        \"\"\"Set single property, return (new_value, old_value)\"\"\"\n        pass\n\n    def get(self, prop):\n        \"\"\"Get property value\"\"\"\n        pass\n\n    def delete(self, prop):\n        \"\"\"Delete property\"\"\"\n        pass\n\n    def flush(self):\n        \"\"\"Delete entire subject\"\"\"\n        pass\n\n    def get_ext_props(self):\n        \"\"\"Return extended properties dict\"\"\"\n        return {}\n```\n\n## Testing\n\nKRules provides utilities for easy testing:\n\n```python\nimport pytest\nfrom krules_core.container import KRulesContainer\nfrom krules_core.event_types import SUBJECT_PROPERTY_CHANGED\n\n@pytest.fixture\ndef container():\n    \"\"\"Create fresh container for each test\"\"\"\n    return KRulesContainer()\n\n@pytest.mark.asyncio\nasync def test_user_login(container):\n    \"\"\"Test user login handler\"\"\"\n    on, when, middleware, emit = container.handlers()\n    results = []\n\n    @on(\"user.login\")\n    async def handler(ctx):\n        results.append(ctx.event_type)\n        ctx.subject.set(\"logged_in\", True)\n\n    user = container.subject(\"test-user\")\n    await emit(\"user.login\", user)\n\n    assert len(results) == 1\n    assert user.get(\"logged_in\") == True\n```\n\n## Documentation\n\n- [Quick Start Guide](docs/QUICKSTART.md) - 5-minute tutorial\n- [Core Concepts](docs/CORE_CONCEPTS.md) - Framework fundamentals\n- [Subjects](docs/SUBJECTS.md) - Reactive property store deep dive\n- [Event Handlers](docs/EVENT_HANDLERS.md) - Handlers, filters, patterns\n- [Middleware](docs/MIDDLEWARE.md) - Cross-cutting concerns\n- [Container & DI](docs/CONTAINER_DI.md) - Dependency injection\n- [Storage Backends](docs/STORAGE_BACKENDS.md) - Persistence layer\n- [Integrations](docs/INTEGRATIONS.md) - FastAPI, Pub/Sub, CloudEvents\n- [Testing](docs/TESTING.md) - Testing strategies\n- [Advanced Patterns](docs/ADVANCED_PATTERNS.md) - Production best practices\n- [Shell Mode](docs/SHELL_MODE.md) - Interactive REPL usage\n- [API Reference](docs/API_REFERENCE.md) - Complete API documentation\n\n## Integrations\n\nKRules supports event-driven communication with external systems through **event receivers** (inbound) and **event emitters** (outbound).\n\n### Event Receivers (Inbound)\n\n**FastAPI Integration** - Receive HTTP CloudEvents\n\n```python\nfrom krules_fastapi_env import KRulesApp\nfrom krules_core.container import KRulesContainer\n\ncontainer = KRulesContainer()\non, when, middleware, emit = container.handlers()\n\n# Define handlers (same as local events)\n@on(\"order.created\")\nasync def handle_order(ctx):\n    print(f\"Received order: {ctx.subject.name}\")\n\n# Create FastAPI app that receives CloudEvents\napp = KRulesApp(krules_container=container)\n# POST /krules endpoint now receives CloudEvents and triggers handlers\n```\n\n**Pub/Sub Subscriber** - Receive events from Google Pub/Sub\n\n```python\nfrom krules_cloudevents_pubsub import PubSubSubscriber\n\n# Subscribe to Pub/Sub topic\nsubscriber = PubSubSubscriber(\n    project_id=\"my-project\",\n    subscription_name=\"my-subscription\",\n    container=container\n)\n\n# Same handlers work for Pub/Sub events\nawait subscriber.run()\n```\n\n### Event Emitters (Outbound)\n\n**HTTP CloudEvents** - Send events to external HTTP endpoints\n\n```python\nfrom krules_cloudevents import CloudEventsDispatcher, create_dispatcher_middleware\n\n# Create dispatcher\ndispatcher = CloudEventsDispatcher(\n    dispatch_url=\"https://api.example.com/events\",\n    source=\"my-service\",\n    krules_container=container\n)\n\n# Register as middleware\ndispatcher_mw = create_dispatcher_middleware(dispatcher)\ncontainer.event_bus().add_middleware(dispatcher_mw)\n\n# Now emit events to external URL\nawait emit(\"user.created\", user, dispatch_url=\"https://api.example.com/events\")\n```\n\n**Pub/Sub Publisher** - Send events to Google Pub/Sub\n\n```python\nfrom krules_cloudevents_pubsub import CloudEventsDispatcher, create_dispatcher_middleware\n\n# Create dispatcher\ndispatcher = CloudEventsDispatcher(\n    project_id=\"my-project\",\n    default_topic=\"krules-events\",\n    source=\"my-service\",\n    krules_container=container\n)\n\n# Register as middleware\ndispatcher_mw = create_dispatcher_middleware(dispatcher)\ncontainer.event_bus().add_middleware(dispatcher_mw)\n\n# Emit to Pub/Sub topic\nawait emit(\"user.created\", user, topic=\"user-events\")\n```\n\nSee [Integrations](docs/INTEGRATIONS.md) for detailed guides.\n\n## Requirements\n\n- Python >=3.11\n- asyncio support\n\n## License\n\nApache License 2.0\n\n## Contributing\n\nThis framework is maintained by [Airspot](mailto:info@airspot.tech) for internal use, but contributions are welcome.\n\n## Support\n\nFor issues and questions, please open a GitHub issue.\n\n---\n\n**Built with \u2764\ufe0f by Airspot**\n",
    "bugtrack_url": null,
    "license": "Apache-2.0",
    "summary": "KRules Framework - Async event-driven framework",
    "version": "2.0.0",
    "project_urls": null,
    "split_keywords": [
        "async",
        " event-driven",
        " framework",
        " krules",
        " reactive"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "42df05cae35b851927e9669cfd529fa801d307bb3e3098f93476919afff0d74b",
                "md5": "4b0034a95840451e2f2968b4b5db2a90",
                "sha256": "4c5747c4a182d573e83d543390b0886b2b714142d87aa70e493e47c0216e3a3b"
            },
            "downloads": -1,
            "filename": "krules_framework-2.0.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "4b0034a95840451e2f2968b4b5db2a90",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.11",
            "size": 52027,
            "upload_time": "2025-11-03T10:23:34",
            "upload_time_iso_8601": "2025-11-03T10:23:34.204822Z",
            "url": "https://files.pythonhosted.org/packages/42/df/05cae35b851927e9669cfd529fa801d307bb3e3098f93476919afff0d74b/krules_framework-2.0.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "6e00ed54226a6f2d37b9b171da606e3fc47f9e12855aaaa2d76c5190c1921d1a",
                "md5": "e15fc311acca1f8445b8e8b0c0752e32",
                "sha256": "845df60315d750f05c7a4394437f84f984167d17af422659a0297efce60a5a2e"
            },
            "downloads": -1,
            "filename": "krules_framework-2.0.0.tar.gz",
            "has_sig": false,
            "md5_digest": "e15fc311acca1f8445b8e8b0c0752e32",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.11",
            "size": 30338,
            "upload_time": "2025-11-03T10:23:35",
            "upload_time_iso_8601": "2025-11-03T10:23:35.226979Z",
            "url": "https://files.pythonhosted.org/packages/6e/00/ed54226a6f2d37b9b171da606e3fc47f9e12855aaaa2d76c5190c1921d1a/krules_framework-2.0.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-11-03 10:23:35",
    "github": false,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "lcname": "krules-framework"
}
        
Elapsed time: 3.83942s