runnelpy


Namerunnelpy JSON
Version 0.2.13 PyPI version JSON
download
home_pageNone
SummaryDistributed event processing for Python based on Redis Streams
upload_time2024-10-15 09:28:57
maintainerNone
docs_urlNone
authorMatt Westcott
requires_python<4.0,>=3.7
licenseLGPL-3.0-only
keywords data stream processing redis async
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Runnel

[![pyversions](https://img.shields.io/pypi/pyversions/runnelpy.svg)](https://pypi.python.org/pypi/runnelpy/)
[![LGPLv3](https://img.shields.io/badge/License-LGPLv3-blue.svg)](https://github.com/mjwestcott/runnelpy/blob/master/LICENSE)
[![version](https://img.shields.io/pypi/v/runnelpy.svg)](https://pypi.python.org/pypi/runnelpy/)

**Distributed event processing for Python based on Redis Streams.**

RunnelPy allows you to easily create scalable stream processors, which operate on
partitions of event streams in Redis. RunnelPy takes care of assigning partitions
to workers and acknowledging events automatically, so you can focus on your
application logic.

Whereas traditional job queues do not provide ordering guarantees, RunnelPy is
designed to process partitions of your event stream strictly in the order
events are created.

### Installation

```bash
pip install runnelpy
```

### Basic Usage

```python
from datetime import datetime

from runnelpy import App, Record

app = App(name="myapp", redis_url="redis://127.0.0.1")


# Specify event types using the Record class.
class Order(Record):
    order_id: int
    created_at: datetime
    amount: float


orders = app.stream("orders", record=Order, partition_by="order_id")


# Every 4 seconds, send an example record to the stream.
@app.timer(interval=4)
async def sender():
    await orders.send(Order(order_id=1, created_at=datetime.utcnow(), amount=9.99))


# Iterate over a continuous stream of events in your processors.
@app.processor(orders)
async def printer(events):
    async for order in events.records():
        print(f"processed {order.amount}")
```

Meanwhile, run the worker (assuming code in `example.py` and `PYTHONPATH` is set):
```bash
$ runnelpy worker example:app
```

### Features

Designed to support a similar paradigm to Kafka Streams, but on top of Redis.

* At least once processing semantics
* Automatic partitioning of events by key
* Each partition maintains strict ordering
* Dynamic rebalance algorithm distributes partitions among workers on-the-fly
* Support for nested Record types with custom serialisation and compression
* Background tasks, including timers and cron-style scheduling
* User-defined middleware for exception handling, e.g. dead-letter-queueing
* A builtin batching mechanism to efficiently process events in bulk
* A `runnelpy[fast]` bundle for C or Rust extension dependencies ([uvloop](https://github.com/MagicStack/uvloop), [xxhash](https://github.com/Cyan4973/xxHash), [orjson](https://github.com/ijl/orjson), [lz4](https://github.com/python-lz4/python-lz4))

### Documentation

Full documenation is available at https://runnelpy.dev.

* [Guide](https://runnelpy.dev/guide.html)
* [Motivation](https://runnelpy.dev/motivation.html)
* [Architecture](https://runnelpy.dev/architecture.html)
* [API Reference](https://runnelpy.dev/reference.html)

### Blog posts

Essays about this project or the technology it's using:

* [Redis streams vs. Kafka](https://mattwestcott.co.uk/blog/redis-streams-vs-kafka)
* [Structured concurrency in Python with AnyIO](https://mattwestcott.co.uk/blog/structured-concurrency-in-python-with-anyio)

### Local development

To run the test suite locally, clone the repo and install the optional deps
(e.g. via `poetry install -E fast`). Make sure Redis is running on localhost at
port 6379, then run `pytest`.

### See also

For a traditional task queue that doesn't provide ordering guarantees, see our
sister project [Fennel](https://github.com/mjwestcott/fennel).

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "runnelpy",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<4.0,>=3.7",
    "maintainer_email": null,
    "keywords": "data, stream, processing, redis, async",
    "author": "Matt Westcott",
    "author_email": "m.westcott@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/e4/73/5110b09575001417ada703307b80788a8e8a70d581c019ddec4de908779b/runnelpy-0.2.13.tar.gz",
    "platform": null,
    "description": "# Runnel\n\n[![pyversions](https://img.shields.io/pypi/pyversions/runnelpy.svg)](https://pypi.python.org/pypi/runnelpy/)\n[![LGPLv3](https://img.shields.io/badge/License-LGPLv3-blue.svg)](https://github.com/mjwestcott/runnelpy/blob/master/LICENSE)\n[![version](https://img.shields.io/pypi/v/runnelpy.svg)](https://pypi.python.org/pypi/runnelpy/)\n\n**Distributed event processing for Python based on Redis Streams.**\n\nRunnelPy allows you to easily create scalable stream processors, which operate on\npartitions of event streams in Redis. RunnelPy takes care of assigning partitions\nto workers and acknowledging events automatically, so you can focus on your\napplication logic.\n\nWhereas traditional job queues do not provide ordering guarantees, RunnelPy is\ndesigned to process partitions of your event stream strictly in the order\nevents are created.\n\n### Installation\n\n```bash\npip install runnelpy\n```\n\n### Basic Usage\n\n```python\nfrom datetime import datetime\n\nfrom runnelpy import App, Record\n\napp = App(name=\"myapp\", redis_url=\"redis://127.0.0.1\")\n\n\n# Specify event types using the Record class.\nclass Order(Record):\n    order_id: int\n    created_at: datetime\n    amount: float\n\n\norders = app.stream(\"orders\", record=Order, partition_by=\"order_id\")\n\n\n# Every 4 seconds, send an example record to the stream.\n@app.timer(interval=4)\nasync def sender():\n    await orders.send(Order(order_id=1, created_at=datetime.utcnow(), amount=9.99))\n\n\n# Iterate over a continuous stream of events in your processors.\n@app.processor(orders)\nasync def printer(events):\n    async for order in events.records():\n        print(f\"processed {order.amount}\")\n```\n\nMeanwhile, run the worker (assuming code in `example.py` and `PYTHONPATH` is set):\n```bash\n$ runnelpy worker example:app\n```\n\n### Features\n\nDesigned to support a similar paradigm to Kafka Streams, but on top of Redis.\n\n* At least once processing semantics\n* Automatic partitioning of events by key\n* Each partition maintains strict ordering\n* Dynamic rebalance algorithm distributes partitions among workers on-the-fly\n* Support for nested Record types with custom serialisation and compression\n* Background tasks, including timers and cron-style scheduling\n* User-defined middleware for exception handling, e.g. dead-letter-queueing\n* A builtin batching mechanism to efficiently process events in bulk\n* A `runnelpy[fast]` bundle for C or Rust extension dependencies ([uvloop](https://github.com/MagicStack/uvloop), [xxhash](https://github.com/Cyan4973/xxHash), [orjson](https://github.com/ijl/orjson), [lz4](https://github.com/python-lz4/python-lz4))\n\n### Documentation\n\nFull documenation is available at https://runnelpy.dev.\n\n* [Guide](https://runnelpy.dev/guide.html)\n* [Motivation](https://runnelpy.dev/motivation.html)\n* [Architecture](https://runnelpy.dev/architecture.html)\n* [API Reference](https://runnelpy.dev/reference.html)\n\n### Blog posts\n\nEssays about this project or the technology it's using:\n\n* [Redis streams vs. Kafka](https://mattwestcott.co.uk/blog/redis-streams-vs-kafka)\n* [Structured concurrency in Python with AnyIO](https://mattwestcott.co.uk/blog/structured-concurrency-in-python-with-anyio)\n\n### Local development\n\nTo run the test suite locally, clone the repo and install the optional deps\n(e.g. via `poetry install -E fast`). Make sure Redis is running on localhost at\nport 6379, then run `pytest`.\n\n### See also\n\nFor a traditional task queue that doesn't provide ordering guarantees, see our\nsister project [Fennel](https://github.com/mjwestcott/fennel).\n",
    "bugtrack_url": null,
    "license": "LGPL-3.0-only",
    "summary": "Distributed event processing for Python based on Redis Streams",
    "version": "0.2.13",
    "project_urls": null,
    "split_keywords": [
        "data",
        " stream",
        " processing",
        " redis",
        " async"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "f1ca208db5299db2156da373405da3bc2c5a65dfe2ccd5abd36a0a41f36d17fa",
                "md5": "6141c8c06ac52c1704e897dd53a50fdb",
                "sha256": "d9e0f4780a2902978a1492cb718f97ded80ee2de5ebad78e57915af807f58a08"
            },
            "downloads": -1,
            "filename": "runnelpy-0.2.13-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "6141c8c06ac52c1704e897dd53a50fdb",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<4.0,>=3.7",
            "size": 41752,
            "upload_time": "2024-10-15T09:28:56",
            "upload_time_iso_8601": "2024-10-15T09:28:56.066070Z",
            "url": "https://files.pythonhosted.org/packages/f1/ca/208db5299db2156da373405da3bc2c5a65dfe2ccd5abd36a0a41f36d17fa/runnelpy-0.2.13-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "e4735110b09575001417ada703307b80788a8e8a70d581c019ddec4de908779b",
                "md5": "dba5af09417c348350199deb11cc7eb3",
                "sha256": "1ca2d6c8f09f4d2d0686a7b75279d183bf7fdf3260d920b30d5af1793dfb6fd5"
            },
            "downloads": -1,
            "filename": "runnelpy-0.2.13.tar.gz",
            "has_sig": false,
            "md5_digest": "dba5af09417c348350199deb11cc7eb3",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<4.0,>=3.7",
            "size": 32442,
            "upload_time": "2024-10-15T09:28:57",
            "upload_time_iso_8601": "2024-10-15T09:28:57.195388Z",
            "url": "https://files.pythonhosted.org/packages/e4/73/5110b09575001417ada703307b80788a8e8a70d581c019ddec4de908779b/runnelpy-0.2.13.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-10-15 09:28:57",
    "github": false,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "lcname": "runnelpy"
}
        
Elapsed time: 0.43359s