persist-queue


Namepersist-queue JSON
Version 1.0.0 PyPI version JSON
download
home_pagehttp://github.com/peter-wangxu/persist-queue
SummaryA thread-safe disk based persistent queue in Python.
upload_time2024-07-03 02:28:24
maintainerPeter Wang
docs_urlNone
authorPeter Wang
requires_pythonNone
licenseBSD
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage
            persist-queue - A thread-safe, disk-based queue for Python
==========================================================

.. image:: https://img.shields.io/circleci/project/github/peter-wangxu/persist-queue/master.svg?label=Linux%20%26%20Mac
    :target: https://circleci.com/gh/peter-wangxu/persist-queue

.. image:: https://img.shields.io/appveyor/ci/peter-wangxu/persist-queue/master.svg?label=Windows
    :target: https://ci.appveyor.com/project/peter-wangxu/persist-queue

.. image:: https://img.shields.io/codecov/c/github/peter-wangxu/persist-queue/master.svg
    :target: https://codecov.io/gh/peter-wangxu/persist-queue

.. image:: https://img.shields.io/pypi/v/persist-queue.svg
    :target: https://pypi.python.org/pypi/persist-queue

.. image:: https://img.shields.io/pypi/pyversions/persist-queue
   :alt: PyPI - Python Version

``persist-queue`` implements a file-based queue and a serial of sqlite3-based queues. The goals is to achieve following requirements:

* Disk-based: each queued item should be stored in disk in case of any crash.
* Thread-safe: can be used by multi-threaded producers and multi-threaded consumers.
* Recoverable: Items can be read after process restart.
* Green-compatible: can be used in ``greenlet`` or ``eventlet`` environment.

While *queuelib* and *python-pqueue* cannot fulfil all of above. After some try, I found it's hard to achieve based on their current
implementation without huge code change. this is the motivation to start this project.

By default, *persist-queue* use *pickle* object serialization module to support object instances.
Most built-in type, like `int`, `dict`, `list` are able to be persisted by `persist-queue` directly, to support customized objects,
please refer to `Pickling and unpickling extension types(Python2) <https://docs.python.org/2/library/pickle.html#pickling-and-unpickling-normal-class-instances>`_
and `Pickling Class Instances(Python3) <https://docs.python.org/3/library/pickle.html#pickling-class-instances>`_

This project is based on the achievements of `python-pqueue <https://github.com/balena/python-pqueue>`_
and `queuelib <https://github.com/scrapy/queuelib>`_

Slack channels
^^^^^^^^^^^^^^

Join `persist-queue <https://join.slack
.com/t/persist-queue/shared_invite
/enQtOTM0MDgzNTQ0MDg3LTNmN2IzYjQ1MDc0MDYzMjI4OGJmNmVkNWE3ZDBjYzg5MDc0OWUzZDJkYTkwODdkZmYwODdjNjUzMTk3MWExNDE>`_ channel


Requirements
------------
* Python 3.5 or newer versions (refer to `Deprecation`_ for older Python versions)
* Full support for Linux and MacOS.
* Windows support (with `Caution`_ if ``persistqueue.Queue`` is used).

Features
--------

- Multiple platforms support: Linux, macOS, Windows
- Pure python
- Both filed based queues and sqlite3 based queues are supported
- Filed based queue: multiple serialization protocol support: pickle(default), msgpack, cbor, json

Deprecation
-----------
- `persist-queue` drops Python 2 support since version `1.0.0`, no new feature will be developed under Python 2 as `Python 2 was sunset on January 1, 2020 <https://www.python.org/doc/sunset-python-2/>`_.
- `Python 3.4 release has reached end of life <https://www.python.org/downloads/release/python-3410/>`_ and
  `DBUtils <https://webwareforpython.github.io/DBUtils/changelog.html>`_ ceased support for `Python 3.4`, `persist queue` drops MySQL based queue for python 3.4 since version 0.8.0.
  other queue implementations such as file based queue and sqlite3 based queue are still workable.

Installation
------------

from pypi
^^^^^^^^^

.. code-block:: console

    pip install persist-queue
    # for msgpack, cbor and mysql support, use following command
    pip install "persist-queue[extra]"


from source code
^^^^^^^^^^^^^^^^

.. code-block:: console

    git clone https://github.com/peter-wangxu/persist-queue
    cd persist-queue
    # for msgpack and cbor support, run 'pip install -r extra-requirements.txt' first
    python setup.py install


Benchmark
---------

Here are the time spent(in seconds) for writing/reading **1000** items to the
disk comparing the sqlite3 and file queue.

- Windows
    - OS: Windows 10
    - Disk: SATA3 SSD
    - RAM: 16 GiB

+---------------+---------+-------------------------+----------------------------+
|               | Write   | Write/Read(1 task_done) | Write/Read(many task_done) |
+---------------+---------+-------------------------+----------------------------+
| SQLite3 Queue | 1.8880  | 2.0290                  | 3.5940                     |
+---------------+---------+-------------------------+----------------------------+
| File Queue    | 4.9520  | 5.0560                  | 8.4900                     |
+---------------+---------+-------------------------+----------------------------+

**windows note**
Performance of Windows File Queue has dramatic improvement since `v0.4.1` due to the
atomic renaming support(3-4X faster)

- Linux
    - OS: Ubuntu 16.04 (VM)
    - Disk: SATA3 SSD
    - RAM:  4 GiB

+---------------+--------+-------------------------+----------------------------+
|               | Write  | Write/Read(1 task_done) | Write/Read(many task_done) |
+---------------+--------+-------------------------+----------------------------+
| SQLite3 Queue | 1.8282 | 1.8075                  | 2.8639                     |
+---------------+--------+-------------------------+----------------------------+
| File Queue    | 0.9123 | 1.0411                  | 2.5104                     |
+---------------+--------+-------------------------+----------------------------+

- Mac OS
    - OS: 10.14 (macOS Mojave)
    - Disk: PCIe SSD
    - RAM:  16 GiB

+---------------+--------+-------------------------+----------------------------+
|               | Write  | Write/Read(1 task_done) | Write/Read(many task_done) |
+---------------+--------+-------------------------+----------------------------+
| SQLite3 Queue | 0.1879 | 0.2115                  | 0.3147                     |
+---------------+--------+-------------------------+----------------------------+
| File Queue    | 0.5158 | 0.5357                  | 1.0446                     |
+---------------+--------+-------------------------+----------------------------+

**note**

- The value above is in seconds for reading/writing *1000* items, the less
  the better
- Above result was got from:

.. code-block:: console

    python benchmark/run_benchmark.py 1000


To see the real performance on your host, run the script under ``benchmark/run_benchmark.py``:

.. code-block:: console

    python benchmark/run_benchmark.py <COUNT, default to 100>


Examples
--------


Example usage with a SQLite3 based queue
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. code-block:: python

    >>> import persistqueue
    >>> q = persistqueue.SQLiteQueue('mypath', auto_commit=True)
    >>> q.put('str1')
    >>> q.put('str2')
    >>> q.put('str3')
    >>> q.get()
    'str1'
    >>> del q


Close the console, and then recreate the queue:

.. code-block:: python

   >>> import persistqueue
   >>> q = persistqueue.SQLiteQueue('mypath', auto_commit=True)
   >>> q.get()
   'str2'
   >>>

New functions:
*Available since v0.8.0*

- ``shrink_disk_usage`` perform a ``VACUUM`` against the sqlite, and rebuild the database file, this usually takes long time and frees a lot of disk space after ``get()``


Example usage of SQLite3 based ``UniqueQ``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
This queue does not allow duplicate items.

.. code-block:: python

   >>> import persistqueue
   >>> q = persistqueue.UniqueQ('mypath')
   >>> q.put('str1')
   >>> q.put('str1')
   >>> q.size
   1
   >>> q.put('str2')
   >>> q.size
   2
   >>>

Example usage of SQLite3 based ``SQLiteAckQueue``/``UniqueAckQ``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The core functions:

- ``put``: add item to the queue. Returns ``id``
- ``get``: get item from queue and mark as unack.  Returns ``item``, Optional paramaters (``block``, ``timeout``, ``id``, ``next_in_order``, ``raw``)
- ``update``: update an item. Returns ``id``, Paramaters (``item``), Optional parameter if item not in raw format (``id``)
- ``ack``: mark item as acked. Returns ``id``, Parameters (``item`` or ``id``)
- ``nack``: there might be something wrong with current consumer, so mark item as ready and new consumer will get it.  Returns ``id``, Parameters (``item`` or ``id``)
- ``ack_failed``: there might be something wrong during process, so just mark item as failed. Returns ``id``, Parameters (``item`` or ``id``)
- ``clear_acked_data``: perform a sql delete agaist sqlite. It removes 1000 items, while keeping 1000 of the most recent, whose status is ``AckStatus.acked`` (note: this does not shrink the file size on disk) Optional paramters (``max_delete``, ``keep_latest``, ``clear_ack_failed``)
- ``shrink_disk_usage`` perform a ``VACUUM`` against the sqlite, and rebuild the database file, this usually takes long time and frees a lot of disk space after ``clear_acked_data``
- ``queue``: returns the database contents as a Python List[Dict]
- ``active_size``: The active size changes when an item is added (put) and completed (ack/ack_failed) unlike ``qsize`` which changes when an item is pulled (get) or returned (nack).

.. code-block:: python

   >>> import persistqueue
   >>> ackq = persistqueue.SQLiteAckQueue('path')
   >>> ackq.put('str1')
   >>> item = ackq.get()
   >>> # Do something with the item
   >>> ackq.ack(item) # If done with the item
   >>> ackq.nack(item) # Else mark item as `nack` so that it can be proceeded again by any worker
   >>> ackq.ack_failed(item) # Or else mark item as `ack_failed` to discard this item

Parameters:

- ``clear_acked_data``
    - ``max_delete`` (defaults to 1000): This is the LIMIT.  How many items to delete.
    - ``keep_latest`` (defaults to 1000): This is the OFFSET.  How many recent items to keep.
    - ``clear_ack_failed`` (defaults to False): Clears the ``AckStatus.ack_failed`` in addition to the ``AckStatus.ack``.

- ``get``
    - ``raw`` (defaults to False): Returns the metadata along with the record, which includes the id (``pqid``) and timestamp.  On the SQLiteAckQueue, the raw results can be ack, nack, ack_failed similar to the normal return.
    -  ``id`` (defaults to None): Accepts an `id` or a raw item containing ``pqid``.  Will select the item based on the row id.
    -  ``next_in_order`` (defaults to False): Requires the ``id`` attribute.  This option tells the SQLiteAckQueue/UniqueAckQ to get the next item based on  ``id``, not the first available.  This allows the user to get, nack, get, nack and progress down the queue, instead of continuing to get the same nack'd item over again.

``raw`` example:

.. code-block:: python

   >>> q.put('val1')
   >>> d = q.get(raw=True)
   >>> print(d)
   >>> {'pqid': 1, 'data': 'val1', 'timestamp': 1616719225.012912}
   >>> q.ack(d)

``next_in_order`` example:

.. code-block:: python

   >>> q.put("val1")
   >>> q.put("val2")
   >>> q.put("val3")
   >>> item = q.get()
   >>> id = q.nack(item)
   >>> item = q.get(id=id, next_in_order=True)
   >>> print(item)
   >>> val2


Note:

1. The SQLiteAckQueue always uses "auto_commit=True".
2. The Queue could be set in non-block style, e.g. "SQLiteAckQueue.get(block=False, timeout=5)".
3. ``UniqueAckQ`` only allows for unique items

Example usage with a file based queue
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Parameters:

- ``path``: specifies the directory wher enqueued data persisted.
- ``maxsize``: indicates the maximum size stored in the queue, if maxsize<=0 the queue is unlimited.
- ``chunksize``: indicates how many entries should exist in each chunk file on disk. When a all entries in a chunk file was dequeued by get(), the file would be removed from filesystem.
- ``tempdir``: indicates where temporary files should be stored. The tempdir has to be located on the same disk as the enqueued data in order to obtain atomic operations.
- ``serializer``: controls how enqueued data is serialized.
- ``auto_save``: `True` or `False`. By default, the change is only persisted when task_done() is called. If autosave is enabled, info data is persisted immediately when get() is called. Adding data to the queue with put() will always persist immediately regardless of this setting.

.. code-block:: python

    >>> from persistqueue import Queue
    >>> q = Queue("mypath")
    >>> q.put('a')
    >>> q.put('b')
    >>> q.put('c')
    >>> q.get()
    'a'
    >>> q.task_done()


Close the python console, and then we restart the queue from the same path,

.. code-block:: python

    >>> from persistqueue import Queue
    >>> q = Queue('mypath')
    >>> q.get()
    'b'
    >>> q.task_done()

Example usage with an auto-saving file based queue
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

*Available since: v0.5.0*

By default, items added to the queue are persisted during the ``put()`` call,
and items removed from a queue are only persisted when ``task_done()`` is
called.

.. code-block:: python

    >>> from persistqueue import Queue
    >>> q = Queue("mypath")
    >>> q.put('a')
    >>> q.put('b')
    >>> q.get()
    'a'
    >>> q.get()
    'b'

After exiting and restarting the queue from the same path, we see the items
remain in the queue, because ``task_done()`` wasn't called before.

.. code-block:: python

    >>> from persistqueue import Queue
    >>> q = Queue('mypath')
    >>> q.get()
    'a'
    >>> q.get()
    'b'

This can be advantageous. For example, if your program crashes before finishing
processing an item, it will remain in the queue after restarting. You can also
spread out the ``task_done()`` calls for performance reasons to avoid lots of
individual writes.

Using ``autosave=True`` on a file based queue will automatically save on every
call to ``get()``. Calling ``task_done()`` is not necessary, but may still be
used to ``join()`` against the queue.

.. code-block:: python

    >>> from persistqueue import Queue
    >>> q = Queue("mypath", autosave=True)
    >>> q.put('a')
    >>> q.put('b')
    >>> q.get()
    'a'

After exiting and restarting the queue from the same path, only the second item
remains:

.. code-block:: python

    >>> from persistqueue import Queue
    >>> q = Queue('mypath', autosave=True)
    >>> q.get()
    'b'


Example usage with a SQLite3 based dict
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. code-block:: python

    >>> from persisitqueue import PDict
    >>> q = PDict("testpath", "testname")
    >>> q['key1'] = 123
    >>> q['key2'] = 321
    >>> q['key1']
    123
    >>> len(q)
    2
    >>> del q['key1']
    >>> q['key1']
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "persistqueue\pdict.py", line 58, in __getitem__
        raise KeyError('Key: {} not exists.'.format(item))
    KeyError: 'Key: key1 not exists.'

Close the console and restart the PDict


.. code-block:: python

    >>> from persisitqueue import PDict
    >>> q = PDict("testpath", "testname")
    >>> q['key2']
    321


Multi-thread usage for **SQLite3** based queue
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. code-block:: python

    from persistqueue import FIFOSQLiteQueue

    q = FIFOSQLiteQueue(path="./test", multithreading=True)

    def worker():
        while True:
            item = q.get()
            do_work(item)

    for i in range(num_worker_threads):
         t = Thread(target=worker)
         t.daemon = True
         t.start()

    for item in source():
        q.put(item)


multi-thread usage for **Queue**
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

.. code-block:: python

    from persistqueue import Queue

    q = Queue()

    def worker():
        while True:
            item = q.get()
            do_work(item)
            q.task_done()

    for i in range(num_worker_threads):
         t = Thread(target=worker)
         t.daemon = True
         t.start()

    for item in source():
        q.put(item)

    q.join()       # block until all tasks are done

Example usage with a MySQL based queue
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

*Available since: v0.8.0*

.. code-block:: python

    >>> import persistqueue
    >>> db_conf = {
    >>>     "host": "127.0.0.1",
    >>>     "user": "user",
    >>>     "passwd": "passw0rd",
    >>>     "db_name": "testqueue",
    >>>     # "name": "",
    >>>     "port": 3306
    >>> }
    >>> q = persistqueue.MySQLQueue(name="testtable", **db_conf)
    >>> q.put('str1')
    >>> q.put('str2')
    >>> q.put('str3')
    >>> q.get()
    'str1'
    >>> del q


Close the console, and then recreate the queue:

.. code-block:: python

   >>> import persistqueue
   >>> q = persistqueue.MySQLQueue(name="testtable", **db_conf)
   >>> q.get()
   'str2'
   >>>



**note**

Due to the limitation of file queue described in issue `#89 <https://github.com/peter-wangxu/persist-queue/issues/89>`_,
`task_done` in one thread may acknowledge items in other threads which should not be. Considering the `SQLiteAckQueue` if you have such requirement.


Serialization via msgpack/cbor/json
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- v0.4.1: Currently only available for file based Queue
- v0.4.2: Also available for SQLite3 based Queues

.. code-block:: python

    >>> from persistqueue
    >>> q = persistqueue.Queue('mypath', serializer=persistqueue.serializers.msgpack)
    >>> # via cbor2
    >>> # q = persistqueue.Queue('mypath', serializer=persistqueue.serializers.cbor2)
    >>> # via json
    >>> # q = Queue('mypath', serializer=persistqueue.serializers.json)
    >>> q.get()
    'b'
    >>> q.task_done()

Explicit resource reclaim
^^^^^^^^^^^^^^^^^^^^^^^^^

For some reasons, an application may require explicit reclamation for file
handles or sql connections before end of execution. In these cases, user can
simply call:
.. code-block:: python

    q = Queue() # or q = persistqueue.SQLiteQueue('mypath', auto_commit=True)
    del q


to reclaim related file handles or sql connections.

Tips
----

``task_done`` is required both for file based queue and SQLite3 based queue (when ``auto_commit=False``)
to persist the cursor of next ``get`` to the disk.


Performance impact
------------------

- **WAL**

  Starting on v0.3.2, the ``persistqueue`` is leveraging the sqlite3 builtin feature
  `WAL <https://www.sqlite.org/wal.html>`_ which can improve the performance
  significantly, a general testing indicates that ``persistqueue`` is 2-4 times
  faster than previous version.

- **auto_commit=False**

  Since persistqueue v0.3.0, a new parameter ``auto_commit`` is introduced to tweak
  the performance for sqlite3 based queues as needed. When specify ``auto_commit=False``, user
  needs to perform ``queue.task_done()`` to persist the changes made to the disk since
  last ``task_done`` invocation.

- **pickle protocol selection**

  From v0.3.6, the ``persistqueue`` will select ``Protocol version 2`` for python2 and ``Protocol version 4`` for python3
  respectively. This selection only happens when the directory is not present when initializing the queue.

Tests
-----

*persist-queue* use ``tox`` to trigger tests.

- Unit test

.. code-block:: console

    tox -e <PYTHON_VERSION>

Available ``<PYTHON_VERSION>``: ``py27``, ``py34``, ``py35``, ``py36``, ``py37``


- PEP8 check

.. code-block:: console

   tox -e pep8


`pyenv <https://github.com/pyenv/pyenv>`_ is usually a helpful tool to manage multiple versions of Python.

Caution
-------

Currently, the atomic operation is supported on Windows while still in experimental,
That's saying, the data in ``persistqueue.Queue`` could be in unreadable state when an incidental failure occurs during ``Queue.task_done``.

**DO NOT put any critical data on persistqueue.queue on Windows**.


Contribution
------------

Simply fork this repo and send PR for your code change(also tests to cover your change), remember to give a title and description of your PR. I am willing to
enhance this project with you :).


License
-------

`BSD <LICENSE>`_

Contributors
------------

`Contributors <https://github.com/peter-wangxu/persist-queue/graphs/contributors>`_

FAQ
---

* ``sqlite3.OperationalError: database is locked`` is raised.

persistqueue open 2 connections for the db if ``multithreading=True``, the
SQLite database is locked until that transaction is committed. The ``timeout``
parameter specifies how long the connection should wait for the lock to go away
until raising an exception. Default time is **10**, increase ``timeout``
when creating the queue if above error occurs.

* sqlite3 based queues are not thread-safe.

The sqlite3 queues are heavily tested under multi-threading environment, if you find it's not thread-safe, please
make sure you set the ``multithreading=True`` when initializing the queue before submitting new issue:).

            

Raw data

            {
    "_id": null,
    "home_page": "http://github.com/peter-wangxu/persist-queue",
    "name": "persist-queue",
    "maintainer": "Peter Wang",
    "docs_url": null,
    "requires_python": null,
    "maintainer_email": "wangxu198709@gmail.com",
    "keywords": null,
    "author": "Peter Wang",
    "author_email": "wangxu198709@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/cc/1f/495e4d941c925bcfa934f113f0ca4a5565cc00db3ab29944807f09e15457/persist-queue-1.0.0.tar.gz",
    "platform": "all",
    "description": "persist-queue - A thread-safe, disk-based queue for Python\n==========================================================\n\n.. image:: https://img.shields.io/circleci/project/github/peter-wangxu/persist-queue/master.svg?label=Linux%20%26%20Mac\n    :target: https://circleci.com/gh/peter-wangxu/persist-queue\n\n.. image:: https://img.shields.io/appveyor/ci/peter-wangxu/persist-queue/master.svg?label=Windows\n    :target: https://ci.appveyor.com/project/peter-wangxu/persist-queue\n\n.. image:: https://img.shields.io/codecov/c/github/peter-wangxu/persist-queue/master.svg\n    :target: https://codecov.io/gh/peter-wangxu/persist-queue\n\n.. image:: https://img.shields.io/pypi/v/persist-queue.svg\n    :target: https://pypi.python.org/pypi/persist-queue\n\n.. image:: https://img.shields.io/pypi/pyversions/persist-queue\n   :alt: PyPI - Python Version\n\n``persist-queue`` implements a file-based queue and a serial of sqlite3-based queues. The goals is to achieve following requirements:\n\n* Disk-based: each queued item should be stored in disk in case of any crash.\n* Thread-safe: can be used by multi-threaded producers and multi-threaded consumers.\n* Recoverable: Items can be read after process restart.\n* Green-compatible: can be used in ``greenlet`` or ``eventlet`` environment.\n\nWhile *queuelib* and *python-pqueue* cannot fulfil all of above. After some try, I found it's hard to achieve based on their current\nimplementation without huge code change. this is the motivation to start this project.\n\nBy default, *persist-queue* use *pickle* object serialization module to support object instances.\nMost built-in type, like `int`, `dict`, `list` are able to be persisted by `persist-queue` directly, to support customized objects,\nplease refer to `Pickling and unpickling extension types(Python2) <https://docs.python.org/2/library/pickle.html#pickling-and-unpickling-normal-class-instances>`_\nand `Pickling Class Instances(Python3) <https://docs.python.org/3/library/pickle.html#pickling-class-instances>`_\n\nThis project is based on the achievements of `python-pqueue <https://github.com/balena/python-pqueue>`_\nand `queuelib <https://github.com/scrapy/queuelib>`_\n\nSlack channels\n^^^^^^^^^^^^^^\n\nJoin `persist-queue <https://join.slack\n.com/t/persist-queue/shared_invite\n/enQtOTM0MDgzNTQ0MDg3LTNmN2IzYjQ1MDc0MDYzMjI4OGJmNmVkNWE3ZDBjYzg5MDc0OWUzZDJkYTkwODdkZmYwODdjNjUzMTk3MWExNDE>`_ channel\n\n\nRequirements\n------------\n* Python 3.5 or newer versions (refer to `Deprecation`_ for older Python versions)\n* Full support for Linux and MacOS.\n* Windows support (with `Caution`_ if ``persistqueue.Queue`` is used).\n\nFeatures\n--------\n\n- Multiple platforms support: Linux, macOS, Windows\n- Pure python\n- Both filed based queues and sqlite3 based queues are supported\n- Filed based queue: multiple serialization protocol support: pickle(default), msgpack, cbor, json\n\nDeprecation\n-----------\n- `persist-queue` drops Python 2 support since version `1.0.0`, no new feature will be developed under Python 2 as `Python 2 was sunset on January 1, 2020 <https://www.python.org/doc/sunset-python-2/>`_.\n- `Python 3.4 release has reached end of life <https://www.python.org/downloads/release/python-3410/>`_ and\n  `DBUtils <https://webwareforpython.github.io/DBUtils/changelog.html>`_ ceased support for `Python 3.4`, `persist queue` drops MySQL based queue for python 3.4 since version 0.8.0.\n  other queue implementations such as file based queue and sqlite3 based queue are still workable.\n\nInstallation\n------------\n\nfrom pypi\n^^^^^^^^^\n\n.. code-block:: console\n\n    pip install persist-queue\n    # for msgpack, cbor and mysql support, use following command\n    pip install \"persist-queue[extra]\"\n\n\nfrom source code\n^^^^^^^^^^^^^^^^\n\n.. code-block:: console\n\n    git clone https://github.com/peter-wangxu/persist-queue\n    cd persist-queue\n    # for msgpack and cbor support, run 'pip install -r extra-requirements.txt' first\n    python setup.py install\n\n\nBenchmark\n---------\n\nHere are the time spent(in seconds) for writing/reading **1000** items to the\ndisk comparing the sqlite3 and file queue.\n\n- Windows\n    - OS: Windows 10\n    - Disk: SATA3 SSD\n    - RAM: 16 GiB\n\n+---------------+---------+-------------------------+----------------------------+\n|               | Write   | Write/Read(1 task_done) | Write/Read(many task_done) |\n+---------------+---------+-------------------------+----------------------------+\n| SQLite3 Queue | 1.8880  | 2.0290                  | 3.5940                     |\n+---------------+---------+-------------------------+----------------------------+\n| File Queue    | 4.9520  | 5.0560                  | 8.4900                     |\n+---------------+---------+-------------------------+----------------------------+\n\n**windows note**\nPerformance of Windows File Queue has dramatic improvement since `v0.4.1` due to the\natomic renaming support(3-4X faster)\n\n- Linux\n    - OS: Ubuntu 16.04 (VM)\n    - Disk: SATA3 SSD\n    - RAM:  4 GiB\n\n+---------------+--------+-------------------------+----------------------------+\n|               | Write  | Write/Read(1 task_done) | Write/Read(many task_done) |\n+---------------+--------+-------------------------+----------------------------+\n| SQLite3 Queue | 1.8282 | 1.8075                  | 2.8639                     |\n+---------------+--------+-------------------------+----------------------------+\n| File Queue    | 0.9123 | 1.0411                  | 2.5104                     |\n+---------------+--------+-------------------------+----------------------------+\n\n- Mac OS\n    - OS: 10.14 (macOS Mojave)\n    - Disk: PCIe SSD\n    - RAM:  16 GiB\n\n+---------------+--------+-------------------------+----------------------------+\n|               | Write  | Write/Read(1 task_done) | Write/Read(many task_done) |\n+---------------+--------+-------------------------+----------------------------+\n| SQLite3 Queue | 0.1879 | 0.2115                  | 0.3147                     |\n+---------------+--------+-------------------------+----------------------------+\n| File Queue    | 0.5158 | 0.5357                  | 1.0446                     |\n+---------------+--------+-------------------------+----------------------------+\n\n**note**\n\n- The value above is in seconds for reading/writing *1000* items, the less\n  the better\n- Above result was got from:\n\n.. code-block:: console\n\n    python benchmark/run_benchmark.py 1000\n\n\nTo see the real performance on your host, run the script under ``benchmark/run_benchmark.py``:\n\n.. code-block:: console\n\n    python benchmark/run_benchmark.py <COUNT, default to 100>\n\n\nExamples\n--------\n\n\nExample usage with a SQLite3 based queue\n^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n\n.. code-block:: python\n\n    >>> import persistqueue\n    >>> q = persistqueue.SQLiteQueue('mypath', auto_commit=True)\n    >>> q.put('str1')\n    >>> q.put('str2')\n    >>> q.put('str3')\n    >>> q.get()\n    'str1'\n    >>> del q\n\n\nClose the console, and then recreate the queue:\n\n.. code-block:: python\n\n   >>> import persistqueue\n   >>> q = persistqueue.SQLiteQueue('mypath', auto_commit=True)\n   >>> q.get()\n   'str2'\n   >>>\n\nNew functions:\n*Available since v0.8.0*\n\n- ``shrink_disk_usage`` perform a ``VACUUM`` against the sqlite, and rebuild the database file, this usually takes long time and frees a lot of disk space after ``get()``\n\n\nExample usage of SQLite3 based ``UniqueQ``\n^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\nThis queue does not allow duplicate items.\n\n.. code-block:: python\n\n   >>> import persistqueue\n   >>> q = persistqueue.UniqueQ('mypath')\n   >>> q.put('str1')\n   >>> q.put('str1')\n   >>> q.size\n   1\n   >>> q.put('str2')\n   >>> q.size\n   2\n   >>>\n\nExample usage of SQLite3 based ``SQLiteAckQueue``/``UniqueAckQ``\n^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\nThe core functions:\n\n- ``put``: add item to the queue. Returns ``id``\n- ``get``: get item from queue and mark as unack.  Returns ``item``, Optional paramaters (``block``, ``timeout``, ``id``, ``next_in_order``, ``raw``)\n- ``update``: update an item. Returns ``id``, Paramaters (``item``), Optional parameter if item not in raw format (``id``)\n- ``ack``: mark item as acked. Returns ``id``, Parameters (``item`` or ``id``)\n- ``nack``: there might be something wrong with current consumer, so mark item as ready and new consumer will get it.  Returns ``id``, Parameters (``item`` or ``id``)\n- ``ack_failed``: there might be something wrong during process, so just mark item as failed. Returns ``id``, Parameters (``item`` or ``id``)\n- ``clear_acked_data``: perform a sql delete agaist sqlite. It removes 1000 items, while keeping 1000 of the most recent, whose status is ``AckStatus.acked`` (note: this does not shrink the file size on disk) Optional paramters (``max_delete``, ``keep_latest``, ``clear_ack_failed``)\n- ``shrink_disk_usage`` perform a ``VACUUM`` against the sqlite, and rebuild the database file, this usually takes long time and frees a lot of disk space after ``clear_acked_data``\n- ``queue``: returns the database contents as a Python List[Dict]\n- ``active_size``: The active size changes when an item is added (put) and completed (ack/ack_failed) unlike ``qsize`` which changes when an item is pulled (get) or returned (nack).\n\n.. code-block:: python\n\n   >>> import persistqueue\n   >>> ackq = persistqueue.SQLiteAckQueue('path')\n   >>> ackq.put('str1')\n   >>> item = ackq.get()\n   >>> # Do something with the item\n   >>> ackq.ack(item) # If done with the item\n   >>> ackq.nack(item) # Else mark item as `nack` so that it can be proceeded again by any worker\n   >>> ackq.ack_failed(item) # Or else mark item as `ack_failed` to discard this item\n\nParameters:\n\n- ``clear_acked_data``\n    - ``max_delete`` (defaults to 1000): This is the LIMIT.  How many items to delete.\n    - ``keep_latest`` (defaults to 1000): This is the OFFSET.  How many recent items to keep.\n    - ``clear_ack_failed`` (defaults to False): Clears the ``AckStatus.ack_failed`` in addition to the ``AckStatus.ack``.\n\n- ``get``\n    - ``raw`` (defaults to False): Returns the metadata along with the record, which includes the id (``pqid``) and timestamp.  On the SQLiteAckQueue, the raw results can be ack, nack, ack_failed similar to the normal return.\n    -  ``id`` (defaults to None): Accepts an `id` or a raw item containing ``pqid``.  Will select the item based on the row id.\n    -  ``next_in_order`` (defaults to False): Requires the ``id`` attribute.  This option tells the SQLiteAckQueue/UniqueAckQ to get the next item based on  ``id``, not the first available.  This allows the user to get, nack, get, nack and progress down the queue, instead of continuing to get the same nack'd item over again.\n\n``raw`` example:\n\n.. code-block:: python\n\n   >>> q.put('val1')\n   >>> d = q.get(raw=True)\n   >>> print(d)\n   >>> {'pqid': 1, 'data': 'val1', 'timestamp': 1616719225.012912}\n   >>> q.ack(d)\n\n``next_in_order`` example:\n\n.. code-block:: python\n\n   >>> q.put(\"val1\")\n   >>> q.put(\"val2\")\n   >>> q.put(\"val3\")\n   >>> item = q.get()\n   >>> id = q.nack(item)\n   >>> item = q.get(id=id, next_in_order=True)\n   >>> print(item)\n   >>> val2\n\n\nNote:\n\n1. The SQLiteAckQueue always uses \"auto_commit=True\".\n2. The Queue could be set in non-block style, e.g. \"SQLiteAckQueue.get(block=False, timeout=5)\".\n3. ``UniqueAckQ`` only allows for unique items\n\nExample usage with a file based queue\n^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n\nParameters:\n\n- ``path``: specifies the directory wher enqueued data persisted.\n- ``maxsize``: indicates the maximum size stored in the queue, if maxsize<=0 the queue is unlimited.\n- ``chunksize``: indicates how many entries should exist in each chunk file on disk. When a all entries in a chunk file was dequeued by get(), the file would be removed from filesystem.\n- ``tempdir``: indicates where temporary files should be stored. The tempdir has to be located on the same disk as the enqueued data in order to obtain atomic operations.\n- ``serializer``: controls how enqueued data is serialized.\n- ``auto_save``: `True` or `False`. By default, the change is only persisted when task_done() is called. If autosave is enabled, info data is persisted immediately when get() is called. Adding data to the queue with put() will always persist immediately regardless of this setting.\n\n.. code-block:: python\n\n    >>> from persistqueue import Queue\n    >>> q = Queue(\"mypath\")\n    >>> q.put('a')\n    >>> q.put('b')\n    >>> q.put('c')\n    >>> q.get()\n    'a'\n    >>> q.task_done()\n\n\nClose the python console, and then we restart the queue from the same path,\n\n.. code-block:: python\n\n    >>> from persistqueue import Queue\n    >>> q = Queue('mypath')\n    >>> q.get()\n    'b'\n    >>> q.task_done()\n\nExample usage with an auto-saving file based queue\n^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n\n*Available since: v0.5.0*\n\nBy default, items added to the queue are persisted during the ``put()`` call,\nand items removed from a queue are only persisted when ``task_done()`` is\ncalled.\n\n.. code-block:: python\n\n    >>> from persistqueue import Queue\n    >>> q = Queue(\"mypath\")\n    >>> q.put('a')\n    >>> q.put('b')\n    >>> q.get()\n    'a'\n    >>> q.get()\n    'b'\n\nAfter exiting and restarting the queue from the same path, we see the items\nremain in the queue, because ``task_done()`` wasn't called before.\n\n.. code-block:: python\n\n    >>> from persistqueue import Queue\n    >>> q = Queue('mypath')\n    >>> q.get()\n    'a'\n    >>> q.get()\n    'b'\n\nThis can be advantageous. For example, if your program crashes before finishing\nprocessing an item, it will remain in the queue after restarting. You can also\nspread out the ``task_done()`` calls for performance reasons to avoid lots of\nindividual writes.\n\nUsing ``autosave=True`` on a file based queue will automatically save on every\ncall to ``get()``. Calling ``task_done()`` is not necessary, but may still be\nused to ``join()`` against the queue.\n\n.. code-block:: python\n\n    >>> from persistqueue import Queue\n    >>> q = Queue(\"mypath\", autosave=True)\n    >>> q.put('a')\n    >>> q.put('b')\n    >>> q.get()\n    'a'\n\nAfter exiting and restarting the queue from the same path, only the second item\nremains:\n\n.. code-block:: python\n\n    >>> from persistqueue import Queue\n    >>> q = Queue('mypath', autosave=True)\n    >>> q.get()\n    'b'\n\n\nExample usage with a SQLite3 based dict\n^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n\n.. code-block:: python\n\n    >>> from persisitqueue import PDict\n    >>> q = PDict(\"testpath\", \"testname\")\n    >>> q['key1'] = 123\n    >>> q['key2'] = 321\n    >>> q['key1']\n    123\n    >>> len(q)\n    2\n    >>> del q['key1']\n    >>> q['key1']\n    Traceback (most recent call last):\n      File \"<stdin>\", line 1, in <module>\n      File \"persistqueue\\pdict.py\", line 58, in __getitem__\n        raise KeyError('Key: {} not exists.'.format(item))\n    KeyError: 'Key: key1 not exists.'\n\nClose the console and restart the PDict\n\n\n.. code-block:: python\n\n    >>> from persisitqueue import PDict\n    >>> q = PDict(\"testpath\", \"testname\")\n    >>> q['key2']\n    321\n\n\nMulti-thread usage for **SQLite3** based queue\n^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n\n.. code-block:: python\n\n    from persistqueue import FIFOSQLiteQueue\n\n    q = FIFOSQLiteQueue(path=\"./test\", multithreading=True)\n\n    def worker():\n        while True:\n            item = q.get()\n            do_work(item)\n\n    for i in range(num_worker_threads):\n         t = Thread(target=worker)\n         t.daemon = True\n         t.start()\n\n    for item in source():\n        q.put(item)\n\n\nmulti-thread usage for **Queue**\n^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n\n.. code-block:: python\n\n    from persistqueue import Queue\n\n    q = Queue()\n\n    def worker():\n        while True:\n            item = q.get()\n            do_work(item)\n            q.task_done()\n\n    for i in range(num_worker_threads):\n         t = Thread(target=worker)\n         t.daemon = True\n         t.start()\n\n    for item in source():\n        q.put(item)\n\n    q.join()       # block until all tasks are done\n\nExample usage with a MySQL based queue\n^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n\n*Available since: v0.8.0*\n\n.. code-block:: python\n\n    >>> import persistqueue\n    >>> db_conf = {\n    >>>     \"host\": \"127.0.0.1\",\n    >>>     \"user\": \"user\",\n    >>>     \"passwd\": \"passw0rd\",\n    >>>     \"db_name\": \"testqueue\",\n    >>>     # \"name\": \"\",\n    >>>     \"port\": 3306\n    >>> }\n    >>> q = persistqueue.MySQLQueue(name=\"testtable\", **db_conf)\n    >>> q.put('str1')\n    >>> q.put('str2')\n    >>> q.put('str3')\n    >>> q.get()\n    'str1'\n    >>> del q\n\n\nClose the console, and then recreate the queue:\n\n.. code-block:: python\n\n   >>> import persistqueue\n   >>> q = persistqueue.MySQLQueue(name=\"testtable\", **db_conf)\n   >>> q.get()\n   'str2'\n   >>>\n\n\n\n**note**\n\nDue to the limitation of file queue described in issue `#89 <https://github.com/peter-wangxu/persist-queue/issues/89>`_,\n`task_done` in one thread may acknowledge items in other threads which should not be. Considering the `SQLiteAckQueue` if you have such requirement.\n\n\nSerialization via msgpack/cbor/json\n^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n- v0.4.1: Currently only available for file based Queue\n- v0.4.2: Also available for SQLite3 based Queues\n\n.. code-block:: python\n\n    >>> from persistqueue\n    >>> q = persistqueue.Queue('mypath', serializer=persistqueue.serializers.msgpack)\n    >>> # via cbor2\n    >>> # q = persistqueue.Queue('mypath', serializer=persistqueue.serializers.cbor2)\n    >>> # via json\n    >>> # q = Queue('mypath', serializer=persistqueue.serializers.json)\n    >>> q.get()\n    'b'\n    >>> q.task_done()\n\nExplicit resource reclaim\n^^^^^^^^^^^^^^^^^^^^^^^^^\n\nFor some reasons, an application may require explicit reclamation for file\nhandles or sql connections before end of execution. In these cases, user can\nsimply call:\n.. code-block:: python\n\n    q = Queue() # or q = persistqueue.SQLiteQueue('mypath', auto_commit=True)\n    del q\n\n\nto reclaim related file handles or sql connections.\n\nTips\n----\n\n``task_done`` is required both for file based queue and SQLite3 based queue (when ``auto_commit=False``)\nto persist the cursor of next ``get`` to the disk.\n\n\nPerformance impact\n------------------\n\n- **WAL**\n\n  Starting on v0.3.2, the ``persistqueue`` is leveraging the sqlite3 builtin feature\n  `WAL <https://www.sqlite.org/wal.html>`_ which can improve the performance\n  significantly, a general testing indicates that ``persistqueue`` is 2-4 times\n  faster than previous version.\n\n- **auto_commit=False**\n\n  Since persistqueue v0.3.0, a new parameter ``auto_commit`` is introduced to tweak\n  the performance for sqlite3 based queues as needed. When specify ``auto_commit=False``, user\n  needs to perform ``queue.task_done()`` to persist the changes made to the disk since\n  last ``task_done`` invocation.\n\n- **pickle protocol selection**\n\n  From v0.3.6, the ``persistqueue`` will select ``Protocol version 2`` for python2 and ``Protocol version 4`` for python3\n  respectively. This selection only happens when the directory is not present when initializing the queue.\n\nTests\n-----\n\n*persist-queue* use ``tox`` to trigger tests.\n\n- Unit test\n\n.. code-block:: console\n\n    tox -e <PYTHON_VERSION>\n\nAvailable ``<PYTHON_VERSION>``: ``py27``, ``py34``, ``py35``, ``py36``, ``py37``\n\n\n- PEP8 check\n\n.. code-block:: console\n\n   tox -e pep8\n\n\n`pyenv <https://github.com/pyenv/pyenv>`_ is usually a helpful tool to manage multiple versions of Python.\n\nCaution\n-------\n\nCurrently, the atomic operation is supported on Windows while still in experimental,\nThat's saying, the data in ``persistqueue.Queue`` could be in unreadable state when an incidental failure occurs during ``Queue.task_done``.\n\n**DO NOT put any critical data on persistqueue.queue on Windows**.\n\n\nContribution\n------------\n\nSimply fork this repo and send PR for your code change(also tests to cover your change), remember to give a title and description of your PR. I am willing to\nenhance this project with you :).\n\n\nLicense\n-------\n\n`BSD <LICENSE>`_\n\nContributors\n------------\n\n`Contributors <https://github.com/peter-wangxu/persist-queue/graphs/contributors>`_\n\nFAQ\n---\n\n* ``sqlite3.OperationalError: database is locked`` is raised.\n\npersistqueue open 2 connections for the db if ``multithreading=True``, the\nSQLite database is locked until that transaction is committed. The ``timeout``\nparameter specifies how long the connection should wait for the lock to go away\nuntil raising an exception. Default time is **10**, increase ``timeout``\nwhen creating the queue if above error occurs.\n\n* sqlite3 based queues are not thread-safe.\n\nThe sqlite3 queues are heavily tested under multi-threading environment, if you find it's not thread-safe, please\nmake sure you set the ``multithreading=True`` when initializing the queue before submitting new issue:).\n",
    "bugtrack_url": null,
    "license": "BSD",
    "summary": "A thread-safe disk based persistent queue in Python.",
    "version": "1.0.0",
    "project_urls": {
        "Homepage": "http://github.com/peter-wangxu/persist-queue"
    },
    "split_keywords": [],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "c03e2b717ade1ead833033b3de44d907e25576f587e77be80154d4526f93d8ba",
                "md5": "c30347cc21a68b5b7ddfe41bddc61999",
                "sha256": "81bb20030b480fcacecc3abe6261480c818246f4d838fdf0217e36c2552a5f3a"
            },
            "downloads": -1,
            "filename": "persist_queue-1.0.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "c30347cc21a68b5b7ddfe41bddc61999",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": null,
            "size": 39349,
            "upload_time": "2024-07-03T02:28:21",
            "upload_time_iso_8601": "2024-07-03T02:28:21.533021Z",
            "url": "https://files.pythonhosted.org/packages/c0/3e/2b717ade1ead833033b3de44d907e25576f587e77be80154d4526f93d8ba/persist_queue-1.0.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "cc1f495e4d941c925bcfa934f113f0ca4a5565cc00db3ab29944807f09e15457",
                "md5": "770c027afd3812090539d7446bb7346c",
                "sha256": "3ffb746902d3023fd09eb46897609fdee6c77b1641f19e2fc8d98d744bdfc845"
            },
            "downloads": -1,
            "filename": "persist-queue-1.0.0.tar.gz",
            "has_sig": false,
            "md5_digest": "770c027afd3812090539d7446bb7346c",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": null,
            "size": 30262,
            "upload_time": "2024-07-03T02:28:24",
            "upload_time_iso_8601": "2024-07-03T02:28:24.026803Z",
            "url": "https://files.pythonhosted.org/packages/cc/1f/495e4d941c925bcfa934f113f0ca4a5565cc00db3ab29944807f09e15457/persist-queue-1.0.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-07-03 02:28:24",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "peter-wangxu",
    "github_project": "persist-queue",
    "travis_ci": false,
    "coveralls": true,
    "github_actions": false,
    "circle": true,
    "appveyor": true,
    "requirements": [],
    "tox": true,
    "lcname": "persist-queue"
}
        
Elapsed time: 4.61690s