trading-common


Nametrading-common JSON
Version 0.1.5 PyPI version JSON
download
home_pageNone
SummaryCommon utils for trading platform (Kafka/Postgres/outbox/inbox)
upload_time2025-08-31 10:33:09
maintainerNone
docs_urlNone
authorTrading Platform Team
requires_python>=3.10
licenseMIT
keywords trading kafka postgres async outbox inbox
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Trading Common

Common utilities for trading platform including Kafka producer/consumer wrappers, Postgres inbox/outbox patterns, and event validation through trading-contracts.

## Features

- **Database**: AsyncPG wrapper with inbox/outbox tables and idempotency support
- **Kafka**: AIOKafka producer/consumer wrappers with transaction handling
- **Schema Validation**: Event validation using trading-contracts schemas
- **Outbox Pattern**: Reliable message publishing with database persistence

## Installation

```bash
# Install in development mode
pip install -e ".[dev]"

# Install production dependencies only
pip install -e .
```

## Quick Start

### Database Setup

```python
from trading_common.db import DB

db = DB("postgresql://user:pass@localhost:5432/dbname")
await db.start()

# Use in transactions
async with db.pool.acquire() as con:
    tx = con.transaction()
    await tx.start()
    try:
        # Your business logic here
        await db.outbox_put(con, "topic", "key", {"data": "value"})
        await tx.commit()
    except Exception:
        await tx.rollback()
        raise

await db.stop()
```

### Kafka Consumer

```python
from trading_common.kafka import ConsumerApp
from trading_common.schema import ensure

async def handler(con, topic, key, payload):
    # Validate event schema
    ensure("md.candle.closed@v1", payload)

    # Process the event
    # ... your business logic ...

    # Optionally publish to outbox
    await db.outbox_put(con, "strategy.signal@v1", key, {"signal": "buy"})

# Create and run consumer
consumer = ConsumerApp(
    name="strategy-service",
    db=db,
    bootstrap="localhost:9092",
    topics=["market-data"],
    group_id="strategy-group",
    handler=handler
)

await consumer.start()
await consumer.run()  # Runs indefinitely
```

### Kafka Producer

```python
from trading_common.kafka import Producer

producer = Producer("localhost:9092")
await producer.start()

# Send messages
await producer.send("topic", "key", {"data": "value"})

await producer.stop()
```

### Outbox Processing

```python
from trading_common.outbox import OutboxProcessor

outbox = OutboxProcessor(db)

async with db.pool.acquire() as con:
    # Get pending events
    events = await outbox.get_pending_events(con, limit=100)

    for event_id, topic, key, payload in events:
        # Publish to Kafka
        await producer.send(topic, key, payload)

        # Mark as published
        await outbox.mark_published(con, event_id)

    # Clean up old events
    await outbox.cleanup_old_events(con, days_old=7)
```

## Development

```bash
# Install development dependencies
pip install -e ".[dev]"

# Run tests
pytest

# Format code
black src tests
isort src tests

# Type checking
mypy src

# Run with coverage
pytest --cov=trading_common --cov-report=term-missing
```

## Architecture

- **Inbox Pattern**: Ensures idempotent message processing
- **Outbox Pattern**: Reliable message publishing with database persistence
- **Transaction Safety**: All operations wrapped in database transactions
- **Schema Validation**: Events validated against JSON schemas before processing

## Dependencies

- Python 3.10+
- asyncpg: Async PostgreSQL driver
- aiokafka: Async Kafka client
- ujson: Fast JSON serializer
- trading-contracts: Schema validation
- jsonschema: JSON schema validation

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "trading-common",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.10",
    "maintainer_email": null,
    "keywords": "trading, kafka, postgres, async, outbox, inbox",
    "author": "Trading Platform Team",
    "author_email": null,
    "download_url": "https://files.pythonhosted.org/packages/55/13/fbf143e8d8799b74bbdeb7373e95b7ced6c2ed314a617dcf7d77eb5f1a82/trading_common-0.1.5.tar.gz",
    "platform": null,
    "description": "# Trading Common\n\nCommon utilities for trading platform including Kafka producer/consumer wrappers, Postgres inbox/outbox patterns, and event validation through trading-contracts.\n\n## Features\n\n- **Database**: AsyncPG wrapper with inbox/outbox tables and idempotency support\n- **Kafka**: AIOKafka producer/consumer wrappers with transaction handling\n- **Schema Validation**: Event validation using trading-contracts schemas\n- **Outbox Pattern**: Reliable message publishing with database persistence\n\n## Installation\n\n```bash\n# Install in development mode\npip install -e \".[dev]\"\n\n# Install production dependencies only\npip install -e .\n```\n\n## Quick Start\n\n### Database Setup\n\n```python\nfrom trading_common.db import DB\n\ndb = DB(\"postgresql://user:pass@localhost:5432/dbname\")\nawait db.start()\n\n# Use in transactions\nasync with db.pool.acquire() as con:\n    tx = con.transaction()\n    await tx.start()\n    try:\n        # Your business logic here\n        await db.outbox_put(con, \"topic\", \"key\", {\"data\": \"value\"})\n        await tx.commit()\n    except Exception:\n        await tx.rollback()\n        raise\n\nawait db.stop()\n```\n\n### Kafka Consumer\n\n```python\nfrom trading_common.kafka import ConsumerApp\nfrom trading_common.schema import ensure\n\nasync def handler(con, topic, key, payload):\n    # Validate event schema\n    ensure(\"md.candle.closed@v1\", payload)\n\n    # Process the event\n    # ... your business logic ...\n\n    # Optionally publish to outbox\n    await db.outbox_put(con, \"strategy.signal@v1\", key, {\"signal\": \"buy\"})\n\n# Create and run consumer\nconsumer = ConsumerApp(\n    name=\"strategy-service\",\n    db=db,\n    bootstrap=\"localhost:9092\",\n    topics=[\"market-data\"],\n    group_id=\"strategy-group\",\n    handler=handler\n)\n\nawait consumer.start()\nawait consumer.run()  # Runs indefinitely\n```\n\n### Kafka Producer\n\n```python\nfrom trading_common.kafka import Producer\n\nproducer = Producer(\"localhost:9092\")\nawait producer.start()\n\n# Send messages\nawait producer.send(\"topic\", \"key\", {\"data\": \"value\"})\n\nawait producer.stop()\n```\n\n### Outbox Processing\n\n```python\nfrom trading_common.outbox import OutboxProcessor\n\noutbox = OutboxProcessor(db)\n\nasync with db.pool.acquire() as con:\n    # Get pending events\n    events = await outbox.get_pending_events(con, limit=100)\n\n    for event_id, topic, key, payload in events:\n        # Publish to Kafka\n        await producer.send(topic, key, payload)\n\n        # Mark as published\n        await outbox.mark_published(con, event_id)\n\n    # Clean up old events\n    await outbox.cleanup_old_events(con, days_old=7)\n```\n\n## Development\n\n```bash\n# Install development dependencies\npip install -e \".[dev]\"\n\n# Run tests\npytest\n\n# Format code\nblack src tests\nisort src tests\n\n# Type checking\nmypy src\n\n# Run with coverage\npytest --cov=trading_common --cov-report=term-missing\n```\n\n## Architecture\n\n- **Inbox Pattern**: Ensures idempotent message processing\n- **Outbox Pattern**: Reliable message publishing with database persistence\n- **Transaction Safety**: All operations wrapped in database transactions\n- **Schema Validation**: Events validated against JSON schemas before processing\n\n## Dependencies\n\n- Python 3.10+\n- asyncpg: Async PostgreSQL driver\n- aiokafka: Async Kafka client\n- ujson: Fast JSON serializer\n- trading-contracts: Schema validation\n- jsonschema: JSON schema validation\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "Common utils for trading platform (Kafka/Postgres/outbox/inbox)",
    "version": "0.1.5",
    "project_urls": null,
    "split_keywords": [
        "trading",
        " kafka",
        " postgres",
        " async",
        " outbox",
        " inbox"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "aa7d4cb1e89c8ff6d9ab65adf4c92180a5491e3832c7fdd1ddbebdff77aa2f5f",
                "md5": "f8474bed59eb77655dbad1aa6e20037a",
                "sha256": "c3a11f80c9cced14ea4fe8f34ae94f64f81ebd050e0f7dbad8143b9c9f6fa744"
            },
            "downloads": -1,
            "filename": "trading_common-0.1.5-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "f8474bed59eb77655dbad1aa6e20037a",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.10",
            "size": 7325,
            "upload_time": "2025-08-31T10:33:09",
            "upload_time_iso_8601": "2025-08-31T10:33:09.112894Z",
            "url": "https://files.pythonhosted.org/packages/aa/7d/4cb1e89c8ff6d9ab65adf4c92180a5491e3832c7fdd1ddbebdff77aa2f5f/trading_common-0.1.5-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "5513fbf143e8d8799b74bbdeb7373e95b7ced6c2ed314a617dcf7d77eb5f1a82",
                "md5": "99395dd972fdfe6ec693a48b7923ab87",
                "sha256": "e90e690e8390d1833cb788e38028063f2aa1c15d6ca1628f2e76527537f7b216"
            },
            "downloads": -1,
            "filename": "trading_common-0.1.5.tar.gz",
            "has_sig": false,
            "md5_digest": "99395dd972fdfe6ec693a48b7923ab87",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.10",
            "size": 17234,
            "upload_time": "2025-08-31T10:33:09",
            "upload_time_iso_8601": "2025-08-31T10:33:09.936917Z",
            "url": "https://files.pythonhosted.org/packages/55/13/fbf143e8d8799b74bbdeb7373e95b7ced6c2ed314a617dcf7d77eb5f1a82/trading_common-0.1.5.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-08-31 10:33:09",
    "github": false,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "lcname": "trading-common"
}
        
Elapsed time: 1.33292s