Name | queuelink JSON |
Version |
2.0.3
JSON |
| download |
home_page | None |
Summary | Connect and link one or more Queues together or to files |
upload_time | 2025-07-20 01:29:04 |
maintainer | None |
docs_url | None |
author | None |
requires_python | None |
license | MIT License
Copyright (c) 2023 Andy Robb
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE. |
keywords |
utility
|
VCS |
 |
bugtrack_url |
|
requirements |
importlib_metadata
kitchen
processrunner-kitchenpatch
|
Travis-CI |
No Travis.
|
coveralls test coverage |
No coveralls.
|
---------
QueueLink
---------
The QueueLink library simplifies several queue patterns including linking queues together with one-to-many or many-to-one relationships. "Adapters" support reading and writing text-based files.
Documentation: https://queuelink.readthedocs.io/en/latest/
Use
===
A QueueLink is a one-way process that connects queues together. When two or more queues are linked, a sub-process is started to read from the "source" queue and write into the "destination" queue.
Circular references are not allowed.
Users create each queue from the Queue or Multiprocessing libraries. Those queues can then be added to a QueueLink instance as either the source or destination.
With standard queues
--------------------
.. code-block:: python
from queue import Queue
from queuelink import QueueLink
# Source and destination queues
source_q = Queue()
dest_q = Queue()
# Create the QueueLink
queue_link = QueueLink(name="my link")
# Connect queues to the QueueLink
source_id = queue_link.read(q=source_q)
dest_id = queue_link.write(q=dest_q)
# Text to send
text_in = "a😂" * 10
# Add text to the source queue
source_q.put(text_in)
# Retrieve the text from the destination queue!
text_out = dest_q.get()
print(text_out)
With a process manager
----------------------
.. code-block:: python
from multiprocessing import Manager
from queuelink import QueueLink
# Create the multiprocessing.Manager
manager = Manager()
# Source and destination queues
source_q = manager.JoinableQueue()
dest_q = manager.JoinableQueue()
# Create the QueueLink
queue_link = QueueLink(name="my link")
# Connect queues to the QueueLink
source_id = queue_link.read(q=source_q)
dest_id = queue_link.write(q=dest_q)
# Text to send
text_in = "a😂" * 10
# Add text to the source queue
source_q.put(text_in)
# Retrieve the text from the destination queue!
text_out = dest_q.get()
print(text_out)
Methods
=======
Primary methods
---------------------
These methods are used most common use cases.
* ``register_queue(q: UNION_SUPPORTED_QUEUES, direction: str, start_method: str=None) -> client id: str``
* ``stop``
Secondary methods
-----------------
These methods are less common.
* ``destructive_audit(direction: str)``
* ``get_queue(queue_id: [str, int])``
* ``is_alive``
* ``is_drained``
* ``is_empty(queue_id:str =None)``
* ``unregister_queue(queue_id: str, direction: str, start_method: str=None)``
Queue Compatibility
===================
QueueLink is tested against multiple native Queue implementations. When a source or destination queue is thread-based, the link will be created as a Thread instance. When all involved queues are process-based, the link will also be a Process instance.
Note that in thread-based situations throughput might be limited by the `Python GIL <https://wiki.python.org/moin/GlobalInterpreterLock>`_.
Two thread-based queues in different processes cannot be bridged directly. They would require an intermediate multiprocessing queue that can be accessed across processes.
Tested against the following queue implementations:
* SyncManager.Queue (multiprocessing.Manager)
* SyncManager.JoinableQueue (multiprocessing.Manager)
* multiprocessing.Queue
* multiprocessing.JoinableQueue
* multiprocessing.SimpleQueue
* queue.Queue
* queue.LifoQueue
* queue.PriorityQueue
* queue.SimpleQueue
Implementation
==============
QueueLink creates a new thread or process for each source queue, regardless of the number of downstream queues. The linking thread/process gets each element of the source queue and iterates over and puts to the set of destination queues.
Multiprocessing
---------------
Start Method: QueueLink is tested against fork, forkserver, and spawn start methods. It defaults to the system preference, but can be overridden by passing the preferred start method name to the class "start_method" parameter.
Linking with other channels
===========================
QueueLink includes two "adapters" to link queues with inbound and outbound connections.
Inbound Connections
-------------------
To quickly link a pipe or handle with a queue, use ``QueueHandleAdapterReader``. The Reader Adapter is tested against Multiprocessing Connections and Subprocess pipes. It calls ``flush`` and ``readline`` to consume from handles, so it should work against any object implementing those methods, with ``readline`` returning a string or byte array. For Multiprocessing Connections, the adapter injects a no-op ``flush`` method and a custom ``readline`` method.
::
# Text to send
text_in = "a😂" * 10
# Destination queue
dest_q = multiprocessing.Queue() # Process-based
# Subprocess, simple example sending some text to stdout
# from subprocess import Popen, PIPE
proc = Popen(['echo', '-n', text_in], # -n prevents echo from adding a newline character
stdout=PIPE,
universal_newlines=True,
close_fds=True)
# Connect the reader
# from queuelink import QueueHandleAdapterReader
read_adapter = QueueHandleAdapterReader(queue=dest_q,
handle=proc.stdout)
# Get the text from the queue
text_out = dest_q.get()
print(text_out)
Other Notes
===========
Tuning link_timeout
-------------------
Under heavily loaded conditions the "publisher" process/thread can thrash when trying to retrieve records from the source queue. Tuning link_timeout higher (default 0.1 seconds) can improve responsiveness. Higher values might be less responsive to stop requests and throw warnings during shutdown.
Raw data
{
"_id": null,
"home_page": null,
"name": "queuelink",
"maintainer": null,
"docs_url": null,
"requires_python": null,
"maintainer_email": null,
"keywords": "utility",
"author": null,
"author_email": "Andy Robb <andy@andyrobb.com>",
"download_url": "https://files.pythonhosted.org/packages/71/1f/1a7879272b1e1bcb6a8560868131e2fc6f0eda140cc429c5f890c5616f23/queuelink-2.0.3.tar.gz",
"platform": null,
"description": "---------\nQueueLink\n---------\nThe QueueLink library simplifies several queue patterns including linking queues together with one-to-many or many-to-one relationships. \"Adapters\" support reading and writing text-based files.\n\nDocumentation: https://queuelink.readthedocs.io/en/latest/\n\nUse\n===\nA QueueLink is a one-way process that connects queues together. When two or more queues are linked, a sub-process is started to read from the \"source\" queue and write into the \"destination\" queue.\n\nCircular references are not allowed.\n\nUsers create each queue from the Queue or Multiprocessing libraries. Those queues can then be added to a QueueLink instance as either the source or destination.\n\nWith standard queues\n--------------------\n\n.. code-block:: python\n\n from queue import Queue\n from queuelink import QueueLink\n\n # Source and destination queues\n source_q = Queue()\n dest_q = Queue()\n\n # Create the QueueLink\n queue_link = QueueLink(name=\"my link\")\n\n # Connect queues to the QueueLink\n source_id = queue_link.read(q=source_q)\n dest_id = queue_link.write(q=dest_q)\n\n # Text to send\n text_in = \"a\ud83d\ude02\" * 10\n\n # Add text to the source queue\n source_q.put(text_in)\n\n # Retrieve the text from the destination queue!\n text_out = dest_q.get()\n print(text_out)\n\n\nWith a process manager\n----------------------\n\n.. code-block:: python\n\n from multiprocessing import Manager\n from queuelink import QueueLink\n\n # Create the multiprocessing.Manager\n manager = Manager()\n\n # Source and destination queues\n source_q = manager.JoinableQueue()\n dest_q = manager.JoinableQueue()\n\n # Create the QueueLink\n queue_link = QueueLink(name=\"my link\")\n\n # Connect queues to the QueueLink\n source_id = queue_link.read(q=source_q)\n dest_id = queue_link.write(q=dest_q)\n\n # Text to send\n text_in = \"a\ud83d\ude02\" * 10\n\n # Add text to the source queue\n source_q.put(text_in)\n\n # Retrieve the text from the destination queue!\n text_out = dest_q.get()\n print(text_out)\n\nMethods\n=======\n\nPrimary methods\n---------------------\nThese methods are used most common use cases.\n\n* ``register_queue(q: UNION_SUPPORTED_QUEUES, direction: str, start_method: str=None) -> client id: str``\n* ``stop``\n\nSecondary methods\n-----------------\nThese methods are less common.\n\n* ``destructive_audit(direction: str)``\n* ``get_queue(queue_id: [str, int])``\n* ``is_alive``\n* ``is_drained``\n* ``is_empty(queue_id:str =None)``\n* ``unregister_queue(queue_id: str, direction: str, start_method: str=None)``\n\nQueue Compatibility\n===================\nQueueLink is tested against multiple native Queue implementations. When a source or destination queue is thread-based, the link will be created as a Thread instance. When all involved queues are process-based, the link will also be a Process instance.\n\nNote that in thread-based situations throughput might be limited by the `Python GIL <https://wiki.python.org/moin/GlobalInterpreterLock>`_.\n\nTwo thread-based queues in different processes cannot be bridged directly. They would require an intermediate multiprocessing queue that can be accessed across processes.\n\nTested against the following queue implementations:\n\n* SyncManager.Queue (multiprocessing.Manager)\n* SyncManager.JoinableQueue (multiprocessing.Manager)\n* multiprocessing.Queue\n* multiprocessing.JoinableQueue\n* multiprocessing.SimpleQueue\n* queue.Queue\n* queue.LifoQueue\n* queue.PriorityQueue\n* queue.SimpleQueue\n\nImplementation\n==============\nQueueLink creates a new thread or process for each source queue, regardless of the number of downstream queues. The linking thread/process gets each element of the source queue and iterates over and puts to the set of destination queues.\n\nMultiprocessing\n---------------\nStart Method: QueueLink is tested against fork, forkserver, and spawn start methods. It defaults to the system preference, but can be overridden by passing the preferred start method name to the class \"start_method\" parameter.\n\nLinking with other channels\n===========================\nQueueLink includes two \"adapters\" to link queues with inbound and outbound connections.\n\nInbound Connections\n-------------------\nTo quickly link a pipe or handle with a queue, use ``QueueHandleAdapterReader``. The Reader Adapter is tested against Multiprocessing Connections and Subprocess pipes. It calls ``flush`` and ``readline`` to consume from handles, so it should work against any object implementing those methods, with ``readline`` returning a string or byte array. For Multiprocessing Connections, the adapter injects a no-op ``flush`` method and a custom ``readline`` method.\n\n::\n\n # Text to send\n text_in = \"a\ud83d\ude02\" * 10\n\n # Destination queue\n dest_q = multiprocessing.Queue() # Process-based\n\n # Subprocess, simple example sending some text to stdout\n # from subprocess import Popen, PIPE\n proc = Popen(['echo', '-n', text_in], # -n prevents echo from adding a newline character\n stdout=PIPE,\n universal_newlines=True,\n close_fds=True)\n\n # Connect the reader\n # from queuelink import QueueHandleAdapterReader\n read_adapter = QueueHandleAdapterReader(queue=dest_q,\n handle=proc.stdout)\n\n # Get the text from the queue\n text_out = dest_q.get()\n print(text_out)\n\nOther Notes\n===========\n\nTuning link_timeout\n-------------------\nUnder heavily loaded conditions the \"publisher\" process/thread can thrash when trying to retrieve records from the source queue. Tuning link_timeout higher (default 0.1 seconds) can improve responsiveness. Higher values might be less responsive to stop requests and throw warnings during shutdown.\n",
"bugtrack_url": null,
"license": "MIT License\n \n Copyright (c) 2023 Andy Robb\n \n Permission is hereby granted, free of charge, to any person obtaining a copy\n of this software and associated documentation files (the \"Software\"), to deal\n in the Software without restriction, including without limitation the rights\n to use, copy, modify, merge, publish, distribute, sublicense, and/or sell\n copies of the Software, and to permit persons to whom the Software is\n furnished to do so, subject to the following conditions:\n \n The above copyright notice and this permission notice shall be included in all\n copies or substantial portions of the Software.\n \n THE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\n IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,\n FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE\n AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER\n LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,\n OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE\n SOFTWARE.",
"summary": "Connect and link one or more Queues together or to files",
"version": "2.0.3",
"project_urls": {
"Documentation": "https://queuelink.readthedocs.io/en/latest/",
"Homepage": "https://github.com/arobb/python-queuelink",
"Repository": "https://github.com/arobb/python-queuelink"
},
"split_keywords": [
"utility"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "711f1a7879272b1e1bcb6a8560868131e2fc6f0eda140cc429c5f890c5616f23",
"md5": "b829efd765d5eee4ef3ff03422ded3f1",
"sha256": "d08f019f6d1a8d83935b5e334510f4e724cb9e4901d05463d97e55169abb632d"
},
"downloads": -1,
"filename": "queuelink-2.0.3.tar.gz",
"has_sig": false,
"md5_digest": "b829efd765d5eee4ef3ff03422ded3f1",
"packagetype": "sdist",
"python_version": "source",
"requires_python": null,
"size": 39453,
"upload_time": "2025-07-20T01:29:04",
"upload_time_iso_8601": "2025-07-20T01:29:04.670957Z",
"url": "https://files.pythonhosted.org/packages/71/1f/1a7879272b1e1bcb6a8560868131e2fc6f0eda140cc429c5f890c5616f23/queuelink-2.0.3.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-07-20 01:29:04",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "arobb",
"github_project": "python-queuelink",
"travis_ci": false,
"coveralls": false,
"github_actions": false,
"requirements": [
{
"name": "importlib_metadata",
"specs": [
[
">=",
"8.7.0"
]
]
},
{
"name": "kitchen",
"specs": [
[
">=",
"1.2.6"
]
]
},
{
"name": "processrunner-kitchenpatch",
"specs": [
[
">=",
"1.0.7"
]
]
}
],
"lcname": "queuelink"
}