pq


Namepq JSON
Version 1.9.1 PyPI version JSON
download
home_pagehttps://github.com/malthe/pq/
SummaryPQ is a transactional queue for PostgreSQL.
upload_time2023-04-04 05:55:07
maintainer
docs_urlNone
authorMalthe Borch
requires_python>=3.6
licenseBSD
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            PQ
**

A transactional queue system for PostgreSQL written in Python.

.. figure:: https://pq.readthedocs.org/en/latest/_static/intro.svg
   :alt: PQ does the job!

It allows you to push and pop items in and out of a queue in various
ways and also provides two scheduling options: delayed processing and
prioritization.

The system uses a single table that holds all jobs across queues; the
specifics are easy to customize.

The system currently supports only the `psycopg2
<https://pypi.python.org/pypi/psycopg2>`_ database driver - or
`psycopg2cffi <https://pypi.python.org/pypi/psycopg2cffi>`_ for PyPy.

The basic queue implementation is similar to Ryan Smith's
`queue_classic <https://github.com/ryandotsmith/queue_classic>`_
library written in Ruby, but uses `SKIP LOCKED
<https://www.2ndquadrant.com/en/blog/what-is-select-skip-locked-for-in-postgresql-9-5/>`_
for concurrency control.

In terms of performance, the implementation clock in at about 1,000
operations per second. Using the `PyPy <http://pypy.org/>`_
interpreter, this scales linearly with the number of cores available.


Getting started
===============

All functionality is encapsulated in a single class ``PQ``.

     ``class PQ(conn=None, pool=None, table="queue", schema=None)``

The optional ``schema`` argument can be used to qualify the table with
a schema if necessary.

Example usage:

.. code-block:: python

    from psycopg2 import connect
    from pq import PQ

    conn = connect('dbname=example user=postgres')
    pq = PQ(conn)

For multi-threaded operation, use a connection pool such as
``psycopg2.pool.ThreadedConnectionPool``.

You probably want to make sure your database is created with the
``utf-8`` encoding.

To create and configure the queue table, call the ``create()`` method.

.. code-block:: python

    pq.create()


Queues
======

The ``pq`` object exposes queues through Python's dictionary
interface:

.. code-block:: python

    queue = pq['apples']

The ``queue`` object provides ``get`` and ``put`` methods as explained
below, and in addition, it also works as a context manager where it
manages a transaction:

.. code-block:: python

    with queue as cursor:
        ...

The statements inside the context manager are either committed as a
transaction or rejected, atomically. This is useful when a queue is
used to manage jobs because it allows you to retrieve a job from the
queue, perform a job and write a result, with transactional
semantics.

Methods
=======

Use the ``put(data)`` method to insert an item into the queue. It
takes a JSON-compatible object such as a Python dictionary:

.. code-block:: python

    queue.put({'kind': 'Cox'})
    queue.put({'kind': 'Arthur Turner'})
    queue.put({'kind': 'Golden Delicious'})

Items are pulled out of the queue using ``get(block=True)``. The
default behavior is to block until an item is available with a default
timeout of one second after which a value of ``None`` is returned.

.. code-block:: python

    def eat(kind):
        print 'umm, %s apples taste good.' % kind

    job = queue.get()
    eat(**job.data)

The ``job`` object provides additional metadata in addition to the
``data`` attribute as illustrated by the string representation:

    >>> job
    <pq.Job id=77709 size=1 enqueued_at="2014-02-21T16:22:06Z" schedule_at=None>

The ``get`` operation is also available through iteration:

.. code-block:: python

    for job in queue:
        if job is None:
            break

        eat(**job.data)

The iterator blocks if no item is available. Again, there is a default
timeout of one second, after which the iterator yields a value of
``None``.

An application can then choose to break out of the loop, or wait again
for an item to be ready.

.. code-block:: python

    for job in queue:
        if job is not None:
            eat(**job.data)

        # This is an infinite loop!


Scheduling
==========

Items can be scheduled such that they're not pulled until a later
time:

.. code-block:: python

    queue.put({'kind': 'Cox'}, '5m')

In this example, the item is ready for work five minutes later. The
method also accepts ``datetime`` and ``timedelta`` objects.


Priority
========

If some items are more important than others, a time expectation can
be expressed:

.. code-block:: python

    queue.put({'kind': 'Cox'}, expected_at='5m')

This tells the queue processor to give priority to this item over an
item expected at a later time, and conversely, to prefer an item with
an earlier expected time. Note that items without a set priority are
pulled last.

The scheduling and priority options can be combined:

.. code-block:: python

    queue.put({'kind': 'Cox'}, '1h', '2h')

This item won't be pulled out until after one hour, and even then,
it's only processed subject to it's priority of two hours.


Encoding and decoding
=====================

The task data is encoded and decoded into JSON using the built-in
`json` module. If you want to use a different implementation or need
to configure this, pass `encode` and/or `decode` arguments to the `PQ`
constructor.


Pickles
=======

If a queue name is provided as ``<name>/pickle``
(e.g. ``'jobs/pickle'``), items are automatically pickled and
unpickled using Python's built-in ``cPickle`` module:

.. code-block:: python

    queue = pq['apples/pickle']

    class Apple(object):
        def __init__(self, kind):
           self.kind = kind

    queue.put(Apple('Cox'))

This allows you to store most objects without having to add any
further serialization code.

The old pickle protocol ``0`` is used to ensure the pickled data is
encoded as ``ascii`` which should be compatible with any database
encoding. Note that the pickle data is still wrapped as a JSON string at the
database level.

While using the pickle protocol is an easy way to serialize objects,
for advanced users t might be better to use JSON serialization
directly on the objects, using for example the object hook mechanism
in the built-in `json` module or subclassing
`JSONEncoder <https://docs.python.org/2/library/json.html#json.JSONEncoder>`.


Tasks
=====

``pq`` comes with a higher level ``API`` that helps to manage ``tasks``.


.. code-block:: python

    from pq.tasks import PQ

    pq = PQ(...)

    queue = pq['default']

    @queue.task(schedule_at='1h')
    def eat(job_id, kind):
        print 'umm, %s apples taste good.' % kind

    eat('Cox')

    queue.work()


``tasks``'s ``jobs`` can optionally be re-scheduled on failure:

.. code-block:: python

    @queue.task(schedule_at='1h', max_retries=2, retry_in='10s')
    def eat(job_id, kind):
        # ...


Time expectations can be overriden at ``task`` call:

.. code-block:: python

    eat('Cox', _expected_at='2m', _schedule_at='1m')


** NOTE ** First positional argument is id of job. It's PK of record in PostgreSQL.

Thread-safety
=============

All objects are thread-safe as long as a connection pool is provided
where each thread receives its own database connection.


Changes
=======

1.9.1 (2023-04-04)
------------------

- Add support for PostgreSQL 14 [kalekseev].

  See https://www.postgresql.org/docs/14/release-14.html#id-1.11.6.11.4.

- Add support for using a custom schema (issue #35).

1.9.0 (2020-09-29)
------------------

- The task executor now receives `job_id` as the first argument.

1.8.2 (2020-08-14)
------------------

- Added support for queue names longer than 63 characters.

  A database migration (dropping and recreating the `pq_notify`
  trigger) is required if using names longer than this limit. If not
  using, then no migration is required.

- Return connections to the pool if an exception is raised while it is retrieved

1.8.1 (2019-07-30)
------------------

- Added overridable `encode` and `decode` methods which are
  responsible for turning task data into `JSON` and vice-versa.

1.8.0 (2019-07-03)
------------------

- Change policy on task priority. Tasks with a null-value for
  `expected_at` are now processed after items that have a value set.

1.7.0 (2019-04-07)
------------------

- Use `SKIP LOCKED` instead of advisory lock mechanism (PostgreSQL 9.5+).

1.6.1 (2018-11-14)
------------------

- Fix queue class factory pattern.

1.6 (2018-11-12)
----------------

- Fix compatibility with `NamedTupleCursor`.

- Fix duplicate column name issue.

- Add option to specify own queue class.


1.5 (2017-04-18)
----------------

- Fixed Python 2 compatibility.


1.4 (2016-03-25)
----------------

- Added worker class and handler helper decorator.
  [jeanphix]


1.3 (2015-05-11)
----------------

- Python 3 compatibility.
  [migurski]

- Fix time zone issue.


1.2 (2014-10-21)
----------------

Improvements:

- Fixed concurrency issue where a large number of locks would be held
  as a queue grows in size.

- Fixed a database connection resource issue.


1.1 (2014-02-27)
----------------

Features:

- A queue is now also a context manager, providing transactional
  semantics.

- A queues now returns task objects which provide metadata and allows
  reading and writing task data.

Improvements:

- The same connection pool can now be used with different queues.

Bugs:

- The `Literal` string wrapper did not work correctly with `psycopg2`.

- The transaction manager now correctly returns connections to the
  pool.


1.0 (2013-11-20)
----------------

- Initial public release.

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/malthe/pq/",
    "name": "pq",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3.6",
    "maintainer_email": "",
    "keywords": "",
    "author": "Malthe Borch",
    "author_email": "mborch@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/a1/79/7babafd0ec7e279078f03a6a402e0160e158028b0afdede436c4efef2004/pq-1.9.1.tar.gz",
    "platform": "any",
    "description": "PQ\n**\n\nA transactional queue system for PostgreSQL written in Python.\n\n.. figure:: https://pq.readthedocs.org/en/latest/_static/intro.svg\n   :alt: PQ does the job!\n\nIt allows you to push and pop items in and out of a queue in various\nways and also provides two scheduling options: delayed processing and\nprioritization.\n\nThe system uses a single table that holds all jobs across queues; the\nspecifics are easy to customize.\n\nThe system currently supports only the `psycopg2\n<https://pypi.python.org/pypi/psycopg2>`_ database driver - or\n`psycopg2cffi <https://pypi.python.org/pypi/psycopg2cffi>`_ for PyPy.\n\nThe basic queue implementation is similar to Ryan Smith's\n`queue_classic <https://github.com/ryandotsmith/queue_classic>`_\nlibrary written in Ruby, but uses `SKIP LOCKED\n<https://www.2ndquadrant.com/en/blog/what-is-select-skip-locked-for-in-postgresql-9-5/>`_\nfor concurrency control.\n\nIn terms of performance, the implementation clock in at about 1,000\noperations per second. Using the `PyPy <http://pypy.org/>`_\ninterpreter, this scales linearly with the number of cores available.\n\n\nGetting started\n===============\n\nAll functionality is encapsulated in a single class ``PQ``.\n\n     ``class PQ(conn=None, pool=None, table=\"queue\", schema=None)``\n\nThe optional ``schema`` argument can be used to qualify the table with\na schema if necessary.\n\nExample usage:\n\n.. code-block:: python\n\n    from psycopg2 import connect\n    from pq import PQ\n\n    conn = connect('dbname=example user=postgres')\n    pq = PQ(conn)\n\nFor multi-threaded operation, use a connection pool such as\n``psycopg2.pool.ThreadedConnectionPool``.\n\nYou probably want to make sure your database is created with the\n``utf-8`` encoding.\n\nTo create and configure the queue table, call the ``create()`` method.\n\n.. code-block:: python\n\n    pq.create()\n\n\nQueues\n======\n\nThe ``pq`` object exposes queues through Python's dictionary\ninterface:\n\n.. code-block:: python\n\n    queue = pq['apples']\n\nThe ``queue`` object provides ``get`` and ``put`` methods as explained\nbelow, and in addition, it also works as a context manager where it\nmanages a transaction:\n\n.. code-block:: python\n\n    with queue as cursor:\n        ...\n\nThe statements inside the context manager are either committed as a\ntransaction or rejected, atomically. This is useful when a queue is\nused to manage jobs because it allows you to retrieve a job from the\nqueue, perform a job and write a result, with transactional\nsemantics.\n\nMethods\n=======\n\nUse the ``put(data)`` method to insert an item into the queue. It\ntakes a JSON-compatible object such as a Python dictionary:\n\n.. code-block:: python\n\n    queue.put({'kind': 'Cox'})\n    queue.put({'kind': 'Arthur Turner'})\n    queue.put({'kind': 'Golden Delicious'})\n\nItems are pulled out of the queue using ``get(block=True)``. The\ndefault behavior is to block until an item is available with a default\ntimeout of one second after which a value of ``None`` is returned.\n\n.. code-block:: python\n\n    def eat(kind):\n        print 'umm, %s apples taste good.' % kind\n\n    job = queue.get()\n    eat(**job.data)\n\nThe ``job`` object provides additional metadata in addition to the\n``data`` attribute as illustrated by the string representation:\n\n    >>> job\n    <pq.Job id=77709 size=1 enqueued_at=\"2014-02-21T16:22:06Z\" schedule_at=None>\n\nThe ``get`` operation is also available through iteration:\n\n.. code-block:: python\n\n    for job in queue:\n        if job is None:\n            break\n\n        eat(**job.data)\n\nThe iterator blocks if no item is available. Again, there is a default\ntimeout of one second, after which the iterator yields a value of\n``None``.\n\nAn application can then choose to break out of the loop, or wait again\nfor an item to be ready.\n\n.. code-block:: python\n\n    for job in queue:\n        if job is not None:\n            eat(**job.data)\n\n        # This is an infinite loop!\n\n\nScheduling\n==========\n\nItems can be scheduled such that they're not pulled until a later\ntime:\n\n.. code-block:: python\n\n    queue.put({'kind': 'Cox'}, '5m')\n\nIn this example, the item is ready for work five minutes later. The\nmethod also accepts ``datetime`` and ``timedelta`` objects.\n\n\nPriority\n========\n\nIf some items are more important than others, a time expectation can\nbe expressed:\n\n.. code-block:: python\n\n    queue.put({'kind': 'Cox'}, expected_at='5m')\n\nThis tells the queue processor to give priority to this item over an\nitem expected at a later time, and conversely, to prefer an item with\nan earlier expected time. Note that items without a set priority are\npulled last.\n\nThe scheduling and priority options can be combined:\n\n.. code-block:: python\n\n    queue.put({'kind': 'Cox'}, '1h', '2h')\n\nThis item won't be pulled out until after one hour, and even then,\nit's only processed subject to it's priority of two hours.\n\n\nEncoding and decoding\n=====================\n\nThe task data is encoded and decoded into JSON using the built-in\n`json` module. If you want to use a different implementation or need\nto configure this, pass `encode` and/or `decode` arguments to the `PQ`\nconstructor.\n\n\nPickles\n=======\n\nIf a queue name is provided as ``<name>/pickle``\n(e.g. ``'jobs/pickle'``), items are automatically pickled and\nunpickled using Python's built-in ``cPickle`` module:\n\n.. code-block:: python\n\n    queue = pq['apples/pickle']\n\n    class Apple(object):\n        def __init__(self, kind):\n           self.kind = kind\n\n    queue.put(Apple('Cox'))\n\nThis allows you to store most objects without having to add any\nfurther serialization code.\n\nThe old pickle protocol ``0`` is used to ensure the pickled data is\nencoded as ``ascii`` which should be compatible with any database\nencoding. Note that the pickle data is still wrapped as a JSON string at the\ndatabase level.\n\nWhile using the pickle protocol is an easy way to serialize objects,\nfor advanced users t might be better to use JSON serialization\ndirectly on the objects, using for example the object hook mechanism\nin the built-in `json` module or subclassing\n`JSONEncoder <https://docs.python.org/2/library/json.html#json.JSONEncoder>`.\n\n\nTasks\n=====\n\n``pq`` comes with a higher level ``API`` that helps to manage ``tasks``.\n\n\n.. code-block:: python\n\n    from pq.tasks import PQ\n\n    pq = PQ(...)\n\n    queue = pq['default']\n\n    @queue.task(schedule_at='1h')\n    def eat(job_id, kind):\n        print 'umm, %s apples taste good.' % kind\n\n    eat('Cox')\n\n    queue.work()\n\n\n``tasks``'s ``jobs`` can optionally be re-scheduled on failure:\n\n.. code-block:: python\n\n    @queue.task(schedule_at='1h', max_retries=2, retry_in='10s')\n    def eat(job_id, kind):\n        # ...\n\n\nTime expectations can be overriden at ``task`` call:\n\n.. code-block:: python\n\n    eat('Cox', _expected_at='2m', _schedule_at='1m')\n\n\n** NOTE ** First positional argument is id of job. It's PK of record in PostgreSQL.\n\nThread-safety\n=============\n\nAll objects are thread-safe as long as a connection pool is provided\nwhere each thread receives its own database connection.\n\n\nChanges\n=======\n\n1.9.1 (2023-04-04)\n------------------\n\n- Add support for PostgreSQL 14 [kalekseev].\n\n  See https://www.postgresql.org/docs/14/release-14.html#id-1.11.6.11.4.\n\n- Add support for using a custom schema (issue #35).\n\n1.9.0 (2020-09-29)\n------------------\n\n- The task executor now receives `job_id` as the first argument.\n\n1.8.2 (2020-08-14)\n------------------\n\n- Added support for queue names longer than 63 characters.\n\n  A database migration (dropping and recreating the `pq_notify`\n  trigger) is required if using names longer than this limit. If not\n  using, then no migration is required.\n\n- Return connections to the pool if an exception is raised while it is retrieved\n\n1.8.1 (2019-07-30)\n------------------\n\n- Added overridable `encode` and `decode` methods which are\n  responsible for turning task data into `JSON` and vice-versa.\n\n1.8.0 (2019-07-03)\n------------------\n\n- Change policy on task priority. Tasks with a null-value for\n  `expected_at` are now processed after items that have a value set.\n\n1.7.0 (2019-04-07)\n------------------\n\n- Use `SKIP LOCKED` instead of advisory lock mechanism (PostgreSQL 9.5+).\n\n1.6.1 (2018-11-14)\n------------------\n\n- Fix queue class factory pattern.\n\n1.6 (2018-11-12)\n----------------\n\n- Fix compatibility with `NamedTupleCursor`.\n\n- Fix duplicate column name issue.\n\n- Add option to specify own queue class.\n\n\n1.5 (2017-04-18)\n----------------\n\n- Fixed Python 2 compatibility.\n\n\n1.4 (2016-03-25)\n----------------\n\n- Added worker class and handler helper decorator.\n  [jeanphix]\n\n\n1.3 (2015-05-11)\n----------------\n\n- Python 3 compatibility.\n  [migurski]\n\n- Fix time zone issue.\n\n\n1.2 (2014-10-21)\n----------------\n\nImprovements:\n\n- Fixed concurrency issue where a large number of locks would be held\n  as a queue grows in size.\n\n- Fixed a database connection resource issue.\n\n\n1.1 (2014-02-27)\n----------------\n\nFeatures:\n\n- A queue is now also a context manager, providing transactional\n  semantics.\n\n- A queues now returns task objects which provide metadata and allows\n  reading and writing task data.\n\nImprovements:\n\n- The same connection pool can now be used with different queues.\n\nBugs:\n\n- The `Literal` string wrapper did not work correctly with `psycopg2`.\n\n- The transaction manager now correctly returns connections to the\n  pool.\n\n\n1.0 (2013-11-20)\n----------------\n\n- Initial public release.\n",
    "bugtrack_url": null,
    "license": "BSD",
    "summary": "PQ is a transactional queue for PostgreSQL.",
    "version": "1.9.1",
    "split_keywords": [],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "a1797babafd0ec7e279078f03a6a402e0160e158028b0afdede436c4efef2004",
                "md5": "646c9654bcd997136a2592a2d4372d3d",
                "sha256": "d64af0efb8a3ebd11b2e0e662a6426c5e1e99f7f2e446f4439ba0699193e4ad6"
            },
            "downloads": -1,
            "filename": "pq-1.9.1.tar.gz",
            "has_sig": false,
            "md5_digest": "646c9654bcd997136a2592a2d4372d3d",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.6",
            "size": 15794,
            "upload_time": "2023-04-04T05:55:07",
            "upload_time_iso_8601": "2023-04-04T05:55:07.583802Z",
            "url": "https://files.pythonhosted.org/packages/a1/79/7babafd0ec7e279078f03a6a402e0160e158028b0afdede436c4efef2004/pq-1.9.1.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2023-04-04 05:55:07",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "github_user": "malthe",
    "github_project": "pq",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "pq"
}
        
Elapsed time: 0.07186s