# Trading Common
Common utilities for trading platform including Kafka producer/consumer wrappers, Postgres inbox/outbox patterns, and event validation through trading-contracts.
## Features
- **Database**: AsyncPG wrapper with inbox/outbox tables and idempotency support
- **Kafka**: AIOKafka producer/consumer wrappers with transaction handling
- **Schema Validation**: Event validation using trading-contracts schemas
- **Outbox Pattern**: Reliable message publishing with database persistence
## Installation
```bash
# Install in development mode
pip install -e ".[dev]"
# Install production dependencies only
pip install -e .
```
## Quick Start
### Database Setup
```python
from trading_common.db import DB
db = DB("postgresql://user:pass@localhost:5432/dbname")
await db.start()
# Use in transactions
async with db.pool.acquire() as con:
tx = con.transaction()
await tx.start()
try:
# Your business logic here
await db.outbox_put(con, "topic", "key", {"data": "value"})
await tx.commit()
except Exception:
await tx.rollback()
raise
await db.stop()
```
### Kafka Consumer
```python
from trading_common.kafka import ConsumerApp
from trading_common.schema import ensure
async def handler(con, topic, key, payload):
# Validate event schema
ensure("md.candle.closed@v1", payload)
# Process the event
# ... your business logic ...
# Optionally publish to outbox
await db.outbox_put(con, "strategy.signal@v1", key, {"signal": "buy"})
# Create and run consumer
consumer = ConsumerApp(
name="strategy-service",
db=db,
bootstrap="localhost:9092",
topics=["market-data"],
group_id="strategy-group",
handler=handler
)
await consumer.start()
await consumer.run() # Runs indefinitely
```
### Kafka Producer
```python
from trading_common.kafka import Producer
producer = Producer("localhost:9092")
await producer.start()
# Send messages
await producer.send("topic", "key", {"data": "value"})
await producer.stop()
```
### Outbox Processing
```python
from trading_common.outbox import OutboxProcessor
outbox = OutboxProcessor(db)
async with db.pool.acquire() as con:
# Get pending events
events = await outbox.get_pending_events(con, limit=100)
for event_id, topic, key, payload in events:
# Publish to Kafka
await producer.send(topic, key, payload)
# Mark as published
await outbox.mark_published(con, event_id)
# Clean up old events
await outbox.cleanup_old_events(con, days_old=7)
```
## Development
```bash
# Install development dependencies
pip install -e ".[dev]"
# Run tests
pytest
# Format code
black src tests
isort src tests
# Type checking
mypy src
# Run with coverage
pytest --cov=trading_common --cov-report=term-missing
```
## Architecture
- **Inbox Pattern**: Ensures idempotent message processing
- **Outbox Pattern**: Reliable message publishing with database persistence
- **Transaction Safety**: All operations wrapped in database transactions
- **Schema Validation**: Events validated against JSON schemas before processing
## Dependencies
- Python 3.10+
- asyncpg: Async PostgreSQL driver
- aiokafka: Async Kafka client
- ujson: Fast JSON serializer
- trading-contracts: Schema validation
- jsonschema: JSON schema validation
Raw data
{
"_id": null,
"home_page": null,
"name": "trading-common",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.10",
"maintainer_email": null,
"keywords": "trading, kafka, postgres, async, outbox, inbox",
"author": "Trading Platform Team",
"author_email": null,
"download_url": "https://files.pythonhosted.org/packages/55/13/fbf143e8d8799b74bbdeb7373e95b7ced6c2ed314a617dcf7d77eb5f1a82/trading_common-0.1.5.tar.gz",
"platform": null,
"description": "# Trading Common\n\nCommon utilities for trading platform including Kafka producer/consumer wrappers, Postgres inbox/outbox patterns, and event validation through trading-contracts.\n\n## Features\n\n- **Database**: AsyncPG wrapper with inbox/outbox tables and idempotency support\n- **Kafka**: AIOKafka producer/consumer wrappers with transaction handling\n- **Schema Validation**: Event validation using trading-contracts schemas\n- **Outbox Pattern**: Reliable message publishing with database persistence\n\n## Installation\n\n```bash\n# Install in development mode\npip install -e \".[dev]\"\n\n# Install production dependencies only\npip install -e .\n```\n\n## Quick Start\n\n### Database Setup\n\n```python\nfrom trading_common.db import DB\n\ndb = DB(\"postgresql://user:pass@localhost:5432/dbname\")\nawait db.start()\n\n# Use in transactions\nasync with db.pool.acquire() as con:\n tx = con.transaction()\n await tx.start()\n try:\n # Your business logic here\n await db.outbox_put(con, \"topic\", \"key\", {\"data\": \"value\"})\n await tx.commit()\n except Exception:\n await tx.rollback()\n raise\n\nawait db.stop()\n```\n\n### Kafka Consumer\n\n```python\nfrom trading_common.kafka import ConsumerApp\nfrom trading_common.schema import ensure\n\nasync def handler(con, topic, key, payload):\n # Validate event schema\n ensure(\"md.candle.closed@v1\", payload)\n\n # Process the event\n # ... your business logic ...\n\n # Optionally publish to outbox\n await db.outbox_put(con, \"strategy.signal@v1\", key, {\"signal\": \"buy\"})\n\n# Create and run consumer\nconsumer = ConsumerApp(\n name=\"strategy-service\",\n db=db,\n bootstrap=\"localhost:9092\",\n topics=[\"market-data\"],\n group_id=\"strategy-group\",\n handler=handler\n)\n\nawait consumer.start()\nawait consumer.run() # Runs indefinitely\n```\n\n### Kafka Producer\n\n```python\nfrom trading_common.kafka import Producer\n\nproducer = Producer(\"localhost:9092\")\nawait producer.start()\n\n# Send messages\nawait producer.send(\"topic\", \"key\", {\"data\": \"value\"})\n\nawait producer.stop()\n```\n\n### Outbox Processing\n\n```python\nfrom trading_common.outbox import OutboxProcessor\n\noutbox = OutboxProcessor(db)\n\nasync with db.pool.acquire() as con:\n # Get pending events\n events = await outbox.get_pending_events(con, limit=100)\n\n for event_id, topic, key, payload in events:\n # Publish to Kafka\n await producer.send(topic, key, payload)\n\n # Mark as published\n await outbox.mark_published(con, event_id)\n\n # Clean up old events\n await outbox.cleanup_old_events(con, days_old=7)\n```\n\n## Development\n\n```bash\n# Install development dependencies\npip install -e \".[dev]\"\n\n# Run tests\npytest\n\n# Format code\nblack src tests\nisort src tests\n\n# Type checking\nmypy src\n\n# Run with coverage\npytest --cov=trading_common --cov-report=term-missing\n```\n\n## Architecture\n\n- **Inbox Pattern**: Ensures idempotent message processing\n- **Outbox Pattern**: Reliable message publishing with database persistence\n- **Transaction Safety**: All operations wrapped in database transactions\n- **Schema Validation**: Events validated against JSON schemas before processing\n\n## Dependencies\n\n- Python 3.10+\n- asyncpg: Async PostgreSQL driver\n- aiokafka: Async Kafka client\n- ujson: Fast JSON serializer\n- trading-contracts: Schema validation\n- jsonschema: JSON schema validation\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "Common utils for trading platform (Kafka/Postgres/outbox/inbox)",
"version": "0.1.5",
"project_urls": null,
"split_keywords": [
"trading",
" kafka",
" postgres",
" async",
" outbox",
" inbox"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "aa7d4cb1e89c8ff6d9ab65adf4c92180a5491e3832c7fdd1ddbebdff77aa2f5f",
"md5": "f8474bed59eb77655dbad1aa6e20037a",
"sha256": "c3a11f80c9cced14ea4fe8f34ae94f64f81ebd050e0f7dbad8143b9c9f6fa744"
},
"downloads": -1,
"filename": "trading_common-0.1.5-py3-none-any.whl",
"has_sig": false,
"md5_digest": "f8474bed59eb77655dbad1aa6e20037a",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.10",
"size": 7325,
"upload_time": "2025-08-31T10:33:09",
"upload_time_iso_8601": "2025-08-31T10:33:09.112894Z",
"url": "https://files.pythonhosted.org/packages/aa/7d/4cb1e89c8ff6d9ab65adf4c92180a5491e3832c7fdd1ddbebdff77aa2f5f/trading_common-0.1.5-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "5513fbf143e8d8799b74bbdeb7373e95b7ced6c2ed314a617dcf7d77eb5f1a82",
"md5": "99395dd972fdfe6ec693a48b7923ab87",
"sha256": "e90e690e8390d1833cb788e38028063f2aa1c15d6ca1628f2e76527537f7b216"
},
"downloads": -1,
"filename": "trading_common-0.1.5.tar.gz",
"has_sig": false,
"md5_digest": "99395dd972fdfe6ec693a48b7923ab87",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.10",
"size": 17234,
"upload_time": "2025-08-31T10:33:09",
"upload_time_iso_8601": "2025-08-31T10:33:09.936917Z",
"url": "https://files.pythonhosted.org/packages/55/13/fbf143e8d8799b74bbdeb7373e95b7ced6c2ed314a617dcf7d77eb5f1a82/trading_common-0.1.5.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-08-31 10:33:09",
"github": false,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"lcname": "trading-common"
}