aiosafeconsumer


Nameaiosafeconsumer JSON
Version 0.0.7 PyPI version JSON
download
home_pageNone
SummarySafely consume and process data.
upload_time2025-11-06 14:30:36
maintainerNone
docs_urlNone
authorNone
requires_python>=3.10
licenseNone
keywords asyncio consumer microservices
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            aiosafeconsumer
===============

.. image:: https://github.com/lostclus/aiosafeconsumer/actions/workflows/tests.yml/badge.svg
    :target: https://github.com/lostclus/aiosafeconsumer/actions

.. image:: https://img.shields.io/pypi/v/aiosafeconsumer.svg
    :target: https://pypi.org/project/aiosafeconsumer/
    :alt: Current version on PyPi

.. image:: https://img.shields.io/pypi/pyversions/aiosafeconsumer
    :alt: PyPI - Python Version

aiosafeconsumer is a library that provides abstractions and some implementations
to consume data somewhere and process it.

Features:

* Based on AsyncIO
* Type annotated
* Use logging with contextual information

Abstractions:

* `DataSource` - waits for data and returns batch of records using Python generator
* `DataProcessor` - accepts batch of records and precess it
* `DataTransformer` - accepts batch of records and transform it and calls
  another processor to precess it. Extends `DataProcessor`
* `Worker` - abstract worker. Do a long running task
* `ConsumerWorker` - connects `DataSource` and `DataProcessor`. Extends Worker
* `DataWriter` - base abstraction to perform data synchronization. Extends DataProcessor

Current implementations:

* `KafkaSource` - read data from Kafka
* `RedisWriter` - synchronize data in Redis
* `ElasticsearchWriter` - synchronize data in Elasticsearch
* `WorkerPool` - controller to setup and run workers in parallel. Can handle worker failures and restarts workers when it fails or exits.

Install::

    pip install aiosafeconsumer

Install with Kafka::

    pip install aiosafeconsumer[kafka]

Install with Redis::

    pip install aiosafeconsumer[redis]

Install with Elasticsearch::

    pip install aiosafeconsumer[elasticsearch]

Install with MongoDB::

    pip install aiosafeconsumer[mongo]

Install with PostgreSQL::

    pip install aiosafeconsumer[postgres]

Links:

* Producer library: https://github.com/lostclus/django-kafka-streamer
* Example application: https://github.com/lostclus/WeatherApp

TODO:

* Redis Streams source
* Enumerate IDs message type support in PostgreSQL writer
* ClickHouse writer

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "aiosafeconsumer",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.10",
    "maintainer_email": null,
    "keywords": "asyncio, consumer, microservices",
    "author": null,
    "author_email": "Kostiantyn Korikov <lostclus@gmail.com>",
    "download_url": "https://files.pythonhosted.org/packages/ee/20/dc0a32baa844b97a07284fade711a9e60852c07e2e7826438c91d24ec16f/aiosafeconsumer-0.0.7.tar.gz",
    "platform": null,
    "description": "aiosafeconsumer\n===============\n\n.. image:: https://github.com/lostclus/aiosafeconsumer/actions/workflows/tests.yml/badge.svg\n    :target: https://github.com/lostclus/aiosafeconsumer/actions\n\n.. image:: https://img.shields.io/pypi/v/aiosafeconsumer.svg\n    :target: https://pypi.org/project/aiosafeconsumer/\n    :alt: Current version on PyPi\n\n.. image:: https://img.shields.io/pypi/pyversions/aiosafeconsumer\n    :alt: PyPI - Python Version\n\naiosafeconsumer is a library that provides abstractions and some implementations\nto consume data somewhere and process it.\n\nFeatures:\n\n* Based on AsyncIO\n* Type annotated\n* Use logging with contextual information\n\nAbstractions:\n\n* `DataSource` - waits for data and returns batch of records using Python generator\n* `DataProcessor` - accepts batch of records and precess it\n* `DataTransformer` - accepts batch of records and transform it and calls\n  another processor to precess it. Extends `DataProcessor`\n* `Worker` - abstract worker. Do a long running task\n* `ConsumerWorker` - connects `DataSource` and `DataProcessor`. Extends Worker\n* `DataWriter` - base abstraction to perform data synchronization. Extends DataProcessor\n\nCurrent implementations:\n\n* `KafkaSource` - read data from Kafka\n* `RedisWriter` - synchronize data in Redis\n* `ElasticsearchWriter` - synchronize data in Elasticsearch\n* `WorkerPool` - controller to setup and run workers in parallel. Can handle worker failures and restarts workers when it fails or exits.\n\nInstall::\n\n    pip install aiosafeconsumer\n\nInstall with Kafka::\n\n    pip install aiosafeconsumer[kafka]\n\nInstall with Redis::\n\n    pip install aiosafeconsumer[redis]\n\nInstall with Elasticsearch::\n\n    pip install aiosafeconsumer[elasticsearch]\n\nInstall with MongoDB::\n\n    pip install aiosafeconsumer[mongo]\n\nInstall with PostgreSQL::\n\n    pip install aiosafeconsumer[postgres]\n\nLinks:\n\n* Producer library: https://github.com/lostclus/django-kafka-streamer\n* Example application: https://github.com/lostclus/WeatherApp\n\nTODO:\n\n* Redis Streams source\n* Enumerate IDs message type support in PostgreSQL writer\n* ClickHouse writer\n",
    "bugtrack_url": null,
    "license": null,
    "summary": "Safely consume and process data.",
    "version": "0.0.7",
    "project_urls": {
        "Repository": "http://github.com/lostclus/aiosafeconsumer"
    },
    "split_keywords": [
        "asyncio",
        " consumer",
        " microservices"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "76e313ddad855e937cb1a7d55681c01f5c0e40b6bb0f6baf0b575802bee29a2e",
                "md5": "93ddfe5247735a1ff8c02a53a3b5e986",
                "sha256": "d6871aaa0e4a51df322fbe0a8399acc04869a47b26e43fe7144bc67a48c48553"
            },
            "downloads": -1,
            "filename": "aiosafeconsumer-0.0.7-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "93ddfe5247735a1ff8c02a53a3b5e986",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.10",
            "size": 24644,
            "upload_time": "2025-11-06T14:30:35",
            "upload_time_iso_8601": "2025-11-06T14:30:35.479105Z",
            "url": "https://files.pythonhosted.org/packages/76/e3/13ddad855e937cb1a7d55681c01f5c0e40b6bb0f6baf0b575802bee29a2e/aiosafeconsumer-0.0.7-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "ee20dc0a32baa844b97a07284fade711a9e60852c07e2e7826438c91d24ec16f",
                "md5": "3f0faf6138945846aa453d9be601e565",
                "sha256": "32f086905c30ed811e11ead3d9665ee3b1ff115d8458d6fb21e30a931d751c0f"
            },
            "downloads": -1,
            "filename": "aiosafeconsumer-0.0.7.tar.gz",
            "has_sig": false,
            "md5_digest": "3f0faf6138945846aa453d9be601e565",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.10",
            "size": 37714,
            "upload_time": "2025-11-06T14:30:36",
            "upload_time_iso_8601": "2025-11-06T14:30:36.735693Z",
            "url": "https://files.pythonhosted.org/packages/ee/20/dc0a32baa844b97a07284fade711a9e60852c07e2e7826438c91d24ec16f/aiosafeconsumer-0.0.7.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-11-06 14:30:36",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "lostclus",
    "github_project": "aiosafeconsumer",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "tox": true,
    "lcname": "aiosafeconsumer"
}
        
Elapsed time: 4.53128s