# Trading Common
Async utilities shared across the trading platform for working with PostgreSQL inbox/outbox tables, Kafka producers/consumers, and schema validation via `trading-contracts`.
## Components
- `trading_common.db.DB` — connection pool with inbox/outbox bootstrap, idempotent helpers, and convenience wrappers for writing to the outbox.
- `trading_common.kafka.Producer` — resilient `aiokafka` producer prepared for SASL/SSL clusters and controlled retry behaviour.
- `trading_common.consumer_app.ConsumerApp` — batching Kafka consumer that wraps message handling in PostgreSQL transactions with inbox-based deduplication.
- `trading_common.outbox.OutboxProcessor` — helpers for reading, acknowledging, and vacuuming the outbox table.
- `trading_common.schema.ensure` — thin wrapper around `trading-contracts` JSON schema validation.
- `trading_common.settings_kafka` — baseline Kafka client settings hydrated from environment variables.
## Installation
```bash
# Inside a virtualenv; install runtime dependencies only
pip install -e .
# Install with development tooling (pytest, black, mypy, ...)
pip install -e ".[dev]"
```
## Configuration
### PostgreSQL
Provide a DSN string understood by `asyncpg`. The `DB.start()` method creates (or verifies) the `core.inbox` and `core.outbox` tables so that services can boot against an empty database.
```python
from trading_common.db import DB
db = DB("postgresql://postgres:postgres@127.0.0.1:55432/trading")
await db.start()
```
### Kafka
Baseline Kafka options come from `trading_common.settings_kafka`. They honour the following environment variables (defaults in parentheses):
| Variable | Purpose |
| --- | --- |
| `KAFKA_BOOTSTRAP_SERVERS` (`localhost:9092`) | Broker connection string |
| `KAFKA_SECURITY_PROTOCOL` (`SASL_SSL`) | `PLAINTEXT`, `SASL_SSL`, ... |
| `KAFKA_SASL_MECHANISM` (`PLAIN`) | Auth mechanism |
| `KAFKA_SASL_USERNAME` / `KAFKA_SASL_PASSWORD` (empty) | SASL credentials |
| `KAFKA_CLIENT_ID` (`market-data-service`) | Default client id |
| `KAFKA_ENABLE_IDEMPOTENCE` (`true`) | Producer idempotence flag |
| `KAFKA_LINGER_MS`, `KAFKA_BATCH_SIZE`, `KAFKA_COMPRESSION_TYPE`, ... | Producer tuning knobs |
| `KAFKA_ENABLE_AUTO_COMMIT` (`false`) | Consumer offset mode |
| `KAFKA_AUTO_OFFSET_RESET`, `KAFKA_MAX_POLL_INTERVAL_MS`, ... | Consumer runtime tuning |
Use `dict(...)` to clone and tweak the defaults before passing them into the Kafka helpers so that you do not mutate the shared module-level dictionaries:
```python
from trading_common import settings_kafka
producer = Producer(
base_cfg={**settings_kafka.KAFKA_COMMON, "client_id": "strategy-service"},
tuning=settings_kafka.PRODUCER_TUNING,
)
```
## Usage Examples
### Producer
```python
import asyncio
from trading_common import settings_kafka
from trading_common.kafka import Producer
async def publish() -> None:
producer = Producer(
base_cfg={**settings_kafka.KAFKA_COMMON, "client_id": "md-publisher"},
tuning=settings_kafka.PRODUCER_TUNING,
)
await producer.start()
try:
await producer.send(
topic="md.candles",
key="BTCUSDT",
payload={"event_id": "...", "open": 52000, "close": 52120},
)
finally:
await producer.stop()
asyncio.run(publish())
```
### Consumer with Inbox/Outbox
```python
import asyncio
from trading_common import settings_kafka
from trading_common.consumer_app import ConsumerApp
from trading_common.db import DB
from trading_common.schema import ensure
async def handle(con, topic, key, payload):
ensure("md.candle.closed@v1", payload)
# Business logic ...
async def main() -> None:
db = DB("postgresql://postgres:postgres@127.0.0.1:55432/trading")
consumer = ConsumerApp(
name="market-data-service",
db=db,
base_cfg={**settings_kafka.KAFKA_COMMON, "client_id": "market-data-service"},
tuning=settings_kafka.CONSUMER_TUNING,
topics=["md.candles"],
group_id="md-service",
handler=handle,
)
await consumer.start()
try:
await consumer.run()
finally:
await consumer.stop()
asyncio.run(main())
```
### Outbox Processing
```python
from trading_common.outbox import OutboxProcessor
processor = OutboxProcessor(db)
async with db.pool.acquire() as con: # pool is available after db.start()
events = await processor.get_pending_events(con, limit=100)
for event_id, topic, key, payload in events:
await producer.send(topic, key, payload)
await processor.mark_published(con, event_id)
await processor.cleanup_old_events(con, days_old=7)
```
## Development Workflow
```bash
pytest -m "not slow" # Run fast tests
black src tests # Format
isort src tests # Import sorting
mypy src # Static typing
pytest --cov=trading_common --cov-report=term-missing
```
## Testing
- `pytest` — run the whole suite (asyncio tests use strict mode).
- `pytest -m "not slow"` — focus on the fast checks used in CI.
- `pytest --cov=trading_common --cov-report=term-missing` — optional coverage report.
These commands are mirrored in `.github/workflows/test.yml`, so keeping them green locally guarantees the CI job passes.
## Related Projects
- [`trading-contracts`](../contracts) — JSON schemas for every event type; install alongside this package to validate messages with `schema.ensure`.
- `infra/` in the monorepo provides Kafka/PostgreSQL containers for local development (`docker compose -f infra/docker-compose.yml up -d`).
## License
MIT License. See `LICENSE` for details.
Raw data
{
"_id": null,
"home_page": null,
"name": "trading-common",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.10",
"maintainer_email": null,
"keywords": "trading, kafka, postgres, async, outbox, inbox",
"author": "Trading Platform Team",
"author_email": null,
"download_url": "https://files.pythonhosted.org/packages/10/ac/875e9eb82167d7a886ccdc31b9da355238bd21264f44a4aec890324935db/trading_common-0.2.3.tar.gz",
"platform": null,
"description": "# Trading Common\n\nAsync utilities shared across the trading platform for working with PostgreSQL inbox/outbox tables, Kafka producers/consumers, and schema validation via `trading-contracts`.\n\n## Components\n- `trading_common.db.DB` \u2014 connection pool with inbox/outbox bootstrap, idempotent helpers, and convenience wrappers for writing to the outbox.\n- `trading_common.kafka.Producer` \u2014 resilient `aiokafka` producer prepared for SASL/SSL clusters and controlled retry behaviour.\n- `trading_common.consumer_app.ConsumerApp` \u2014 batching Kafka consumer that wraps message handling in PostgreSQL transactions with inbox-based deduplication.\n- `trading_common.outbox.OutboxProcessor` \u2014 helpers for reading, acknowledging, and vacuuming the outbox table.\n- `trading_common.schema.ensure` \u2014 thin wrapper around `trading-contracts` JSON schema validation.\n- `trading_common.settings_kafka` \u2014 baseline Kafka client settings hydrated from environment variables.\n\n## Installation\n```bash\n# Inside a virtualenv; install runtime dependencies only\npip install -e .\n\n# Install with development tooling (pytest, black, mypy, ...)\npip install -e \".[dev]\"\n```\n\n## Configuration\n### PostgreSQL\nProvide a DSN string understood by `asyncpg`. The `DB.start()` method creates (or verifies) the `core.inbox` and `core.outbox` tables so that services can boot against an empty database.\n\n```python\nfrom trading_common.db import DB\n\ndb = DB(\"postgresql://postgres:postgres@127.0.0.1:55432/trading\")\nawait db.start()\n```\n\n### Kafka\nBaseline Kafka options come from `trading_common.settings_kafka`. They honour the following environment variables (defaults in parentheses):\n\n| Variable | Purpose |\n| --- | --- |\n| `KAFKA_BOOTSTRAP_SERVERS` (`localhost:9092`) | Broker connection string |\n| `KAFKA_SECURITY_PROTOCOL` (`SASL_SSL`) | `PLAINTEXT`, `SASL_SSL`, ... |\n| `KAFKA_SASL_MECHANISM` (`PLAIN`) | Auth mechanism |\n| `KAFKA_SASL_USERNAME` / `KAFKA_SASL_PASSWORD` (empty) | SASL credentials |\n| `KAFKA_CLIENT_ID` (`market-data-service`) | Default client id |\n| `KAFKA_ENABLE_IDEMPOTENCE` (`true`) | Producer idempotence flag |\n| `KAFKA_LINGER_MS`, `KAFKA_BATCH_SIZE`, `KAFKA_COMPRESSION_TYPE`, ... | Producer tuning knobs |\n| `KAFKA_ENABLE_AUTO_COMMIT` (`false`) | Consumer offset mode |\n| `KAFKA_AUTO_OFFSET_RESET`, `KAFKA_MAX_POLL_INTERVAL_MS`, ... | Consumer runtime tuning |\n\nUse `dict(...)` to clone and tweak the defaults before passing them into the Kafka helpers so that you do not mutate the shared module-level dictionaries:\n\n```python\nfrom trading_common import settings_kafka\n\nproducer = Producer(\n base_cfg={**settings_kafka.KAFKA_COMMON, \"client_id\": \"strategy-service\"},\n tuning=settings_kafka.PRODUCER_TUNING,\n)\n```\n\n## Usage Examples\n### Producer\n```python\nimport asyncio\nfrom trading_common import settings_kafka\nfrom trading_common.kafka import Producer\n\nasync def publish() -> None:\n producer = Producer(\n base_cfg={**settings_kafka.KAFKA_COMMON, \"client_id\": \"md-publisher\"},\n tuning=settings_kafka.PRODUCER_TUNING,\n )\n await producer.start()\n try:\n await producer.send(\n topic=\"md.candles\",\n key=\"BTCUSDT\",\n payload={\"event_id\": \"...\", \"open\": 52000, \"close\": 52120},\n )\n finally:\n await producer.stop()\n\nasyncio.run(publish())\n```\n\n### Consumer with Inbox/Outbox\n```python\nimport asyncio\nfrom trading_common import settings_kafka\nfrom trading_common.consumer_app import ConsumerApp\nfrom trading_common.db import DB\nfrom trading_common.schema import ensure\n\nasync def handle(con, topic, key, payload):\n ensure(\"md.candle.closed@v1\", payload)\n # Business logic ...\n\nasync def main() -> None:\n db = DB(\"postgresql://postgres:postgres@127.0.0.1:55432/trading\")\n consumer = ConsumerApp(\n name=\"market-data-service\",\n db=db,\n base_cfg={**settings_kafka.KAFKA_COMMON, \"client_id\": \"market-data-service\"},\n tuning=settings_kafka.CONSUMER_TUNING,\n topics=[\"md.candles\"],\n group_id=\"md-service\",\n handler=handle,\n )\n await consumer.start()\n try:\n await consumer.run()\n finally:\n await consumer.stop()\n\nasyncio.run(main())\n```\n\n### Outbox Processing\n```python\nfrom trading_common.outbox import OutboxProcessor\n\nprocessor = OutboxProcessor(db)\nasync with db.pool.acquire() as con: # pool is available after db.start()\n events = await processor.get_pending_events(con, limit=100)\n for event_id, topic, key, payload in events:\n await producer.send(topic, key, payload)\n await processor.mark_published(con, event_id)\n await processor.cleanup_old_events(con, days_old=7)\n```\n\n## Development Workflow\n```bash\npytest -m \"not slow\" # Run fast tests\nblack src tests # Format\nisort src tests # Import sorting\nmypy src # Static typing\npytest --cov=trading_common --cov-report=term-missing\n```\n\n## Testing\n- `pytest` \u2014 run the whole suite (asyncio tests use strict mode).\n- `pytest -m \"not slow\"` \u2014 focus on the fast checks used in CI.\n- `pytest --cov=trading_common --cov-report=term-missing` \u2014 optional coverage report.\n\nThese commands are mirrored in `.github/workflows/test.yml`, so keeping them green locally guarantees the CI job passes.\n\n## Related Projects\n- [`trading-contracts`](../contracts) \u2014 JSON schemas for every event type; install alongside this package to validate messages with `schema.ensure`.\n- `infra/` in the monorepo provides Kafka/PostgreSQL containers for local development (`docker compose -f infra/docker-compose.yml up -d`).\n\n## License\nMIT License. See `LICENSE` for details.\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "Common utils for trading platform (Kafka/Postgres/outbox/inbox)",
"version": "0.2.3",
"project_urls": null,
"split_keywords": [
"trading",
" kafka",
" postgres",
" async",
" outbox",
" inbox"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "b2c491688ede2901c7734a728ba42c923458a042c313cd0886a900f02f4a1154",
"md5": "744db8a327c6c669cc43c3babf77243c",
"sha256": "935ae604003e968e8b8f1c96e968d1e5d35b8d98dcb41e57f8d968dfe14169bf"
},
"downloads": -1,
"filename": "trading_common-0.2.3-py3-none-any.whl",
"has_sig": false,
"md5_digest": "744db8a327c6c669cc43c3babf77243c",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.10",
"size": 11935,
"upload_time": "2025-10-14T08:39:42",
"upload_time_iso_8601": "2025-10-14T08:39:42.938994Z",
"url": "https://files.pythonhosted.org/packages/b2/c4/91688ede2901c7734a728ba42c923458a042c313cd0886a900f02f4a1154/trading_common-0.2.3-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "10ac875e9eb82167d7a886ccdc31b9da355238bd21264f44a4aec890324935db",
"md5": "8f054e741127487b1b9f6aa2f406f668",
"sha256": "8385355c4c7ef03788e30449cd7ea9ac778206d64a6078ca180f6925304f95d2"
},
"downloads": -1,
"filename": "trading_common-0.2.3.tar.gz",
"has_sig": false,
"md5_digest": "8f054e741127487b1b9f6aa2f406f668",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.10",
"size": 23798,
"upload_time": "2025-10-14T08:39:44",
"upload_time_iso_8601": "2025-10-14T08:39:44.042072Z",
"url": "https://files.pythonhosted.org/packages/10/ac/875e9eb82167d7a886ccdc31b9da355238bd21264f44a4aec890324935db/trading_common-0.2.3.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-10-14 08:39:44",
"github": false,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"lcname": "trading-common"
}