# KRules Framework
**Modern async-first event-driven application framework for Python**
KRules is a Python framework for building reactive, event-driven applications with a focus on dynamic state management and declarative event handling.
## Features
- **Reactive Subjects** - Dynamic entities with schema-less properties that automatically emit events on changes
- **Declarative Handlers** - Clean decorator-based API (`@on`, `@when`, `@middleware`)
- **Async Native** - Built on asyncio for high-performance concurrent event processing
- **Type Safe** - Full type hints for excellent IDE support and type checking
- **Dependency Injection** - Container-based architecture for testability and flexibility
- **Storage Agnostic** - Pluggable backends (Redis, SQLite, in-memory, custom)
- **Production Ready** - Middleware support, error isolation, monitoring hooks
## Installation
```bash
pip install krules-framework
```
With optional features:
```bash
# Redis storage backend
pip install "krules-framework[redis]"
# Google Cloud Pub/Sub
pip install "krules-framework[pubsub]"
# FastAPI integration
pip install "krules-framework[fastapi]"
```
## Quick Example
This example demonstrates **reactive state composition** - building complex states from simple properties, where each layer reacts to changes in lower layers.
```python
from krules_core.container import KRulesContainer
from krules_core.event_types import SUBJECT_PROPERTY_CHANGED
container = KRulesContainer()
on, when, middleware, emit = container.handlers()
# Layer 1: Derive health status from metrics (ONLY for device: subjects)
@on(SUBJECT_PROPERTY_CHANGED)
@when(lambda ctx: ctx.subject.name.startswith("device:"))
@when(lambda ctx: ctx.property_name in ["cpu_usage", "memory_usage", "error_rate"])
async def compute_device_health(ctx):
"""Aggregate device metrics into health status"""
device = ctx.subject
# Read from subject's internal cache (even if not yet persisted)
cpu = device.get("cpu_usage", 0)
memory = device.get("memory_usage", 0)
errors = device.get("error_rate", 0)
if cpu > 90 or memory > 90 or errors > 10:
device.set("health", "critical")
elif cpu > 70 or memory > 70 or errors > 5:
device.set("health", "warning")
else:
device.set("health", "healthy")
# Layer 2: React to health transitions (ONLY for device: subjects)
@on(SUBJECT_PROPERTY_CHANGED)
@when(lambda ctx: ctx.subject.name.startswith("device:"))
@when(lambda ctx: ctx.property_name == "health")
async def handle_device_health_change(ctx):
"""Take action based on health state transition"""
print(f"{ctx.subject.name}: {ctx.old_value} → {ctx.new_value}")
if ctx.new_value == "critical":
await ctx.emit("device.alert.critical", ctx.subject)
elif ctx.new_value == "healthy" and ctx.old_value == "critical":
await ctx.emit("device.alert.recovered", ctx.subject)
# Usage
device = container.subject("device:prod-01")
# Batch mode: multiple sets + single store
device.set("cpu_usage", 75) # → triggers handler, health="warning"
device.set("memory_usage", 60)
device.set("error_rate", 2)
device.store() # Single persistence, flushes cache to Redis
# Single update mode: bypass cache, write directly
await device.set("cpu_usage", 95, use_cache=False) # → health="critical" → alert!
await device.set("cpu_usage", 50, use_cache=False) # → health="healthy" → recovered!
await device.set("cpu_usage", 45, use_cache=False) # → NO EVENT (health unchanged)
```
**Key Concepts:**
1. **Reactive Composition** - `health` state is automatically derived from metrics
2. **Subject Type Filtering** - Handlers target `device:*` subjects using naming conventions
3. **Events on Change Only** - Property change events fire only when values actually change
4. **State Transitions** - Access `old_value` and `new_value` to handle transitions
5. **Efficient Persistence** - Batch updates with single `store()`, or `use_cache=False` for single updates
6. **Bounded Entities** - Devices are predictable, limited entities (not infinite like orders)
## Core Concepts
### Subjects - Reactive State Entities
Subjects are dynamic entities with persistent, reactive properties. Setting a property automatically emits a `subject-property-changed` event.
```python
from krules_core.container import KRulesContainer
container = KRulesContainer()
# Create subject
device = container.subject("device-456")
# Set properties (schema-less, fully dynamic)
device.set("temperature", 75.5)
device.set("status", "online")
device.set("metadata", {"location": "room-1", "floor": 2})
# Lambda values for atomic operations
device.set("count", 0)
device.set("count", lambda c: c + 1) # Atomic increment
# Get with defaults
temp = device.get("temperature")
status = device.get("status", default="offline")
# Extended properties (metadata, no events)
device.set_ext("tags", ["production", "critical"])
# Persist to storage
device.store()
```
### Event Handlers - Declarative Processing
Define handlers using decorators. Supports glob patterns and conditional filters.
```python
from krules_core.container import KRulesContainer
from krules_core.event_types import SUBJECT_PROPERTY_CHANGED
container = KRulesContainer()
on, when, middleware, emit = container.handlers()
# Single event
@on("order.created")
async def process_order(ctx):
ctx.subject.set("status", "processing")
await ctx.emit("order.processing")
# Multiple events
@on("user.created", "user.updated", "user.deleted")
async def log_user_change(ctx):
print(f"User event: {ctx.event_type}")
# Glob patterns
@on("device.*")
async def handle_device(ctx):
print(f"Device event: {ctx.event_type}")
# Property change with filters
@on(SUBJECT_PROPERTY_CHANGED)
@when(lambda ctx: ctx.property_name == "status")
@when(lambda ctx: ctx.new_value == "error")
async def on_error_status(ctx):
await ctx.emit("alert.device_error", {
"device_id": ctx.subject.name
})
```
### Filters - Conditional Execution
Stack multiple `@when` decorators for conditional execution (all must pass).
```python
# Multiple filters (AND logic)
@on("payment.process")
@when(lambda ctx: ctx.payload.get("amount") > 0)
@when(lambda ctx: ctx.subject.get("verified") == True)
async def process_payment(ctx):
# Only for verified users with amount > 0
pass
# Reusable filters
def is_premium(ctx):
return ctx.subject.get("tier") == "premium"
def has_credits(ctx):
return ctx.subject.get("credits", 0) > 0
@on("feature.use")
@when(is_premium)
@when(has_credits)
async def use_premium_feature(ctx):
ctx.subject.set("credits", lambda c: c - 1)
```
### Middleware - Cross-Cutting Concerns
Middleware runs for all events, enabling logging, timing, error handling, etc.
```python
from krules_core.container import KRulesContainer
import time
container = KRulesContainer()
on, when, middleware, emit = container.handlers()
@middleware
async def timing_middleware(ctx, next):
"""Measure handler execution time"""
start = time.time()
await next()
duration = time.time() - start
print(f"{ctx.event_type} took {duration:.3f}s")
@middleware
async def error_handling(ctx, next):
"""Global error handler"""
try:
await next()
except Exception as e:
print(f"Handler error: {e}")
await ctx.emit("error.handler_failed", {"error": str(e)})
```
## Storage Backends
KRules supports pluggable storage backends for subject persistence.
### Redis Storage
```python
from dependency_injector import providers
from krules_core.container import KRulesContainer
from redis_subjects_storage.storage_impl import create_redis_storage
# Create container
container = KRulesContainer()
# Override storage with Redis
redis_factory = create_redis_storage(
url="redis://localhost:6379",
key_prefix="myapp:"
)
container.subject_storage.override(providers.Object(redis_factory))
# Now all subjects use Redis
user = container.subject("user-123")
user.set("name", "John") # Persisted in Redis
user.store()
```
### Custom Storage
Implement the storage interface to create custom backends:
```python
class CustomStorage:
def __init__(self, subject_name, event_info=None, event_data=None):
self._subject = subject_name
def load(self):
"""Return (properties_dict, ext_properties_dict)"""
return {}, {}
def store(self, inserts=[], updates=[], deletes=[]):
"""Persist property changes"""
pass
def set(self, prop):
"""Set single property, return (new_value, old_value)"""
pass
def get(self, prop):
"""Get property value"""
pass
def delete(self, prop):
"""Delete property"""
pass
def flush(self):
"""Delete entire subject"""
pass
def get_ext_props(self):
"""Return extended properties dict"""
return {}
```
## Testing
KRules provides utilities for easy testing:
```python
import pytest
from krules_core.container import KRulesContainer
from krules_core.event_types import SUBJECT_PROPERTY_CHANGED
@pytest.fixture
def container():
"""Create fresh container for each test"""
return KRulesContainer()
@pytest.mark.asyncio
async def test_user_login(container):
"""Test user login handler"""
on, when, middleware, emit = container.handlers()
results = []
@on("user.login")
async def handler(ctx):
results.append(ctx.event_type)
ctx.subject.set("logged_in", True)
user = container.subject("test-user")
await emit("user.login", user)
assert len(results) == 1
assert user.get("logged_in") == True
```
## Documentation
- [Quick Start Guide](docs/QUICKSTART.md) - 5-minute tutorial
- [Core Concepts](docs/CORE_CONCEPTS.md) - Framework fundamentals
- [Subjects](docs/SUBJECTS.md) - Reactive property store deep dive
- [Event Handlers](docs/EVENT_HANDLERS.md) - Handlers, filters, patterns
- [Middleware](docs/MIDDLEWARE.md) - Cross-cutting concerns
- [Container & DI](docs/CONTAINER_DI.md) - Dependency injection
- [Storage Backends](docs/STORAGE_BACKENDS.md) - Persistence layer
- [Integrations](docs/INTEGRATIONS.md) - FastAPI, Pub/Sub, CloudEvents
- [Testing](docs/TESTING.md) - Testing strategies
- [Advanced Patterns](docs/ADVANCED_PATTERNS.md) - Production best practices
- [Shell Mode](docs/SHELL_MODE.md) - Interactive REPL usage
- [API Reference](docs/API_REFERENCE.md) - Complete API documentation
## Integrations
KRules supports event-driven communication with external systems through **event receivers** (inbound) and **event emitters** (outbound).
### Event Receivers (Inbound)
**FastAPI Integration** - Receive HTTP CloudEvents
```python
from krules_fastapi_env import KRulesApp
from krules_core.container import KRulesContainer
container = KRulesContainer()
on, when, middleware, emit = container.handlers()
# Define handlers (same as local events)
@on("order.created")
async def handle_order(ctx):
print(f"Received order: {ctx.subject.name}")
# Create FastAPI app that receives CloudEvents
app = KRulesApp(krules_container=container)
# POST /krules endpoint now receives CloudEvents and triggers handlers
```
**Pub/Sub Subscriber** - Receive events from Google Pub/Sub
```python
from krules_cloudevents_pubsub import PubSubSubscriber
# Subscribe to Pub/Sub topic
subscriber = PubSubSubscriber(
project_id="my-project",
subscription_name="my-subscription",
container=container
)
# Same handlers work for Pub/Sub events
await subscriber.run()
```
### Event Emitters (Outbound)
**HTTP CloudEvents** - Send events to external HTTP endpoints
```python
from krules_cloudevents import CloudEventsDispatcher, create_dispatcher_middleware
# Create dispatcher
dispatcher = CloudEventsDispatcher(
dispatch_url="https://api.example.com/events",
source="my-service",
krules_container=container
)
# Register as middleware
dispatcher_mw = create_dispatcher_middleware(dispatcher)
container.event_bus().add_middleware(dispatcher_mw)
# Now emit events to external URL
await emit("user.created", user, dispatch_url="https://api.example.com/events")
```
**Pub/Sub Publisher** - Send events to Google Pub/Sub
```python
from krules_cloudevents_pubsub import CloudEventsDispatcher, create_dispatcher_middleware
# Create dispatcher
dispatcher = CloudEventsDispatcher(
project_id="my-project",
default_topic="krules-events",
source="my-service",
krules_container=container
)
# Register as middleware
dispatcher_mw = create_dispatcher_middleware(dispatcher)
container.event_bus().add_middleware(dispatcher_mw)
# Emit to Pub/Sub topic
await emit("user.created", user, topic="user-events")
```
See [Integrations](docs/INTEGRATIONS.md) for detailed guides.
## Requirements
- Python >=3.11
- asyncio support
## License
Apache License 2.0
## Contributing
This framework is maintained by [Airspot](mailto:info@airspot.tech) for internal use, but contributions are welcome.
## Support
For issues and questions, please open a GitHub issue.
---
**Built with ❤️ by Airspot**
Raw data
{
"_id": null,
"home_page": null,
"name": "krules-framework",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.11",
"maintainer_email": null,
"keywords": "async, event-driven, framework, krules, reactive",
"author": null,
"author_email": "Airspot <info@airspot.tech>",
"download_url": "https://files.pythonhosted.org/packages/6e/00/ed54226a6f2d37b9b171da606e3fc47f9e12855aaaa2d76c5190c1921d1a/krules_framework-2.0.0.tar.gz",
"platform": null,
"description": "# KRules Framework\n\n**Modern async-first event-driven application framework for Python**\n\nKRules is a Python framework for building reactive, event-driven applications with a focus on dynamic state management and declarative event handling.\n\n## Features\n\n- **Reactive Subjects** - Dynamic entities with schema-less properties that automatically emit events on changes\n- **Declarative Handlers** - Clean decorator-based API (`@on`, `@when`, `@middleware`)\n- **Async Native** - Built on asyncio for high-performance concurrent event processing\n- **Type Safe** - Full type hints for excellent IDE support and type checking\n- **Dependency Injection** - Container-based architecture for testability and flexibility\n- **Storage Agnostic** - Pluggable backends (Redis, SQLite, in-memory, custom)\n- **Production Ready** - Middleware support, error isolation, monitoring hooks\n\n## Installation\n\n```bash\npip install krules-framework\n```\n\nWith optional features:\n\n```bash\n# Redis storage backend\npip install \"krules-framework[redis]\"\n\n# Google Cloud Pub/Sub\npip install \"krules-framework[pubsub]\"\n\n# FastAPI integration\npip install \"krules-framework[fastapi]\"\n```\n\n## Quick Example\n\nThis example demonstrates **reactive state composition** - building complex states from simple properties, where each layer reacts to changes in lower layers.\n\n```python\nfrom krules_core.container import KRulesContainer\nfrom krules_core.event_types import SUBJECT_PROPERTY_CHANGED\n\ncontainer = KRulesContainer()\non, when, middleware, emit = container.handlers()\n\n# Layer 1: Derive health status from metrics (ONLY for device: subjects)\n@on(SUBJECT_PROPERTY_CHANGED)\n@when(lambda ctx: ctx.subject.name.startswith(\"device:\"))\n@when(lambda ctx: ctx.property_name in [\"cpu_usage\", \"memory_usage\", \"error_rate\"])\nasync def compute_device_health(ctx):\n \"\"\"Aggregate device metrics into health status\"\"\"\n device = ctx.subject\n # Read from subject's internal cache (even if not yet persisted)\n cpu = device.get(\"cpu_usage\", 0)\n memory = device.get(\"memory_usage\", 0)\n errors = device.get(\"error_rate\", 0)\n\n if cpu > 90 or memory > 90 or errors > 10:\n device.set(\"health\", \"critical\")\n elif cpu > 70 or memory > 70 or errors > 5:\n device.set(\"health\", \"warning\")\n else:\n device.set(\"health\", \"healthy\")\n\n# Layer 2: React to health transitions (ONLY for device: subjects)\n@on(SUBJECT_PROPERTY_CHANGED)\n@when(lambda ctx: ctx.subject.name.startswith(\"device:\"))\n@when(lambda ctx: ctx.property_name == \"health\")\nasync def handle_device_health_change(ctx):\n \"\"\"Take action based on health state transition\"\"\"\n print(f\"{ctx.subject.name}: {ctx.old_value} \u2192 {ctx.new_value}\")\n\n if ctx.new_value == \"critical\":\n await ctx.emit(\"device.alert.critical\", ctx.subject)\n elif ctx.new_value == \"healthy\" and ctx.old_value == \"critical\":\n await ctx.emit(\"device.alert.recovered\", ctx.subject)\n\n# Usage\ndevice = container.subject(\"device:prod-01\")\n\n# Batch mode: multiple sets + single store\ndevice.set(\"cpu_usage\", 75) # \u2192 triggers handler, health=\"warning\"\ndevice.set(\"memory_usage\", 60)\ndevice.set(\"error_rate\", 2)\ndevice.store() # Single persistence, flushes cache to Redis\n\n# Single update mode: bypass cache, write directly\nawait device.set(\"cpu_usage\", 95, use_cache=False) # \u2192 health=\"critical\" \u2192 alert!\nawait device.set(\"cpu_usage\", 50, use_cache=False) # \u2192 health=\"healthy\" \u2192 recovered!\nawait device.set(\"cpu_usage\", 45, use_cache=False) # \u2192 NO EVENT (health unchanged)\n```\n\n**Key Concepts:**\n\n1. **Reactive Composition** - `health` state is automatically derived from metrics\n2. **Subject Type Filtering** - Handlers target `device:*` subjects using naming conventions\n3. **Events on Change Only** - Property change events fire only when values actually change\n4. **State Transitions** - Access `old_value` and `new_value` to handle transitions\n5. **Efficient Persistence** - Batch updates with single `store()`, or `use_cache=False` for single updates\n6. **Bounded Entities** - Devices are predictable, limited entities (not infinite like orders)\n\n## Core Concepts\n\n### Subjects - Reactive State Entities\n\nSubjects are dynamic entities with persistent, reactive properties. Setting a property automatically emits a `subject-property-changed` event.\n\n```python\nfrom krules_core.container import KRulesContainer\n\ncontainer = KRulesContainer()\n\n# Create subject\ndevice = container.subject(\"device-456\")\n\n# Set properties (schema-less, fully dynamic)\ndevice.set(\"temperature\", 75.5)\ndevice.set(\"status\", \"online\")\ndevice.set(\"metadata\", {\"location\": \"room-1\", \"floor\": 2})\n\n# Lambda values for atomic operations\ndevice.set(\"count\", 0)\ndevice.set(\"count\", lambda c: c + 1) # Atomic increment\n\n# Get with defaults\ntemp = device.get(\"temperature\")\nstatus = device.get(\"status\", default=\"offline\")\n\n# Extended properties (metadata, no events)\ndevice.set_ext(\"tags\", [\"production\", \"critical\"])\n\n# Persist to storage\ndevice.store()\n```\n\n### Event Handlers - Declarative Processing\n\nDefine handlers using decorators. Supports glob patterns and conditional filters.\n\n```python\nfrom krules_core.container import KRulesContainer\nfrom krules_core.event_types import SUBJECT_PROPERTY_CHANGED\n\ncontainer = KRulesContainer()\non, when, middleware, emit = container.handlers()\n\n# Single event\n@on(\"order.created\")\nasync def process_order(ctx):\n ctx.subject.set(\"status\", \"processing\")\n await ctx.emit(\"order.processing\")\n\n# Multiple events\n@on(\"user.created\", \"user.updated\", \"user.deleted\")\nasync def log_user_change(ctx):\n print(f\"User event: {ctx.event_type}\")\n\n# Glob patterns\n@on(\"device.*\")\nasync def handle_device(ctx):\n print(f\"Device event: {ctx.event_type}\")\n\n# Property change with filters\n@on(SUBJECT_PROPERTY_CHANGED)\n@when(lambda ctx: ctx.property_name == \"status\")\n@when(lambda ctx: ctx.new_value == \"error\")\nasync def on_error_status(ctx):\n await ctx.emit(\"alert.device_error\", {\n \"device_id\": ctx.subject.name\n })\n```\n\n### Filters - Conditional Execution\n\nStack multiple `@when` decorators for conditional execution (all must pass).\n\n```python\n# Multiple filters (AND logic)\n@on(\"payment.process\")\n@when(lambda ctx: ctx.payload.get(\"amount\") > 0)\n@when(lambda ctx: ctx.subject.get(\"verified\") == True)\nasync def process_payment(ctx):\n # Only for verified users with amount > 0\n pass\n\n# Reusable filters\ndef is_premium(ctx):\n return ctx.subject.get(\"tier\") == \"premium\"\n\ndef has_credits(ctx):\n return ctx.subject.get(\"credits\", 0) > 0\n\n@on(\"feature.use\")\n@when(is_premium)\n@when(has_credits)\nasync def use_premium_feature(ctx):\n ctx.subject.set(\"credits\", lambda c: c - 1)\n```\n\n### Middleware - Cross-Cutting Concerns\n\nMiddleware runs for all events, enabling logging, timing, error handling, etc.\n\n```python\nfrom krules_core.container import KRulesContainer\nimport time\n\ncontainer = KRulesContainer()\non, when, middleware, emit = container.handlers()\n\n@middleware\nasync def timing_middleware(ctx, next):\n \"\"\"Measure handler execution time\"\"\"\n start = time.time()\n await next()\n duration = time.time() - start\n print(f\"{ctx.event_type} took {duration:.3f}s\")\n\n@middleware\nasync def error_handling(ctx, next):\n \"\"\"Global error handler\"\"\"\n try:\n await next()\n except Exception as e:\n print(f\"Handler error: {e}\")\n await ctx.emit(\"error.handler_failed\", {\"error\": str(e)})\n```\n\n## Storage Backends\n\nKRules supports pluggable storage backends for subject persistence.\n\n### Redis Storage\n\n```python\nfrom dependency_injector import providers\nfrom krules_core.container import KRulesContainer\nfrom redis_subjects_storage.storage_impl import create_redis_storage\n\n# Create container\ncontainer = KRulesContainer()\n\n# Override storage with Redis\nredis_factory = create_redis_storage(\n url=\"redis://localhost:6379\",\n key_prefix=\"myapp:\"\n)\ncontainer.subject_storage.override(providers.Object(redis_factory))\n\n# Now all subjects use Redis\nuser = container.subject(\"user-123\")\nuser.set(\"name\", \"John\") # Persisted in Redis\nuser.store()\n```\n\n### Custom Storage\n\nImplement the storage interface to create custom backends:\n\n```python\nclass CustomStorage:\n def __init__(self, subject_name, event_info=None, event_data=None):\n self._subject = subject_name\n\n def load(self):\n \"\"\"Return (properties_dict, ext_properties_dict)\"\"\"\n return {}, {}\n\n def store(self, inserts=[], updates=[], deletes=[]):\n \"\"\"Persist property changes\"\"\"\n pass\n\n def set(self, prop):\n \"\"\"Set single property, return (new_value, old_value)\"\"\"\n pass\n\n def get(self, prop):\n \"\"\"Get property value\"\"\"\n pass\n\n def delete(self, prop):\n \"\"\"Delete property\"\"\"\n pass\n\n def flush(self):\n \"\"\"Delete entire subject\"\"\"\n pass\n\n def get_ext_props(self):\n \"\"\"Return extended properties dict\"\"\"\n return {}\n```\n\n## Testing\n\nKRules provides utilities for easy testing:\n\n```python\nimport pytest\nfrom krules_core.container import KRulesContainer\nfrom krules_core.event_types import SUBJECT_PROPERTY_CHANGED\n\n@pytest.fixture\ndef container():\n \"\"\"Create fresh container for each test\"\"\"\n return KRulesContainer()\n\n@pytest.mark.asyncio\nasync def test_user_login(container):\n \"\"\"Test user login handler\"\"\"\n on, when, middleware, emit = container.handlers()\n results = []\n\n @on(\"user.login\")\n async def handler(ctx):\n results.append(ctx.event_type)\n ctx.subject.set(\"logged_in\", True)\n\n user = container.subject(\"test-user\")\n await emit(\"user.login\", user)\n\n assert len(results) == 1\n assert user.get(\"logged_in\") == True\n```\n\n## Documentation\n\n- [Quick Start Guide](docs/QUICKSTART.md) - 5-minute tutorial\n- [Core Concepts](docs/CORE_CONCEPTS.md) - Framework fundamentals\n- [Subjects](docs/SUBJECTS.md) - Reactive property store deep dive\n- [Event Handlers](docs/EVENT_HANDLERS.md) - Handlers, filters, patterns\n- [Middleware](docs/MIDDLEWARE.md) - Cross-cutting concerns\n- [Container & DI](docs/CONTAINER_DI.md) - Dependency injection\n- [Storage Backends](docs/STORAGE_BACKENDS.md) - Persistence layer\n- [Integrations](docs/INTEGRATIONS.md) - FastAPI, Pub/Sub, CloudEvents\n- [Testing](docs/TESTING.md) - Testing strategies\n- [Advanced Patterns](docs/ADVANCED_PATTERNS.md) - Production best practices\n- [Shell Mode](docs/SHELL_MODE.md) - Interactive REPL usage\n- [API Reference](docs/API_REFERENCE.md) - Complete API documentation\n\n## Integrations\n\nKRules supports event-driven communication with external systems through **event receivers** (inbound) and **event emitters** (outbound).\n\n### Event Receivers (Inbound)\n\n**FastAPI Integration** - Receive HTTP CloudEvents\n\n```python\nfrom krules_fastapi_env import KRulesApp\nfrom krules_core.container import KRulesContainer\n\ncontainer = KRulesContainer()\non, when, middleware, emit = container.handlers()\n\n# Define handlers (same as local events)\n@on(\"order.created\")\nasync def handle_order(ctx):\n print(f\"Received order: {ctx.subject.name}\")\n\n# Create FastAPI app that receives CloudEvents\napp = KRulesApp(krules_container=container)\n# POST /krules endpoint now receives CloudEvents and triggers handlers\n```\n\n**Pub/Sub Subscriber** - Receive events from Google Pub/Sub\n\n```python\nfrom krules_cloudevents_pubsub import PubSubSubscriber\n\n# Subscribe to Pub/Sub topic\nsubscriber = PubSubSubscriber(\n project_id=\"my-project\",\n subscription_name=\"my-subscription\",\n container=container\n)\n\n# Same handlers work for Pub/Sub events\nawait subscriber.run()\n```\n\n### Event Emitters (Outbound)\n\n**HTTP CloudEvents** - Send events to external HTTP endpoints\n\n```python\nfrom krules_cloudevents import CloudEventsDispatcher, create_dispatcher_middleware\n\n# Create dispatcher\ndispatcher = CloudEventsDispatcher(\n dispatch_url=\"https://api.example.com/events\",\n source=\"my-service\",\n krules_container=container\n)\n\n# Register as middleware\ndispatcher_mw = create_dispatcher_middleware(dispatcher)\ncontainer.event_bus().add_middleware(dispatcher_mw)\n\n# Now emit events to external URL\nawait emit(\"user.created\", user, dispatch_url=\"https://api.example.com/events\")\n```\n\n**Pub/Sub Publisher** - Send events to Google Pub/Sub\n\n```python\nfrom krules_cloudevents_pubsub import CloudEventsDispatcher, create_dispatcher_middleware\n\n# Create dispatcher\ndispatcher = CloudEventsDispatcher(\n project_id=\"my-project\",\n default_topic=\"krules-events\",\n source=\"my-service\",\n krules_container=container\n)\n\n# Register as middleware\ndispatcher_mw = create_dispatcher_middleware(dispatcher)\ncontainer.event_bus().add_middleware(dispatcher_mw)\n\n# Emit to Pub/Sub topic\nawait emit(\"user.created\", user, topic=\"user-events\")\n```\n\nSee [Integrations](docs/INTEGRATIONS.md) for detailed guides.\n\n## Requirements\n\n- Python >=3.11\n- asyncio support\n\n## License\n\nApache License 2.0\n\n## Contributing\n\nThis framework is maintained by [Airspot](mailto:info@airspot.tech) for internal use, but contributions are welcome.\n\n## Support\n\nFor issues and questions, please open a GitHub issue.\n\n---\n\n**Built with \u2764\ufe0f by Airspot**\n",
"bugtrack_url": null,
"license": "Apache-2.0",
"summary": "KRules Framework - Async event-driven framework",
"version": "2.0.0",
"project_urls": null,
"split_keywords": [
"async",
" event-driven",
" framework",
" krules",
" reactive"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "42df05cae35b851927e9669cfd529fa801d307bb3e3098f93476919afff0d74b",
"md5": "4b0034a95840451e2f2968b4b5db2a90",
"sha256": "4c5747c4a182d573e83d543390b0886b2b714142d87aa70e493e47c0216e3a3b"
},
"downloads": -1,
"filename": "krules_framework-2.0.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "4b0034a95840451e2f2968b4b5db2a90",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.11",
"size": 52027,
"upload_time": "2025-11-03T10:23:34",
"upload_time_iso_8601": "2025-11-03T10:23:34.204822Z",
"url": "https://files.pythonhosted.org/packages/42/df/05cae35b851927e9669cfd529fa801d307bb3e3098f93476919afff0d74b/krules_framework-2.0.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "6e00ed54226a6f2d37b9b171da606e3fc47f9e12855aaaa2d76c5190c1921d1a",
"md5": "e15fc311acca1f8445b8e8b0c0752e32",
"sha256": "845df60315d750f05c7a4394437f84f984167d17af422659a0297efce60a5a2e"
},
"downloads": -1,
"filename": "krules_framework-2.0.0.tar.gz",
"has_sig": false,
"md5_digest": "e15fc311acca1f8445b8e8b0c0752e32",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.11",
"size": 30338,
"upload_time": "2025-11-03T10:23:35",
"upload_time_iso_8601": "2025-11-03T10:23:35.226979Z",
"url": "https://files.pythonhosted.org/packages/6e/00/ed54226a6f2d37b9b171da606e3fc47f9e12855aaaa2d76c5190c1921d1a/krules_framework-2.0.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-11-03 10:23:35",
"github": false,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"lcname": "krules-framework"
}