Name | pq JSON |
Version |
1.9.1
JSON |
| download |
home_page | https://github.com/malthe/pq/ |
Summary | PQ is a transactional queue for PostgreSQL. |
upload_time | 2023-04-04 05:55:07 |
maintainer | |
docs_url | None |
author | Malthe Borch |
requires_python | >=3.6 |
license | BSD |
keywords |
|
VCS |
|
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
No Travis.
|
coveralls test coverage |
No coveralls.
|
PQ
**
A transactional queue system for PostgreSQL written in Python.
.. figure:: https://pq.readthedocs.org/en/latest/_static/intro.svg
:alt: PQ does the job!
It allows you to push and pop items in and out of a queue in various
ways and also provides two scheduling options: delayed processing and
prioritization.
The system uses a single table that holds all jobs across queues; the
specifics are easy to customize.
The system currently supports only the `psycopg2
<https://pypi.python.org/pypi/psycopg2>`_ database driver - or
`psycopg2cffi <https://pypi.python.org/pypi/psycopg2cffi>`_ for PyPy.
The basic queue implementation is similar to Ryan Smith's
`queue_classic <https://github.com/ryandotsmith/queue_classic>`_
library written in Ruby, but uses `SKIP LOCKED
<https://www.2ndquadrant.com/en/blog/what-is-select-skip-locked-for-in-postgresql-9-5/>`_
for concurrency control.
In terms of performance, the implementation clock in at about 1,000
operations per second. Using the `PyPy <http://pypy.org/>`_
interpreter, this scales linearly with the number of cores available.
Getting started
===============
All functionality is encapsulated in a single class ``PQ``.
``class PQ(conn=None, pool=None, table="queue", schema=None)``
The optional ``schema`` argument can be used to qualify the table with
a schema if necessary.
Example usage:
.. code-block:: python
from psycopg2 import connect
from pq import PQ
conn = connect('dbname=example user=postgres')
pq = PQ(conn)
For multi-threaded operation, use a connection pool such as
``psycopg2.pool.ThreadedConnectionPool``.
You probably want to make sure your database is created with the
``utf-8`` encoding.
To create and configure the queue table, call the ``create()`` method.
.. code-block:: python
pq.create()
Queues
======
The ``pq`` object exposes queues through Python's dictionary
interface:
.. code-block:: python
queue = pq['apples']
The ``queue`` object provides ``get`` and ``put`` methods as explained
below, and in addition, it also works as a context manager where it
manages a transaction:
.. code-block:: python
with queue as cursor:
...
The statements inside the context manager are either committed as a
transaction or rejected, atomically. This is useful when a queue is
used to manage jobs because it allows you to retrieve a job from the
queue, perform a job and write a result, with transactional
semantics.
Methods
=======
Use the ``put(data)`` method to insert an item into the queue. It
takes a JSON-compatible object such as a Python dictionary:
.. code-block:: python
queue.put({'kind': 'Cox'})
queue.put({'kind': 'Arthur Turner'})
queue.put({'kind': 'Golden Delicious'})
Items are pulled out of the queue using ``get(block=True)``. The
default behavior is to block until an item is available with a default
timeout of one second after which a value of ``None`` is returned.
.. code-block:: python
def eat(kind):
print 'umm, %s apples taste good.' % kind
job = queue.get()
eat(**job.data)
The ``job`` object provides additional metadata in addition to the
``data`` attribute as illustrated by the string representation:
>>> job
<pq.Job id=77709 size=1 enqueued_at="2014-02-21T16:22:06Z" schedule_at=None>
The ``get`` operation is also available through iteration:
.. code-block:: python
for job in queue:
if job is None:
break
eat(**job.data)
The iterator blocks if no item is available. Again, there is a default
timeout of one second, after which the iterator yields a value of
``None``.
An application can then choose to break out of the loop, or wait again
for an item to be ready.
.. code-block:: python
for job in queue:
if job is not None:
eat(**job.data)
# This is an infinite loop!
Scheduling
==========
Items can be scheduled such that they're not pulled until a later
time:
.. code-block:: python
queue.put({'kind': 'Cox'}, '5m')
In this example, the item is ready for work five minutes later. The
method also accepts ``datetime`` and ``timedelta`` objects.
Priority
========
If some items are more important than others, a time expectation can
be expressed:
.. code-block:: python
queue.put({'kind': 'Cox'}, expected_at='5m')
This tells the queue processor to give priority to this item over an
item expected at a later time, and conversely, to prefer an item with
an earlier expected time. Note that items without a set priority are
pulled last.
The scheduling and priority options can be combined:
.. code-block:: python
queue.put({'kind': 'Cox'}, '1h', '2h')
This item won't be pulled out until after one hour, and even then,
it's only processed subject to it's priority of two hours.
Encoding and decoding
=====================
The task data is encoded and decoded into JSON using the built-in
`json` module. If you want to use a different implementation or need
to configure this, pass `encode` and/or `decode` arguments to the `PQ`
constructor.
Pickles
=======
If a queue name is provided as ``<name>/pickle``
(e.g. ``'jobs/pickle'``), items are automatically pickled and
unpickled using Python's built-in ``cPickle`` module:
.. code-block:: python
queue = pq['apples/pickle']
class Apple(object):
def __init__(self, kind):
self.kind = kind
queue.put(Apple('Cox'))
This allows you to store most objects without having to add any
further serialization code.
The old pickle protocol ``0`` is used to ensure the pickled data is
encoded as ``ascii`` which should be compatible with any database
encoding. Note that the pickle data is still wrapped as a JSON string at the
database level.
While using the pickle protocol is an easy way to serialize objects,
for advanced users t might be better to use JSON serialization
directly on the objects, using for example the object hook mechanism
in the built-in `json` module or subclassing
`JSONEncoder <https://docs.python.org/2/library/json.html#json.JSONEncoder>`.
Tasks
=====
``pq`` comes with a higher level ``API`` that helps to manage ``tasks``.
.. code-block:: python
from pq.tasks import PQ
pq = PQ(...)
queue = pq['default']
@queue.task(schedule_at='1h')
def eat(job_id, kind):
print 'umm, %s apples taste good.' % kind
eat('Cox')
queue.work()
``tasks``'s ``jobs`` can optionally be re-scheduled on failure:
.. code-block:: python
@queue.task(schedule_at='1h', max_retries=2, retry_in='10s')
def eat(job_id, kind):
# ...
Time expectations can be overriden at ``task`` call:
.. code-block:: python
eat('Cox', _expected_at='2m', _schedule_at='1m')
** NOTE ** First positional argument is id of job. It's PK of record in PostgreSQL.
Thread-safety
=============
All objects are thread-safe as long as a connection pool is provided
where each thread receives its own database connection.
Changes
=======
1.9.1 (2023-04-04)
------------------
- Add support for PostgreSQL 14 [kalekseev].
See https://www.postgresql.org/docs/14/release-14.html#id-1.11.6.11.4.
- Add support for using a custom schema (issue #35).
1.9.0 (2020-09-29)
------------------
- The task executor now receives `job_id` as the first argument.
1.8.2 (2020-08-14)
------------------
- Added support for queue names longer than 63 characters.
A database migration (dropping and recreating the `pq_notify`
trigger) is required if using names longer than this limit. If not
using, then no migration is required.
- Return connections to the pool if an exception is raised while it is retrieved
1.8.1 (2019-07-30)
------------------
- Added overridable `encode` and `decode` methods which are
responsible for turning task data into `JSON` and vice-versa.
1.8.0 (2019-07-03)
------------------
- Change policy on task priority. Tasks with a null-value for
`expected_at` are now processed after items that have a value set.
1.7.0 (2019-04-07)
------------------
- Use `SKIP LOCKED` instead of advisory lock mechanism (PostgreSQL 9.5+).
1.6.1 (2018-11-14)
------------------
- Fix queue class factory pattern.
1.6 (2018-11-12)
----------------
- Fix compatibility with `NamedTupleCursor`.
- Fix duplicate column name issue.
- Add option to specify own queue class.
1.5 (2017-04-18)
----------------
- Fixed Python 2 compatibility.
1.4 (2016-03-25)
----------------
- Added worker class and handler helper decorator.
[jeanphix]
1.3 (2015-05-11)
----------------
- Python 3 compatibility.
[migurski]
- Fix time zone issue.
1.2 (2014-10-21)
----------------
Improvements:
- Fixed concurrency issue where a large number of locks would be held
as a queue grows in size.
- Fixed a database connection resource issue.
1.1 (2014-02-27)
----------------
Features:
- A queue is now also a context manager, providing transactional
semantics.
- A queues now returns task objects which provide metadata and allows
reading and writing task data.
Improvements:
- The same connection pool can now be used with different queues.
Bugs:
- The `Literal` string wrapper did not work correctly with `psycopg2`.
- The transaction manager now correctly returns connections to the
pool.
1.0 (2013-11-20)
----------------
- Initial public release.
Raw data
{
"_id": null,
"home_page": "https://github.com/malthe/pq/",
"name": "pq",
"maintainer": "",
"docs_url": null,
"requires_python": ">=3.6",
"maintainer_email": "",
"keywords": "",
"author": "Malthe Borch",
"author_email": "mborch@gmail.com",
"download_url": "https://files.pythonhosted.org/packages/a1/79/7babafd0ec7e279078f03a6a402e0160e158028b0afdede436c4efef2004/pq-1.9.1.tar.gz",
"platform": "any",
"description": "PQ\n**\n\nA transactional queue system for PostgreSQL written in Python.\n\n.. figure:: https://pq.readthedocs.org/en/latest/_static/intro.svg\n :alt: PQ does the job!\n\nIt allows you to push and pop items in and out of a queue in various\nways and also provides two scheduling options: delayed processing and\nprioritization.\n\nThe system uses a single table that holds all jobs across queues; the\nspecifics are easy to customize.\n\nThe system currently supports only the `psycopg2\n<https://pypi.python.org/pypi/psycopg2>`_ database driver - or\n`psycopg2cffi <https://pypi.python.org/pypi/psycopg2cffi>`_ for PyPy.\n\nThe basic queue implementation is similar to Ryan Smith's\n`queue_classic <https://github.com/ryandotsmith/queue_classic>`_\nlibrary written in Ruby, but uses `SKIP LOCKED\n<https://www.2ndquadrant.com/en/blog/what-is-select-skip-locked-for-in-postgresql-9-5/>`_\nfor concurrency control.\n\nIn terms of performance, the implementation clock in at about 1,000\noperations per second. Using the `PyPy <http://pypy.org/>`_\ninterpreter, this scales linearly with the number of cores available.\n\n\nGetting started\n===============\n\nAll functionality is encapsulated in a single class ``PQ``.\n\n ``class PQ(conn=None, pool=None, table=\"queue\", schema=None)``\n\nThe optional ``schema`` argument can be used to qualify the table with\na schema if necessary.\n\nExample usage:\n\n.. code-block:: python\n\n from psycopg2 import connect\n from pq import PQ\n\n conn = connect('dbname=example user=postgres')\n pq = PQ(conn)\n\nFor multi-threaded operation, use a connection pool such as\n``psycopg2.pool.ThreadedConnectionPool``.\n\nYou probably want to make sure your database is created with the\n``utf-8`` encoding.\n\nTo create and configure the queue table, call the ``create()`` method.\n\n.. code-block:: python\n\n pq.create()\n\n\nQueues\n======\n\nThe ``pq`` object exposes queues through Python's dictionary\ninterface:\n\n.. code-block:: python\n\n queue = pq['apples']\n\nThe ``queue`` object provides ``get`` and ``put`` methods as explained\nbelow, and in addition, it also works as a context manager where it\nmanages a transaction:\n\n.. code-block:: python\n\n with queue as cursor:\n ...\n\nThe statements inside the context manager are either committed as a\ntransaction or rejected, atomically. This is useful when a queue is\nused to manage jobs because it allows you to retrieve a job from the\nqueue, perform a job and write a result, with transactional\nsemantics.\n\nMethods\n=======\n\nUse the ``put(data)`` method to insert an item into the queue. It\ntakes a JSON-compatible object such as a Python dictionary:\n\n.. code-block:: python\n\n queue.put({'kind': 'Cox'})\n queue.put({'kind': 'Arthur Turner'})\n queue.put({'kind': 'Golden Delicious'})\n\nItems are pulled out of the queue using ``get(block=True)``. The\ndefault behavior is to block until an item is available with a default\ntimeout of one second after which a value of ``None`` is returned.\n\n.. code-block:: python\n\n def eat(kind):\n print 'umm, %s apples taste good.' % kind\n\n job = queue.get()\n eat(**job.data)\n\nThe ``job`` object provides additional metadata in addition to the\n``data`` attribute as illustrated by the string representation:\n\n >>> job\n <pq.Job id=77709 size=1 enqueued_at=\"2014-02-21T16:22:06Z\" schedule_at=None>\n\nThe ``get`` operation is also available through iteration:\n\n.. code-block:: python\n\n for job in queue:\n if job is None:\n break\n\n eat(**job.data)\n\nThe iterator blocks if no item is available. Again, there is a default\ntimeout of one second, after which the iterator yields a value of\n``None``.\n\nAn application can then choose to break out of the loop, or wait again\nfor an item to be ready.\n\n.. code-block:: python\n\n for job in queue:\n if job is not None:\n eat(**job.data)\n\n # This is an infinite loop!\n\n\nScheduling\n==========\n\nItems can be scheduled such that they're not pulled until a later\ntime:\n\n.. code-block:: python\n\n queue.put({'kind': 'Cox'}, '5m')\n\nIn this example, the item is ready for work five minutes later. The\nmethod also accepts ``datetime`` and ``timedelta`` objects.\n\n\nPriority\n========\n\nIf some items are more important than others, a time expectation can\nbe expressed:\n\n.. code-block:: python\n\n queue.put({'kind': 'Cox'}, expected_at='5m')\n\nThis tells the queue processor to give priority to this item over an\nitem expected at a later time, and conversely, to prefer an item with\nan earlier expected time. Note that items without a set priority are\npulled last.\n\nThe scheduling and priority options can be combined:\n\n.. code-block:: python\n\n queue.put({'kind': 'Cox'}, '1h', '2h')\n\nThis item won't be pulled out until after one hour, and even then,\nit's only processed subject to it's priority of two hours.\n\n\nEncoding and decoding\n=====================\n\nThe task data is encoded and decoded into JSON using the built-in\n`json` module. If you want to use a different implementation or need\nto configure this, pass `encode` and/or `decode` arguments to the `PQ`\nconstructor.\n\n\nPickles\n=======\n\nIf a queue name is provided as ``<name>/pickle``\n(e.g. ``'jobs/pickle'``), items are automatically pickled and\nunpickled using Python's built-in ``cPickle`` module:\n\n.. code-block:: python\n\n queue = pq['apples/pickle']\n\n class Apple(object):\n def __init__(self, kind):\n self.kind = kind\n\n queue.put(Apple('Cox'))\n\nThis allows you to store most objects without having to add any\nfurther serialization code.\n\nThe old pickle protocol ``0`` is used to ensure the pickled data is\nencoded as ``ascii`` which should be compatible with any database\nencoding. Note that the pickle data is still wrapped as a JSON string at the\ndatabase level.\n\nWhile using the pickle protocol is an easy way to serialize objects,\nfor advanced users t might be better to use JSON serialization\ndirectly on the objects, using for example the object hook mechanism\nin the built-in `json` module or subclassing\n`JSONEncoder <https://docs.python.org/2/library/json.html#json.JSONEncoder>`.\n\n\nTasks\n=====\n\n``pq`` comes with a higher level ``API`` that helps to manage ``tasks``.\n\n\n.. code-block:: python\n\n from pq.tasks import PQ\n\n pq = PQ(...)\n\n queue = pq['default']\n\n @queue.task(schedule_at='1h')\n def eat(job_id, kind):\n print 'umm, %s apples taste good.' % kind\n\n eat('Cox')\n\n queue.work()\n\n\n``tasks``'s ``jobs`` can optionally be re-scheduled on failure:\n\n.. code-block:: python\n\n @queue.task(schedule_at='1h', max_retries=2, retry_in='10s')\n def eat(job_id, kind):\n # ...\n\n\nTime expectations can be overriden at ``task`` call:\n\n.. code-block:: python\n\n eat('Cox', _expected_at='2m', _schedule_at='1m')\n\n\n** NOTE ** First positional argument is id of job. It's PK of record in PostgreSQL.\n\nThread-safety\n=============\n\nAll objects are thread-safe as long as a connection pool is provided\nwhere each thread receives its own database connection.\n\n\nChanges\n=======\n\n1.9.1 (2023-04-04)\n------------------\n\n- Add support for PostgreSQL 14 [kalekseev].\n\n See https://www.postgresql.org/docs/14/release-14.html#id-1.11.6.11.4.\n\n- Add support for using a custom schema (issue #35).\n\n1.9.0 (2020-09-29)\n------------------\n\n- The task executor now receives `job_id` as the first argument.\n\n1.8.2 (2020-08-14)\n------------------\n\n- Added support for queue names longer than 63 characters.\n\n A database migration (dropping and recreating the `pq_notify`\n trigger) is required if using names longer than this limit. If not\n using, then no migration is required.\n\n- Return connections to the pool if an exception is raised while it is retrieved\n\n1.8.1 (2019-07-30)\n------------------\n\n- Added overridable `encode` and `decode` methods which are\n responsible for turning task data into `JSON` and vice-versa.\n\n1.8.0 (2019-07-03)\n------------------\n\n- Change policy on task priority. Tasks with a null-value for\n `expected_at` are now processed after items that have a value set.\n\n1.7.0 (2019-04-07)\n------------------\n\n- Use `SKIP LOCKED` instead of advisory lock mechanism (PostgreSQL 9.5+).\n\n1.6.1 (2018-11-14)\n------------------\n\n- Fix queue class factory pattern.\n\n1.6 (2018-11-12)\n----------------\n\n- Fix compatibility with `NamedTupleCursor`.\n\n- Fix duplicate column name issue.\n\n- Add option to specify own queue class.\n\n\n1.5 (2017-04-18)\n----------------\n\n- Fixed Python 2 compatibility.\n\n\n1.4 (2016-03-25)\n----------------\n\n- Added worker class and handler helper decorator.\n [jeanphix]\n\n\n1.3 (2015-05-11)\n----------------\n\n- Python 3 compatibility.\n [migurski]\n\n- Fix time zone issue.\n\n\n1.2 (2014-10-21)\n----------------\n\nImprovements:\n\n- Fixed concurrency issue where a large number of locks would be held\n as a queue grows in size.\n\n- Fixed a database connection resource issue.\n\n\n1.1 (2014-02-27)\n----------------\n\nFeatures:\n\n- A queue is now also a context manager, providing transactional\n semantics.\n\n- A queues now returns task objects which provide metadata and allows\n reading and writing task data.\n\nImprovements:\n\n- The same connection pool can now be used with different queues.\n\nBugs:\n\n- The `Literal` string wrapper did not work correctly with `psycopg2`.\n\n- The transaction manager now correctly returns connections to the\n pool.\n\n\n1.0 (2013-11-20)\n----------------\n\n- Initial public release.\n",
"bugtrack_url": null,
"license": "BSD",
"summary": "PQ is a transactional queue for PostgreSQL.",
"version": "1.9.1",
"split_keywords": [],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "a1797babafd0ec7e279078f03a6a402e0160e158028b0afdede436c4efef2004",
"md5": "646c9654bcd997136a2592a2d4372d3d",
"sha256": "d64af0efb8a3ebd11b2e0e662a6426c5e1e99f7f2e446f4439ba0699193e4ad6"
},
"downloads": -1,
"filename": "pq-1.9.1.tar.gz",
"has_sig": false,
"md5_digest": "646c9654bcd997136a2592a2d4372d3d",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.6",
"size": 15794,
"upload_time": "2023-04-04T05:55:07",
"upload_time_iso_8601": "2023-04-04T05:55:07.583802Z",
"url": "https://files.pythonhosted.org/packages/a1/79/7babafd0ec7e279078f03a6a402e0160e158028b0afdede436c4efef2004/pq-1.9.1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2023-04-04 05:55:07",
"github": true,
"gitlab": false,
"bitbucket": false,
"github_user": "malthe",
"github_project": "pq",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "pq"
}