===========
psycopg2_mq
===========
.. image:: https://img.shields.io/pypi/v/psycopg2_mq.svg
:target: https://pypi.org/pypi/psycopg2_mq
.. image:: https://github.com/mmerickel/psycopg2_mq/actions/workflows/ci-tests.yml/badge.svg?branch=main
:target: https://github.com/mmerickel/psycopg2_mq/actions/workflows/ci-tests.yml?query=branch%3Amain
:alt: main CI Status
``psycopg2_mq`` is a message queue implemented on top of
`PostgreSQL <https://www.postgresql.org/>`__,
`SQLAlchemy <https://www.sqlalchemy.org/>`__, and
`psycopg2 <http://initd.org/psycopg/>`__.
Currently the library provides only the low-level constructs that can be used
to build a multithreaded worker system. It is broken into two components:
- ``psycopg2_mq.MQWorker`` - a reusable worker object that manages a
single-threaded worker that can accept jobs and execute them. An application
should create worker per thread. It supports an API for thread-safe graceful
shutdown.
- ``psycopg2_mq.MQSource`` - a source object providing a client-side API for
invoking and querying job states.
It is expected that these core components are then wrapped into your own
application in any way that you see fit, without dictating a specific CLI
or framework.
Data Model
==========
Queues
------
Workers run jobs defined in queues. Currently each queue will run jobs
concurrently, while a future version may support serial execution on a
per-queue basis. Each registered queue should contain an ``execute_job(job)``
method.
Jobs
----
The ``execute_job`` method of a queue is passed a ``Job`` object containing
the following attributes:
- ``id``
- ``queue``
- ``method``
- ``args``
- ``cursor``
As a convenience, there is an ``extend(**kw)`` method which can be used to
add extra attributes to the object. This is useful in individual queues to
define a contract between a queue and its methods.
Cursors
~~~~~~~
By defining a cursor you can execute jobs sequentially instead of in parallel.
There can only be one running (or lost) job for any cursor at a time.
A ``Job`` can be scheduled with a ``cursor_key``.
A ``job.cursor`` dict is provided to the workers containing the cursor data,
and is saved back to the database when the job is completed. This effectively
gives jobs some persistent, shared memory between jobs on the cursor.
Because a "lost" job on the cursor counts as running, it will also block any other jobs from executing.
To resolve this situation it is necessary for the lost job to be either marked as failed or retried.
Look at ``MQSource.retry_job`` and ``MQSource.fail_lost_job`` APIs.
Collapsible
~~~~~~~~~~~
When ``collapse_on_cursor`` is set to ``True`` on a job, this is declaring the job as "collapsible".
There can only be one job in the "pending" state marked ``collapsible == True`` for the same ``queue``, ``method``, and ``cursor_key``.
This means that if there is an existing "pending" job with the same ``cursor_key``, ``queue``, and ``method`` with ``collapsible == True`` then the new job will be "collapsed" into the existing job.
By default, this typically means that the new job will be dropped because another job already exists.
This means that the ``args`` should be "constant", because the new job's args are ignored.
If the ``args`` are not constant, then you will likely want to pass a ``conflict_resolver`` callback.
This is a function that will be invoked when a job already exists.
You can then update the existing ``Job`` object in the "pending" state, adjusting the args etc.
Delayed Jobs
~~~~~~~~~~~~
A ``Job`` can be delayed to run in the future by providing a ``datetime`` object to the ``when`` argument.
When this feature is used with collapsible jobs you can create a highly efficient throttle greatly reducing the number of jobs that run in the background.
For example, schedule 20 jobs all with the same ``cursor_key``, and ``collapse_on_cursor=True``, and ``when=timedelta(seconds=30)``.
You will see only 1 job created and it will execute in 30 seconds.
All of the other jobs are collapsed into this one instead of creating separate jobs.
Schedules
---------
A ``JobSchedule`` can be defined which supports
`RFC 5545 <https://tools.ietf.org/html/rfc5545>`__ ``RRULE`` schedules. These
are powerful and can support timezones, frequencies based on calendars as well
as simple recurring rules from an anchor time using ``DTSTART``. Cron jobs
can be converted to this syntax for simpler scenarios.
``psycopg2-mq`` workers will automatically negotiate which worker is responsible
for managing schedules so clustered workers should operate as expected.
To register a new schedule, look at the ``MQSource.add_schedule(queue, method, args, *, rrule)`` API.
If you set ``collapse_on_cursor`` is ``True`` on the schedule and there is already a job pending then the firing of the schedule is effectively a no-op.
Events / Listeners
------------------
A ``JobListener`` can be defined which supports creating new jobs when events are
emitted. When an event is emitted via ``MQSource.emit_event`` then any listeners
matching this event will be used to create a new job in the system.
To register a listener, look at the ``MQSource.add_listener(event, queue, method, args, ...)`` API.
There is a default event emitted every time a job moved to a finished state. It has the format::
mq.job_finished.<state><queue>.<method>
For example, ``mq.job_finished.completed.dummy.echo``.
You are free to emit your own custom events as well if you need different dimensions!
When ``collapse_on_cursor`` is ``False`` then the listener receives an ``event`` arg containing the ``name``, ``listener_id``, and ``data`` keys.
If ``collapse_on_cursor`` is ``True`` on the listener then the resulting job will receive an ``events`` arg containing a list of all of the emitted events that occured while the job was in the "pending" state.
Example Worker
==============
.. code-block:: python
from psycopg2_mq import (
MQWorker,
make_default_model,
)
from sqlalchemy import (
MetaData,
create_engine,
)
import sys
class EchoQueue:
def execute_job(self, job):
return f'hello, {job.args["name"]} from method="{job.method}"'
if __name__ == '__main__':
engine = create_engine(sys.argv[1])
metadata = MetaData()
model = make_default_model(metadata)
worker = MQWorker(
engine=engine,
queues={
'echo': EchoQueue(),
},
model=model,
)
worker.run()
Example Source
==============
.. code-block:: python
engine = create_engine('postgresql+psycopg2://...')
metadata = MetaData()
model = make_default_model(metadata)
metadata.create_all(engine)
session_factory = sessionmaker(engine)
with session_factory.begin():
mq = MQSource(
dbsession=dbsession,
model=model,
)
job_id = mq.call('echo', 'hello', {'name': 'Andy'})
print(f'queued job={job_id}')
Changes
=======
0.12.6 (2024-11-14)
-------------------
- Fix linting.
0.12.5 (2024-11-14)
-------------------
- Change the format to put the state in the middle of the emitted event.
- Emit an event when moving from lost to failed via ``MQSource.fail_lost_job``.
- Add the ``queue`` and ``method`` to the event args on ``mq.job_finished`` events
0.12.4 (2024-11-13)
-------------------
- Emit an event any time a job hits a terminal state. The format has changed from
``mq_job_complete.<queue>.<method>`` to ``mq.job_finished.<queue>.<method>.<state>``.
This includes ``completed``, ``failed``, and ``lost``.
0.12.3 (2024-10-29)
-------------------
- Add ``job.schedule_ids`` and ``job.listener_ids`` to the job context passed to the
background workers.
0.12.2 (2024-10-29)
-------------------
- Prevent canceling jobs in the lost state.
- Allow retrying jobs in any state.
- Allow setting a result on a lost job when moving it to failed.
0.12.1 (2024-10-27)
-------------------
- Support SQLAlchemy 2.x.
- Must mark a lost job as failed prior to being able to retry it.
0.12 (2024-10-27)
-----------------
- Support a job being linked properly to multiple schedule and listener sources such
that provenance is properly tracked on retries.
- Add a new ``CANCELED`` job state that can be used to manually mark any pending,
failed, or lost jobs as canceled. Jobs do not enter this state automatically - theyt
must be manually marked but will be useful to disambiguate failed from canceled.
- [breaking] ``job.schedule_id`` is removed from the job object passed to background
workers.
- [model migration] Moved the ``schedule_id`` and ``listener_id`` foreign keys from
the ``Job`` table to many-to-many link tables to support tracking the source properly
when collapsing occurs. Possible migration::
insert into mq_job_schedule_link (job_id, schedule_id)
select id, schedule_id from mq_job where schedule_id is not null;
insert into mq_job_listener_link (job_id, listener_id)
select id, listener_id from mq_job where listener_id is not null;
alter table mq_job drop column schedule_id;
alter table mq_job drop column listener_id;
- [model migration] Add a new ``CANCELED`` state to the ``mq_job_state`` enum.
Possible migration::
alter type mq_job_state add value 'canceled';
0.11 (2024-10-27)
-----------------
- Add support for Python 3.13.
- [breaking] Modified the ``MQSource.call``, and ``MQSource.add_schedule`` APIs such
that when a cursor is used ``collapse_on_cursor`` defaults to ``False`` instead of
``True``. You must explicitly set it to ``True`` in scenarios in which that is
desired as it is no longer the default behavior.
- [model migration] Add ``collapse_on_cursor`` attribute to
the ``JobSchedule`` model. A bw-compat migration would set this value to ``False``
if ``cursor_key`` is ``NULL`` and ``True`` on everything else.
- [model migration] Add a new ``JobListener`` model.
- [model migration] Add ``listener_id`` foreign key to the ``Job`` model.
- Fix a bug in which NOTIFY events were missed in some cases causing jobs to wait
until the maintenance window to execute.
- Add the concept of pub/sub event listeners. Listeners can be registered that act as a
job factory, creating a new job when an event is emitted.
It is possible to emit events manually as needed via the ``MQSource.emit_event`` API.
Events are emitted automatically when a job is completed. Every job when it is
completed successfully emits a new ``mq_job_complete:<queue>.<method>`` event.
This event contains the result of the job.
- The ``MQSource`` that is used by the ``MQWorker`` can now be overridden via the
``mq_source_factory`` option.
0.10 (2024-08-06)
------------------
- Add support for Python 3.12.
- Drop support for Python 3.7, and 3.8.
- Fix a race condition on shutdown where the job fails to cleanup because the triggers
are gone while the pool is still shutting down.
0.9 (2023-04-21)
----------------
- Add support for Python 3.10, and 3.11.
- [breaking] Prevent retrying of collapsible jobs. Require them to be invoked
using ``call`` instead for an opportunity to specify a ``conflict_resolver``.
- [model migration] Fix a bug in the default model schema in which the
collapsible database index was not marked unique.
- Copy trace info when retrying a job.
- Capture the stringified exception to the job result in the ``message`` key,
alongside the existing ``tb``, ``exc``, and ``args`` keys.
- The worker was not recognizing ``capture_signals=False``, causing problems
when running the event loop in other threads.
- Blackify the codebase and add some real tests. Yay!
0.8.3 (2022-04-15)
------------------
- [breaking] Remove ``MQWorker.make_job_context``.
0.8.2 (2022-04-15)
------------------
- Drop Python 3.6 support.
- [breaking] Require SQLAlchemy 1.4+ and resolve deprecation warnings related to
SQLAlchemy 2.0.
- [model migration] Rename ``update_job_id`` to ``updated_job_id`` in the
``JobCursor`` model.
0.8.1 (2022-04-15)
------------------
- Ensure the ``trace`` attribute is populated on the ``JobContext``.
- Add ``MQWorker.make_job_context`` which can be defined to completely override
the ``JobContext`` factory using the ``Job`` object and open database session.
0.8.0 (2022-04-15)
------------------
- [model migration] Add ``update_job_id`` foreign key to the ``JobCursor`` model to
make it possible to know which job last updated the value in the cursor.
- [model migration] Add ``trace`` json blob to the ``Job`` model.
- Support a ``trace`` json blob when creating new jobs. This value is available
on the running job context and can be used when creating sub-jobs or when
making requests to external systems to pass through tracing metadata.
See ``MQSource.call``'s new ``trace`` parameter when creating jobs.
See ``JobContext.trace`` attribute when handling jobs.
- Add a standard ``FailedJobError`` exception which can be raised by jobs to
mark a failure with a custom result object. This is different from unhandled
exceptions that cause the ``MQWorker.result_from_error`` method to be invoked.
0.7.0 (2022-03-03)
------------------
- Fix a corner case with lost jobs attached to cursors. In scenarios where
multiple workers are running, if one loses a database connection then the
other is designed to notice and mark jobs lost. However, it's possible the
job is not actually lost and the worker can then recover after resuming
its connection, and marking the job running again. In this situation, we
do not want another job to begin on the same cursor. To fix this issue,
new jobs will not be run if another job is marked lost on the same cursor.
You will be required to recover the job by marking it as not lost (probably
failed) first to unblock the rest of the jobs on the cursor.
0.6.2 (2022-03-01)
------------------
- Prioritize maintenance work higher than running new jobs.
There was a chicken-and-egg issue where a job would be marked running
but needs to be marked lost. However marking it lost is lower priority than
trying to start new jobs. In the case where a lot of jobs were scheduled
at the same time, the worker always tried to start new jobs and didn't
run the maintenance so the job never got marked lost, effectively blocking
the queue.
0.6.1 (2022-01-15)
------------------
- Fix a bug introduced in the 0.6.0 release when scheduling new jobs.
0.6.0 (2022-01-14)
------------------
- [model migration] Add model changes to mark jobs as collapsible.
- [model migration] Add model changes to the cursor index.
- Allow multiple pending jobs to be scheduled on the same cursor if either:
1. The queue or method are different from existing pending jobs on the cursor.
2. ``collapse_on_cursor`` is set to ``False`` when scheduling the job.
0.5.7 (2021-03-07)
------------------
- Add a ``schedule_id`` attribute to the job context for use in jobs that want
to know whether they were executed from a schedule or not.
0.5.6 (2021-02-28)
------------------
- Some UnicodeDecodeError exceptions raised from jobs could trigger a
serialization failure (UntranslatableCharacter) because it would contain
the sequence ``\u0000``` which, while valid in Python, is not allowed
in postgres. So when dealing with the raw bytes, we'll decode it with
the replacement character that can be properly stored. Not ideal, but
better than failing to store the error at all.
0.5.5 (2021-01-22)
------------------
- Fixed some old code causing the worker lock to release after a job
completed.
0.5.4 (2021-01-20)
------------------
- Log at the error level when marking a job as lost.
0.5.3 (2021-01-11)
------------------
- Copy the ``schedule_id`` information to retried jobs.
0.5.2 (2021-01-11)
------------------
- [breaking] Require ``call_schedule`` to accept an id instead of an object.
0.5.1 (2021-01-09)
------------------
- [model migration] Drop the ``UNIQUE`` constraint on the background job
``lock_id`` column.
0.5 (2021-01-09)
----------------
- [model migration] Add a scheduler model with support for emitting periodic
jobs based on RRULE syntax.
See https://github.com/mmerickel/psycopg2_mq/pull/11
- Enable the workers to coordinate on a per-queue basis who is in control
of scheduling jobs.
See https://github.com/mmerickel/psycopg2_mq/pull/12
- Reduce the number of advisory locks held from one per job to one per worker.
See https://github.com/mmerickel/psycopg2_mq/pull/12
0.4.5 (2020-12-22)
------------------
- Use column objects in the insert statement to support ORM-level synonyms,
enabling the schema to have columns with different names.
0.4.4 (2019-11-07)
------------------
- Ensure the advisory locks are released when a job completes.
0.4.3 (2019-10-31)
------------------
- Ensure maintenance (finding lost jobs) always runs at set intervals defined
by the ``timeout`` parameter.
0.4.2 (2019-10-30)
------------------
- Recover active jobs when the connection is lost by re-locking them
and ensuring they are marked running.
0.4.1 (2019-10-30)
------------------
- Attempt to reconnect to the database after losing the connection.
If the reconnect attempt fails then crash.
0.4 (2019-10-28)
----------------
- [model migration] Add a ``worker`` column to the ``Job`` model to track what
worker is handling a job.
- Add an optional ``name`` argument to ``MQWorker`` to name the worker -
the value will be recorded in each job.
- Add a ``threads`` argument (default=``1``) to ``MQWorker`` to support
handling multiple jobs from the same worker instance instead of making a
worker per thread.
- Add ``capture_signals`` argument (default=``True``) to ``MQWorker`` which
will capture ``SIGTERM``, ``SIGINT`` and ``SIGUSR1``. The first two will
trigger graceful shutdown - they will make the process stop handling new
jobs while finishing active jobs. The latter will dump to ``stderr`` a
JSON dump of the current status of the worker.
0.3.3 (2019-10-23)
------------------
- Only save a cursor update if the job is completed successfully.
0.3.2 (2019-10-22)
------------------
- Mark lost jobs during timeouts instead of just when a worker starts in order
to catch them earlier.
0.3.1 (2019-10-17)
------------------
- When attempting to schedule a job with a cursor and a ``scheduled_time``
earlier than a pending job on the same cursor, the job will be updated to
run at the earlier time.
- When attempting to schedule a job with a cursor and a pending job already
exists on the same cursor, a ``conflict_resolver`` function may be
supplied to ``MQSource.call`` to update the job properties, merging the
arguments however the user wishes.
0.3 (2019-10-15)
----------------
- [model migration] Add a new column ``cursor_snapshot`` to the ``Job`` model which
will contain the value of the cursor when the job begins.
0.2 (2019-10-09)
----------------
- [model migration] Add cursor support for jobs. This requires a schema migration to
add a ``cursor_key`` column, a new ``JobCursor`` model, and some new indices.
0.1.6 (2019-10-07)
------------------
- Support passing custom kwargs to the job in ``psycopg2_mq.MQSource.call``
to allow custom columns on the job table.
0.1.5 (2019-05-17)
------------------
- Fix a regression when serializing errors with strings or cycles.
0.1.4 (2019-05-09)
------------------
- More safely serialize exception objects when jobs fail.
0.1.3 (2018-09-04)
------------------
- Rename the thread to contain the job id while it's handling a job.
0.1.2 (2018-09-04)
------------------
- [model migration] Rename ``Job.params`` to ``Job.args``.
0.1.1 (2018-09-04)
------------------
- Make ``psycopg2`` an optional dependency in order to allow apps to depend
on ``psycopg2-binary`` if they wish.
0.1 (2018-09-04)
----------------
- Initial release.
Raw data
{
"_id": null,
"home_page": "https://github.com/mmerickel/psycopg2_mq",
"name": "psycopg2-mq",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.9",
"maintainer_email": null,
"keywords": "psycopg2, postgres, postgresql, message queue, background jobs",
"author": "Michael Merickel",
"author_email": "oss@m.merickel.org",
"download_url": "https://files.pythonhosted.org/packages/54/e6/a517812483e025d4395312339243f78261bd52f66591f2727b9e98b5cdf4/psycopg2_mq-0.12.6.tar.gz",
"platform": null,
"description": "===========\npsycopg2_mq\n===========\n\n.. image:: https://img.shields.io/pypi/v/psycopg2_mq.svg\n :target: https://pypi.org/pypi/psycopg2_mq\n\n.. image:: https://github.com/mmerickel/psycopg2_mq/actions/workflows/ci-tests.yml/badge.svg?branch=main\n :target: https://github.com/mmerickel/psycopg2_mq/actions/workflows/ci-tests.yml?query=branch%3Amain\n :alt: main CI Status\n\n``psycopg2_mq`` is a message queue implemented on top of\n`PostgreSQL <https://www.postgresql.org/>`__,\n`SQLAlchemy <https://www.sqlalchemy.org/>`__, and\n`psycopg2 <http://initd.org/psycopg/>`__.\n\nCurrently the library provides only the low-level constructs that can be used\nto build a multithreaded worker system. It is broken into two components:\n\n- ``psycopg2_mq.MQWorker`` - a reusable worker object that manages a\n single-threaded worker that can accept jobs and execute them. An application\n should create worker per thread. It supports an API for thread-safe graceful\n shutdown.\n\n- ``psycopg2_mq.MQSource`` - a source object providing a client-side API for\n invoking and querying job states.\n\nIt is expected that these core components are then wrapped into your own\napplication in any way that you see fit, without dictating a specific CLI\nor framework.\n\nData Model\n==========\n\nQueues\n------\n\nWorkers run jobs defined in queues. Currently each queue will run jobs\nconcurrently, while a future version may support serial execution on a\nper-queue basis. Each registered queue should contain an ``execute_job(job)``\nmethod.\n\nJobs\n----\n\nThe ``execute_job`` method of a queue is passed a ``Job`` object containing\nthe following attributes:\n\n- ``id``\n- ``queue``\n- ``method``\n- ``args``\n- ``cursor``\n\nAs a convenience, there is an ``extend(**kw)`` method which can be used to\nadd extra attributes to the object. This is useful in individual queues to\ndefine a contract between a queue and its methods.\n\nCursors\n~~~~~~~\n\nBy defining a cursor you can execute jobs sequentially instead of in parallel.\nThere can only be one running (or lost) job for any cursor at a time.\n\nA ``Job`` can be scheduled with a ``cursor_key``.\n\nA ``job.cursor`` dict is provided to the workers containing the cursor data,\nand is saved back to the database when the job is completed. This effectively\ngives jobs some persistent, shared memory between jobs on the cursor.\n\nBecause a \"lost\" job on the cursor counts as running, it will also block any other jobs from executing.\nTo resolve this situation it is necessary for the lost job to be either marked as failed or retried.\nLook at ``MQSource.retry_job`` and ``MQSource.fail_lost_job`` APIs.\n\nCollapsible\n~~~~~~~~~~~\n\nWhen ``collapse_on_cursor`` is set to ``True`` on a job, this is declaring the job as \"collapsible\".\nThere can only be one job in the \"pending\" state marked ``collapsible == True`` for the same ``queue``, ``method``, and ``cursor_key``.\nThis means that if there is an existing \"pending\" job with the same ``cursor_key``, ``queue``, and ``method`` with ``collapsible == True`` then the new job will be \"collapsed\" into the existing job.\nBy default, this typically means that the new job will be dropped because another job already exists.\nThis means that the ``args`` should be \"constant\", because the new job's args are ignored.\n\nIf the ``args`` are not constant, then you will likely want to pass a ``conflict_resolver`` callback.\nThis is a function that will be invoked when a job already exists.\nYou can then update the existing ``Job`` object in the \"pending\" state, adjusting the args etc.\n\nDelayed Jobs\n~~~~~~~~~~~~\n\nA ``Job`` can be delayed to run in the future by providing a ``datetime`` object to the ``when`` argument.\nWhen this feature is used with collapsible jobs you can create a highly efficient throttle greatly reducing the number of jobs that run in the background.\nFor example, schedule 20 jobs all with the same ``cursor_key``, and ``collapse_on_cursor=True``, and ``when=timedelta(seconds=30)``.\nYou will see only 1 job created and it will execute in 30 seconds.\nAll of the other jobs are collapsed into this one instead of creating separate jobs.\n\nSchedules\n---------\n\nA ``JobSchedule`` can be defined which supports\n`RFC 5545 <https://tools.ietf.org/html/rfc5545>`__ ``RRULE`` schedules. These\nare powerful and can support timezones, frequencies based on calendars as well\nas simple recurring rules from an anchor time using ``DTSTART``. Cron jobs\ncan be converted to this syntax for simpler scenarios.\n\n``psycopg2-mq`` workers will automatically negotiate which worker is responsible\nfor managing schedules so clustered workers should operate as expected.\n\nTo register a new schedule, look at the ``MQSource.add_schedule(queue, method, args, *, rrule)`` API.\n\nIf you set ``collapse_on_cursor`` is ``True`` on the schedule and there is already a job pending then the firing of the schedule is effectively a no-op.\n\nEvents / Listeners\n------------------\n\nA ``JobListener`` can be defined which supports creating new jobs when events are\nemitted. When an event is emitted via ``MQSource.emit_event`` then any listeners\nmatching this event will be used to create a new job in the system.\n\nTo register a listener, look at the ``MQSource.add_listener(event, queue, method, args, ...)`` API.\n\nThere is a default event emitted every time a job moved to a finished state. It has the format::\n\n mq.job_finished.<state><queue>.<method>\n\nFor example, ``mq.job_finished.completed.dummy.echo``.\n\nYou are free to emit your own custom events as well if you need different dimensions!\n\nWhen ``collapse_on_cursor`` is ``False`` then the listener receives an ``event`` arg containing the ``name``, ``listener_id``, and ``data`` keys.\n\nIf ``collapse_on_cursor`` is ``True`` on the listener then the resulting job will receive an ``events`` arg containing a list of all of the emitted events that occured while the job was in the \"pending\" state.\n\nExample Worker\n==============\n\n.. code-block:: python\n\n from psycopg2_mq import (\n MQWorker,\n make_default_model,\n )\n from sqlalchemy import (\n MetaData,\n create_engine,\n )\n import sys\n\n class EchoQueue:\n def execute_job(self, job):\n return f'hello, {job.args[\"name\"]} from method=\"{job.method}\"'\n\n if __name__ == '__main__':\n engine = create_engine(sys.argv[1])\n metadata = MetaData()\n model = make_default_model(metadata)\n worker = MQWorker(\n engine=engine,\n queues={\n 'echo': EchoQueue(),\n },\n model=model,\n )\n worker.run()\n\nExample Source\n==============\n\n.. code-block:: python\n\n engine = create_engine('postgresql+psycopg2://...')\n metadata = MetaData()\n model = make_default_model(metadata)\n metadata.create_all(engine)\n session_factory = sessionmaker(engine)\n\n with session_factory.begin():\n mq = MQSource(\n dbsession=dbsession,\n model=model,\n )\n job_id = mq.call('echo', 'hello', {'name': 'Andy'})\n print(f'queued job={job_id}')\n\nChanges\n=======\n\n0.12.6 (2024-11-14)\n-------------------\n\n- Fix linting.\n\n0.12.5 (2024-11-14)\n-------------------\n\n- Change the format to put the state in the middle of the emitted event.\n\n- Emit an event when moving from lost to failed via ``MQSource.fail_lost_job``.\n\n- Add the ``queue`` and ``method`` to the event args on ``mq.job_finished`` events\n\n0.12.4 (2024-11-13)\n-------------------\n\n- Emit an event any time a job hits a terminal state. The format has changed from\n ``mq_job_complete.<queue>.<method>`` to ``mq.job_finished.<queue>.<method>.<state>``.\n This includes ``completed``, ``failed``, and ``lost``.\n\n0.12.3 (2024-10-29)\n-------------------\n\n- Add ``job.schedule_ids`` and ``job.listener_ids`` to the job context passed to the\n background workers.\n\n0.12.2 (2024-10-29)\n-------------------\n\n- Prevent canceling jobs in the lost state.\n\n- Allow retrying jobs in any state.\n\n- Allow setting a result on a lost job when moving it to failed.\n\n0.12.1 (2024-10-27)\n-------------------\n\n- Support SQLAlchemy 2.x.\n\n- Must mark a lost job as failed prior to being able to retry it.\n\n0.12 (2024-10-27)\n-----------------\n\n- Support a job being linked properly to multiple schedule and listener sources such\n that provenance is properly tracked on retries.\n\n- Add a new ``CANCELED`` job state that can be used to manually mark any pending,\n failed, or lost jobs as canceled. Jobs do not enter this state automatically - theyt\n must be manually marked but will be useful to disambiguate failed from canceled.\n\n- [breaking] ``job.schedule_id`` is removed from the job object passed to background\n workers.\n\n- [model migration] Moved the ``schedule_id`` and ``listener_id`` foreign keys from\n the ``Job`` table to many-to-many link tables to support tracking the source properly\n when collapsing occurs. Possible migration::\n\n insert into mq_job_schedule_link (job_id, schedule_id)\n select id, schedule_id from mq_job where schedule_id is not null;\n\n insert into mq_job_listener_link (job_id, listener_id)\n select id, listener_id from mq_job where listener_id is not null;\n\n alter table mq_job drop column schedule_id;\n alter table mq_job drop column listener_id;\n\n- [model migration] Add a new ``CANCELED`` state to the ``mq_job_state`` enum.\n Possible migration::\n\n alter type mq_job_state add value 'canceled';\n\n0.11 (2024-10-27)\n-----------------\n\n- Add support for Python 3.13.\n\n- [breaking] Modified the ``MQSource.call``, and ``MQSource.add_schedule`` APIs such\n that when a cursor is used ``collapse_on_cursor`` defaults to ``False`` instead of\n ``True``. You must explicitly set it to ``True`` in scenarios in which that is\n desired as it is no longer the default behavior.\n\n- [model migration] Add ``collapse_on_cursor`` attribute to\n the ``JobSchedule`` model. A bw-compat migration would set this value to ``False``\n if ``cursor_key`` is ``NULL`` and ``True`` on everything else.\n\n- [model migration] Add a new ``JobListener`` model.\n\n- [model migration] Add ``listener_id`` foreign key to the ``Job`` model.\n\n- Fix a bug in which NOTIFY events were missed in some cases causing jobs to wait\n until the maintenance window to execute.\n\n- Add the concept of pub/sub event listeners. Listeners can be registered that act as a\n job factory, creating a new job when an event is emitted.\n\n It is possible to emit events manually as needed via the ``MQSource.emit_event`` API.\n\n Events are emitted automatically when a job is completed. Every job when it is\n completed successfully emits a new ``mq_job_complete:<queue>.<method>`` event.\n This event contains the result of the job.\n\n- The ``MQSource`` that is used by the ``MQWorker`` can now be overridden via the\n ``mq_source_factory`` option.\n\n0.10 (2024-08-06)\n------------------\n\n- Add support for Python 3.12.\n\n- Drop support for Python 3.7, and 3.8.\n\n- Fix a race condition on shutdown where the job fails to cleanup because the triggers\n are gone while the pool is still shutting down.\n\n0.9 (2023-04-21)\n----------------\n\n- Add support for Python 3.10, and 3.11.\n\n- [breaking] Prevent retrying of collapsible jobs. Require them to be invoked\n using ``call`` instead for an opportunity to specify a ``conflict_resolver``.\n\n- [model migration] Fix a bug in the default model schema in which the\n collapsible database index was not marked unique.\n\n- Copy trace info when retrying a job.\n\n- Capture the stringified exception to the job result in the ``message`` key,\n alongside the existing ``tb``, ``exc``, and ``args`` keys.\n\n- The worker was not recognizing ``capture_signals=False``, causing problems\n when running the event loop in other threads.\n\n- Blackify the codebase and add some real tests. Yay!\n\n0.8.3 (2022-04-15)\n------------------\n\n- [breaking] Remove ``MQWorker.make_job_context``.\n\n0.8.2 (2022-04-15)\n------------------\n\n- Drop Python 3.6 support.\n\n- [breaking] Require SQLAlchemy 1.4+ and resolve deprecation warnings related to\n SQLAlchemy 2.0.\n\n- [model migration] Rename ``update_job_id`` to ``updated_job_id`` in the\n ``JobCursor`` model.\n\n0.8.1 (2022-04-15)\n------------------\n\n- Ensure the ``trace`` attribute is populated on the ``JobContext``.\n\n- Add ``MQWorker.make_job_context`` which can be defined to completely override\n the ``JobContext`` factory using the ``Job`` object and open database session.\n\n0.8.0 (2022-04-15)\n------------------\n\n- [model migration] Add ``update_job_id`` foreign key to the ``JobCursor`` model to\n make it possible to know which job last updated the value in the cursor.\n\n- [model migration] Add ``trace`` json blob to the ``Job`` model.\n\n- Support a ``trace`` json blob when creating new jobs. This value is available\n on the running job context and can be used when creating sub-jobs or when\n making requests to external systems to pass through tracing metadata.\n\n See ``MQSource.call``'s new ``trace`` parameter when creating jobs.\n See ``JobContext.trace`` attribute when handling jobs.\n\n- Add a standard ``FailedJobError`` exception which can be raised by jobs to\n mark a failure with a custom result object. This is different from unhandled\n exceptions that cause the ``MQWorker.result_from_error`` method to be invoked.\n\n0.7.0 (2022-03-03)\n------------------\n\n- Fix a corner case with lost jobs attached to cursors. In scenarios where\n multiple workers are running, if one loses a database connection then the\n other is designed to notice and mark jobs lost. However, it's possible the\n job is not actually lost and the worker can then recover after resuming\n its connection, and marking the job running again. In this situation, we\n do not want another job to begin on the same cursor. To fix this issue,\n new jobs will not be run if another job is marked lost on the same cursor.\n You will be required to recover the job by marking it as not lost (probably\n failed) first to unblock the rest of the jobs on the cursor.\n\n0.6.2 (2022-03-01)\n------------------\n\n- Prioritize maintenance work higher than running new jobs.\n There was a chicken-and-egg issue where a job would be marked running\n but needs to be marked lost. However marking it lost is lower priority than\n trying to start new jobs. In the case where a lot of jobs were scheduled\n at the same time, the worker always tried to start new jobs and didn't\n run the maintenance so the job never got marked lost, effectively blocking\n the queue.\n\n0.6.1 (2022-01-15)\n------------------\n\n- Fix a bug introduced in the 0.6.0 release when scheduling new jobs.\n\n0.6.0 (2022-01-14)\n------------------\n\n- [model migration] Add model changes to mark jobs as collapsible.\n\n- [model migration] Add model changes to the cursor index.\n\n- Allow multiple pending jobs to be scheduled on the same cursor if either:\n\n 1. The queue or method are different from existing pending jobs on the cursor.\n\n 2. ``collapse_on_cursor`` is set to ``False`` when scheduling the job.\n\n0.5.7 (2021-03-07)\n------------------\n\n- Add a ``schedule_id`` attribute to the job context for use in jobs that want\n to know whether they were executed from a schedule or not.\n\n0.5.6 (2021-02-28)\n------------------\n\n- Some UnicodeDecodeError exceptions raised from jobs could trigger a\n serialization failure (UntranslatableCharacter) because it would contain\n the sequence ``\\u0000``` which, while valid in Python, is not allowed\n in postgres. So when dealing with the raw bytes, we'll decode it with\n the replacement character that can be properly stored. Not ideal, but\n better than failing to store the error at all.\n\n0.5.5 (2021-01-22)\n------------------\n\n- Fixed some old code causing the worker lock to release after a job\n completed.\n\n0.5.4 (2021-01-20)\n------------------\n\n- Log at the error level when marking a job as lost.\n\n0.5.3 (2021-01-11)\n------------------\n\n- Copy the ``schedule_id`` information to retried jobs.\n\n0.5.2 (2021-01-11)\n------------------\n\n- [breaking] Require ``call_schedule`` to accept an id instead of an object.\n\n0.5.1 (2021-01-09)\n------------------\n\n- [model migration] Drop the ``UNIQUE`` constraint on the background job\n ``lock_id`` column.\n\n0.5 (2021-01-09)\n----------------\n\n- [model migration] Add a scheduler model with support for emitting periodic\n jobs based on RRULE syntax.\n See https://github.com/mmerickel/psycopg2_mq/pull/11\n\n- Enable the workers to coordinate on a per-queue basis who is in control\n of scheduling jobs.\n See https://github.com/mmerickel/psycopg2_mq/pull/12\n\n- Reduce the number of advisory locks held from one per job to one per worker.\n See https://github.com/mmerickel/psycopg2_mq/pull/12\n\n0.4.5 (2020-12-22)\n------------------\n\n- Use column objects in the insert statement to support ORM-level synonyms,\n enabling the schema to have columns with different names.\n\n0.4.4 (2019-11-07)\n------------------\n\n- Ensure the advisory locks are released when a job completes.\n\n0.4.3 (2019-10-31)\n------------------\n\n- Ensure maintenance (finding lost jobs) always runs at set intervals defined\n by the ``timeout`` parameter.\n\n0.4.2 (2019-10-30)\n------------------\n\n- Recover active jobs when the connection is lost by re-locking them\n and ensuring they are marked running.\n\n0.4.1 (2019-10-30)\n------------------\n\n- Attempt to reconnect to the database after losing the connection.\n If the reconnect attempt fails then crash.\n\n0.4 (2019-10-28)\n----------------\n\n- [model migration] Add a ``worker`` column to the ``Job`` model to track what\n worker is handling a job.\n\n- Add an optional ``name`` argument to ``MQWorker`` to name the worker -\n the value will be recorded in each job.\n\n- Add a ``threads`` argument (default=``1``) to ``MQWorker`` to support\n handling multiple jobs from the same worker instance instead of making a\n worker per thread.\n\n- Add ``capture_signals`` argument (default=``True``) to ``MQWorker`` which\n will capture ``SIGTERM``, ``SIGINT`` and ``SIGUSR1``. The first two will\n trigger graceful shutdown - they will make the process stop handling new\n jobs while finishing active jobs. The latter will dump to ``stderr`` a\n JSON dump of the current status of the worker.\n\n0.3.3 (2019-10-23)\n------------------\n\n- Only save a cursor update if the job is completed successfully.\n\n0.3.2 (2019-10-22)\n------------------\n\n- Mark lost jobs during timeouts instead of just when a worker starts in order\n to catch them earlier.\n\n0.3.1 (2019-10-17)\n------------------\n\n- When attempting to schedule a job with a cursor and a ``scheduled_time``\n earlier than a pending job on the same cursor, the job will be updated to\n run at the earlier time.\n\n- When attempting to schedule a job with a cursor and a pending job already\n exists on the same cursor, a ``conflict_resolver`` function may be\n supplied to ``MQSource.call`` to update the job properties, merging the\n arguments however the user wishes.\n\n0.3 (2019-10-15)\n----------------\n\n- [model migration] Add a new column ``cursor_snapshot`` to the ``Job`` model which\n will contain the value of the cursor when the job begins.\n\n0.2 (2019-10-09)\n----------------\n\n- [model migration] Add cursor support for jobs. This requires a schema migration to\n add a ``cursor_key`` column, a new ``JobCursor`` model, and some new indices.\n\n0.1.6 (2019-10-07)\n------------------\n\n- Support passing custom kwargs to the job in ``psycopg2_mq.MQSource.call``\n to allow custom columns on the job table.\n\n0.1.5 (2019-05-17)\n------------------\n\n- Fix a regression when serializing errors with strings or cycles.\n\n0.1.4 (2019-05-09)\n------------------\n\n- More safely serialize exception objects when jobs fail.\n\n0.1.3 (2018-09-04)\n------------------\n\n- Rename the thread to contain the job id while it's handling a job.\n\n0.1.2 (2018-09-04)\n------------------\n\n- [model migration] Rename ``Job.params`` to ``Job.args``.\n\n0.1.1 (2018-09-04)\n------------------\n\n- Make ``psycopg2`` an optional dependency in order to allow apps to depend\n on ``psycopg2-binary`` if they wish.\n\n0.1 (2018-09-04)\n----------------\n\n- Initial release.\n",
"bugtrack_url": null,
"license": null,
"summary": "A message queue written around PostgreSQL.",
"version": "0.12.6",
"project_urls": {
"Homepage": "https://github.com/mmerickel/psycopg2_mq"
},
"split_keywords": [
"psycopg2",
" postgres",
" postgresql",
" message queue",
" background jobs"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "04d29ad78e7cc36ebd5b915139dc55576d9db9bbbfcb1a97b6f482572dc10418",
"md5": "8af80b9fcfdda01da7cebe451120b13d",
"sha256": "c7a8eef41b4e53206f82974f8afafb5aa9ce5d4cb67c9ad2cb1072ddbe2d9a26"
},
"downloads": -1,
"filename": "psycopg2_mq-0.12.6-py3-none-any.whl",
"has_sig": false,
"md5_digest": "8af80b9fcfdda01da7cebe451120b13d",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.9",
"size": 24888,
"upload_time": "2024-11-14T21:47:49",
"upload_time_iso_8601": "2024-11-14T21:47:49.362011Z",
"url": "https://files.pythonhosted.org/packages/04/d2/9ad78e7cc36ebd5b915139dc55576d9db9bbbfcb1a97b6f482572dc10418/psycopg2_mq-0.12.6-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "54e6a517812483e025d4395312339243f78261bd52f66591f2727b9e98b5cdf4",
"md5": "54d1d149bd8bad52ff6b96f8f8cbcbc3",
"sha256": "a4eec09f79d60af25382e3e99234005375974b252bac124a93700a8ea7b238a4"
},
"downloads": -1,
"filename": "psycopg2_mq-0.12.6.tar.gz",
"has_sig": false,
"md5_digest": "54d1d149bd8bad52ff6b96f8f8cbcbc3",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.9",
"size": 37154,
"upload_time": "2024-11-14T21:47:50",
"upload_time_iso_8601": "2024-11-14T21:47:50.623550Z",
"url": "https://files.pythonhosted.org/packages/54/e6/a517812483e025d4395312339243f78261bd52f66591f2727b9e98b5cdf4/psycopg2_mq-0.12.6.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-11-14 21:47:50",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "mmerickel",
"github_project": "psycopg2_mq",
"travis_ci": false,
"coveralls": true,
"github_actions": true,
"tox": true,
"lcname": "psycopg2-mq"
}