psycopg2-mq


Namepsycopg2-mq JSON
Version 0.13.2 PyPI version JSON
download
home_pagehttps://github.com/mmerickel/psycopg2_mq
SummaryA message queue written around PostgreSQL.
upload_time2025-01-16 23:16:04
maintainerNone
docs_urlNone
authorMichael Merickel
requires_python>=3.9
licenseNone
keywords psycopg2 postgres postgresql message queue background jobs
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage
            ===========
psycopg2_mq
===========

.. image:: https://img.shields.io/pypi/v/psycopg2_mq.svg
    :target: https://pypi.org/pypi/psycopg2_mq

.. image:: https://github.com/mmerickel/psycopg2_mq/actions/workflows/ci-tests.yml/badge.svg?branch=main
    :target: https://github.com/mmerickel/psycopg2_mq/actions/workflows/ci-tests.yml?query=branch%3Amain
    :alt: main CI Status

``psycopg2_mq`` is a message queue implemented on top of
`PostgreSQL <https://www.postgresql.org/>`__,
`SQLAlchemy <https://www.sqlalchemy.org/>`__, and
`psycopg2 <http://initd.org/psycopg/>`__.

Currently the library provides only the low-level constructs that can be used
to build a multithreaded worker system. It is broken into two components:

- ``psycopg2_mq.MQWorker`` - a reusable worker object that manages a
  single-threaded worker that can accept jobs and execute them. An application
  should create worker per thread. It supports an API for thread-safe graceful
  shutdown.

- ``psycopg2_mq.MQSource`` - a source object providing a client-side API for
  invoking and querying job states.

It is expected that these core components are then wrapped into your own
application in any way that you see fit, without dictating a specific CLI
or framework.

Data Model
==========

Queues
------

Workers run jobs defined in queues. Currently each queue will run jobs
concurrently, while a future version may support serial execution on a
per-queue basis. Each registered queue should contain an ``execute_job(job)``
method.

Jobs
----

The ``execute_job`` method of a queue is passed a ``Job`` object containing
the following attributes:

- ``id``
- ``queue``
- ``method``
- ``args``
- ``cursor``

As a convenience, there is an ``extend(**kw)`` method which can be used to
add extra attributes to the object. This is useful in individual queues to
define a contract between a queue and its methods.

Cursors
~~~~~~~

By defining a cursor you can execute jobs sequentially instead of in parallel.
There can only be one running (or lost) job for any cursor at a time.

A ``Job`` can be scheduled with a ``cursor_key``.

A ``job.cursor`` dict is provided to the workers containing the cursor data,
and is saved back to the database when the job is completed. This effectively
gives jobs some persistent, shared memory between jobs on the cursor.

Because a "lost" job on the cursor counts as running, it will also block any other jobs from executing.
To resolve this situation it is necessary for the lost job to be either marked as failed or retried.
Look at ``MQSource.retry_job`` and ``MQSource.fail_lost_job`` APIs.

Collapsible
~~~~~~~~~~~

When ``collapse_on_cursor`` is set to ``True`` on a job, this is declaring the job as "collapsible".
There can only be one job in the "pending" state marked ``collapsible == True`` for the same ``queue``, ``method``, and ``cursor_key``.
This means that if there is an existing "pending" job with the same ``cursor_key``, ``queue``, and ``method`` with ``collapsible == True`` then the new job will be "collapsed" into the existing job.
By default, this typically means that the new job will be dropped because another job already exists.
This means that the ``args`` should be "constant", because the new job's args are ignored.

If the ``args`` are not constant, then you will likely want to pass a ``conflict_resolver`` callback.
This is a function that will be invoked when a job already exists.
You can then update the existing ``Job`` object in the "pending" state, adjusting the args etc.

Delayed Jobs
~~~~~~~~~~~~

A ``Job`` can be delayed to run in the future by providing a ``datetime`` object to the ``when`` argument.
When this feature is used with collapsible jobs you can create a highly efficient throttle greatly reducing the number of jobs that run in the background.
For example, schedule 20 jobs all with the same ``cursor_key``, and ``collapse_on_cursor=True``, and ``when=timedelta(seconds=30)``.
You will see only 1 job created and it will execute in 30 seconds.
All of the other jobs are collapsed into this one instead of creating separate jobs.

Schedules
---------

A ``JobSchedule`` can be defined which supports
`RFC 5545 <https://tools.ietf.org/html/rfc5545>`__ ``RRULE`` schedules. These
are powerful and can support timezones, frequencies based on calendars as well
as simple recurring rules from an anchor time using ``DTSTART``. Cron jobs
can be converted to this syntax for simpler scenarios.

``psycopg2-mq`` workers will automatically negotiate which worker is responsible
for managing schedules so clustered workers should operate as expected.

To register a new schedule, look at the ``MQSource.add_schedule(queue, method, args, *, rrule)`` API.

If you set ``collapse_on_cursor`` is ``True`` on the schedule and there is already a job pending then the firing of the schedule is effectively a no-op.

Events / Listeners
------------------

A ``JobListener`` can be defined which supports creating new jobs when events are
emitted. When an event is emitted via ``MQSource.emit_event`` then any listeners
matching this event will be used to create a new job in the system.

To register a listener, look at the ``MQSource.add_listener(event, queue, method, args, ...)`` API.

There is a default event emitted every time a job moved to a finished state. It has the format::

  mq.job_finished.<state><queue>.<method>

For example, ``mq.job_finished.completed.dummy.echo``.

You are free to emit your own custom events as well if you need different dimensions!

When ``collapse_on_cursor`` is ``False`` then the listener receives an ``event`` arg containing the ``name``, ``listener_id``, and ``data`` keys.

If ``collapse_on_cursor`` is ``True`` on the listener then the resulting job will receive an ``events`` arg containing a list of all of the emitted events that occured while the job was in the "pending" state.

Example Worker
==============

.. code-block:: python

    from psycopg2_mq import (
        MQWorker,
        make_default_model,
    )
    from sqlalchemy import (
        MetaData,
        create_engine,
    )
    import sys

    class EchoQueue:
        def execute_job(self, job):
            return f'hello, {job.args["name"]} from method="{job.method}"'

    if __name__ == '__main__':
        engine = create_engine(sys.argv[1])
        metadata = MetaData()
        model = make_default_model(metadata)
        worker = MQWorker(
            engine=engine,
            queues={
                'echo': EchoQueue(),
            },
            model=model,
        )
        worker.run()

Example Source
==============

.. code-block:: python

    engine = create_engine('postgresql+psycopg2://...')
    metadata = MetaData()
    model = make_default_model(metadata)
    metadata.create_all(engine)
    session_factory = sessionmaker(engine)

    with session_factory.begin():
        mq = MQSource(
            dbsession=dbsession,
            model=model,
        )
        job_id = mq.call('echo', 'hello', {'name': 'Andy'})
        print(f'queued job={job_id}')

Changes
=======

0.13.2 (2025-01-16)
-------------------

- Output a nicer log message containing the cursor key when creating a new job.

0.13.1 (2025-01-16)
-------------------

- Modify the maintenance loop to run an explicit ``select 1`` on the listener
  connection periodically during idle periods to ensure the listener is still alive
  to any tcp proxies like istio when jobs are sparse.

0.13 (2025-01-14)
-----------------

- The ``job.trace`` is now populated automatically with ``mq_schedule_ids`` and
  ``mq_listener_ids`` which indicate where the job came from. These values are used
  to populate the ``JobContext.schedule_ids`` and ``JobContext.listener_ids`` fields
  going forward which is more performant than querying the relationships directly.

- [breaking] The above change means that jobs that were scheduled for the future may
  have be missing this metadata unless you add shims to your code to find the
  ``schedule_ids`` and ``listener_ids`` from the database tables or populate the
  ``trace`` via a migration yourself. New jobs created post-update will be fine.

- [breaking] The ``mq_lock`` table has undergone a small change that will require
  turning off workers (but not callers) temporarily.

- [model migration] Dropped the ``queue`` column from ``mq_lock`` and added a new
  ``ns`` column. Again we can drop the table because all workers should be disabled
  first. Possible migration::

    drop table mq_lock;
    create table mq_lock (
      ns text not null,
      key text not null,
      lock_id int not null,
      worker text not null,
      primary key (ns, key)
    );

- [model migration] Changed an index on the ``mq_job`` table to include lost jobs
  which the query optimizer prefers. Possible migration::

    drop index uq_mq_job_running_cursor_key;
    create unique index uq_mq_job_active_cursor_key on mq_job (cursor_key)
      where cursor_key is not null and state in ('running', 'lost');

- Fix deprecation warnings on Python 3.13 about ``datetime.utcnow()`` usage.

0.12.12 (2025-01-13)
--------------------

- Improve the retry logic to use an exponential backoff.

- Fix a crash introduced in 0.12.11 when connections failed.

0.12.11 (2025-01-13)
--------------------

- Improve recovery when database connections are lost by trying to reconnect multiple
  times and only logging warn/error level messages once retries completely fail.

0.12.10 (2025-01-07)
--------------------

- Improve log output if a job is blocking a cursor.

- Simplify the connection rotation logic and stale lock logic to avoid race conditions.

0.12.9 (2025-01-06)
-------------------

- Improve log output when a job is marked lost.

0.12.8 (2025-01-06)
-------------------

- Fix a critical bug causing the advisory lock to be acquired on the wrong database
  connection. If there was an issue with the database connections it could result in
  weird behavior where jobs were marked lost when they weren't, etc.

0.12.7 (2024-12-05)
-------------------

- Fixed an issue when claiming jobs if the SQLAlchemy Job model passed in to MQModel is
  not named ``Job`` in the SQLAlchemy metadata.

0.12.6 (2024-11-14)
-------------------

- Fix linting.

0.12.5 (2024-11-14)
-------------------

- Change the format to put the state in the middle of the emitted event.

- Emit an event when moving from lost to failed via ``MQSource.fail_lost_job``.

- Add the ``queue`` and ``method`` to the event args on ``mq.job_finished`` events

0.12.4 (2024-11-13)
-------------------

- Emit an event any time a job hits a terminal state. The format has changed from
  ``mq_job_complete.<queue>.<method>`` to ``mq.job_finished.<queue>.<method>.<state>``.
  This includes ``completed``, ``failed``, and ``lost``.

0.12.3 (2024-10-29)
-------------------

- Add ``job.schedule_ids`` and ``job.listener_ids`` to the job context passed to the
  background workers.

0.12.2 (2024-10-29)
-------------------

- Prevent canceling jobs in the lost state.

- Allow retrying jobs in any state.

- Allow setting a result on a lost job when moving it to failed.

0.12.1 (2024-10-27)
-------------------

- Support SQLAlchemy 2.x.

- Must mark a lost job as failed prior to being able to retry it.

0.12 (2024-10-27)
-----------------

- Support a job being linked properly to multiple schedule and listener sources such
  that provenance is properly tracked on retries.

- Add a new ``CANCELED`` job state that can be used to manually mark any pending,
  failed, or lost jobs as canceled. Jobs do not enter this state automatically - theyt
  must be manually marked but will be useful to disambiguate failed from canceled.

- [breaking] ``job.schedule_id`` is removed from the job object passed to background
  workers.

- [model migration] Moved the ``schedule_id`` and ``listener_id`` foreign keys from
  the ``Job`` table to many-to-many link tables to support tracking the source properly
  when collapsing occurs. Possible migration::

    insert into mq_job_schedule_link (job_id, schedule_id)
      select id, schedule_id from mq_job where schedule_id is not null;

    insert into mq_job_listener_link (job_id, listener_id)
      select id, listener_id from mq_job where listener_id is not null;

    alter table mq_job drop column schedule_id;
    alter table mq_job drop column listener_id;

- [model migration] Add a new ``CANCELED`` state to the ``mq_job_state`` enum.
  Possible migration::

    alter type mq_job_state add value 'canceled';

0.11 (2024-10-27)
-----------------

- Add support for Python 3.13.

- [breaking] Modified the ``MQSource.call``, and ``MQSource.add_schedule`` APIs such
  that when a cursor is used ``collapse_on_cursor`` defaults to ``False`` instead of
  ``True``. You must explicitly set it to ``True`` in scenarios in which that is
  desired as it is no longer the default behavior.

- [model migration] Add ``collapse_on_cursor`` attribute to
  the ``JobSchedule`` model. A bw-compat migration would set this value to ``False``
  if ``cursor_key`` is ``NULL`` and ``True`` on everything else.

- [model migration] Add a new ``JobListener`` model.

- [model migration] Add ``listener_id`` foreign key to the ``Job`` model.

- Fix a bug in which NOTIFY events were missed in some cases causing jobs to wait
  until the maintenance window to execute.

- Add the concept of pub/sub event listeners. Listeners can be registered that act as a
  job factory, creating a new job when an event is emitted.

  It is possible to emit events manually as needed via the ``MQSource.emit_event`` API.

  Events are emitted automatically when a job is completed. Every job when it is
  completed successfully emits a new ``mq_job_complete:<queue>.<method>`` event.
  This event contains the result of the job.

- The ``MQSource`` that is used by the ``MQWorker`` can now be overridden via the
  ``mq_source_factory`` option.

0.10 (2024-08-06)
------------------

- Add support for Python 3.12.

- Drop support for Python 3.7, and 3.8.

- Fix a race condition on shutdown where the job fails to cleanup because the triggers
  are gone while the pool is still shutting down.

0.9 (2023-04-21)
----------------

- Add support for Python 3.10, and 3.11.

- [breaking] Prevent retrying of collapsible jobs. Require them to be invoked
  using ``call`` instead for an opportunity to specify a ``conflict_resolver``.

- [model migration] Fix a bug in the default model schema in which the
  collapsible database index was not marked unique.

- Copy trace info when retrying a job.

- Capture the stringified exception to the job result in the ``message`` key,
  alongside the existing ``tb``, ``exc``, and ``args`` keys.

- The worker was not recognizing ``capture_signals=False``, causing problems
  when running the event loop in other threads.

- Blackify the codebase and add some real tests. Yay!

0.8.3 (2022-04-15)
------------------

- [breaking] Remove ``MQWorker.make_job_context``.

0.8.2 (2022-04-15)
------------------

- Drop Python 3.6 support.

- [breaking] Require SQLAlchemy 1.4+ and resolve deprecation warnings related to
  SQLAlchemy 2.0.

- [model migration] Rename ``update_job_id`` to ``updated_job_id`` in the
  ``JobCursor`` model.

0.8.1 (2022-04-15)
------------------

- Ensure the ``trace`` attribute is populated on the ``JobContext``.

- Add ``MQWorker.make_job_context`` which can be defined to completely override
  the ``JobContext`` factory using the ``Job`` object and open database session.

0.8.0 (2022-04-15)
------------------

- [model migration] Add ``update_job_id`` foreign key to the ``JobCursor`` model to
  make it possible to know which job last updated the value in the cursor.

- [model migration] Add ``trace`` json blob to the ``Job`` model.

- Support a ``trace`` json blob when creating new jobs. This value is available
  on the running job context and can be used when creating sub-jobs or when
  making requests to external systems to pass through tracing metadata.

  See ``MQSource.call``'s new ``trace`` parameter when creating jobs.
  See ``JobContext.trace`` attribute when handling jobs.

- Add a standard ``FailedJobError`` exception which can be raised by jobs to
  mark a failure with a custom result object. This is different from unhandled
  exceptions that cause the ``MQWorker.result_from_error`` method to be invoked.

0.7.0 (2022-03-03)
------------------

- Fix a corner case with lost jobs attached to cursors. In scenarios where
  multiple workers are running, if one loses a database connection then the
  other is designed to notice and mark jobs lost. However, it's possible the
  job is not actually lost and the worker can then recover after resuming
  its connection, and marking the job running again. In this situation, we
  do not want another job to begin on the same cursor. To fix this issue,
  new jobs will not be run if another job is marked lost on the same cursor.
  You will be required to recover the job by marking it as not lost (probably
  failed) first to unblock the rest of the jobs on the cursor.

0.6.2 (2022-03-01)
------------------

- Prioritize maintenance work higher than running new jobs.
  There was a chicken-and-egg issue where a job would be marked running
  but needs to be marked lost. However marking it lost is lower priority than
  trying to start new jobs. In the case where a lot of jobs were scheduled
  at the same time, the worker always tried to start new jobs and didn't
  run the maintenance so the job never got marked lost, effectively blocking
  the queue.

0.6.1 (2022-01-15)
------------------

- Fix a bug introduced in the 0.6.0 release when scheduling new jobs.

0.6.0 (2022-01-14)
------------------

- [model migration] Add model changes to mark jobs as collapsible.

- [model migration] Add model changes to the cursor index.

- Allow multiple pending jobs to be scheduled on the same cursor if either:

  1. The queue or method are different from existing pending jobs on the cursor.

  2. ``collapse_on_cursor`` is set to ``False`` when scheduling the job.

0.5.7 (2021-03-07)
------------------

- Add a ``schedule_id`` attribute to the job context for use in jobs that want
  to know whether they were executed from a schedule or not.

0.5.6 (2021-02-28)
------------------

- Some UnicodeDecodeError exceptions raised from jobs could trigger a
  serialization failure (UntranslatableCharacter) because it would contain
  the sequence ``\u0000``` which, while valid in Python, is not allowed
  in postgres. So when dealing with the raw bytes, we'll decode it with
  the replacement character that can be properly stored. Not ideal, but
  better than failing to store the error at all.

0.5.5 (2021-01-22)
------------------

- Fixed some old code causing the worker lock to release after a job
  completed.

0.5.4 (2021-01-20)
------------------

- Log at the error level when marking a job as lost.

0.5.3 (2021-01-11)
------------------

- Copy the ``schedule_id`` information to retried jobs.

0.5.2 (2021-01-11)
------------------

- [breaking] Require ``call_schedule`` to accept an id instead of an object.

0.5.1 (2021-01-09)
------------------

- [model migration] Drop the ``UNIQUE`` constraint on the background job
  ``lock_id`` column.

0.5 (2021-01-09)
----------------

- [model migration] Add a scheduler model with support for emitting periodic
  jobs based on RRULE syntax.
  See https://github.com/mmerickel/psycopg2_mq/pull/11

- Enable the workers to coordinate on a per-queue basis who is in control
  of scheduling jobs.
  See https://github.com/mmerickel/psycopg2_mq/pull/12

- Reduce the number of advisory locks held from one per job to one per worker.
  See https://github.com/mmerickel/psycopg2_mq/pull/12

0.4.5 (2020-12-22)
------------------

- Use column objects in the insert statement to support ORM-level synonyms,
  enabling the schema to have columns with different names.

0.4.4 (2019-11-07)
------------------

- Ensure the advisory locks are released when a job completes.

0.4.3 (2019-10-31)
------------------

- Ensure maintenance (finding lost jobs) always runs at set intervals defined
  by the ``timeout`` parameter.

0.4.2 (2019-10-30)
------------------

- Recover active jobs when the connection is lost by re-locking them
  and ensuring they are marked running.

0.4.1 (2019-10-30)
------------------

- Attempt to reconnect to the database after losing the connection.
  If the reconnect attempt fails then crash.

0.4 (2019-10-28)
----------------

- [model migration] Add a ``worker`` column to the ``Job`` model to track what
  worker is handling a job.

- Add an optional ``name`` argument to ``MQWorker`` to name the worker -
  the value will be recorded in each job.

- Add a ``threads`` argument (default=``1``) to ``MQWorker`` to support
  handling multiple jobs from the same worker instance instead of making a
  worker per thread.

- Add ``capture_signals`` argument (default=``True``) to ``MQWorker`` which
  will capture ``SIGTERM``, ``SIGINT`` and ``SIGUSR1``. The first two will
  trigger graceful shutdown - they will make the process stop handling new
  jobs while finishing active jobs. The latter will dump to ``stderr`` a
  JSON dump of the current status of the worker.

0.3.3 (2019-10-23)
------------------

- Only save a cursor update if the job is completed successfully.

0.3.2 (2019-10-22)
------------------

- Mark lost jobs during timeouts instead of just when a worker starts in order
  to catch them earlier.

0.3.1 (2019-10-17)
------------------

- When attempting to schedule a job with a cursor and a ``scheduled_time``
  earlier than a pending job on the same cursor, the job will be updated to
  run at the earlier time.

- When attempting to schedule a job with a cursor and a pending job already
  exists on the same cursor, a ``conflict_resolver`` function may be
  supplied to ``MQSource.call`` to update the job properties, merging the
  arguments however the user wishes.

0.3 (2019-10-15)
----------------

- [model migration] Add a new column ``cursor_snapshot`` to the ``Job`` model which
  will contain the value of the cursor when the job begins.

0.2 (2019-10-09)
----------------

- [model migration] Add cursor support for jobs. This requires a schema migration to
  add a ``cursor_key`` column, a new ``JobCursor`` model, and some new indices.

0.1.6 (2019-10-07)
------------------

- Support passing custom kwargs to the job in ``psycopg2_mq.MQSource.call``
  to allow custom columns on the job table.

0.1.5 (2019-05-17)
------------------

- Fix a regression when serializing errors with strings or cycles.

0.1.4 (2019-05-09)
------------------

- More safely serialize exception objects when jobs fail.

0.1.3 (2018-09-04)
------------------

- Rename the thread to contain the job id while it's handling a job.

0.1.2 (2018-09-04)
------------------

- [model migration] Rename ``Job.params`` to ``Job.args``.

0.1.1 (2018-09-04)
------------------

- Make ``psycopg2`` an optional dependency in order to allow apps to depend
  on ``psycopg2-binary`` if they wish.

0.1 (2018-09-04)
----------------

- Initial release.

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/mmerickel/psycopg2_mq",
    "name": "psycopg2-mq",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.9",
    "maintainer_email": null,
    "keywords": "psycopg2, postgres, postgresql, message queue, background jobs",
    "author": "Michael Merickel",
    "author_email": "oss@m.merickel.org",
    "download_url": "https://files.pythonhosted.org/packages/8c/c5/ac7c9932b483b65a2565cd67e7059b3d182ac3ce58e05617a54fbfc02680/psycopg2_mq-0.13.2.tar.gz",
    "platform": null,
    "description": "===========\npsycopg2_mq\n===========\n\n.. image:: https://img.shields.io/pypi/v/psycopg2_mq.svg\n    :target: https://pypi.org/pypi/psycopg2_mq\n\n.. image:: https://github.com/mmerickel/psycopg2_mq/actions/workflows/ci-tests.yml/badge.svg?branch=main\n    :target: https://github.com/mmerickel/psycopg2_mq/actions/workflows/ci-tests.yml?query=branch%3Amain\n    :alt: main CI Status\n\n``psycopg2_mq`` is a message queue implemented on top of\n`PostgreSQL <https://www.postgresql.org/>`__,\n`SQLAlchemy <https://www.sqlalchemy.org/>`__, and\n`psycopg2 <http://initd.org/psycopg/>`__.\n\nCurrently the library provides only the low-level constructs that can be used\nto build a multithreaded worker system. It is broken into two components:\n\n- ``psycopg2_mq.MQWorker`` - a reusable worker object that manages a\n  single-threaded worker that can accept jobs and execute them. An application\n  should create worker per thread. It supports an API for thread-safe graceful\n  shutdown.\n\n- ``psycopg2_mq.MQSource`` - a source object providing a client-side API for\n  invoking and querying job states.\n\nIt is expected that these core components are then wrapped into your own\napplication in any way that you see fit, without dictating a specific CLI\nor framework.\n\nData Model\n==========\n\nQueues\n------\n\nWorkers run jobs defined in queues. Currently each queue will run jobs\nconcurrently, while a future version may support serial execution on a\nper-queue basis. Each registered queue should contain an ``execute_job(job)``\nmethod.\n\nJobs\n----\n\nThe ``execute_job`` method of a queue is passed a ``Job`` object containing\nthe following attributes:\n\n- ``id``\n- ``queue``\n- ``method``\n- ``args``\n- ``cursor``\n\nAs a convenience, there is an ``extend(**kw)`` method which can be used to\nadd extra attributes to the object. This is useful in individual queues to\ndefine a contract between a queue and its methods.\n\nCursors\n~~~~~~~\n\nBy defining a cursor you can execute jobs sequentially instead of in parallel.\nThere can only be one running (or lost) job for any cursor at a time.\n\nA ``Job`` can be scheduled with a ``cursor_key``.\n\nA ``job.cursor`` dict is provided to the workers containing the cursor data,\nand is saved back to the database when the job is completed. This effectively\ngives jobs some persistent, shared memory between jobs on the cursor.\n\nBecause a \"lost\" job on the cursor counts as running, it will also block any other jobs from executing.\nTo resolve this situation it is necessary for the lost job to be either marked as failed or retried.\nLook at ``MQSource.retry_job`` and ``MQSource.fail_lost_job`` APIs.\n\nCollapsible\n~~~~~~~~~~~\n\nWhen ``collapse_on_cursor`` is set to ``True`` on a job, this is declaring the job as \"collapsible\".\nThere can only be one job in the \"pending\" state marked ``collapsible == True`` for the same ``queue``, ``method``, and ``cursor_key``.\nThis means that if there is an existing \"pending\" job with the same ``cursor_key``, ``queue``, and ``method`` with ``collapsible == True`` then the new job will be \"collapsed\" into the existing job.\nBy default, this typically means that the new job will be dropped because another job already exists.\nThis means that the ``args`` should be \"constant\", because the new job's args are ignored.\n\nIf the ``args`` are not constant, then you will likely want to pass a ``conflict_resolver`` callback.\nThis is a function that will be invoked when a job already exists.\nYou can then update the existing ``Job`` object in the \"pending\" state, adjusting the args etc.\n\nDelayed Jobs\n~~~~~~~~~~~~\n\nA ``Job`` can be delayed to run in the future by providing a ``datetime`` object to the ``when`` argument.\nWhen this feature is used with collapsible jobs you can create a highly efficient throttle greatly reducing the number of jobs that run in the background.\nFor example, schedule 20 jobs all with the same ``cursor_key``, and ``collapse_on_cursor=True``, and ``when=timedelta(seconds=30)``.\nYou will see only 1 job created and it will execute in 30 seconds.\nAll of the other jobs are collapsed into this one instead of creating separate jobs.\n\nSchedules\n---------\n\nA ``JobSchedule`` can be defined which supports\n`RFC 5545 <https://tools.ietf.org/html/rfc5545>`__ ``RRULE`` schedules. These\nare powerful and can support timezones, frequencies based on calendars as well\nas simple recurring rules from an anchor time using ``DTSTART``. Cron jobs\ncan be converted to this syntax for simpler scenarios.\n\n``psycopg2-mq`` workers will automatically negotiate which worker is responsible\nfor managing schedules so clustered workers should operate as expected.\n\nTo register a new schedule, look at the ``MQSource.add_schedule(queue, method, args, *, rrule)`` API.\n\nIf you set ``collapse_on_cursor`` is ``True`` on the schedule and there is already a job pending then the firing of the schedule is effectively a no-op.\n\nEvents / Listeners\n------------------\n\nA ``JobListener`` can be defined which supports creating new jobs when events are\nemitted. When an event is emitted via ``MQSource.emit_event`` then any listeners\nmatching this event will be used to create a new job in the system.\n\nTo register a listener, look at the ``MQSource.add_listener(event, queue, method, args, ...)`` API.\n\nThere is a default event emitted every time a job moved to a finished state. It has the format::\n\n  mq.job_finished.<state><queue>.<method>\n\nFor example, ``mq.job_finished.completed.dummy.echo``.\n\nYou are free to emit your own custom events as well if you need different dimensions!\n\nWhen ``collapse_on_cursor`` is ``False`` then the listener receives an ``event`` arg containing the ``name``, ``listener_id``, and ``data`` keys.\n\nIf ``collapse_on_cursor`` is ``True`` on the listener then the resulting job will receive an ``events`` arg containing a list of all of the emitted events that occured while the job was in the \"pending\" state.\n\nExample Worker\n==============\n\n.. code-block:: python\n\n    from psycopg2_mq import (\n        MQWorker,\n        make_default_model,\n    )\n    from sqlalchemy import (\n        MetaData,\n        create_engine,\n    )\n    import sys\n\n    class EchoQueue:\n        def execute_job(self, job):\n            return f'hello, {job.args[\"name\"]} from method=\"{job.method}\"'\n\n    if __name__ == '__main__':\n        engine = create_engine(sys.argv[1])\n        metadata = MetaData()\n        model = make_default_model(metadata)\n        worker = MQWorker(\n            engine=engine,\n            queues={\n                'echo': EchoQueue(),\n            },\n            model=model,\n        )\n        worker.run()\n\nExample Source\n==============\n\n.. code-block:: python\n\n    engine = create_engine('postgresql+psycopg2://...')\n    metadata = MetaData()\n    model = make_default_model(metadata)\n    metadata.create_all(engine)\n    session_factory = sessionmaker(engine)\n\n    with session_factory.begin():\n        mq = MQSource(\n            dbsession=dbsession,\n            model=model,\n        )\n        job_id = mq.call('echo', 'hello', {'name': 'Andy'})\n        print(f'queued job={job_id}')\n\nChanges\n=======\n\n0.13.2 (2025-01-16)\n-------------------\n\n- Output a nicer log message containing the cursor key when creating a new job.\n\n0.13.1 (2025-01-16)\n-------------------\n\n- Modify the maintenance loop to run an explicit ``select 1`` on the listener\n  connection periodically during idle periods to ensure the listener is still alive\n  to any tcp proxies like istio when jobs are sparse.\n\n0.13 (2025-01-14)\n-----------------\n\n- The ``job.trace`` is now populated automatically with ``mq_schedule_ids`` and\n  ``mq_listener_ids`` which indicate where the job came from. These values are used\n  to populate the ``JobContext.schedule_ids`` and ``JobContext.listener_ids`` fields\n  going forward which is more performant than querying the relationships directly.\n\n- [breaking] The above change means that jobs that were scheduled for the future may\n  have be missing this metadata unless you add shims to your code to find the\n  ``schedule_ids`` and ``listener_ids`` from the database tables or populate the\n  ``trace`` via a migration yourself. New jobs created post-update will be fine.\n\n- [breaking] The ``mq_lock`` table has undergone a small change that will require\n  turning off workers (but not callers) temporarily.\n\n- [model migration] Dropped the ``queue`` column from ``mq_lock`` and added a new\n  ``ns`` column. Again we can drop the table because all workers should be disabled\n  first. Possible migration::\n\n    drop table mq_lock;\n    create table mq_lock (\n      ns text not null,\n      key text not null,\n      lock_id int not null,\n      worker text not null,\n      primary key (ns, key)\n    );\n\n- [model migration] Changed an index on the ``mq_job`` table to include lost jobs\n  which the query optimizer prefers. Possible migration::\n\n    drop index uq_mq_job_running_cursor_key;\n    create unique index uq_mq_job_active_cursor_key on mq_job (cursor_key)\n      where cursor_key is not null and state in ('running', 'lost');\n\n- Fix deprecation warnings on Python 3.13 about ``datetime.utcnow()`` usage.\n\n0.12.12 (2025-01-13)\n--------------------\n\n- Improve the retry logic to use an exponential backoff.\n\n- Fix a crash introduced in 0.12.11 when connections failed.\n\n0.12.11 (2025-01-13)\n--------------------\n\n- Improve recovery when database connections are lost by trying to reconnect multiple\n  times and only logging warn/error level messages once retries completely fail.\n\n0.12.10 (2025-01-07)\n--------------------\n\n- Improve log output if a job is blocking a cursor.\n\n- Simplify the connection rotation logic and stale lock logic to avoid race conditions.\n\n0.12.9 (2025-01-06)\n-------------------\n\n- Improve log output when a job is marked lost.\n\n0.12.8 (2025-01-06)\n-------------------\n\n- Fix a critical bug causing the advisory lock to be acquired on the wrong database\n  connection. If there was an issue with the database connections it could result in\n  weird behavior where jobs were marked lost when they weren't, etc.\n\n0.12.7 (2024-12-05)\n-------------------\n\n- Fixed an issue when claiming jobs if the SQLAlchemy Job model passed in to MQModel is\n  not named ``Job`` in the SQLAlchemy metadata.\n\n0.12.6 (2024-11-14)\n-------------------\n\n- Fix linting.\n\n0.12.5 (2024-11-14)\n-------------------\n\n- Change the format to put the state in the middle of the emitted event.\n\n- Emit an event when moving from lost to failed via ``MQSource.fail_lost_job``.\n\n- Add the ``queue`` and ``method`` to the event args on ``mq.job_finished`` events\n\n0.12.4 (2024-11-13)\n-------------------\n\n- Emit an event any time a job hits a terminal state. The format has changed from\n  ``mq_job_complete.<queue>.<method>`` to ``mq.job_finished.<queue>.<method>.<state>``.\n  This includes ``completed``, ``failed``, and ``lost``.\n\n0.12.3 (2024-10-29)\n-------------------\n\n- Add ``job.schedule_ids`` and ``job.listener_ids`` to the job context passed to the\n  background workers.\n\n0.12.2 (2024-10-29)\n-------------------\n\n- Prevent canceling jobs in the lost state.\n\n- Allow retrying jobs in any state.\n\n- Allow setting a result on a lost job when moving it to failed.\n\n0.12.1 (2024-10-27)\n-------------------\n\n- Support SQLAlchemy 2.x.\n\n- Must mark a lost job as failed prior to being able to retry it.\n\n0.12 (2024-10-27)\n-----------------\n\n- Support a job being linked properly to multiple schedule and listener sources such\n  that provenance is properly tracked on retries.\n\n- Add a new ``CANCELED`` job state that can be used to manually mark any pending,\n  failed, or lost jobs as canceled. Jobs do not enter this state automatically - theyt\n  must be manually marked but will be useful to disambiguate failed from canceled.\n\n- [breaking] ``job.schedule_id`` is removed from the job object passed to background\n  workers.\n\n- [model migration] Moved the ``schedule_id`` and ``listener_id`` foreign keys from\n  the ``Job`` table to many-to-many link tables to support tracking the source properly\n  when collapsing occurs. Possible migration::\n\n    insert into mq_job_schedule_link (job_id, schedule_id)\n      select id, schedule_id from mq_job where schedule_id is not null;\n\n    insert into mq_job_listener_link (job_id, listener_id)\n      select id, listener_id from mq_job where listener_id is not null;\n\n    alter table mq_job drop column schedule_id;\n    alter table mq_job drop column listener_id;\n\n- [model migration] Add a new ``CANCELED`` state to the ``mq_job_state`` enum.\n  Possible migration::\n\n    alter type mq_job_state add value 'canceled';\n\n0.11 (2024-10-27)\n-----------------\n\n- Add support for Python 3.13.\n\n- [breaking] Modified the ``MQSource.call``, and ``MQSource.add_schedule`` APIs such\n  that when a cursor is used ``collapse_on_cursor`` defaults to ``False`` instead of\n  ``True``. You must explicitly set it to ``True`` in scenarios in which that is\n  desired as it is no longer the default behavior.\n\n- [model migration] Add ``collapse_on_cursor`` attribute to\n  the ``JobSchedule`` model. A bw-compat migration would set this value to ``False``\n  if ``cursor_key`` is ``NULL`` and ``True`` on everything else.\n\n- [model migration] Add a new ``JobListener`` model.\n\n- [model migration] Add ``listener_id`` foreign key to the ``Job`` model.\n\n- Fix a bug in which NOTIFY events were missed in some cases causing jobs to wait\n  until the maintenance window to execute.\n\n- Add the concept of pub/sub event listeners. Listeners can be registered that act as a\n  job factory, creating a new job when an event is emitted.\n\n  It is possible to emit events manually as needed via the ``MQSource.emit_event`` API.\n\n  Events are emitted automatically when a job is completed. Every job when it is\n  completed successfully emits a new ``mq_job_complete:<queue>.<method>`` event.\n  This event contains the result of the job.\n\n- The ``MQSource`` that is used by the ``MQWorker`` can now be overridden via the\n  ``mq_source_factory`` option.\n\n0.10 (2024-08-06)\n------------------\n\n- Add support for Python 3.12.\n\n- Drop support for Python 3.7, and 3.8.\n\n- Fix a race condition on shutdown where the job fails to cleanup because the triggers\n  are gone while the pool is still shutting down.\n\n0.9 (2023-04-21)\n----------------\n\n- Add support for Python 3.10, and 3.11.\n\n- [breaking] Prevent retrying of collapsible jobs. Require them to be invoked\n  using ``call`` instead for an opportunity to specify a ``conflict_resolver``.\n\n- [model migration] Fix a bug in the default model schema in which the\n  collapsible database index was not marked unique.\n\n- Copy trace info when retrying a job.\n\n- Capture the stringified exception to the job result in the ``message`` key,\n  alongside the existing ``tb``, ``exc``, and ``args`` keys.\n\n- The worker was not recognizing ``capture_signals=False``, causing problems\n  when running the event loop in other threads.\n\n- Blackify the codebase and add some real tests. Yay!\n\n0.8.3 (2022-04-15)\n------------------\n\n- [breaking] Remove ``MQWorker.make_job_context``.\n\n0.8.2 (2022-04-15)\n------------------\n\n- Drop Python 3.6 support.\n\n- [breaking] Require SQLAlchemy 1.4+ and resolve deprecation warnings related to\n  SQLAlchemy 2.0.\n\n- [model migration] Rename ``update_job_id`` to ``updated_job_id`` in the\n  ``JobCursor`` model.\n\n0.8.1 (2022-04-15)\n------------------\n\n- Ensure the ``trace`` attribute is populated on the ``JobContext``.\n\n- Add ``MQWorker.make_job_context`` which can be defined to completely override\n  the ``JobContext`` factory using the ``Job`` object and open database session.\n\n0.8.0 (2022-04-15)\n------------------\n\n- [model migration] Add ``update_job_id`` foreign key to the ``JobCursor`` model to\n  make it possible to know which job last updated the value in the cursor.\n\n- [model migration] Add ``trace`` json blob to the ``Job`` model.\n\n- Support a ``trace`` json blob when creating new jobs. This value is available\n  on the running job context and can be used when creating sub-jobs or when\n  making requests to external systems to pass through tracing metadata.\n\n  See ``MQSource.call``'s new ``trace`` parameter when creating jobs.\n  See ``JobContext.trace`` attribute when handling jobs.\n\n- Add a standard ``FailedJobError`` exception which can be raised by jobs to\n  mark a failure with a custom result object. This is different from unhandled\n  exceptions that cause the ``MQWorker.result_from_error`` method to be invoked.\n\n0.7.0 (2022-03-03)\n------------------\n\n- Fix a corner case with lost jobs attached to cursors. In scenarios where\n  multiple workers are running, if one loses a database connection then the\n  other is designed to notice and mark jobs lost. However, it's possible the\n  job is not actually lost and the worker can then recover after resuming\n  its connection, and marking the job running again. In this situation, we\n  do not want another job to begin on the same cursor. To fix this issue,\n  new jobs will not be run if another job is marked lost on the same cursor.\n  You will be required to recover the job by marking it as not lost (probably\n  failed) first to unblock the rest of the jobs on the cursor.\n\n0.6.2 (2022-03-01)\n------------------\n\n- Prioritize maintenance work higher than running new jobs.\n  There was a chicken-and-egg issue where a job would be marked running\n  but needs to be marked lost. However marking it lost is lower priority than\n  trying to start new jobs. In the case where a lot of jobs were scheduled\n  at the same time, the worker always tried to start new jobs and didn't\n  run the maintenance so the job never got marked lost, effectively blocking\n  the queue.\n\n0.6.1 (2022-01-15)\n------------------\n\n- Fix a bug introduced in the 0.6.0 release when scheduling new jobs.\n\n0.6.0 (2022-01-14)\n------------------\n\n- [model migration] Add model changes to mark jobs as collapsible.\n\n- [model migration] Add model changes to the cursor index.\n\n- Allow multiple pending jobs to be scheduled on the same cursor if either:\n\n  1. The queue or method are different from existing pending jobs on the cursor.\n\n  2. ``collapse_on_cursor`` is set to ``False`` when scheduling the job.\n\n0.5.7 (2021-03-07)\n------------------\n\n- Add a ``schedule_id`` attribute to the job context for use in jobs that want\n  to know whether they were executed from a schedule or not.\n\n0.5.6 (2021-02-28)\n------------------\n\n- Some UnicodeDecodeError exceptions raised from jobs could trigger a\n  serialization failure (UntranslatableCharacter) because it would contain\n  the sequence ``\\u0000``` which, while valid in Python, is not allowed\n  in postgres. So when dealing with the raw bytes, we'll decode it with\n  the replacement character that can be properly stored. Not ideal, but\n  better than failing to store the error at all.\n\n0.5.5 (2021-01-22)\n------------------\n\n- Fixed some old code causing the worker lock to release after a job\n  completed.\n\n0.5.4 (2021-01-20)\n------------------\n\n- Log at the error level when marking a job as lost.\n\n0.5.3 (2021-01-11)\n------------------\n\n- Copy the ``schedule_id`` information to retried jobs.\n\n0.5.2 (2021-01-11)\n------------------\n\n- [breaking] Require ``call_schedule`` to accept an id instead of an object.\n\n0.5.1 (2021-01-09)\n------------------\n\n- [model migration] Drop the ``UNIQUE`` constraint on the background job\n  ``lock_id`` column.\n\n0.5 (2021-01-09)\n----------------\n\n- [model migration] Add a scheduler model with support for emitting periodic\n  jobs based on RRULE syntax.\n  See https://github.com/mmerickel/psycopg2_mq/pull/11\n\n- Enable the workers to coordinate on a per-queue basis who is in control\n  of scheduling jobs.\n  See https://github.com/mmerickel/psycopg2_mq/pull/12\n\n- Reduce the number of advisory locks held from one per job to one per worker.\n  See https://github.com/mmerickel/psycopg2_mq/pull/12\n\n0.4.5 (2020-12-22)\n------------------\n\n- Use column objects in the insert statement to support ORM-level synonyms,\n  enabling the schema to have columns with different names.\n\n0.4.4 (2019-11-07)\n------------------\n\n- Ensure the advisory locks are released when a job completes.\n\n0.4.3 (2019-10-31)\n------------------\n\n- Ensure maintenance (finding lost jobs) always runs at set intervals defined\n  by the ``timeout`` parameter.\n\n0.4.2 (2019-10-30)\n------------------\n\n- Recover active jobs when the connection is lost by re-locking them\n  and ensuring they are marked running.\n\n0.4.1 (2019-10-30)\n------------------\n\n- Attempt to reconnect to the database after losing the connection.\n  If the reconnect attempt fails then crash.\n\n0.4 (2019-10-28)\n----------------\n\n- [model migration] Add a ``worker`` column to the ``Job`` model to track what\n  worker is handling a job.\n\n- Add an optional ``name`` argument to ``MQWorker`` to name the worker -\n  the value will be recorded in each job.\n\n- Add a ``threads`` argument (default=``1``) to ``MQWorker`` to support\n  handling multiple jobs from the same worker instance instead of making a\n  worker per thread.\n\n- Add ``capture_signals`` argument (default=``True``) to ``MQWorker`` which\n  will capture ``SIGTERM``, ``SIGINT`` and ``SIGUSR1``. The first two will\n  trigger graceful shutdown - they will make the process stop handling new\n  jobs while finishing active jobs. The latter will dump to ``stderr`` a\n  JSON dump of the current status of the worker.\n\n0.3.3 (2019-10-23)\n------------------\n\n- Only save a cursor update if the job is completed successfully.\n\n0.3.2 (2019-10-22)\n------------------\n\n- Mark lost jobs during timeouts instead of just when a worker starts in order\n  to catch them earlier.\n\n0.3.1 (2019-10-17)\n------------------\n\n- When attempting to schedule a job with a cursor and a ``scheduled_time``\n  earlier than a pending job on the same cursor, the job will be updated to\n  run at the earlier time.\n\n- When attempting to schedule a job with a cursor and a pending job already\n  exists on the same cursor, a ``conflict_resolver`` function may be\n  supplied to ``MQSource.call`` to update the job properties, merging the\n  arguments however the user wishes.\n\n0.3 (2019-10-15)\n----------------\n\n- [model migration] Add a new column ``cursor_snapshot`` to the ``Job`` model which\n  will contain the value of the cursor when the job begins.\n\n0.2 (2019-10-09)\n----------------\n\n- [model migration] Add cursor support for jobs. This requires a schema migration to\n  add a ``cursor_key`` column, a new ``JobCursor`` model, and some new indices.\n\n0.1.6 (2019-10-07)\n------------------\n\n- Support passing custom kwargs to the job in ``psycopg2_mq.MQSource.call``\n  to allow custom columns on the job table.\n\n0.1.5 (2019-05-17)\n------------------\n\n- Fix a regression when serializing errors with strings or cycles.\n\n0.1.4 (2019-05-09)\n------------------\n\n- More safely serialize exception objects when jobs fail.\n\n0.1.3 (2018-09-04)\n------------------\n\n- Rename the thread to contain the job id while it's handling a job.\n\n0.1.2 (2018-09-04)\n------------------\n\n- [model migration] Rename ``Job.params`` to ``Job.args``.\n\n0.1.1 (2018-09-04)\n------------------\n\n- Make ``psycopg2`` an optional dependency in order to allow apps to depend\n  on ``psycopg2-binary`` if they wish.\n\n0.1 (2018-09-04)\n----------------\n\n- Initial release.\n",
    "bugtrack_url": null,
    "license": null,
    "summary": "A message queue written around PostgreSQL.",
    "version": "0.13.2",
    "project_urls": {
        "Homepage": "https://github.com/mmerickel/psycopg2_mq"
    },
    "split_keywords": [
        "psycopg2",
        " postgres",
        " postgresql",
        " message queue",
        " background jobs"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "dca2da571e174a68ac1474d5e1c719370b971addbf24692893c86a437e73bff3",
                "md5": "9101a77156ea3a99a14e2af61fa6889a",
                "sha256": "b9711f08a03bd17b93fddd2447afd6b0d8a9cfa93e6b9de6b156475e97580ec8"
            },
            "downloads": -1,
            "filename": "psycopg2_mq-0.13.2-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "9101a77156ea3a99a14e2af61fa6889a",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.9",
            "size": 27116,
            "upload_time": "2025-01-16T23:16:02",
            "upload_time_iso_8601": "2025-01-16T23:16:02.593723Z",
            "url": "https://files.pythonhosted.org/packages/dc/a2/da571e174a68ac1474d5e1c719370b971addbf24692893c86a437e73bff3/psycopg2_mq-0.13.2-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "8cc5ac7c9932b483b65a2565cd67e7059b3d182ac3ce58e05617a54fbfc02680",
                "md5": "6ccf0cbf35a6d56fc37c6c4bdbd01b50",
                "sha256": "20bab4ac688d94b7e0b0f584058d18e3f84e993fff9b3bc65a890c7edbbcaf0f"
            },
            "downloads": -1,
            "filename": "psycopg2_mq-0.13.2.tar.gz",
            "has_sig": false,
            "md5_digest": "6ccf0cbf35a6d56fc37c6c4bdbd01b50",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.9",
            "size": 40588,
            "upload_time": "2025-01-16T23:16:04",
            "upload_time_iso_8601": "2025-01-16T23:16:04.657595Z",
            "url": "https://files.pythonhosted.org/packages/8c/c5/ac7c9932b483b65a2565cd67e7059b3d182ac3ce58e05617a54fbfc02680/psycopg2_mq-0.13.2.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-01-16 23:16:04",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "mmerickel",
    "github_project": "psycopg2_mq",
    "travis_ci": false,
    "coveralls": true,
    "github_actions": true,
    "tox": true,
    "lcname": "psycopg2-mq"
}
        
Elapsed time: 1.99659s