queueplus


Namequeueplus JSON
Version 0.7.0 PyPI version JSON
download
home_pagehttps://github.com/mitchelllisle/queueplus
SummaryA Python library that adds functionality to asyncio queues
upload_time2024-01-13 23:24:40
maintainer
docs_urlNone
authorMitchell Lisle
requires_python
licenseMIT license
keywords queueplus
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage
            ##  QueuePlus ➕

> 1️⃣ version: 0.7.0

> ✍️ author: Mitchell Lisle

A Python library that adds functionality to asyncio queues

## Install

```shell
pip install queueplus
```

## Usage

You can use AioQueue with all the same functionality as a regular `asyncio.Queue`.

```python
from queueplus import AioQueue

q = AioQueue()
await q.put("hello world")

message = await q.get()
# hello world
```

With a few extra capabilities

**Iterate over a queue (can be async or not)**
```python
from queueplus import AioQueue
q = AioQueue()

[await q.put(i) for i in range(10)] # in non-async mode you would call q.put_nowait

async for row in q:
    print(row)
```

**Collect all values into a list**
```python
from queueplus import AioQueue
q = AioQueue()

[await q.put(i) for i in range(10)]
messages = q.collect()
# [0, 1, 2, 3, 4 ,5 ,6 ,7, 8, 9]
```

**Create a callback everytime a message is added**
```python
from queueplus import AioQueue
inq = AioQueue()
outq = AioQueue()

async def copy_to_q(message: str):
    await outq.put(message)

inq.add_consumer(copy_to_q)

inq.put("hello world")

await inq.wait_for_consumer()
```

**Enforce a type on a queue, error if violated**
```python
from queueplus import TypedAioQueue, RaiseOnViolation
q = TypedAioQueue(int, violations_strategy=RaiseOnViolation)

[await q.put(i) for i in range(10)]
messages = q.collect()
# [0, 1, 2, 3, 4 ,5 ,6 ,7, 8, 9]

await q.put("hello") # Raises a ViolationError
```

**Enforce a type on a queue, ignore if violated**
```python
from queueplus import TypedAioQueue, DiscardOnViolation
q = TypedAioQueue(int, violations_strategy=DiscardOnViolation)

[await q.put(i) for i in range(10)]
await q.put("hello")

messages = q.collect()
# [0, 1, 2, 3, 4 ,5 ,6 ,7, 8, 9]
```

### Full example
```python
from queueplus import AioQueue
import asyncio

async def main():
    q = AioQueue()
    await q.put("hello")
    await q.put("world")
    
    async for item in q:
        print(item)

if __name__ == "__main__":
    asyncio.run(main())
```

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/mitchelllisle/queueplus",
    "name": "queueplus",
    "maintainer": "",
    "docs_url": null,
    "requires_python": "",
    "maintainer_email": "",
    "keywords": "queueplus",
    "author": "Mitchell Lisle",
    "author_email": "m.lisle90@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/5b/fe/340238fe2c0f9537d8768c5c9cd8e31de686e75d68bd68591019499e76b2/queueplus-0.7.0.tar.gz",
    "platform": null,
    "description": "##  QueuePlus \u2795\n\n> 1\ufe0f\u20e3 version: 0.7.0\n\n> \u270d\ufe0f author: Mitchell Lisle\n\nA Python library that adds functionality to asyncio queues\n\n## Install\n\n```shell\npip install queueplus\n```\n\n## Usage\n\nYou can use AioQueue with all the same functionality as a regular `asyncio.Queue`.\n\n```python\nfrom queueplus import AioQueue\n\nq = AioQueue()\nawait q.put(\"hello world\")\n\nmessage = await q.get()\n# hello world\n```\n\nWith a few extra capabilities\n\n**Iterate over a queue (can be async or not)**\n```python\nfrom queueplus import AioQueue\nq = AioQueue()\n\n[await q.put(i) for i in range(10)] # in non-async mode you would call q.put_nowait\n\nasync for row in q:\n    print(row)\n```\n\n**Collect all values into a list**\n```python\nfrom queueplus import AioQueue\nq = AioQueue()\n\n[await q.put(i) for i in range(10)]\nmessages = q.collect()\n# [0, 1, 2, 3, 4 ,5 ,6 ,7, 8, 9]\n```\n\n**Create a callback everytime a message is added**\n```python\nfrom queueplus import AioQueue\ninq = AioQueue()\noutq = AioQueue()\n\nasync def copy_to_q(message: str):\n    await outq.put(message)\n\ninq.add_consumer(copy_to_q)\n\ninq.put(\"hello world\")\n\nawait inq.wait_for_consumer()\n```\n\n**Enforce a type on a queue, error if violated**\n```python\nfrom queueplus import TypedAioQueue, RaiseOnViolation\nq = TypedAioQueue(int, violations_strategy=RaiseOnViolation)\n\n[await q.put(i) for i in range(10)]\nmessages = q.collect()\n# [0, 1, 2, 3, 4 ,5 ,6 ,7, 8, 9]\n\nawait q.put(\"hello\") # Raises a ViolationError\n```\n\n**Enforce a type on a queue, ignore if violated**\n```python\nfrom queueplus import TypedAioQueue, DiscardOnViolation\nq = TypedAioQueue(int, violations_strategy=DiscardOnViolation)\n\n[await q.put(i) for i in range(10)]\nawait q.put(\"hello\")\n\nmessages = q.collect()\n# [0, 1, 2, 3, 4 ,5 ,6 ,7, 8, 9]\n```\n\n### Full example\n```python\nfrom queueplus import AioQueue\nimport asyncio\n\nasync def main():\n    q = AioQueue()\n    await q.put(\"hello\")\n    await q.put(\"world\")\n    \n    async for item in q:\n        print(item)\n\nif __name__ == \"__main__\":\n    asyncio.run(main())\n```\n",
    "bugtrack_url": null,
    "license": "MIT license",
    "summary": "A Python library that adds functionality to asyncio queues",
    "version": "0.7.0",
    "project_urls": {
        "Homepage": "https://github.com/mitchelllisle/queueplus"
    },
    "split_keywords": [
        "queueplus"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "675cdc07bb9e7ed9160d416fed4bd335f80ae6db84930cf0524d1bd1e8183b47",
                "md5": "9702ce37cfdfe64c1a108c15fa2c4a05",
                "sha256": "2686adcb173def742acd4c7d764f262b6897bb7f2a9542fd79ae83290e4213bb"
            },
            "downloads": -1,
            "filename": "queueplus-0.7.0-py2.py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "9702ce37cfdfe64c1a108c15fa2c4a05",
            "packagetype": "bdist_wheel",
            "python_version": "py2.py3",
            "requires_python": null,
            "size": 4441,
            "upload_time": "2024-01-13T23:24:39",
            "upload_time_iso_8601": "2024-01-13T23:24:39.292961Z",
            "url": "https://files.pythonhosted.org/packages/67/5c/dc07bb9e7ed9160d416fed4bd335f80ae6db84930cf0524d1bd1e8183b47/queueplus-0.7.0-py2.py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "5bfe340238fe2c0f9537d8768c5c9cd8e31de686e75d68bd68591019499e76b2",
                "md5": "14118c58802d5872b566bbe7dafb702e",
                "sha256": "0211ca5d9e01ea1434ff26ad2a8f179fe754f3c8e285949febf26d002b078e95"
            },
            "downloads": -1,
            "filename": "queueplus-0.7.0.tar.gz",
            "has_sig": false,
            "md5_digest": "14118c58802d5872b566bbe7dafb702e",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": null,
            "size": 6614,
            "upload_time": "2024-01-13T23:24:40",
            "upload_time_iso_8601": "2024-01-13T23:24:40.821838Z",
            "url": "https://files.pythonhosted.org/packages/5b/fe/340238fe2c0f9537d8768c5c9cd8e31de686e75d68bd68591019499e76b2/queueplus-0.7.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-01-13 23:24:40",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "mitchelllisle",
    "github_project": "queueplus",
    "travis_ci": false,
    "coveralls": true,
    "github_actions": true,
    "requirements": [],
    "lcname": "queueplus"
}
        
Elapsed time: 0.30771s