psq - Cloud Pub/Sub Task Queue for Python.
==========================================
|Build Status| |Coverage Status| |PyPI Version|
.. Note:: this is not an official Google product, experimental or otherwise and
is provided without support. It is intended as a sample library for
demonstrating a set of use cases for Google Cloud Pub/Sub. The official
Pub/Sub client library should be used for production applications.
``psq`` is an example Python implementation of a simple distributed task
queue using `Google Cloud Pub/Sub <https://cloud.google.com/pubsub/>`__.
``psq`` requires minimal configuration and relies on Cloud Pub/Sub to
provide scalable and reliable messaging.
``psq`` is intentionally similar to `rq <http://python-rq.org/>`__ and
`simpleq <https://github.com/rdegges/simpleq>`__, and takes some
inspiration from `celery <http://www.celeryproject.org/>`__ and `this
blog
post <http://jeffknupp.com/blog/2014/02/11/a-celerylike-python-task-queue-in-55-lines-of-code/>`__.
Installation
------------
Install via `pip <https://pypi.python.org/pypi/pip>`__:
::
pip install psq
Prerequisites
-------------
- A project on the `Google Developers
Console <https://console.developers.google.com>`__.
- The `Google Cloud SDK <https://cloud.google.com/sdk>`__ installed
locally.
- You will need the `Cloud Pub/Sub API
enabled <https://console.developers.google.com/flows/enableapi?apiid=datastore,pubsub>`__
on your project. The link will walk you through enabling the API.
- You will need to run ``gcloud auth`` before running these examples so
that authentication to Google Cloud Platform services is handled
transparently.
Usage
-----
First, create a task:
.. code:: python
def adder(a, b):
return a + b
Then, create a pubsub client and a queue:
.. code:: python
from google.cloud import pubsub_v1
import psq
project = 'your-project-id'
publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
q = psq.Queue(publisher, subscriber, project)
Now you can enqueue tasks:
.. code:: python
from tasks import adder
q.enqueue(adder)
In order to get task results, you have to configure storage:
.. code:: python
from google.cloud import pubsub_v1
from google.cloud import datastore
import psq
project = 'your-project-id'
publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
ds_client = datastore.Client()
q = psq.Queue(
publisher, subscriber, project,
storage=psq.DatastoreStorage(ds_client))
With storage configured, you can get the result of a task:
.. code:: python
r = q.enqueue(adder, 5, 6)
r.result() # -> 11
You can also define multiple queues:
.. code:: python
fast = psq.Queue(publisher, subscriber, project, 'fast')
slow = psq.Queue(publisher, subscriber, project, 'slow')
Things to note
--------------
Because ``psq`` is largely similar to ``rq``, similar rules around tasks
apply. You can put any Python function call on a queue, provided:
- The function is importable by the worker. This means the
``__module__`` that the function lives in must be importable.
Notably, you can't enqueue functions that are declared in the
**main** module - such as tasks defined in a file that is run
directly with ``python`` or via the interactive interpreter.
- The function can be a string, but it must be the absolutely importable path
to a function that the worker can import. Otherwise, the task will fail.
- The worker and the applications queuing tasks must share exactly the
same source code.
- The function can't depend on global context such as global variables,
current\_request, etc. Pass any needed context into the worker at
queue time.
Delivery guarantees
~~~~~~~~~~~~~~~~~~~
Pub/sub guarantees your tasks will be delivered to the workers, but
``psq`` doesn't presently guarantee that a task completes execution or
exactly-once semantics, though it does allow you to provide your own
mechanisms for this. This is similar to Celery's
`default <http://celery.readthedocs.org/en/latest/faq.html#faq-acks-late-vs-retry>`__
configuration.
Task completion guarantees can be provided via late ack support. Late
ack is possible with Cloud Pub/sub, but it currently not implemented in
this library. See `CONTRIBUTING.md`_.
There are many approaches for exactly-once semantics, such as
distributed locks. This is possible in systems such as
`zookeeper <http://zookeeper.apache.org/doc/r3.1.2/recipes.html#sc_recipes_Locks>`__
and `redis <http://redis.io/topics/distlock>`__.
Running a worker
----------------
Execute ``psqworker`` in the *same directory where you tasks are
defined*:
::
psqworker.py config.q
``psqworker`` only operates on one queue at a time. If you want a server
to listen to multiple queues, use something like
`supervisord <http://supervisord.org/>`__ to run multiple ``psqworker``
processes.
Broadcast queues
----------------
A normal queue will send a single task to a single worker, spreading
your tasks over all workers listening to the same queue. There are also
broadcast queues, which will deliver a copy of the task to *every*
worker. This is useful in situations where you want every worker to
execute the same task, such as installing or upgrading software on every
server.
.. code:: python
broadcast_q = psq.BroadcastQueue(publisher, subscriber, project)
def restart_apache_task():
call(["apachectl", "restart"])
broadcast_q.enqueue(restart_apache_task)
Broadcast queues provide an implementation of the solution described in
`Reliable Task Scheduling on Google Compute
Engine <https://cloud.google.com/solutions/reliable-task-scheduling-compute-engine>`__.
*Note*: broadcast queues do not currently support any form of storage
and do not support return values.
Retries
-------
Raising ``psq.Retry`` in your task will cause it to be retried.
.. code:: python
from psq import Retry
def retry_if_fail(self):
try:
r = requests.get('http://some.flaky.service.com')
except Exception as e:
logging.error(e)
raise Retry()
Flask & other contexts
----------------------
You can bind an extra context manager to the queue.
.. code:: python
app = Flask(__name__)
q = psq.Queue(extra_context=app.app_context)
This will ensure that the context is available in your tasks, which is
useful for things such as database connections, etc.:
.. code:: python
from flask import current_app
def flasky_task():
backend = current_app.config['BACKEND']
Bypassing workers for testing
-----------------------------
During unit tests you most certainly don't want to spin up workers, but instead
execute the enqueued functions immediately and synchronously. To do this, pass
`asynchronous=False` to the Queue's constructor (default is True). Also, you don't have
to provide a publisher, subscriber or project arguments in this case,
just pass None for all them to the queue.
.. code:: python
q = psq.Queue(None, None, project=None, asynchronous=False)
r = q.enqueue(adder, 1, 2) # Will be run immediately
Ideas for improvements
----------------------
- some sort of storage solution for broadcast queues.
- Memcache/redis value store.
- @task decorator that adds a delay/defer function.
- Task chaining / groups / chords.
- Late ack.
- Gevent worker.
- batch support for queueing.
Contributing changes
--------------------
- See `CONTRIBUTING.md`_
Licensing
---------
- Apache 2.0 - See `LICENSE`_
.. _LICENSE: https://github.com/GoogleCloudPlatform/psq/blob/master/LICENSE
.. _CONTRIBUTING.md: https://github.com/GoogleCloudPlatform/psq/blob/master/CONTRIBUTING.md
.. |Build Status| image:: https://travis-ci.org/GoogleCloudPlatform/psq.svg
:target: https://travis-ci.org/GoogleCloudPlatform/psq
.. |Coverage Status| image:: https://coveralls.io/repos/GoogleCloudPlatform/psq/badge.svg?branch=master&service=github
:target: https://coveralls.io/github/GoogleCloudPlatform/psq?branch=master
.. |PyPI Version| image:: https://img.shields.io/pypi/v/psq.svg
:target: https://pypi.python.org/pypi/psq
Raw data
{
"_id": null,
"home_page": "https://github.com/GoogleCloudPlatform/psq",
"name": "psq",
"maintainer": "",
"docs_url": null,
"requires_python": "",
"maintainer_email": "",
"keywords": "queue tasks background worker",
"author": "Thea Flowers",
"author_email": "theaflowers@google.com",
"download_url": "https://files.pythonhosted.org/packages/11/f4/bf27d21679fd1558840c50cb8bc04348992a2d91d7b2be3c81a06f06a6ed/psq-0.8.0.tar.gz",
"platform": "",
"description": "psq - Cloud Pub/Sub Task Queue for Python.\n==========================================\n\n|Build Status| |Coverage Status| |PyPI Version|\n\n.. Note:: this is not an official Google product, experimental or otherwise and\n is provided without support. It is intended as a sample library for\n demonstrating a set of use cases for Google Cloud Pub/Sub. The official\n Pub/Sub client library should be used for production applications.\n\n``psq`` is an example Python implementation of a simple distributed task\nqueue using `Google Cloud Pub/Sub <https://cloud.google.com/pubsub/>`__.\n\n``psq`` requires minimal configuration and relies on Cloud Pub/Sub to\nprovide scalable and reliable messaging.\n\n``psq`` is intentionally similar to `rq <http://python-rq.org/>`__ and\n`simpleq <https://github.com/rdegges/simpleq>`__, and takes some\ninspiration from `celery <http://www.celeryproject.org/>`__ and `this\nblog\npost <http://jeffknupp.com/blog/2014/02/11/a-celerylike-python-task-queue-in-55-lines-of-code/>`__.\n\nInstallation\n------------\n\nInstall via `pip <https://pypi.python.org/pypi/pip>`__:\n\n::\n\n pip install psq\n\nPrerequisites\n-------------\n\n- A project on the `Google Developers\n Console <https://console.developers.google.com>`__.\n- The `Google Cloud SDK <https://cloud.google.com/sdk>`__ installed\n locally.\n- You will need the `Cloud Pub/Sub API\n enabled <https://console.developers.google.com/flows/enableapi?apiid=datastore,pubsub>`__\n on your project. The link will walk you through enabling the API.\n- You will need to run ``gcloud auth`` before running these examples so\n that authentication to Google Cloud Platform services is handled\n transparently.\n\nUsage\n-----\n\nFirst, create a task:\n\n.. code:: python\n\n def adder(a, b):\n return a + b\n\nThen, create a pubsub client and a queue:\n\n.. code:: python\n\n from google.cloud import pubsub_v1\n import psq\n\n\n project = 'your-project-id'\n\n publisher = pubsub_v1.PublisherClient()\n subscriber = pubsub_v1.SubscriberClient()\n\n q = psq.Queue(publisher, subscriber, project)\n\nNow you can enqueue tasks:\n\n.. code:: python\n\n from tasks import adder\n\n q.enqueue(adder)\n\nIn order to get task results, you have to configure storage:\n\n.. code:: python\n\n from google.cloud import pubsub_v1\n from google.cloud import datastore\n import psq\n\n\n project = 'your-project-id'\n\n publisher = pubsub_v1.PublisherClient()\n subscriber = pubsub_v1.SubscriberClient()\n ds_client = datastore.Client()\n\n q = psq.Queue(\n publisher, subscriber, project,\n storage=psq.DatastoreStorage(ds_client))\n\nWith storage configured, you can get the result of a task:\n\n.. code:: python\n\n r = q.enqueue(adder, 5, 6)\n r.result() # -> 11\n\nYou can also define multiple queues:\n\n.. code:: python\n\n fast = psq.Queue(publisher, subscriber, project, 'fast')\n slow = psq.Queue(publisher, subscriber, project, 'slow')\n\nThings to note\n--------------\n\nBecause ``psq`` is largely similar to ``rq``, similar rules around tasks\napply. You can put any Python function call on a queue, provided:\n\n- The function is importable by the worker. This means the\n ``__module__`` that the function lives in must be importable.\n Notably, you can't enqueue functions that are declared in the\n **main** module - such as tasks defined in a file that is run\n directly with ``python`` or via the interactive interpreter.\n- The function can be a string, but it must be the absolutely importable path\n to a function that the worker can import. Otherwise, the task will fail.\n- The worker and the applications queuing tasks must share exactly the\n same source code.\n- The function can't depend on global context such as global variables,\n current\\_request, etc. Pass any needed context into the worker at\n queue time.\n\nDelivery guarantees\n~~~~~~~~~~~~~~~~~~~\n\nPub/sub guarantees your tasks will be delivered to the workers, but\n``psq`` doesn't presently guarantee that a task completes execution or\nexactly-once semantics, though it does allow you to provide your own\nmechanisms for this. This is similar to Celery's\n`default <http://celery.readthedocs.org/en/latest/faq.html#faq-acks-late-vs-retry>`__\nconfiguration.\n\nTask completion guarantees can be provided via late ack support. Late\nack is possible with Cloud Pub/sub, but it currently not implemented in\nthis library. See `CONTRIBUTING.md`_.\n\nThere are many approaches for exactly-once semantics, such as\ndistributed locks. This is possible in systems such as\n`zookeeper <http://zookeeper.apache.org/doc/r3.1.2/recipes.html#sc_recipes_Locks>`__\nand `redis <http://redis.io/topics/distlock>`__.\n\nRunning a worker\n----------------\n\nExecute ``psqworker`` in the *same directory where you tasks are\ndefined*:\n\n::\n\n psqworker.py config.q\n\n``psqworker`` only operates on one queue at a time. If you want a server\nto listen to multiple queues, use something like\n`supervisord <http://supervisord.org/>`__ to run multiple ``psqworker``\nprocesses.\n\nBroadcast queues\n----------------\n\nA normal queue will send a single task to a single worker, spreading\nyour tasks over all workers listening to the same queue. There are also\nbroadcast queues, which will deliver a copy of the task to *every*\nworker. This is useful in situations where you want every worker to\nexecute the same task, such as installing or upgrading software on every\nserver.\n\n.. code:: python\n\n broadcast_q = psq.BroadcastQueue(publisher, subscriber, project)\n\n def restart_apache_task():\n call([\"apachectl\", \"restart\"])\n\n broadcast_q.enqueue(restart_apache_task)\n\nBroadcast queues provide an implementation of the solution described in\n`Reliable Task Scheduling on Google Compute\nEngine <https://cloud.google.com/solutions/reliable-task-scheduling-compute-engine>`__.\n\n*Note*: broadcast queues do not currently support any form of storage\nand do not support return values.\n\nRetries\n-------\n\nRaising ``psq.Retry`` in your task will cause it to be retried.\n\n.. code:: python\n\n from psq import Retry\n\n def retry_if_fail(self):\n try:\n r = requests.get('http://some.flaky.service.com')\n except Exception as e:\n logging.error(e)\n raise Retry()\n\nFlask & other contexts\n----------------------\n\nYou can bind an extra context manager to the queue.\n\n.. code:: python\n\n app = Flask(__name__)\n\n q = psq.Queue(extra_context=app.app_context)\n\nThis will ensure that the context is available in your tasks, which is\nuseful for things such as database connections, etc.:\n\n.. code:: python\n\n from flask import current_app\n\n def flasky_task():\n backend = current_app.config['BACKEND']\n\nBypassing workers for testing\n-----------------------------\n\nDuring unit tests you most certainly don't want to spin up workers, but instead\nexecute the enqueued functions immediately and synchronously. To do this, pass\n`asynchronous=False` to the Queue's constructor (default is True). Also, you don't have\nto provide a publisher, subscriber or project arguments in this case,\njust pass None for all them to the queue.\n\n.. code:: python\n\n q = psq.Queue(None, None, project=None, asynchronous=False)\n r = q.enqueue(adder, 1, 2) # Will be run immediately\n\n\n\nIdeas for improvements\n----------------------\n\n- some sort of storage solution for broadcast queues.\n- Memcache/redis value store.\n- @task decorator that adds a delay/defer function.\n- Task chaining / groups / chords.\n- Late ack.\n- Gevent worker.\n- batch support for queueing.\n\nContributing changes\n--------------------\n\n- See `CONTRIBUTING.md`_\n\nLicensing\n---------\n\n- Apache 2.0 - See `LICENSE`_\n\n.. _LICENSE: https://github.com/GoogleCloudPlatform/psq/blob/master/LICENSE\n.. _CONTRIBUTING.md: https://github.com/GoogleCloudPlatform/psq/blob/master/CONTRIBUTING.md\n\n.. |Build Status| image:: https://travis-ci.org/GoogleCloudPlatform/psq.svg\n :target: https://travis-ci.org/GoogleCloudPlatform/psq\n.. |Coverage Status| image:: https://coveralls.io/repos/GoogleCloudPlatform/psq/badge.svg?branch=master&service=github\n :target: https://coveralls.io/github/GoogleCloudPlatform/psq?branch=master\n.. |PyPI Version| image:: https://img.shields.io/pypi/v/psq.svg\n :target: https://pypi.python.org/pypi/psq\n\n\n",
"bugtrack_url": null,
"license": "Apache Software License",
"summary": "A simple task queue using Google Cloud Pub/Sub",
"version": "0.8.0",
"project_urls": {
"Homepage": "https://github.com/GoogleCloudPlatform/psq"
},
"split_keywords": [
"queue",
"tasks",
"background",
"worker"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "14450a79c3f1e74a333ba8c92f40744d26b4b10af673df89e018c7915dda7474",
"md5": "ad6f72c7cc426e47517352d73ac6f635",
"sha256": "69608f691ecc5b8f0bfa633cf61d96db209342264d8f7d0833eda297a9cfe03c"
},
"downloads": -1,
"filename": "psq-0.8.0-py2.py3-none-any.whl",
"has_sig": false,
"md5_digest": "ad6f72c7cc426e47517352d73ac6f635",
"packagetype": "bdist_wheel",
"python_version": "py2.py3",
"requires_python": null,
"size": 27412,
"upload_time": "2019-08-19T22:59:46",
"upload_time_iso_8601": "2019-08-19T22:59:46.129464Z",
"url": "https://files.pythonhosted.org/packages/14/45/0a79c3f1e74a333ba8c92f40744d26b4b10af673df89e018c7915dda7474/psq-0.8.0-py2.py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "11f4bf27d21679fd1558840c50cb8bc04348992a2d91d7b2be3c81a06f06a6ed",
"md5": "2459210a958230f0454719cb1afe6799",
"sha256": "c79de0aa7853799cb3dd06fa1b4076511aa9ec4e5db873a95001fc67abe8c381"
},
"downloads": -1,
"filename": "psq-0.8.0.tar.gz",
"has_sig": false,
"md5_digest": "2459210a958230f0454719cb1afe6799",
"packagetype": "sdist",
"python_version": "source",
"requires_python": null,
"size": 17454,
"upload_time": "2019-08-19T22:59:47",
"upload_time_iso_8601": "2019-08-19T22:59:47.521647Z",
"url": "https://files.pythonhosted.org/packages/11/f4/bf27d21679fd1558840c50cb8bc04348992a2d91d7b2be3c81a06f06a6ed/psq-0.8.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2019-08-19 22:59:47",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "GoogleCloudPlatform",
"github_project": "psq",
"travis_ci": true,
"coveralls": true,
"github_actions": false,
"tox": true,
"lcname": "psq"
}