streamforge


Namestreamforge JSON
Version 0.1.0 PyPI version JSON
download
home_pagehttps://github.com/paulobueno90/streamforge
SummaryReal-time cryptocurrency and financial data ingestion system
upload_time2025-10-15 18:31:27
maintainerNone
docs_urlNone
authorPaulo Bueno
requires_python>=3.8
licenseMIT
keywords cryptocurrency crypto stocks options trading data ingestion websocket binance kraken okx streamforge real-time market-data
VCS
bugtrack_url
requirements aiohttp websockets sqlalchemy pandas pydantic orjson aiokafka asyncpg aiolimiter python-dateutil numpy requests ciso8601
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # StreamForge

[![PyPI version](https://badge.fury.io/py/streamforge.svg)](https://badge.fury.io/py/streamforge)
[![Python Support](https://img.shields.io/pypi/pyversions/streamforge.svg)](https://pypi.org/project/streamforge/)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)

**Real-time cryptocurrency and financial data ingestion made simple.**

StreamForge is a unified, async-first framework for ingesting real-time market data from cryptocurrency exchanges. Built with Python's asyncio, it offers high-performance data streaming, normalization, and multiple output formats.

---

## Features

- **Real-time WebSocket Streaming** - Live market data from multiple exchanges
- **Multi-Exchange Support** - Binance, Kraken, OKX with unified API
- **Multiple Output Formats** - CSV, PostgreSQL, Kafka, or custom emitters
- **Timeframe Aggregation** - Automatic aggregation to higher timeframes
- **Historical Backfilling** - Load months of historical data effortlessly
- **Data Transformation** - Built-in transformers for custom data processing
- **Stream Merging** - Combine multiple exchanges into unified streams
- **Type-Safe** - Full type hints and Pydantic validation
- **Production Ready** - Async architecture for high-performance streaming

---

## Installation

```bash
pip install streamforge
```

**Requirements:** Python 3.8+

---

## Quick Start

Stream Bitcoin price data in 3 lines:

```python
import asyncio
import streamforge as sf

async def main():
    # Configure what to stream
    stream = sf.DataInput(
        type="kline",
        symbols=["BTCUSDT"],
        timeframe="1m"
    )
    
    # Create runner and add logger
    runner = sf.BinanceRunner(stream_input=stream)
    runner.register_emitter(sf.Logger(prefix="Binance"))
    
    # Start streaming!
    await runner.run()

asyncio.run(main())
```

**Output:**
```
[Binance] BTCUSDT 1m | Open: $43,250.00 | High: $43,275.00 | Low: $43,240.00 | Close: $43,260.00
```

[📖 Read the full documentation →](https://paulobueno90.github.io/streamforge/)

---

## Supported Exchanges

| Exchange | Symbol Format | Type Name | Backfilling |
|----------|---------------|-----------|-------------|
| **Binance** | `BTCUSDT` | `kline` | ✓ |
| **Kraken** | `BTC/USD` | `ohlc` | Limited |
| **OKX** | `BTC-USDT` | `candle` | ✓ |

---

## Usage Examples

### Save to CSV

```python
import asyncio
import streamforge as sf

async def main():
    runner = sf.BinanceRunner(
        stream_input=sf.DataInput(
            type="kline",
            symbols=["BTCUSDT"],
            timeframe="1m"
        )
    )
    
    csv_emitter = sf.CSVEmitter(
        source="Binance",
        symbol="BTCUSDT",
        timeframe="1m",
        file_path="btc_data.csv"
    )
    
    runner.register_emitter(csv_emitter)
    await runner.run()

asyncio.run(main())
```

### Save to PostgreSQL

```python
import asyncio
import streamforge as sf
from sqlalchemy.orm import declarative_base
from sqlalchemy import Column, String, Float, BigInteger

Base = declarative_base()

class KlineTable(Base):
    __tablename__ = 'klines'
    source = Column(String, primary_key=True)
    symbol = Column(String, primary_key=True)
    timeframe = Column(String, primary_key=True)
    open_ts = Column(BigInteger, primary_key=True)
    end_ts = Column(BigInteger)
    open = Column(Float)
    high = Column(Float)
    low = Column(Float)
    close = Column(Float)
    volume = Column(Float)

async def main():
    postgres = (sf.PostgresEmitter(
            host="localhost",
            dbname="crypto",
            user="postgres",
            password="password"
        )
        .set_model(KlineTable)
        .on_conflict(["source", "symbol", "timeframe", "open_ts"])
    )
    
    runner = sf.BinanceRunner(
        stream_input=sf.DataInput(
            type="kline",
            symbols=["BTCUSDT", "ETHUSDT"],
            timeframe="1m"
        )
    )
    
    runner.register_emitter(postgres)
    await runner.run()

asyncio.run(main())
```

### Multi-Timeframe Aggregation

Stream 1-minute data and automatically create 5m, 15m, and 1h candles:

```python
import asyncio
import streamforge as sf

async def main():
    runner = sf.BinanceRunner(
        stream_input=sf.DataInput(
            type="kline",
            symbols=["BTCUSDT"],
            timeframe="1m",
            aggregate_list=["5m", "15m", "1h"]  # Auto-aggregate!
        ),
        active_warmup=True  # Required for aggregation
    )
    
    runner.register_emitter(sf.Logger(prefix="Multi-TF"))
    await runner.run()

asyncio.run(main())
```

### Historical Backfilling

Load historical data:

```python
import streamforge as sf

backfiller = sf.BinanceBackfilling(
    symbol="BTCUSDT",
    timeframe="1h",
    from_date="2024-01-01",
    to_date="2024-12-31"
)

backfiller.register_emitter(postgres_emitter)
backfiller.run()  # Downloads and saves year of data
```

### Multi-Exchange Streaming

Merge data from multiple exchanges:

```python
import asyncio
import streamforge as sf
from streamforge.merge_stream import merge_streams

async def main():
    binance = sf.BinanceRunner(
        stream_input=sf.DataInput(
            type="kline",
            symbols=["BTCUSDT"],
            timeframe="1m"
        )
    )
    
    okx = sf.OKXRunner(
        stream_input=sf.DataInput(
            type="candle",
            symbols=["BTC-USDT"],
            timeframe="1m"
        )
    )
    
    async for data in merge_streams(binance, okx):
        print(f"{data.source} | {data.symbol} | ${data.close:,.2f}")

asyncio.run(main())
```

---

## Documentation

**Full documentation:** https://paulobueno90.github.io/streamforge/

- [Installation Guide](https://paulobueno90.github.io/streamforge/getting-started/installation/)
- [Quick Start Tutorial](https://paulobueno90.github.io/streamforge/getting-started/quick-start/)
- [User Guide](https://paulobueno90.github.io/streamforge/user-guide/emitters/)
- [Examples Gallery](https://paulobueno90.github.io/streamforge/examples/)
- [API Reference](https://paulobueno90.github.io/streamforge/api-reference/)
- [Exchange Guides](https://paulobueno90.github.io/streamforge/exchanges/binance/)

---

## Key Concepts

### Runners

Connect to exchanges and manage data flow:

```python
runner = sf.BinanceRunner(stream_input=stream)  # Binance
runner = sf.KrakenRunner(stream_input=stream)   # Kraken  
runner = sf.OKXRunner(stream_input=stream)      # OKX
```

### Emitters

Define where data goes:

```python
sf.Logger()              # Print to console
sf.CSVEmitter()          # Save to CSV
sf.PostgresEmitter()     # Save to PostgreSQL
sf.KafkaEmitter()        # Stream to Kafka
```

### DataInput

Configure what to stream:

```python
stream = sf.DataInput(
    type="kline",                           # Data type
    symbols=["BTCUSDT", "ETHUSDT"],        # Trading pairs
    timeframe="1m",                         # Candle interval
    aggregate_list=["5m", "15m", "1h"]     # Optional aggregation
)
```

---

## Development

### Install from Source

```bash
git clone https://github.com/paulobueno90/streamforge.git
cd streamforge
pip install -e ".[dev]"
```

### Run Tests

```bash
pytest
```

### Code Formatting

```bash
black streamforge/
isort streamforge/
flake8 streamforge/
```

---

## Requirements

Core dependencies (installed automatically):

- `aiohttp` - Async HTTP client
- `websockets` - WebSocket client
- `sqlalchemy` - SQL ORM
- `pandas` - Data manipulation
- `pydantic` - Data validation
- `aiokafka` - Kafka client
- `asyncpg` - PostgreSQL driver
- `aiolimiter` - Rate limiting

---

## Examples

### Stream Multiple Symbols

```python
import asyncio
import streamforge as sf

async def main():
    runner = sf.BinanceRunner(
        stream_input=sf.DataInput(
            type="kline",
            symbols=["BTCUSDT", "ETHUSDT", "SOLUSDT"],
            timeframe="1m"
        )
    )
    
    runner.register_emitter(sf.Logger(prefix="Crypto"))
    await runner.run()

asyncio.run(main())
```

### Multiple Output Destinations

```python
import asyncio
import streamforge as sf

async def main():
    runner = sf.BinanceRunner(
        stream_input=sf.DataInput(
            type="kline",
            symbols=["BTCUSDT"],
            timeframe="1m"
        )
    )
    
    # Register multiple emitters - data goes to ALL
    runner.register_emitter(sf.Logger(prefix="Monitor"))
    runner.register_emitter(csv_emitter)
    runner.register_emitter(postgres_emitter)
    runner.register_emitter(kafka_emitter)
    
    await runner.run()

asyncio.run(main())
```

[See more examples →](https://paulobueno90.github.io/streamforge/examples/)

---

## Architecture

```
Exchange WebSocket → Runner → Normalizer → Processor → Aggregator → Transformer → Emitter(s)
```

1. **Runner** - Manages WebSocket connections
2. **Normalizer** - Standardizes data across exchanges
3. **Processor** - Buffers and processes data
4. **Aggregator** - Creates higher timeframe candles (optional)
5. **Transformer** - Applies custom transformations (optional)
6. **Emitter** - Outputs to your destination(s)

[Learn more about architecture →](https://paulobueno90.github.io/streamforge/getting-started/core-concepts/)

---

## Use Cases

- **Trading Bots** - Real-time market data for algorithmic trading
- **Data Analysis** - Collect data for backtesting and research
- **Price Monitoring** - Track cryptocurrency prices across exchanges
- **Arbitrage Detection** - Find price differences between exchanges
- **Market Research** - Analyze market trends and patterns
- **Portfolio Tracking** - Monitor your cryptocurrency holdings

---

## Contributing

Contributions are welcome! Please see our [Contributing Guide](https://paulobueno90.github.io/streamforge/contributing/).

### Development Setup

1. Fork the repository
2. Clone your fork: `git clone https://github.com/YOUR_USERNAME/streamforge.git`
3. Install dev dependencies: `pip install -e ".[dev]"`
4. Create a branch: `git checkout -b feature/my-feature`
5. Make changes and add tests
6. Run tests: `pytest`
7. Submit a pull request

---

## Links

- **Documentation:** https://paulobueno90.github.io/streamforge/
- **PyPI:** https://pypi.org/project/streamforge/
- **GitHub:** https://github.com/paulobueno90/streamforge
- **Issues:** https://github.com/paulobueno90/streamforge/issues
- **Changelog:** [CHANGELOG.md](CHANGELOG.md)

---

## License

MIT License - see [LICENSE](LICENSE) file for details.

---

## Author

**Paulo Bueno**  
Email: paulohmbueno@gmail.com  
GitHub: [@paulobueno90](https://github.com/paulobueno90)

---

## Acknowledgments

Built with:
- [aiohttp](https://github.com/aio-libs/aiohttp) - Async HTTP
- [websockets](https://github.com/python-websockets/websockets) - WebSocket support
- [Pydantic](https://github.com/pydantic/pydantic) - Data validation
- [SQLAlchemy](https://github.com/sqlalchemy/sqlalchemy) - Database ORM
- [Pandas](https://github.com/pandas-dev/pandas) - Data manipulation

---

**Happy Streaming!** 🚀

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/paulobueno90/streamforge",
    "name": "streamforge",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.8",
    "maintainer_email": "Paulo Bueno <paulohmbueno@gmail.com>",
    "keywords": "cryptocurrency, crypto, stocks, options, trading, data, ingestion, websocket, binance, kraken, okx, streamforge, real-time, market-data",
    "author": "Paulo Bueno",
    "author_email": "Paulo Bueno <paulohmbueno@gmail.com>",
    "download_url": "https://files.pythonhosted.org/packages/6d/e6/ff63a0db740581cb18940e33a951261a5c543a699b09ba5e580108101ff4/streamforge-0.1.0.tar.gz",
    "platform": null,
    "description": "# StreamForge\r\n\r\n[![PyPI version](https://badge.fury.io/py/streamforge.svg)](https://badge.fury.io/py/streamforge)\r\n[![Python Support](https://img.shields.io/pypi/pyversions/streamforge.svg)](https://pypi.org/project/streamforge/)\r\n[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)\r\n\r\n**Real-time cryptocurrency and financial data ingestion made simple.**\r\n\r\nStreamForge is a unified, async-first framework for ingesting real-time market data from cryptocurrency exchanges. Built with Python's asyncio, it offers high-performance data streaming, normalization, and multiple output formats.\r\n\r\n---\r\n\r\n## Features\r\n\r\n- **Real-time WebSocket Streaming** - Live market data from multiple exchanges\r\n- **Multi-Exchange Support** - Binance, Kraken, OKX with unified API\r\n- **Multiple Output Formats** - CSV, PostgreSQL, Kafka, or custom emitters\r\n- **Timeframe Aggregation** - Automatic aggregation to higher timeframes\r\n- **Historical Backfilling** - Load months of historical data effortlessly\r\n- **Data Transformation** - Built-in transformers for custom data processing\r\n- **Stream Merging** - Combine multiple exchanges into unified streams\r\n- **Type-Safe** - Full type hints and Pydantic validation\r\n- **Production Ready** - Async architecture for high-performance streaming\r\n\r\n---\r\n\r\n## Installation\r\n\r\n```bash\r\npip install streamforge\r\n```\r\n\r\n**Requirements:** Python 3.8+\r\n\r\n---\r\n\r\n## Quick Start\r\n\r\nStream Bitcoin price data in 3 lines:\r\n\r\n```python\r\nimport asyncio\r\nimport streamforge as sf\r\n\r\nasync def main():\r\n    # Configure what to stream\r\n    stream = sf.DataInput(\r\n        type=\"kline\",\r\n        symbols=[\"BTCUSDT\"],\r\n        timeframe=\"1m\"\r\n    )\r\n    \r\n    # Create runner and add logger\r\n    runner = sf.BinanceRunner(stream_input=stream)\r\n    runner.register_emitter(sf.Logger(prefix=\"Binance\"))\r\n    \r\n    # Start streaming!\r\n    await runner.run()\r\n\r\nasyncio.run(main())\r\n```\r\n\r\n**Output:**\r\n```\r\n[Binance] BTCUSDT 1m | Open: $43,250.00 | High: $43,275.00 | Low: $43,240.00 | Close: $43,260.00\r\n```\r\n\r\n[\ud83d\udcd6 Read the full documentation \u2192](https://paulobueno90.github.io/streamforge/)\r\n\r\n---\r\n\r\n## Supported Exchanges\r\n\r\n| Exchange | Symbol Format | Type Name | Backfilling |\r\n|----------|---------------|-----------|-------------|\r\n| **Binance** | `BTCUSDT` | `kline` | \u2713 |\r\n| **Kraken** | `BTC/USD` | `ohlc` | Limited |\r\n| **OKX** | `BTC-USDT` | `candle` | \u2713 |\r\n\r\n---\r\n\r\n## Usage Examples\r\n\r\n### Save to CSV\r\n\r\n```python\r\nimport asyncio\r\nimport streamforge as sf\r\n\r\nasync def main():\r\n    runner = sf.BinanceRunner(\r\n        stream_input=sf.DataInput(\r\n            type=\"kline\",\r\n            symbols=[\"BTCUSDT\"],\r\n            timeframe=\"1m\"\r\n        )\r\n    )\r\n    \r\n    csv_emitter = sf.CSVEmitter(\r\n        source=\"Binance\",\r\n        symbol=\"BTCUSDT\",\r\n        timeframe=\"1m\",\r\n        file_path=\"btc_data.csv\"\r\n    )\r\n    \r\n    runner.register_emitter(csv_emitter)\r\n    await runner.run()\r\n\r\nasyncio.run(main())\r\n```\r\n\r\n### Save to PostgreSQL\r\n\r\n```python\r\nimport asyncio\r\nimport streamforge as sf\r\nfrom sqlalchemy.orm import declarative_base\r\nfrom sqlalchemy import Column, String, Float, BigInteger\r\n\r\nBase = declarative_base()\r\n\r\nclass KlineTable(Base):\r\n    __tablename__ = 'klines'\r\n    source = Column(String, primary_key=True)\r\n    symbol = Column(String, primary_key=True)\r\n    timeframe = Column(String, primary_key=True)\r\n    open_ts = Column(BigInteger, primary_key=True)\r\n    end_ts = Column(BigInteger)\r\n    open = Column(Float)\r\n    high = Column(Float)\r\n    low = Column(Float)\r\n    close = Column(Float)\r\n    volume = Column(Float)\r\n\r\nasync def main():\r\n    postgres = (sf.PostgresEmitter(\r\n            host=\"localhost\",\r\n            dbname=\"crypto\",\r\n            user=\"postgres\",\r\n            password=\"password\"\r\n        )\r\n        .set_model(KlineTable)\r\n        .on_conflict([\"source\", \"symbol\", \"timeframe\", \"open_ts\"])\r\n    )\r\n    \r\n    runner = sf.BinanceRunner(\r\n        stream_input=sf.DataInput(\r\n            type=\"kline\",\r\n            symbols=[\"BTCUSDT\", \"ETHUSDT\"],\r\n            timeframe=\"1m\"\r\n        )\r\n    )\r\n    \r\n    runner.register_emitter(postgres)\r\n    await runner.run()\r\n\r\nasyncio.run(main())\r\n```\r\n\r\n### Multi-Timeframe Aggregation\r\n\r\nStream 1-minute data and automatically create 5m, 15m, and 1h candles:\r\n\r\n```python\r\nimport asyncio\r\nimport streamforge as sf\r\n\r\nasync def main():\r\n    runner = sf.BinanceRunner(\r\n        stream_input=sf.DataInput(\r\n            type=\"kline\",\r\n            symbols=[\"BTCUSDT\"],\r\n            timeframe=\"1m\",\r\n            aggregate_list=[\"5m\", \"15m\", \"1h\"]  # Auto-aggregate!\r\n        ),\r\n        active_warmup=True  # Required for aggregation\r\n    )\r\n    \r\n    runner.register_emitter(sf.Logger(prefix=\"Multi-TF\"))\r\n    await runner.run()\r\n\r\nasyncio.run(main())\r\n```\r\n\r\n### Historical Backfilling\r\n\r\nLoad historical data:\r\n\r\n```python\r\nimport streamforge as sf\r\n\r\nbackfiller = sf.BinanceBackfilling(\r\n    symbol=\"BTCUSDT\",\r\n    timeframe=\"1h\",\r\n    from_date=\"2024-01-01\",\r\n    to_date=\"2024-12-31\"\r\n)\r\n\r\nbackfiller.register_emitter(postgres_emitter)\r\nbackfiller.run()  # Downloads and saves year of data\r\n```\r\n\r\n### Multi-Exchange Streaming\r\n\r\nMerge data from multiple exchanges:\r\n\r\n```python\r\nimport asyncio\r\nimport streamforge as sf\r\nfrom streamforge.merge_stream import merge_streams\r\n\r\nasync def main():\r\n    binance = sf.BinanceRunner(\r\n        stream_input=sf.DataInput(\r\n            type=\"kline\",\r\n            symbols=[\"BTCUSDT\"],\r\n            timeframe=\"1m\"\r\n        )\r\n    )\r\n    \r\n    okx = sf.OKXRunner(\r\n        stream_input=sf.DataInput(\r\n            type=\"candle\",\r\n            symbols=[\"BTC-USDT\"],\r\n            timeframe=\"1m\"\r\n        )\r\n    )\r\n    \r\n    async for data in merge_streams(binance, okx):\r\n        print(f\"{data.source} | {data.symbol} | ${data.close:,.2f}\")\r\n\r\nasyncio.run(main())\r\n```\r\n\r\n---\r\n\r\n## Documentation\r\n\r\n**Full documentation:** https://paulobueno90.github.io/streamforge/\r\n\r\n- [Installation Guide](https://paulobueno90.github.io/streamforge/getting-started/installation/)\r\n- [Quick Start Tutorial](https://paulobueno90.github.io/streamforge/getting-started/quick-start/)\r\n- [User Guide](https://paulobueno90.github.io/streamforge/user-guide/emitters/)\r\n- [Examples Gallery](https://paulobueno90.github.io/streamforge/examples/)\r\n- [API Reference](https://paulobueno90.github.io/streamforge/api-reference/)\r\n- [Exchange Guides](https://paulobueno90.github.io/streamforge/exchanges/binance/)\r\n\r\n---\r\n\r\n## Key Concepts\r\n\r\n### Runners\r\n\r\nConnect to exchanges and manage data flow:\r\n\r\n```python\r\nrunner = sf.BinanceRunner(stream_input=stream)  # Binance\r\nrunner = sf.KrakenRunner(stream_input=stream)   # Kraken  \r\nrunner = sf.OKXRunner(stream_input=stream)      # OKX\r\n```\r\n\r\n### Emitters\r\n\r\nDefine where data goes:\r\n\r\n```python\r\nsf.Logger()              # Print to console\r\nsf.CSVEmitter()          # Save to CSV\r\nsf.PostgresEmitter()     # Save to PostgreSQL\r\nsf.KafkaEmitter()        # Stream to Kafka\r\n```\r\n\r\n### DataInput\r\n\r\nConfigure what to stream:\r\n\r\n```python\r\nstream = sf.DataInput(\r\n    type=\"kline\",                           # Data type\r\n    symbols=[\"BTCUSDT\", \"ETHUSDT\"],        # Trading pairs\r\n    timeframe=\"1m\",                         # Candle interval\r\n    aggregate_list=[\"5m\", \"15m\", \"1h\"]     # Optional aggregation\r\n)\r\n```\r\n\r\n---\r\n\r\n## Development\r\n\r\n### Install from Source\r\n\r\n```bash\r\ngit clone https://github.com/paulobueno90/streamforge.git\r\ncd streamforge\r\npip install -e \".[dev]\"\r\n```\r\n\r\n### Run Tests\r\n\r\n```bash\r\npytest\r\n```\r\n\r\n### Code Formatting\r\n\r\n```bash\r\nblack streamforge/\r\nisort streamforge/\r\nflake8 streamforge/\r\n```\r\n\r\n---\r\n\r\n## Requirements\r\n\r\nCore dependencies (installed automatically):\r\n\r\n- `aiohttp` - Async HTTP client\r\n- `websockets` - WebSocket client\r\n- `sqlalchemy` - SQL ORM\r\n- `pandas` - Data manipulation\r\n- `pydantic` - Data validation\r\n- `aiokafka` - Kafka client\r\n- `asyncpg` - PostgreSQL driver\r\n- `aiolimiter` - Rate limiting\r\n\r\n---\r\n\r\n## Examples\r\n\r\n### Stream Multiple Symbols\r\n\r\n```python\r\nimport asyncio\r\nimport streamforge as sf\r\n\r\nasync def main():\r\n    runner = sf.BinanceRunner(\r\n        stream_input=sf.DataInput(\r\n            type=\"kline\",\r\n            symbols=[\"BTCUSDT\", \"ETHUSDT\", \"SOLUSDT\"],\r\n            timeframe=\"1m\"\r\n        )\r\n    )\r\n    \r\n    runner.register_emitter(sf.Logger(prefix=\"Crypto\"))\r\n    await runner.run()\r\n\r\nasyncio.run(main())\r\n```\r\n\r\n### Multiple Output Destinations\r\n\r\n```python\r\nimport asyncio\r\nimport streamforge as sf\r\n\r\nasync def main():\r\n    runner = sf.BinanceRunner(\r\n        stream_input=sf.DataInput(\r\n            type=\"kline\",\r\n            symbols=[\"BTCUSDT\"],\r\n            timeframe=\"1m\"\r\n        )\r\n    )\r\n    \r\n    # Register multiple emitters - data goes to ALL\r\n    runner.register_emitter(sf.Logger(prefix=\"Monitor\"))\r\n    runner.register_emitter(csv_emitter)\r\n    runner.register_emitter(postgres_emitter)\r\n    runner.register_emitter(kafka_emitter)\r\n    \r\n    await runner.run()\r\n\r\nasyncio.run(main())\r\n```\r\n\r\n[See more examples \u2192](https://paulobueno90.github.io/streamforge/examples/)\r\n\r\n---\r\n\r\n## Architecture\r\n\r\n```\r\nExchange WebSocket \u2192 Runner \u2192 Normalizer \u2192 Processor \u2192 Aggregator \u2192 Transformer \u2192 Emitter(s)\r\n```\r\n\r\n1. **Runner** - Manages WebSocket connections\r\n2. **Normalizer** - Standardizes data across exchanges\r\n3. **Processor** - Buffers and processes data\r\n4. **Aggregator** - Creates higher timeframe candles (optional)\r\n5. **Transformer** - Applies custom transformations (optional)\r\n6. **Emitter** - Outputs to your destination(s)\r\n\r\n[Learn more about architecture \u2192](https://paulobueno90.github.io/streamforge/getting-started/core-concepts/)\r\n\r\n---\r\n\r\n## Use Cases\r\n\r\n- **Trading Bots** - Real-time market data for algorithmic trading\r\n- **Data Analysis** - Collect data for backtesting and research\r\n- **Price Monitoring** - Track cryptocurrency prices across exchanges\r\n- **Arbitrage Detection** - Find price differences between exchanges\r\n- **Market Research** - Analyze market trends and patterns\r\n- **Portfolio Tracking** - Monitor your cryptocurrency holdings\r\n\r\n---\r\n\r\n## Contributing\r\n\r\nContributions are welcome! Please see our [Contributing Guide](https://paulobueno90.github.io/streamforge/contributing/).\r\n\r\n### Development Setup\r\n\r\n1. Fork the repository\r\n2. Clone your fork: `git clone https://github.com/YOUR_USERNAME/streamforge.git`\r\n3. Install dev dependencies: `pip install -e \".[dev]\"`\r\n4. Create a branch: `git checkout -b feature/my-feature`\r\n5. Make changes and add tests\r\n6. Run tests: `pytest`\r\n7. Submit a pull request\r\n\r\n---\r\n\r\n## Links\r\n\r\n- **Documentation:** https://paulobueno90.github.io/streamforge/\r\n- **PyPI:** https://pypi.org/project/streamforge/\r\n- **GitHub:** https://github.com/paulobueno90/streamforge\r\n- **Issues:** https://github.com/paulobueno90/streamforge/issues\r\n- **Changelog:** [CHANGELOG.md](CHANGELOG.md)\r\n\r\n---\r\n\r\n## License\r\n\r\nMIT License - see [LICENSE](LICENSE) file for details.\r\n\r\n---\r\n\r\n## Author\r\n\r\n**Paulo Bueno**  \r\nEmail: paulohmbueno@gmail.com  \r\nGitHub: [@paulobueno90](https://github.com/paulobueno90)\r\n\r\n---\r\n\r\n## Acknowledgments\r\n\r\nBuilt with:\r\n- [aiohttp](https://github.com/aio-libs/aiohttp) - Async HTTP\r\n- [websockets](https://github.com/python-websockets/websockets) - WebSocket support\r\n- [Pydantic](https://github.com/pydantic/pydantic) - Data validation\r\n- [SQLAlchemy](https://github.com/sqlalchemy/sqlalchemy) - Database ORM\r\n- [Pandas](https://github.com/pandas-dev/pandas) - Data manipulation\r\n\r\n---\r\n\r\n**Happy Streaming!** \ud83d\ude80\r\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "Real-time cryptocurrency and financial data ingestion system",
    "version": "0.1.0",
    "project_urls": {
        "Bug Tracker": "https://github.com/paulobueno90/streamforge/issues",
        "Changelog": "https://github.com/paulobueno90/streamforge/blob/main/CHANGELOG.md",
        "Documentation": "https://paulobueno90.github.io/streamforge/",
        "Homepage": "https://github.com/paulobueno90/streamforge",
        "Repository": "https://github.com/paulobueno90/streamforge"
    },
    "split_keywords": [
        "cryptocurrency",
        " crypto",
        " stocks",
        " options",
        " trading",
        " data",
        " ingestion",
        " websocket",
        " binance",
        " kraken",
        " okx",
        " streamforge",
        " real-time",
        " market-data"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "53edd637e195b3edd656ff03c84a88c5b59d895e53a42fa11c1d4cca8d08d109",
                "md5": "18216034a02902911094d567d40f6fae",
                "sha256": "0d3a2075653a237e1a1a0d86f4091fa632c38052b16bef558455b4a403f2e40a"
            },
            "downloads": -1,
            "filename": "streamforge-0.1.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "18216034a02902911094d567d40f6fae",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.8",
            "size": 87601,
            "upload_time": "2025-10-15T18:31:24",
            "upload_time_iso_8601": "2025-10-15T18:31:24.143802Z",
            "url": "https://files.pythonhosted.org/packages/53/ed/d637e195b3edd656ff03c84a88c5b59d895e53a42fa11c1d4cca8d08d109/streamforge-0.1.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "6de6ff63a0db740581cb18940e33a951261a5c543a699b09ba5e580108101ff4",
                "md5": "4732faa530c51fb47243472238e4c7fa",
                "sha256": "c5c35c15481180aa7c6580a31b7f157b46b3bd6aa1e5e1cd3a2ada9b5e052d5b"
            },
            "downloads": -1,
            "filename": "streamforge-0.1.0.tar.gz",
            "has_sig": false,
            "md5_digest": "4732faa530c51fb47243472238e4c7fa",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.8",
            "size": 63085,
            "upload_time": "2025-10-15T18:31:27",
            "upload_time_iso_8601": "2025-10-15T18:31:27.987002Z",
            "url": "https://files.pythonhosted.org/packages/6d/e6/ff63a0db740581cb18940e33a951261a5c543a699b09ba5e580108101ff4/streamforge-0.1.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-10-15 18:31:27",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "paulobueno90",
    "github_project": "streamforge",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "requirements": [
        {
            "name": "aiohttp",
            "specs": [
                [
                    ">=",
                    "3.8.0"
                ]
            ]
        },
        {
            "name": "websockets",
            "specs": [
                [
                    ">=",
                    "10.0"
                ]
            ]
        },
        {
            "name": "sqlalchemy",
            "specs": [
                [
                    ">=",
                    "1.4.0"
                ]
            ]
        },
        {
            "name": "pandas",
            "specs": [
                [
                    ">=",
                    "1.3.0"
                ]
            ]
        },
        {
            "name": "pydantic",
            "specs": [
                [
                    ">=",
                    "1.8.0"
                ]
            ]
        },
        {
            "name": "orjson",
            "specs": [
                [
                    ">=",
                    "3.6.0"
                ]
            ]
        },
        {
            "name": "aiokafka",
            "specs": [
                [
                    ">=",
                    "0.8.0"
                ]
            ]
        },
        {
            "name": "asyncpg",
            "specs": [
                [
                    ">=",
                    "0.27.0"
                ]
            ]
        },
        {
            "name": "aiolimiter",
            "specs": [
                [
                    ">=",
                    "1.1.0"
                ]
            ]
        },
        {
            "name": "python-dateutil",
            "specs": [
                [
                    ">=",
                    "2.8.0"
                ]
            ]
        },
        {
            "name": "numpy",
            "specs": [
                [
                    ">=",
                    "1.20.0"
                ]
            ]
        },
        {
            "name": "requests",
            "specs": [
                [
                    ">=",
                    "2.25.0"
                ]
            ]
        },
        {
            "name": "ciso8601",
            "specs": [
                [
                    ">=",
                    "2.2.0"
                ]
            ]
        }
    ],
    "lcname": "streamforge"
}
        
Elapsed time: 1.19665s