Name | aioqueueext JSON |
Version |
0.2.1
JSON |
| download |
home_page | |
Summary | Asyncio queues with extended functionality: peeking, setting put and get callbacks. |
upload_time | 2023-08-24 06:16:42 |
maintainer | |
docs_url | None |
author | |
requires_python | <4,>=3.7 |
license | |
keywords |
asyncio
queue
|
VCS |
|
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
No Travis.
|
coveralls test coverage |
No coveralls.
|
# aioqueueext
A package that provides asyncio Queues with additional functionality.
## Work-in-Progress
The repository contains modules extracted from my other project and was refactored as a separate package.
In the current version, I have not verified all of the functions.
Additional functions I plan to implement are:
- `return_when_*()` - async functions to ease synchronization tasks
- `set_on_get_callback()`
- `set_on_put_callback()`
- `peek_nowait()` - returns the "up-next" item without removing it from the queue
- `peek_and_get()` - async peek and conditionally get (pop) an item from the queue
## Examples
### Sync Peeking
```
async def sync_peeking_example() -> None:
queue1 = QueueExt()
await queue1.put("apple")
item = queue1.peek_nowait()
print(f"first peek: {item}") # "apple"
await queue1.put("banana")
item = queue1.peek_nowait()
print(f"second peek: {item}") # "apple"
item = await queue1.get() # popped "apple"
item = queue1.peek_nowait()
print(f"third peek: {item}") # "banana"
item = await queue1.get() # popped "banana"
# the next peek raises the QueueEmpty exception
try:
print(f"fourth peek: {queue1.peek_nowait()}")
except asyncio.QueueEmpty:
print("fourth peek failed: QueueEmpty")
```
Raw data
{
"_id": null,
"home_page": "",
"name": "aioqueueext",
"maintainer": "",
"docs_url": null,
"requires_python": "<4,>=3.7",
"maintainer_email": "",
"keywords": "asyncio,queue",
"author": "",
"author_email": "darkhaniop <darkhaniop@google.com>",
"download_url": "https://files.pythonhosted.org/packages/27/5f/7fcafafcfb3528a5265c3d72560b1b9fc6d411df0d18286ce76eab44c148/aioqueueext-0.2.1.tar.gz",
"platform": null,
"description": "# aioqueueext\n\nA package that provides asyncio Queues with additional functionality.\n\n## Work-in-Progress\n\nThe repository contains modules extracted from my other project and was refactored as a separate package.\n\nIn the current version, I have not verified all of the functions.\n\nAdditional functions I plan to implement are:\n- `return_when_*()` - async functions to ease synchronization tasks\n- `set_on_get_callback()`\n- `set_on_put_callback()`\n- `peek_nowait()` - returns the \"up-next\" item without removing it from the queue\n- `peek_and_get()` - async peek and conditionally get (pop) an item from the queue\n\n## Examples\n\n### Sync Peeking\n\n```\nasync def sync_peeking_example() -> None:\n queue1 = QueueExt()\n\n await queue1.put(\"apple\")\n\n item = queue1.peek_nowait()\n print(f\"first peek: {item}\") # \"apple\"\n\n await queue1.put(\"banana\")\n\n item = queue1.peek_nowait()\n print(f\"second peek: {item}\") # \"apple\"\n\n item = await queue1.get() # popped \"apple\"\n\n item = queue1.peek_nowait()\n print(f\"third peek: {item}\") # \"banana\"\n\n item = await queue1.get() # popped \"banana\"\n\n # the next peek raises the QueueEmpty exception\n try:\n print(f\"fourth peek: {queue1.peek_nowait()}\")\n except asyncio.QueueEmpty:\n print(\"fourth peek failed: QueueEmpty\")\n\n```\n",
"bugtrack_url": null,
"license": "",
"summary": "Asyncio queues with extended functionality: peeking, setting put and get callbacks.",
"version": "0.2.1",
"project_urls": {
"Bug Tracker": "https://github.com/gw-tools/aioqueueext/issues",
"Homepage": "https://github.com/gw-tools/aioqueueext"
},
"split_keywords": [
"asyncio",
"queue"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "f6424ab0f3050a2b7799779c278dde3b56d6e98f4e360f8b2572fba65268a4d2",
"md5": "c88a807d29aa7b2d8aa773e156155dda",
"sha256": "20ef95a2951482df38b742876c0ef1c769ec071d7aee1c5fbfbcc28456d28e2c"
},
"downloads": -1,
"filename": "aioqueueext-0.2.1-py3-none-any.whl",
"has_sig": false,
"md5_digest": "c88a807d29aa7b2d8aa773e156155dda",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "<4,>=3.7",
"size": 6550,
"upload_time": "2023-08-24T06:16:40",
"upload_time_iso_8601": "2023-08-24T06:16:40.659695Z",
"url": "https://files.pythonhosted.org/packages/f6/42/4ab0f3050a2b7799779c278dde3b56d6e98f4e360f8b2572fba65268a4d2/aioqueueext-0.2.1-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "275f7fcafafcfb3528a5265c3d72560b1b9fc6d411df0d18286ce76eab44c148",
"md5": "37a1ce1e8064274f8ae8f6de21bb9c34",
"sha256": "ce56ed86e88219437ed7cc072de5f0c3758c1a0e3024423e2add4bdd0834786e"
},
"downloads": -1,
"filename": "aioqueueext-0.2.1.tar.gz",
"has_sig": false,
"md5_digest": "37a1ce1e8064274f8ae8f6de21bb9c34",
"packagetype": "sdist",
"python_version": "source",
"requires_python": "<4,>=3.7",
"size": 6348,
"upload_time": "2023-08-24T06:16:42",
"upload_time_iso_8601": "2023-08-24T06:16:42.534564Z",
"url": "https://files.pythonhosted.org/packages/27/5f/7fcafafcfb3528a5265c3d72560b1b9fc6d411df0d18286ce76eab44c148/aioqueueext-0.2.1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2023-08-24 06:16:42",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "gw-tools",
"github_project": "aioqueueext",
"travis_ci": false,
"coveralls": false,
"github_actions": false,
"lcname": "aioqueueext"
}