persist-queue - A thread-safe, disk-based queue for Python
==========================================================
.. image:: https://img.shields.io/circleci/project/github/peter-wangxu/persist-queue/master.svg?label=Linux%20%26%20Mac
:target: https://circleci.com/gh/peter-wangxu/persist-queue
.. image:: https://img.shields.io/appveyor/ci/peter-wangxu/persist-queue/master.svg?label=Windows
:target: https://ci.appveyor.com/project/peter-wangxu/persist-queue
.. image:: https://img.shields.io/codecov/c/github/peter-wangxu/persist-queue/master.svg
:target: https://codecov.io/gh/peter-wangxu/persist-queue
.. image:: https://img.shields.io/pypi/v/persist-queue.svg
:target: https://pypi.python.org/pypi/persist-queue
``persist-queue`` implements a file-based queue and a serial of sqlite3-based queues. The goals is to achieve following requirements:
* Disk-based: each queued item should be stored in disk in case of any crash.
* Thread-safe: can be used by multi-threaded producers and multi-threaded consumers.
* Recoverable: Items can be read after process restart.
* Green-compatible: can be used in ``greenlet`` or ``eventlet`` environment.
While *queuelib* and *python-pqueue* cannot fulfil all of above. After some try, I found it's hard to achieve based on their current
implementation without huge code change. this is the motivation to start this project.
By default, *persist-queue* use *pickle* object serialization module to support object instances.
Most built-in type, like `int`, `dict`, `list` are able to be persisted by `persist-queue` directly, to support customized objects,
please refer to `Pickling and unpickling extension types(Python2) <https://docs.python.org/2/library/pickle.html#pickling-and-unpickling-normal-class-instances>`_
and `Pickling Class Instances(Python3) <https://docs.python.org/3/library/pickle.html#pickling-class-instances>`_
This project is based on the achievements of `python-pqueue <https://github.com/balena/python-pqueue>`_
and `queuelib <https://github.com/scrapy/queuelib>`_
Slack channels
^^^^^^^^^^^^^^
Join `persist-queue <https://join.slack
.com/t/persist-queue/shared_invite
/enQtOTM0MDgzNTQ0MDg3LTNmN2IzYjQ1MDc0MDYzMjI4OGJmNmVkNWE3ZDBjYzg5MDc0OWUzZDJkYTkwODdkZmYwODdjNjUzMTk3MWExNDE>`_ channel
Requirements
------------
* Python 2.7 or Python 3.x (refer to `Deprecation`_ for future plan)
* Full support for Linux.
* Windows support (with `Caution`_ if ``persistqueue.Queue`` is used).
Features
--------
- Multiple platforms support: Linux, macOS, Windows
- Pure python
- Both filed based queues and sqlite3 based queues are supported
- Filed based queue: multiple serialization protocol support: pickle(default), msgpack, cbor, json
Deprecation
-----------
- `Python 3.4 release has reached end of life <https://www.python.org/downloads/release/python-3410/>`_ and
`DBUtils <https://webwareforpython.github.io/DBUtils/changelog.html>`_ ceased support for `Python 3.4`, `persist queue` drops the support for python 3.4 since version 0.8.0.
other queue implementations such as file based queue and sqlite3 based queue are still workable.
- `Python 2 was sunset on January 1, 2020 <https://www.python.org/doc/sunset-python-2/>`_, `persist-queue` will drop any Python 2 support in future version `1.0.0`, no new feature will be developed under Python 2.
Installation
------------
from pypi
^^^^^^^^^
.. code-block:: console
pip install persist-queue
# for msgpack, cbor and mysql support, use following command
pip install persist-queue[extra]
from source code
^^^^^^^^^^^^^^^^
.. code-block:: console
git clone https://github.com/peter-wangxu/persist-queue
cd persist-queue
# for msgpack and cbor support, run 'pip install -r extra-requirements.txt' first
python setup.py install
Benchmark
---------
Here are the time spent(in seconds) for writing/reading **1000** items to the
disk comparing the sqlite3 and file queue.
- Windows
- OS: Windows 10
- Disk: SATA3 SSD
- RAM: 16 GiB
+---------------+---------+-------------------------+----------------------------+
| | Write | Write/Read(1 task_done) | Write/Read(many task_done) |
+---------------+---------+-------------------------+----------------------------+
| SQLite3 Queue | 1.8880 | 2.0290 | 3.5940 |
+---------------+---------+-------------------------+----------------------------+
| File Queue | 4.9520 | 5.0560 | 8.4900 |
+---------------+---------+-------------------------+----------------------------+
**windows note**
Performance of Windows File Queue has dramatic improvement since `v0.4.1` due to the
atomic renaming support(3-4X faster)
- Linux
- OS: Ubuntu 16.04 (VM)
- Disk: SATA3 SSD
- RAM: 4 GiB
+---------------+--------+-------------------------+----------------------------+
| | Write | Write/Read(1 task_done) | Write/Read(many task_done) |
+---------------+--------+-------------------------+----------------------------+
| SQLite3 Queue | 1.8282 | 1.8075 | 2.8639 |
+---------------+--------+-------------------------+----------------------------+
| File Queue | 0.9123 | 1.0411 | 2.5104 |
+---------------+--------+-------------------------+----------------------------+
- Mac OS
- OS: 10.14 (macOS Mojave)
- Disk: PCIe SSD
- RAM: 16 GiB
+---------------+--------+-------------------------+----------------------------+
| | Write | Write/Read(1 task_done) | Write/Read(many task_done) |
+---------------+--------+-------------------------+----------------------------+
| SQLite3 Queue | 0.1879 | 0.2115 | 0.3147 |
+---------------+--------+-------------------------+----------------------------+
| File Queue | 0.5158 | 0.5357 | 1.0446 |
+---------------+--------+-------------------------+----------------------------+
**note**
- The value above is in seconds for reading/writing *1000* items, the less
the better
- Above result was got from:
.. code-block:: console
python benchmark/run_benchmark.py 1000
To see the real performance on your host, run the script under ``benchmark/run_benchmark.py``:
.. code-block:: console
python benchmark/run_benchmark.py <COUNT, default to 100>
Examples
--------
Example usage with a SQLite3 based queue
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. code-block:: python
>>> import persistqueue
>>> q = persistqueue.SQLiteQueue('mypath', auto_commit=True)
>>> q.put('str1')
>>> q.put('str2')
>>> q.put('str3')
>>> q.get()
'str1'
>>> del q
Close the console, and then recreate the queue:
.. code-block:: python
>>> import persistqueue
>>> q = persistqueue.SQLiteQueue('mypath', auto_commit=True)
>>> q.get()
'str2'
>>>
New functions:
*Available since v0.8.0*
- ``shrink_disk_usage`` perform a ``VACUUM`` against the sqlite, and rebuild the database file, this usually takes long time and frees a lot of disk space after ``get()``
Example usage of SQLite3 based ``UniqueQ``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
This queue does not allow duplicate items.
.. code-block:: python
>>> import persistqueue
>>> q = persistqueue.UniqueQ('mypath')
>>> q.put('str1')
>>> q.put('str1')
>>> q.size
1
>>> q.put('str2')
>>> q.size
2
>>>
Example usage of SQLite3 based ``SQLiteAckQueue``/``UniqueAckQ``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The core functions:
- ``put``: add item to the queue. Returns ``id``
- ``get``: get item from queue and mark as unack. Returns ``item``, Optional paramaters (``block``, ``timeout``, ``id``, ``next_in_order``, ``raw``)
- ``update``: update an item. Returns ``id``, Paramaters (``item``), Optional parameter if item not in raw format (``id``)
- ``ack``: mark item as acked. Returns ``id``, Parameters (``item`` or ``id``)
- ``nack``: there might be something wrong with current consumer, so mark item as ready and new consumer will get it. Returns ``id``, Parameters (``item`` or ``id``)
- ``ack_failed``: there might be something wrong during process, so just mark item as failed. Returns ``id``, Parameters (``item`` or ``id``)
- ``clear_acked_data``: perform a sql delete agaist sqlite. It removes 1000 items, while keeping 1000 of the most recent, whose status is ``AckStatus.acked`` (note: this does not shrink the file size on disk) Optional paramters (``max_delete``, ``keep_latest``, ``clear_ack_failed``)
- ``shrink_disk_usage`` perform a ``VACUUM`` against the sqlite, and rebuild the database file, this usually takes long time and frees a lot of disk space after ``clear_acked_data``
- ``queue``: returns the database contents as a Python List[Dict]
- ``active_size``: The active size changes when an item is added (put) and completed (ack/ack_failed) unlike ``qsize`` which changes when an item is pulled (get) or returned (nack).
.. code-block:: python
>>> import persistqueue
>>> ackq = persistqueue.SQLiteAckQueue('path')
>>> ackq.put('str1')
>>> item = ackq.get()
>>> # Do something with the item
>>> ackq.ack(item) # If done with the item
>>> ackq.nack(item) # Else mark item as `nack` so that it can be proceeded again by any worker
>>> ackq.ack_failed(item) # Or else mark item as `ack_failed` to discard this item
Parameters:
- ``clear_acked_data``
- ``max_delete`` (defaults to 1000): This is the LIMIT. How many items to delete.
- ``keep_latest`` (defaults to 1000): This is the OFFSET. How many recent items to keep.
- ``clear_ack_failed`` (defaults to False): Clears the ``AckStatus.ack_failed`` in addition to the ``AckStatus.ack``.
- ``get``
- ``raw`` (defaults to False): Returns the metadata along with the record, which includes the id (``pqid``) and timestamp. On the SQLiteAckQueue, the raw results can be ack, nack, ack_failed similar to the normal return.
- ``id`` (defaults to None): Accepts an `id` or a raw item containing ``pqid``. Will select the item based on the row id.
- ``next_in_order`` (defaults to False): Requires the ``id`` attribute. This option tells the SQLiteAckQueue/UniqueAckQ to get the next item based on ``id``, not the first available. This allows the user to get, nack, get, nack and progress down the queue, instead of continuing to get the same nack'd item over again.
``raw`` example:
.. code-block:: python
>>> q.put('val1')
>>> d = q.get(raw=True)
>>> print(d)
>>> {'pqid': 1, 'data': 'val1', 'timestamp': 1616719225.012912}
>>> q.ack(d)
``next_in_order`` example:
.. code-block:: python
>>> q.put("val1")
>>> q.put("val2")
>>> q.put("val3")
>>> item = q.get()
>>> id = q.nack(item)
>>> item = q.get(id=id, next_in_order=True)
>>> print(item)
>>> val2
Note:
1. The SQLiteAckQueue always uses "auto_commit=True".
2. The Queue could be set in non-block style, e.g. "SQLiteAckQueue.get(block=False, timeout=5)".
3. ``UniqueAckQ`` only allows for unique items
Example usage with a file based queue
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Parameters:
- ``path``: specifies the directory wher enqueued data persisted.
- ``maxsize``: indicates the maximum size stored in the queue, if maxsize<=0 the queue is unlimited.
- ``chunksize``: indicates how many entries should exist in each chunk file on disk. When a all entries in a chunk file was dequeued by get(), the file would be removed from filesystem.
- ``tempdir``: indicates where temporary files should be stored. The tempdir has to be located on the same disk as the enqueued data in order to obtain atomic operations.
- ``serializer``: controls how enqueued data is serialized.
- ``auto_save``: `True` or `False`. By default, the change is only persisted when task_done() is called. If autosave is enabled, info data is persisted immediately when get() is called. Adding data to the queue with put() will always persist immediately regardless of this setting.
.. code-block:: python
>>> from persistqueue import Queue
>>> q = Queue("mypath")
>>> q.put('a')
>>> q.put('b')
>>> q.put('c')
>>> q.get()
'a'
>>> q.task_done()
Close the python console, and then we restart the queue from the same path,
.. code-block:: python
>>> from persistqueue import Queue
>>> q = Queue('mypath')
>>> q.get()
'b'
>>> q.task_done()
Example usage with an auto-saving file based queue
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
*Available since: v0.5.0*
By default, items added to the queue are persisted during the ``put()`` call,
and items removed from a queue are only persisted when ``task_done()`` is
called.
.. code-block:: python
>>> from persistqueue import Queue
>>> q = Queue("mypath")
>>> q.put('a')
>>> q.put('b')
>>> q.get()
'a'
>>> q.get()
'b'
After exiting and restarting the queue from the same path, we see the items
remain in the queue, because ``task_done()`` wasn't called before.
.. code-block:: python
>>> from persistqueue import Queue
>>> q = Queue('mypath')
>>> q.get()
'a'
>>> q.get()
'b'
This can be advantageous. For example, if your program crashes before finishing
processing an item, it will remain in the queue after restarting. You can also
spread out the ``task_done()`` calls for performance reasons to avoid lots of
individual writes.
Using ``autosave=True`` on a file based queue will automatically save on every
call to ``get()``. Calling ``task_done()`` is not necessary, but may still be
used to ``join()`` against the queue.
.. code-block:: python
>>> from persistqueue import Queue
>>> q = Queue("mypath", autosave=True)
>>> q.put('a')
>>> q.put('b')
>>> q.get()
'a'
After exiting and restarting the queue from the same path, only the second item
remains:
.. code-block:: python
>>> from persistqueue import Queue
>>> q = Queue('mypath', autosave=True)
>>> q.get()
'b'
Example usage with a SQLite3 based dict
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. code-block:: python
>>> from persisitqueue import PDict
>>> q = PDict("testpath", "testname")
>>> q['key1'] = 123
>>> q['key2'] = 321
>>> q['key1']
123
>>> len(q)
2
>>> del q['key1']
>>> q['key1']
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "persistqueue\pdict.py", line 58, in __getitem__
raise KeyError('Key: {} not exists.'.format(item))
KeyError: 'Key: key1 not exists.'
Close the console and restart the PDict
.. code-block:: python
>>> from persisitqueue import PDict
>>> q = PDict("testpath", "testname")
>>> q['key2']
321
Multi-thread usage for **SQLite3** based queue
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. code-block:: python
from persistqueue import FIFOSQLiteQueue
q = FIFOSQLiteQueue(path="./test", multithreading=True)
def worker():
while True:
item = q.get()
do_work(item)
for i in range(num_worker_threads):
t = Thread(target=worker)
t.daemon = True
t.start()
for item in source():
q.put(item)
multi-thread usage for **Queue**
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. code-block:: python
from persistqueue import Queue
q = Queue()
def worker():
while True:
item = q.get()
do_work(item)
q.task_done()
for i in range(num_worker_threads):
t = Thread(target=worker)
t.daemon = True
t.start()
for item in source():
q.put(item)
q.join() # block until all tasks are done
Example usage with a MySQL based queue
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
*Available since: v0.8.0*
.. code-block:: python
>>> import persistqueue
>>> db_conf = {
>>> "host": "127.0.0.1",
>>> "user": "user",
>>> "passwd": "passw0rd",
>>> "db_name": "testqueue",
>>> # "name": "",
>>> "port": 3306
>>> }
>>> q = persistqueue.MySQLQueue(name="testtable", **db_conf)
>>> q.put('str1')
>>> q.put('str2')
>>> q.put('str3')
>>> q.get()
'str1'
>>> del q
Close the console, and then recreate the queue:
.. code-block:: python
>>> import persistqueue
>>> q = persistqueue.MySQLQueue(name="testtable", **db_conf)
>>> q.get()
'str2'
>>>
**note**
Due to the limitation of file queue described in issue `#89 <https://github.com/peter-wangxu/persist-queue/issues/89>`_,
`task_done` in one thread may acknowledge items in other threads which should not be. Considering the `SQLiteAckQueue` if you have such requirement.
Serialization via msgpack/cbor/json
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- v0.4.1: Currently only available for file based Queue
- v0.4.2: Also available for SQLite3 based Queues
.. code-block:: python
>>> from persistqueue
>>> q = persistqueue.Queue('mypath', serializer=persistqueue.serializers.msgpack)
>>> # via cbor2
>>> # q = persistqueue.Queue('mypath', serializer=persistqueue.serializers.cbor2)
>>> # via json
>>> # q = Queue('mypath', serializer=persistqueue.serializers.json)
>>> q.get()
'b'
>>> q.task_done()
Explicit resource reclaim
^^^^^^^^^^^^^^^^^^^^^^^^^
For some reasons, an application may require explicit reclamation for file
handles or sql connections before end of execution. In these cases, user can
simply call:
.. code-block:: python
q = Queue() # or q = persistqueue.SQLiteQueue('mypath', auto_commit=True)
del q
to reclaim related file handles or sql connections.
Tips
----
``task_done`` is required both for file based queue and SQLite3 based queue (when ``auto_commit=False``)
to persist the cursor of next ``get`` to the disk.
Performance impact
------------------
- **WAL**
Starting on v0.3.2, the ``persistqueue`` is leveraging the sqlite3 builtin feature
`WAL <https://www.sqlite.org/wal.html>`_ which can improve the performance
significantly, a general testing indicates that ``persistqueue`` is 2-4 times
faster than previous version.
- **auto_commit=False**
Since persistqueue v0.3.0, a new parameter ``auto_commit`` is introduced to tweak
the performance for sqlite3 based queues as needed. When specify ``auto_commit=False``, user
needs to perform ``queue.task_done()`` to persist the changes made to the disk since
last ``task_done`` invocation.
- **pickle protocol selection**
From v0.3.6, the ``persistqueue`` will select ``Protocol version 2`` for python2 and ``Protocol version 4`` for python3
respectively. This selection only happens when the directory is not present when initializing the queue.
Tests
-----
*persist-queue* use ``tox`` to trigger tests.
- Unit test
.. code-block:: console
tox -e <PYTHON_VERSION>
Available ``<PYTHON_VERSION>``: ``py27``, ``py34``, ``py35``, ``py36``, ``py37``
- PEP8 check
.. code-block:: console
tox -e pep8
`pyenv <https://github.com/pyenv/pyenv>`_ is usually a helpful tool to manage multiple versions of Python.
Caution
-------
Currently, the atomic operation is supported on Windows while still in experimental,
That's saying, the data in ``persistqueue.Queue`` could be in unreadable state when an incidental failure occurs during ``Queue.task_done``.
**DO NOT put any critical data on persistqueue.queue on Windows**.
Contribution
------------
Simply fork this repo and send PR for your code change(also tests to cover your change), remember to give a title and description of your PR. I am willing to
enhance this project with you :).
License
-------
`BSD <LICENSE>`_
Contributors
------------
`Contributors <https://github.com/peter-wangxu/persist-queue/graphs/contributors>`_
FAQ
---
* ``sqlite3.OperationalError: database is locked`` is raised.
persistqueue open 2 connections for the db if ``multithreading=True``, the
SQLite database is locked until that transaction is committed. The ``timeout``
parameter specifies how long the connection should wait for the lock to go away
until raising an exception. Default time is **10**, increase ``timeout``
when creating the queue if above error occurs.
* sqlite3 based queues are not thread-safe.
The sqlite3 queues are heavily tested under multi-threading environment, if you find it's not thread-safe, please
make sure you set the ``multithreading=True`` when initializing the queue before submitting new issue:).
Raw data
{
"_id": null,
"home_page": "http://github.com/peter-wangxu/persist-queue",
"name": "persist-queue",
"maintainer": "Peter Wang",
"docs_url": null,
"requires_python": "",
"maintainer_email": "wangxu198709@gmail.com",
"keywords": "",
"author": "Peter Wang",
"author_email": "wangxu198709@gmail.com",
"download_url": "https://files.pythonhosted.org/packages/06/6f/78ff1a951f02cfcb5c0e17150428bf102eb984e003f1ced9c9164024a2e2/persist-queue-0.8.1.tar.gz",
"platform": "all",
"description": "persist-queue - A thread-safe, disk-based queue for Python\r\n==========================================================\r\n\r\n.. image:: https://img.shields.io/circleci/project/github/peter-wangxu/persist-queue/master.svg?label=Linux%20%26%20Mac\r\n :target: https://circleci.com/gh/peter-wangxu/persist-queue\r\n\r\n.. image:: https://img.shields.io/appveyor/ci/peter-wangxu/persist-queue/master.svg?label=Windows\r\n :target: https://ci.appveyor.com/project/peter-wangxu/persist-queue\r\n\r\n.. image:: https://img.shields.io/codecov/c/github/peter-wangxu/persist-queue/master.svg\r\n :target: https://codecov.io/gh/peter-wangxu/persist-queue\r\n\r\n.. image:: https://img.shields.io/pypi/v/persist-queue.svg\r\n :target: https://pypi.python.org/pypi/persist-queue\r\n\r\n``persist-queue`` implements a file-based queue and a serial of sqlite3-based queues. The goals is to achieve following requirements:\r\n\r\n* Disk-based: each queued item should be stored in disk in case of any crash.\r\n* Thread-safe: can be used by multi-threaded producers and multi-threaded consumers.\r\n* Recoverable: Items can be read after process restart.\r\n* Green-compatible: can be used in ``greenlet`` or ``eventlet`` environment.\r\n\r\nWhile *queuelib* and *python-pqueue* cannot fulfil all of above. After some try, I found it's hard to achieve based on their current\r\nimplementation without huge code change. this is the motivation to start this project.\r\n\r\nBy default, *persist-queue* use *pickle* object serialization module to support object instances.\r\nMost built-in type, like `int`, `dict`, `list` are able to be persisted by `persist-queue` directly, to support customized objects,\r\nplease refer to `Pickling and unpickling extension types(Python2) <https://docs.python.org/2/library/pickle.html#pickling-and-unpickling-normal-class-instances>`_\r\nand `Pickling Class Instances(Python3) <https://docs.python.org/3/library/pickle.html#pickling-class-instances>`_\r\n\r\nThis project is based on the achievements of `python-pqueue <https://github.com/balena/python-pqueue>`_\r\nand `queuelib <https://github.com/scrapy/queuelib>`_\r\n\r\nSlack channels\r\n^^^^^^^^^^^^^^\r\n\r\nJoin `persist-queue <https://join.slack\r\n.com/t/persist-queue/shared_invite\r\n/enQtOTM0MDgzNTQ0MDg3LTNmN2IzYjQ1MDc0MDYzMjI4OGJmNmVkNWE3ZDBjYzg5MDc0OWUzZDJkYTkwODdkZmYwODdjNjUzMTk3MWExNDE>`_ channel\r\n\r\n\r\nRequirements\r\n------------\r\n* Python 2.7 or Python 3.x (refer to `Deprecation`_ for future plan)\r\n* Full support for Linux.\r\n* Windows support (with `Caution`_ if ``persistqueue.Queue`` is used).\r\n\r\nFeatures\r\n--------\r\n\r\n- Multiple platforms support: Linux, macOS, Windows\r\n- Pure python\r\n- Both filed based queues and sqlite3 based queues are supported\r\n- Filed based queue: multiple serialization protocol support: pickle(default), msgpack, cbor, json\r\n\r\nDeprecation\r\n-----------\r\n- `Python 3.4 release has reached end of life <https://www.python.org/downloads/release/python-3410/>`_ and\r\n `DBUtils <https://webwareforpython.github.io/DBUtils/changelog.html>`_ ceased support for `Python 3.4`, `persist queue` drops the support for python 3.4 since version 0.8.0.\r\n other queue implementations such as file based queue and sqlite3 based queue are still workable.\r\n- `Python 2 was sunset on January 1, 2020 <https://www.python.org/doc/sunset-python-2/>`_, `persist-queue` will drop any Python 2 support in future version `1.0.0`, no new feature will be developed under Python 2.\r\n\r\nInstallation\r\n------------\r\n\r\nfrom pypi\r\n^^^^^^^^^\r\n\r\n.. code-block:: console\r\n\r\n pip install persist-queue\r\n # for msgpack, cbor and mysql support, use following command\r\n pip install persist-queue[extra]\r\n\r\n\r\nfrom source code\r\n^^^^^^^^^^^^^^^^\r\n\r\n.. code-block:: console\r\n\r\n git clone https://github.com/peter-wangxu/persist-queue\r\n cd persist-queue\r\n # for msgpack and cbor support, run 'pip install -r extra-requirements.txt' first\r\n python setup.py install\r\n\r\n\r\nBenchmark\r\n---------\r\n\r\nHere are the time spent(in seconds) for writing/reading **1000** items to the\r\ndisk comparing the sqlite3 and file queue.\r\n\r\n- Windows\r\n - OS: Windows 10\r\n - Disk: SATA3 SSD\r\n - RAM: 16 GiB\r\n\r\n+---------------+---------+-------------------------+----------------------------+\r\n| | Write | Write/Read(1 task_done) | Write/Read(many task_done) |\r\n+---------------+---------+-------------------------+----------------------------+\r\n| SQLite3 Queue | 1.8880 | 2.0290 | 3.5940 |\r\n+---------------+---------+-------------------------+----------------------------+\r\n| File Queue | 4.9520 | 5.0560 | 8.4900 |\r\n+---------------+---------+-------------------------+----------------------------+\r\n\r\n**windows note**\r\nPerformance of Windows File Queue has dramatic improvement since `v0.4.1` due to the\r\natomic renaming support(3-4X faster)\r\n\r\n- Linux\r\n - OS: Ubuntu 16.04 (VM)\r\n - Disk: SATA3 SSD\r\n - RAM: 4 GiB\r\n\r\n+---------------+--------+-------------------------+----------------------------+\r\n| | Write | Write/Read(1 task_done) | Write/Read(many task_done) |\r\n+---------------+--------+-------------------------+----------------------------+\r\n| SQLite3 Queue | 1.8282 | 1.8075 | 2.8639 |\r\n+---------------+--------+-------------------------+----------------------------+\r\n| File Queue | 0.9123 | 1.0411 | 2.5104 |\r\n+---------------+--------+-------------------------+----------------------------+\r\n\r\n- Mac OS\r\n - OS: 10.14 (macOS Mojave)\r\n - Disk: PCIe SSD\r\n - RAM: 16 GiB\r\n\r\n+---------------+--------+-------------------------+----------------------------+\r\n| | Write | Write/Read(1 task_done) | Write/Read(many task_done) |\r\n+---------------+--------+-------------------------+----------------------------+\r\n| SQLite3 Queue | 0.1879 | 0.2115 | 0.3147 |\r\n+---------------+--------+-------------------------+----------------------------+\r\n| File Queue | 0.5158 | 0.5357 | 1.0446 |\r\n+---------------+--------+-------------------------+----------------------------+\r\n\r\n**note**\r\n\r\n- The value above is in seconds for reading/writing *1000* items, the less\r\n the better\r\n- Above result was got from:\r\n\r\n.. code-block:: console\r\n\r\n python benchmark/run_benchmark.py 1000\r\n\r\n\r\nTo see the real performance on your host, run the script under ``benchmark/run_benchmark.py``:\r\n\r\n.. code-block:: console\r\n\r\n python benchmark/run_benchmark.py <COUNT, default to 100>\r\n\r\n\r\nExamples\r\n--------\r\n\r\n\r\nExample usage with a SQLite3 based queue\r\n^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\r\n\r\n.. code-block:: python\r\n\r\n >>> import persistqueue\r\n >>> q = persistqueue.SQLiteQueue('mypath', auto_commit=True)\r\n >>> q.put('str1')\r\n >>> q.put('str2')\r\n >>> q.put('str3')\r\n >>> q.get()\r\n 'str1'\r\n >>> del q\r\n\r\n\r\nClose the console, and then recreate the queue:\r\n\r\n.. code-block:: python\r\n\r\n >>> import persistqueue\r\n >>> q = persistqueue.SQLiteQueue('mypath', auto_commit=True)\r\n >>> q.get()\r\n 'str2'\r\n >>>\r\n\r\nNew functions:\r\n*Available since v0.8.0*\r\n\r\n- ``shrink_disk_usage`` perform a ``VACUUM`` against the sqlite, and rebuild the database file, this usually takes long time and frees a lot of disk space after ``get()``\r\n\r\n\r\nExample usage of SQLite3 based ``UniqueQ``\r\n^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\r\nThis queue does not allow duplicate items.\r\n\r\n.. code-block:: python\r\n\r\n >>> import persistqueue\r\n >>> q = persistqueue.UniqueQ('mypath')\r\n >>> q.put('str1')\r\n >>> q.put('str1')\r\n >>> q.size\r\n 1\r\n >>> q.put('str2')\r\n >>> q.size\r\n 2\r\n >>>\r\n\r\nExample usage of SQLite3 based ``SQLiteAckQueue``/``UniqueAckQ``\r\n^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\r\nThe core functions:\r\n\r\n- ``put``: add item to the queue. Returns ``id``\r\n- ``get``: get item from queue and mark as unack. Returns ``item``, Optional paramaters (``block``, ``timeout``, ``id``, ``next_in_order``, ``raw``)\r\n- ``update``: update an item. Returns ``id``, Paramaters (``item``), Optional parameter if item not in raw format (``id``)\r\n- ``ack``: mark item as acked. Returns ``id``, Parameters (``item`` or ``id``)\r\n- ``nack``: there might be something wrong with current consumer, so mark item as ready and new consumer will get it. Returns ``id``, Parameters (``item`` or ``id``)\r\n- ``ack_failed``: there might be something wrong during process, so just mark item as failed. Returns ``id``, Parameters (``item`` or ``id``)\r\n- ``clear_acked_data``: perform a sql delete agaist sqlite. It removes 1000 items, while keeping 1000 of the most recent, whose status is ``AckStatus.acked`` (note: this does not shrink the file size on disk) Optional paramters (``max_delete``, ``keep_latest``, ``clear_ack_failed``)\r\n- ``shrink_disk_usage`` perform a ``VACUUM`` against the sqlite, and rebuild the database file, this usually takes long time and frees a lot of disk space after ``clear_acked_data``\r\n- ``queue``: returns the database contents as a Python List[Dict]\r\n- ``active_size``: The active size changes when an item is added (put) and completed (ack/ack_failed) unlike ``qsize`` which changes when an item is pulled (get) or returned (nack).\r\n\r\n.. code-block:: python\r\n\r\n >>> import persistqueue\r\n >>> ackq = persistqueue.SQLiteAckQueue('path')\r\n >>> ackq.put('str1')\r\n >>> item = ackq.get()\r\n >>> # Do something with the item\r\n >>> ackq.ack(item) # If done with the item\r\n >>> ackq.nack(item) # Else mark item as `nack` so that it can be proceeded again by any worker\r\n >>> ackq.ack_failed(item) # Or else mark item as `ack_failed` to discard this item\r\n\r\nParameters:\r\n\r\n- ``clear_acked_data``\r\n - ``max_delete`` (defaults to 1000): This is the LIMIT. How many items to delete.\r\n - ``keep_latest`` (defaults to 1000): This is the OFFSET. How many recent items to keep.\r\n - ``clear_ack_failed`` (defaults to False): Clears the ``AckStatus.ack_failed`` in addition to the ``AckStatus.ack``.\r\n\r\n- ``get``\r\n - ``raw`` (defaults to False): Returns the metadata along with the record, which includes the id (``pqid``) and timestamp. On the SQLiteAckQueue, the raw results can be ack, nack, ack_failed similar to the normal return.\r\n - ``id`` (defaults to None): Accepts an `id` or a raw item containing ``pqid``. Will select the item based on the row id.\r\n - ``next_in_order`` (defaults to False): Requires the ``id`` attribute. This option tells the SQLiteAckQueue/UniqueAckQ to get the next item based on ``id``, not the first available. This allows the user to get, nack, get, nack and progress down the queue, instead of continuing to get the same nack'd item over again.\r\n\r\n``raw`` example:\r\n\r\n.. code-block:: python\r\n\r\n >>> q.put('val1')\r\n >>> d = q.get(raw=True)\r\n >>> print(d)\r\n >>> {'pqid': 1, 'data': 'val1', 'timestamp': 1616719225.012912}\r\n >>> q.ack(d)\r\n\r\n``next_in_order`` example:\r\n\r\n.. code-block:: python\r\n\r\n >>> q.put(\"val1\")\r\n >>> q.put(\"val2\")\r\n >>> q.put(\"val3\")\r\n >>> item = q.get()\r\n >>> id = q.nack(item)\r\n >>> item = q.get(id=id, next_in_order=True)\r\n >>> print(item)\r\n >>> val2\r\n\r\n\r\nNote:\r\n\r\n1. The SQLiteAckQueue always uses \"auto_commit=True\".\r\n2. The Queue could be set in non-block style, e.g. \"SQLiteAckQueue.get(block=False, timeout=5)\".\r\n3. ``UniqueAckQ`` only allows for unique items\r\n\r\nExample usage with a file based queue\r\n^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\r\n\r\nParameters:\r\n\r\n- ``path``: specifies the directory wher enqueued data persisted.\r\n- ``maxsize``: indicates the maximum size stored in the queue, if maxsize<=0 the queue is unlimited.\r\n- ``chunksize``: indicates how many entries should exist in each chunk file on disk. When a all entries in a chunk file was dequeued by get(), the file would be removed from filesystem.\r\n- ``tempdir``: indicates where temporary files should be stored. The tempdir has to be located on the same disk as the enqueued data in order to obtain atomic operations.\r\n- ``serializer``: controls how enqueued data is serialized.\r\n- ``auto_save``: `True` or `False`. By default, the change is only persisted when task_done() is called. If autosave is enabled, info data is persisted immediately when get() is called. Adding data to the queue with put() will always persist immediately regardless of this setting.\r\n\r\n.. code-block:: python\r\n\r\n >>> from persistqueue import Queue\r\n >>> q = Queue(\"mypath\")\r\n >>> q.put('a')\r\n >>> q.put('b')\r\n >>> q.put('c')\r\n >>> q.get()\r\n 'a'\r\n >>> q.task_done()\r\n\r\n\r\nClose the python console, and then we restart the queue from the same path,\r\n\r\n.. code-block:: python\r\n\r\n >>> from persistqueue import Queue\r\n >>> q = Queue('mypath')\r\n >>> q.get()\r\n 'b'\r\n >>> q.task_done()\r\n\r\nExample usage with an auto-saving file based queue\r\n^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\r\n\r\n*Available since: v0.5.0*\r\n\r\nBy default, items added to the queue are persisted during the ``put()`` call,\r\nand items removed from a queue are only persisted when ``task_done()`` is\r\ncalled.\r\n\r\n.. code-block:: python\r\n\r\n >>> from persistqueue import Queue\r\n >>> q = Queue(\"mypath\")\r\n >>> q.put('a')\r\n >>> q.put('b')\r\n >>> q.get()\r\n 'a'\r\n >>> q.get()\r\n 'b'\r\n\r\nAfter exiting and restarting the queue from the same path, we see the items\r\nremain in the queue, because ``task_done()`` wasn't called before.\r\n\r\n.. code-block:: python\r\n\r\n >>> from persistqueue import Queue\r\n >>> q = Queue('mypath')\r\n >>> q.get()\r\n 'a'\r\n >>> q.get()\r\n 'b'\r\n\r\nThis can be advantageous. For example, if your program crashes before finishing\r\nprocessing an item, it will remain in the queue after restarting. You can also\r\nspread out the ``task_done()`` calls for performance reasons to avoid lots of\r\nindividual writes.\r\n\r\nUsing ``autosave=True`` on a file based queue will automatically save on every\r\ncall to ``get()``. Calling ``task_done()`` is not necessary, but may still be\r\nused to ``join()`` against the queue.\r\n\r\n.. code-block:: python\r\n\r\n >>> from persistqueue import Queue\r\n >>> q = Queue(\"mypath\", autosave=True)\r\n >>> q.put('a')\r\n >>> q.put('b')\r\n >>> q.get()\r\n 'a'\r\n\r\nAfter exiting and restarting the queue from the same path, only the second item\r\nremains:\r\n\r\n.. code-block:: python\r\n\r\n >>> from persistqueue import Queue\r\n >>> q = Queue('mypath', autosave=True)\r\n >>> q.get()\r\n 'b'\r\n\r\n\r\nExample usage with a SQLite3 based dict\r\n^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\r\n\r\n.. code-block:: python\r\n\r\n >>> from persisitqueue import PDict\r\n >>> q = PDict(\"testpath\", \"testname\")\r\n >>> q['key1'] = 123\r\n >>> q['key2'] = 321\r\n >>> q['key1']\r\n 123\r\n >>> len(q)\r\n 2\r\n >>> del q['key1']\r\n >>> q['key1']\r\n Traceback (most recent call last):\r\n File \"<stdin>\", line 1, in <module>\r\n File \"persistqueue\\pdict.py\", line 58, in __getitem__\r\n raise KeyError('Key: {} not exists.'.format(item))\r\n KeyError: 'Key: key1 not exists.'\r\n\r\nClose the console and restart the PDict\r\n\r\n\r\n.. code-block:: python\r\n\r\n >>> from persisitqueue import PDict\r\n >>> q = PDict(\"testpath\", \"testname\")\r\n >>> q['key2']\r\n 321\r\n\r\n\r\nMulti-thread usage for **SQLite3** based queue\r\n^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\r\n\r\n.. code-block:: python\r\n\r\n from persistqueue import FIFOSQLiteQueue\r\n\r\n q = FIFOSQLiteQueue(path=\"./test\", multithreading=True)\r\n\r\n def worker():\r\n while True:\r\n item = q.get()\r\n do_work(item)\r\n\r\n for i in range(num_worker_threads):\r\n t = Thread(target=worker)\r\n t.daemon = True\r\n t.start()\r\n\r\n for item in source():\r\n q.put(item)\r\n\r\n\r\nmulti-thread usage for **Queue**\r\n^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\r\n\r\n.. code-block:: python\r\n\r\n from persistqueue import Queue\r\n\r\n q = Queue()\r\n\r\n def worker():\r\n while True:\r\n item = q.get()\r\n do_work(item)\r\n q.task_done()\r\n\r\n for i in range(num_worker_threads):\r\n t = Thread(target=worker)\r\n t.daemon = True\r\n t.start()\r\n\r\n for item in source():\r\n q.put(item)\r\n\r\n q.join() # block until all tasks are done\r\n\r\nExample usage with a MySQL based queue\r\n^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\r\n\r\n*Available since: v0.8.0*\r\n\r\n.. code-block:: python\r\n\r\n >>> import persistqueue\r\n >>> db_conf = {\r\n >>> \"host\": \"127.0.0.1\",\r\n >>> \"user\": \"user\",\r\n >>> \"passwd\": \"passw0rd\",\r\n >>> \"db_name\": \"testqueue\",\r\n >>> # \"name\": \"\",\r\n >>> \"port\": 3306\r\n >>> }\r\n >>> q = persistqueue.MySQLQueue(name=\"testtable\", **db_conf)\r\n >>> q.put('str1')\r\n >>> q.put('str2')\r\n >>> q.put('str3')\r\n >>> q.get()\r\n 'str1'\r\n >>> del q\r\n\r\n\r\nClose the console, and then recreate the queue:\r\n\r\n.. code-block:: python\r\n\r\n >>> import persistqueue\r\n >>> q = persistqueue.MySQLQueue(name=\"testtable\", **db_conf)\r\n >>> q.get()\r\n 'str2'\r\n >>>\r\n\r\n\r\n\r\n**note**\r\n\r\nDue to the limitation of file queue described in issue `#89 <https://github.com/peter-wangxu/persist-queue/issues/89>`_,\r\n`task_done` in one thread may acknowledge items in other threads which should not be. Considering the `SQLiteAckQueue` if you have such requirement.\r\n\r\n\r\nSerialization via msgpack/cbor/json\r\n^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\r\n- v0.4.1: Currently only available for file based Queue\r\n- v0.4.2: Also available for SQLite3 based Queues\r\n\r\n.. code-block:: python\r\n\r\n >>> from persistqueue\r\n >>> q = persistqueue.Queue('mypath', serializer=persistqueue.serializers.msgpack)\r\n >>> # via cbor2\r\n >>> # q = persistqueue.Queue('mypath', serializer=persistqueue.serializers.cbor2)\r\n >>> # via json\r\n >>> # q = Queue('mypath', serializer=persistqueue.serializers.json)\r\n >>> q.get()\r\n 'b'\r\n >>> q.task_done()\r\n\r\nExplicit resource reclaim\r\n^^^^^^^^^^^^^^^^^^^^^^^^^\r\n\r\nFor some reasons, an application may require explicit reclamation for file\r\nhandles or sql connections before end of execution. In these cases, user can\r\nsimply call:\r\n.. code-block:: python\r\n\r\n q = Queue() # or q = persistqueue.SQLiteQueue('mypath', auto_commit=True)\r\n del q\r\n\r\n\r\nto reclaim related file handles or sql connections.\r\n\r\nTips\r\n----\r\n\r\n``task_done`` is required both for file based queue and SQLite3 based queue (when ``auto_commit=False``)\r\nto persist the cursor of next ``get`` to the disk.\r\n\r\n\r\nPerformance impact\r\n------------------\r\n\r\n- **WAL**\r\n\r\n Starting on v0.3.2, the ``persistqueue`` is leveraging the sqlite3 builtin feature\r\n `WAL <https://www.sqlite.org/wal.html>`_ which can improve the performance\r\n significantly, a general testing indicates that ``persistqueue`` is 2-4 times\r\n faster than previous version.\r\n\r\n- **auto_commit=False**\r\n\r\n Since persistqueue v0.3.0, a new parameter ``auto_commit`` is introduced to tweak\r\n the performance for sqlite3 based queues as needed. When specify ``auto_commit=False``, user\r\n needs to perform ``queue.task_done()`` to persist the changes made to the disk since\r\n last ``task_done`` invocation.\r\n\r\n- **pickle protocol selection**\r\n\r\n From v0.3.6, the ``persistqueue`` will select ``Protocol version 2`` for python2 and ``Protocol version 4`` for python3\r\n respectively. This selection only happens when the directory is not present when initializing the queue.\r\n\r\nTests\r\n-----\r\n\r\n*persist-queue* use ``tox`` to trigger tests.\r\n\r\n- Unit test\r\n\r\n.. code-block:: console\r\n\r\n tox -e <PYTHON_VERSION>\r\n\r\nAvailable ``<PYTHON_VERSION>``: ``py27``, ``py34``, ``py35``, ``py36``, ``py37``\r\n\r\n\r\n- PEP8 check\r\n\r\n.. code-block:: console\r\n\r\n tox -e pep8\r\n\r\n\r\n`pyenv <https://github.com/pyenv/pyenv>`_ is usually a helpful tool to manage multiple versions of Python.\r\n\r\nCaution\r\n-------\r\n\r\nCurrently, the atomic operation is supported on Windows while still in experimental,\r\nThat's saying, the data in ``persistqueue.Queue`` could be in unreadable state when an incidental failure occurs during ``Queue.task_done``.\r\n\r\n**DO NOT put any critical data on persistqueue.queue on Windows**.\r\n\r\n\r\nContribution\r\n------------\r\n\r\nSimply fork this repo and send PR for your code change(also tests to cover your change), remember to give a title and description of your PR. I am willing to\r\nenhance this project with you :).\r\n\r\n\r\nLicense\r\n-------\r\n\r\n`BSD <LICENSE>`_\r\n\r\nContributors\r\n------------\r\n\r\n`Contributors <https://github.com/peter-wangxu/persist-queue/graphs/contributors>`_\r\n\r\nFAQ\r\n---\r\n\r\n* ``sqlite3.OperationalError: database is locked`` is raised.\r\n\r\npersistqueue open 2 connections for the db if ``multithreading=True``, the\r\nSQLite database is locked until that transaction is committed. The ``timeout``\r\nparameter specifies how long the connection should wait for the lock to go away\r\nuntil raising an exception. Default time is **10**, increase ``timeout``\r\nwhen creating the queue if above error occurs.\r\n\r\n* sqlite3 based queues are not thread-safe.\r\n\r\nThe sqlite3 queues are heavily tested under multi-threading environment, if you find it's not thread-safe, please\r\nmake sure you set the ``multithreading=True`` when initializing the queue before submitting new issue:).\r\n",
"bugtrack_url": null,
"license": "BSD",
"summary": "A thread-safe disk based persistent queue in Python.",
"version": "0.8.1",
"project_urls": {
"Homepage": "http://github.com/peter-wangxu/persist-queue"
},
"split_keywords": [],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "b5d8364215cf946929bdb92aff7bea8e0e9e3d8cdddbe18109e8b0797e57aa01",
"md5": "a468d047460bd70af1bfdbc33b3ca854",
"sha256": "9ea2c2fa80edc1ce9f683665c3ebe78bbe71d08868206d0c67a81135182a19f9"
},
"downloads": -1,
"filename": "persist_queue-0.8.1-py2.py3-none-any.whl",
"has_sig": false,
"md5_digest": "a468d047460bd70af1bfdbc33b3ca854",
"packagetype": "bdist_wheel",
"python_version": "py2.py3",
"requires_python": null,
"size": 38830,
"upload_time": "2023-06-03T11:01:36",
"upload_time_iso_8601": "2023-06-03T11:01:36.114836Z",
"url": "https://files.pythonhosted.org/packages/b5/d8/364215cf946929bdb92aff7bea8e0e9e3d8cdddbe18109e8b0797e57aa01/persist_queue-0.8.1-py2.py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "066f78ff1a951f02cfcb5c0e17150428bf102eb984e003f1ced9c9164024a2e2",
"md5": "12d747680774dd3d62ea25b1ab9dbc8d",
"sha256": "e1938d3ac6d9b61692c115f8ddcada8d9a31f22506782585e39588a41ecfa5aa"
},
"downloads": -1,
"filename": "persist-queue-0.8.1.tar.gz",
"has_sig": false,
"md5_digest": "12d747680774dd3d62ea25b1ab9dbc8d",
"packagetype": "sdist",
"python_version": "source",
"requires_python": null,
"size": 29652,
"upload_time": "2023-06-03T11:01:38",
"upload_time_iso_8601": "2023-06-03T11:01:38.973225Z",
"url": "https://files.pythonhosted.org/packages/06/6f/78ff1a951f02cfcb5c0e17150428bf102eb984e003f1ced9c9164024a2e2/persist-queue-0.8.1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2023-06-03 11:01:38",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "peter-wangxu",
"github_project": "persist-queue",
"travis_ci": false,
"coveralls": true,
"github_actions": false,
"circle": true,
"appveyor": true,
"requirements": [],
"tox": true,
"lcname": "persist-queue"
}