trading-common


Nametrading-common JSON
Version 0.2.3 PyPI version JSON
download
home_pageNone
SummaryCommon utils for trading platform (Kafka/Postgres/outbox/inbox)
upload_time2025-10-14 08:39:44
maintainerNone
docs_urlNone
authorTrading Platform Team
requires_python>=3.10
licenseMIT
keywords trading kafka postgres async outbox inbox
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Trading Common

Async utilities shared across the trading platform for working with PostgreSQL inbox/outbox tables, Kafka producers/consumers, and schema validation via `trading-contracts`.

## Components
- `trading_common.db.DB` — connection pool with inbox/outbox bootstrap, idempotent helpers, and convenience wrappers for writing to the outbox.
- `trading_common.kafka.Producer` — resilient `aiokafka` producer prepared for SASL/SSL clusters and controlled retry behaviour.
- `trading_common.consumer_app.ConsumerApp` — batching Kafka consumer that wraps message handling in PostgreSQL transactions with inbox-based deduplication.
- `trading_common.outbox.OutboxProcessor` — helpers for reading, acknowledging, and vacuuming the outbox table.
- `trading_common.schema.ensure` — thin wrapper around `trading-contracts` JSON schema validation.
- `trading_common.settings_kafka` — baseline Kafka client settings hydrated from environment variables.

## Installation
```bash
# Inside a virtualenv; install runtime dependencies only
pip install -e .

# Install with development tooling (pytest, black, mypy, ...)
pip install -e ".[dev]"
```

## Configuration
### PostgreSQL
Provide a DSN string understood by `asyncpg`. The `DB.start()` method creates (or verifies) the `core.inbox` and `core.outbox` tables so that services can boot against an empty database.

```python
from trading_common.db import DB

db = DB("postgresql://postgres:postgres@127.0.0.1:55432/trading")
await db.start()
```

### Kafka
Baseline Kafka options come from `trading_common.settings_kafka`. They honour the following environment variables (defaults in parentheses):

| Variable | Purpose |
| --- | --- |
| `KAFKA_BOOTSTRAP_SERVERS` (`localhost:9092`) | Broker connection string |
| `KAFKA_SECURITY_PROTOCOL` (`SASL_SSL`) | `PLAINTEXT`, `SASL_SSL`, ... |
| `KAFKA_SASL_MECHANISM` (`PLAIN`) | Auth mechanism |
| `KAFKA_SASL_USERNAME` / `KAFKA_SASL_PASSWORD` (empty) | SASL credentials |
| `KAFKA_CLIENT_ID` (`market-data-service`) | Default client id |
| `KAFKA_ENABLE_IDEMPOTENCE` (`true`) | Producer idempotence flag |
| `KAFKA_LINGER_MS`, `KAFKA_BATCH_SIZE`, `KAFKA_COMPRESSION_TYPE`, ... | Producer tuning knobs |
| `KAFKA_ENABLE_AUTO_COMMIT` (`false`) | Consumer offset mode |
| `KAFKA_AUTO_OFFSET_RESET`, `KAFKA_MAX_POLL_INTERVAL_MS`, ... | Consumer runtime tuning |

Use `dict(...)` to clone and tweak the defaults before passing them into the Kafka helpers so that you do not mutate the shared module-level dictionaries:

```python
from trading_common import settings_kafka

producer = Producer(
    base_cfg={**settings_kafka.KAFKA_COMMON, "client_id": "strategy-service"},
    tuning=settings_kafka.PRODUCER_TUNING,
)
```

## Usage Examples
### Producer
```python
import asyncio
from trading_common import settings_kafka
from trading_common.kafka import Producer

async def publish() -> None:
    producer = Producer(
        base_cfg={**settings_kafka.KAFKA_COMMON, "client_id": "md-publisher"},
        tuning=settings_kafka.PRODUCER_TUNING,
    )
    await producer.start()
    try:
        await producer.send(
            topic="md.candles",
            key="BTCUSDT",
            payload={"event_id": "...", "open": 52000, "close": 52120},
        )
    finally:
        await producer.stop()

asyncio.run(publish())
```

### Consumer with Inbox/Outbox
```python
import asyncio
from trading_common import settings_kafka
from trading_common.consumer_app import ConsumerApp
from trading_common.db import DB
from trading_common.schema import ensure

async def handle(con, topic, key, payload):
    ensure("md.candle.closed@v1", payload)
    # Business logic ...

async def main() -> None:
    db = DB("postgresql://postgres:postgres@127.0.0.1:55432/trading")
    consumer = ConsumerApp(
        name="market-data-service",
        db=db,
        base_cfg={**settings_kafka.KAFKA_COMMON, "client_id": "market-data-service"},
        tuning=settings_kafka.CONSUMER_TUNING,
        topics=["md.candles"],
        group_id="md-service",
        handler=handle,
    )
    await consumer.start()
    try:
        await consumer.run()
    finally:
        await consumer.stop()

asyncio.run(main())
```

### Outbox Processing
```python
from trading_common.outbox import OutboxProcessor

processor = OutboxProcessor(db)
async with db.pool.acquire() as con:  # pool is available after db.start()
    events = await processor.get_pending_events(con, limit=100)
    for event_id, topic, key, payload in events:
        await producer.send(topic, key, payload)
        await processor.mark_published(con, event_id)
    await processor.cleanup_old_events(con, days_old=7)
```

## Development Workflow
```bash
pytest -m "not slow"       # Run fast tests
black src tests             # Format
isort src tests             # Import sorting
mypy src                    # Static typing
pytest --cov=trading_common --cov-report=term-missing
```

## Testing
- `pytest` — run the whole suite (asyncio tests use strict mode).
- `pytest -m "not slow"` — focus on the fast checks used in CI.
- `pytest --cov=trading_common --cov-report=term-missing` — optional coverage report.

These commands are mirrored in `.github/workflows/test.yml`, so keeping them green locally guarantees the CI job passes.

## Related Projects
- [`trading-contracts`](../contracts) — JSON schemas for every event type; install alongside this package to validate messages with `schema.ensure`.
- `infra/` in the monorepo provides Kafka/PostgreSQL containers for local development (`docker compose -f infra/docker-compose.yml up -d`).

## License
MIT License. See `LICENSE` for details.

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "trading-common",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.10",
    "maintainer_email": null,
    "keywords": "trading, kafka, postgres, async, outbox, inbox",
    "author": "Trading Platform Team",
    "author_email": null,
    "download_url": "https://files.pythonhosted.org/packages/10/ac/875e9eb82167d7a886ccdc31b9da355238bd21264f44a4aec890324935db/trading_common-0.2.3.tar.gz",
    "platform": null,
    "description": "# Trading Common\n\nAsync utilities shared across the trading platform for working with PostgreSQL inbox/outbox tables, Kafka producers/consumers, and schema validation via `trading-contracts`.\n\n## Components\n- `trading_common.db.DB` \u2014 connection pool with inbox/outbox bootstrap, idempotent helpers, and convenience wrappers for writing to the outbox.\n- `trading_common.kafka.Producer` \u2014 resilient `aiokafka` producer prepared for SASL/SSL clusters and controlled retry behaviour.\n- `trading_common.consumer_app.ConsumerApp` \u2014 batching Kafka consumer that wraps message handling in PostgreSQL transactions with inbox-based deduplication.\n- `trading_common.outbox.OutboxProcessor` \u2014 helpers for reading, acknowledging, and vacuuming the outbox table.\n- `trading_common.schema.ensure` \u2014 thin wrapper around `trading-contracts` JSON schema validation.\n- `trading_common.settings_kafka` \u2014 baseline Kafka client settings hydrated from environment variables.\n\n## Installation\n```bash\n# Inside a virtualenv; install runtime dependencies only\npip install -e .\n\n# Install with development tooling (pytest, black, mypy, ...)\npip install -e \".[dev]\"\n```\n\n## Configuration\n### PostgreSQL\nProvide a DSN string understood by `asyncpg`. The `DB.start()` method creates (or verifies) the `core.inbox` and `core.outbox` tables so that services can boot against an empty database.\n\n```python\nfrom trading_common.db import DB\n\ndb = DB(\"postgresql://postgres:postgres@127.0.0.1:55432/trading\")\nawait db.start()\n```\n\n### Kafka\nBaseline Kafka options come from `trading_common.settings_kafka`. They honour the following environment variables (defaults in parentheses):\n\n| Variable | Purpose |\n| --- | --- |\n| `KAFKA_BOOTSTRAP_SERVERS` (`localhost:9092`) | Broker connection string |\n| `KAFKA_SECURITY_PROTOCOL` (`SASL_SSL`) | `PLAINTEXT`, `SASL_SSL`, ... |\n| `KAFKA_SASL_MECHANISM` (`PLAIN`) | Auth mechanism |\n| `KAFKA_SASL_USERNAME` / `KAFKA_SASL_PASSWORD` (empty) | SASL credentials |\n| `KAFKA_CLIENT_ID` (`market-data-service`) | Default client id |\n| `KAFKA_ENABLE_IDEMPOTENCE` (`true`) | Producer idempotence flag |\n| `KAFKA_LINGER_MS`, `KAFKA_BATCH_SIZE`, `KAFKA_COMPRESSION_TYPE`, ... | Producer tuning knobs |\n| `KAFKA_ENABLE_AUTO_COMMIT` (`false`) | Consumer offset mode |\n| `KAFKA_AUTO_OFFSET_RESET`, `KAFKA_MAX_POLL_INTERVAL_MS`, ... | Consumer runtime tuning |\n\nUse `dict(...)` to clone and tweak the defaults before passing them into the Kafka helpers so that you do not mutate the shared module-level dictionaries:\n\n```python\nfrom trading_common import settings_kafka\n\nproducer = Producer(\n    base_cfg={**settings_kafka.KAFKA_COMMON, \"client_id\": \"strategy-service\"},\n    tuning=settings_kafka.PRODUCER_TUNING,\n)\n```\n\n## Usage Examples\n### Producer\n```python\nimport asyncio\nfrom trading_common import settings_kafka\nfrom trading_common.kafka import Producer\n\nasync def publish() -> None:\n    producer = Producer(\n        base_cfg={**settings_kafka.KAFKA_COMMON, \"client_id\": \"md-publisher\"},\n        tuning=settings_kafka.PRODUCER_TUNING,\n    )\n    await producer.start()\n    try:\n        await producer.send(\n            topic=\"md.candles\",\n            key=\"BTCUSDT\",\n            payload={\"event_id\": \"...\", \"open\": 52000, \"close\": 52120},\n        )\n    finally:\n        await producer.stop()\n\nasyncio.run(publish())\n```\n\n### Consumer with Inbox/Outbox\n```python\nimport asyncio\nfrom trading_common import settings_kafka\nfrom trading_common.consumer_app import ConsumerApp\nfrom trading_common.db import DB\nfrom trading_common.schema import ensure\n\nasync def handle(con, topic, key, payload):\n    ensure(\"md.candle.closed@v1\", payload)\n    # Business logic ...\n\nasync def main() -> None:\n    db = DB(\"postgresql://postgres:postgres@127.0.0.1:55432/trading\")\n    consumer = ConsumerApp(\n        name=\"market-data-service\",\n        db=db,\n        base_cfg={**settings_kafka.KAFKA_COMMON, \"client_id\": \"market-data-service\"},\n        tuning=settings_kafka.CONSUMER_TUNING,\n        topics=[\"md.candles\"],\n        group_id=\"md-service\",\n        handler=handle,\n    )\n    await consumer.start()\n    try:\n        await consumer.run()\n    finally:\n        await consumer.stop()\n\nasyncio.run(main())\n```\n\n### Outbox Processing\n```python\nfrom trading_common.outbox import OutboxProcessor\n\nprocessor = OutboxProcessor(db)\nasync with db.pool.acquire() as con:  # pool is available after db.start()\n    events = await processor.get_pending_events(con, limit=100)\n    for event_id, topic, key, payload in events:\n        await producer.send(topic, key, payload)\n        await processor.mark_published(con, event_id)\n    await processor.cleanup_old_events(con, days_old=7)\n```\n\n## Development Workflow\n```bash\npytest -m \"not slow\"       # Run fast tests\nblack src tests             # Format\nisort src tests             # Import sorting\nmypy src                    # Static typing\npytest --cov=trading_common --cov-report=term-missing\n```\n\n## Testing\n- `pytest` \u2014 run the whole suite (asyncio tests use strict mode).\n- `pytest -m \"not slow\"` \u2014 focus on the fast checks used in CI.\n- `pytest --cov=trading_common --cov-report=term-missing` \u2014 optional coverage report.\n\nThese commands are mirrored in `.github/workflows/test.yml`, so keeping them green locally guarantees the CI job passes.\n\n## Related Projects\n- [`trading-contracts`](../contracts) \u2014 JSON schemas for every event type; install alongside this package to validate messages with `schema.ensure`.\n- `infra/` in the monorepo provides Kafka/PostgreSQL containers for local development (`docker compose -f infra/docker-compose.yml up -d`).\n\n## License\nMIT License. See `LICENSE` for details.\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "Common utils for trading platform (Kafka/Postgres/outbox/inbox)",
    "version": "0.2.3",
    "project_urls": null,
    "split_keywords": [
        "trading",
        " kafka",
        " postgres",
        " async",
        " outbox",
        " inbox"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "b2c491688ede2901c7734a728ba42c923458a042c313cd0886a900f02f4a1154",
                "md5": "744db8a327c6c669cc43c3babf77243c",
                "sha256": "935ae604003e968e8b8f1c96e968d1e5d35b8d98dcb41e57f8d968dfe14169bf"
            },
            "downloads": -1,
            "filename": "trading_common-0.2.3-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "744db8a327c6c669cc43c3babf77243c",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.10",
            "size": 11935,
            "upload_time": "2025-10-14T08:39:42",
            "upload_time_iso_8601": "2025-10-14T08:39:42.938994Z",
            "url": "https://files.pythonhosted.org/packages/b2/c4/91688ede2901c7734a728ba42c923458a042c313cd0886a900f02f4a1154/trading_common-0.2.3-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "10ac875e9eb82167d7a886ccdc31b9da355238bd21264f44a4aec890324935db",
                "md5": "8f054e741127487b1b9f6aa2f406f668",
                "sha256": "8385355c4c7ef03788e30449cd7ea9ac778206d64a6078ca180f6925304f95d2"
            },
            "downloads": -1,
            "filename": "trading_common-0.2.3.tar.gz",
            "has_sig": false,
            "md5_digest": "8f054e741127487b1b9f6aa2f406f668",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.10",
            "size": 23798,
            "upload_time": "2025-10-14T08:39:44",
            "upload_time_iso_8601": "2025-10-14T08:39:44.042072Z",
            "url": "https://files.pythonhosted.org/packages/10/ac/875e9eb82167d7a886ccdc31b9da355238bd21264f44a4aec890324935db/trading_common-0.2.3.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-10-14 08:39:44",
    "github": false,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "lcname": "trading-common"
}
        
Elapsed time: 3.26038s