dvc-task


Namedvc-task JSON
Version 0.4.0 PyPI version JSON
download
home_pageNone
SummaryExtensible task queue used in DVC.
upload_time2024-03-25 12:22:56
maintainerNone
docs_urlNone
authorNone
requires_python>=3.8
licenseApache-2.0
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            dvc-task
========

|PyPI| |Status| |Python Version| |License|

|Tests| |Codecov| |pre-commit| |Black|

.. |PyPI| image:: https://img.shields.io/pypi/v/dvc-task.svg
   :target: https://pypi.org/project/dvc-task/
   :alt: PyPI
.. |Status| image:: https://img.shields.io/pypi/status/dvc-task.svg
   :target: https://pypi.org/project/dvc-task/
   :alt: Status
.. |Python Version| image:: https://img.shields.io/pypi/pyversions/dvc-task
   :target: https://pypi.org/project/dvc-task
   :alt: Python Version
.. |License| image:: https://img.shields.io/pypi/l/dvc-task
   :target: https://opensource.org/licenses/Apache-2.0
   :alt: License
.. |Tests| image:: https://github.com/iterative/dvc-task/workflows/Tests/badge.svg
   :target: https://github.com/iterative/dvc-task/actions?workflow=Tests
   :alt: Tests
.. |Codecov| image:: https://codecov.io/gh/iterative/dvc-task/branch/main/graph/badge.svg
   :target: https://app.codecov.io/gh/iterative/dvc-task
   :alt: Codecov
.. |pre-commit| image:: https://img.shields.io/badge/pre--commit-enabled-lreen?logo=pre-commit&logoColor=white
   :target: https://github.com/pre-commit/pre-commit
   :alt: pre-commit
.. |Black| image:: https://img.shields.io/badge/code%20style-black-000000.svg
   :target: https://github.com/psf/black
   :alt: Black


dvc-task is a library for queuing, running and managing background jobs
(processes) from standalone Python applications. dvc-task is built on Celery_,
but does not require a full AMQP messaging server (or any other "heavy" servers
which are traditionally used as Celery brokers).


Features
--------

* ``dvc_task.proc`` module for running and managing background processes in
  Celery tasks
* Preconfigured Celery app intended for use in standalone desktop
  applications

  * Uses Kombu_ filesystem transport as the message broker, and the standard
    filesystem Celery results backend
  * Allows standalone applications to make use of Celery without the use of
    additional messaging and results backend servers
* Preconfigured "temporary" Celery worker which will automatically terminate
  itself when the Celery queue is empty

  * Allows standalone applications to start Celery workers as needed directly
    from Python code (as opposed to requiring a "run-forever" daemonized
    CLI ``celery`` worker)


Requirements
------------

* Celery 5.3 or later
* Kombu 5.3 or later

Note: Windows is not officially supported in Celery, but dvc-task is tested on
Windows (and used in DVC on Windows).


Installation
------------

You can install *dvc-task* via pip_ from PyPI_:

.. code:: console

   $ pip install dvc-task


Usage
-----

Processes (``dvc_task.proc``)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

The process module provides a simple API for managing background processes in
background tasks. Background processes are run in Celery tasks, but process
state is stored separately from Celery, so information about managed processes
can be accessed from outside of the Celery producer or consumer application.

After you have configured a Celery application, jobs can be queued (and run) via
``ProcessManager.run`` (which returns a signature for the ``proc.tasks.run``
Celery task):

.. code-block:: python

    from dvc_task.proc import ProcessManager

    @app.task
    def my_task():
        manager = ProcessManager(wdir=".")
        manager.run(["echo", "hello world"], name="foo").delay()

The ``ProcessManager`` will create a subdirectory in ``wdir`` for each managed process.

.. code-block:: none

    $ tree .
    .
    └── 25mYD6MyLNewXXdMVYCCr3
        ├── 25mYD6MyLNewXXdMVYCCr3.json
        ├── 25mYD6MyLNewXXdMVYCCr3.out
        └── 25mYD6MyLNewXXdMVYCCr3.pid
    1 directory, 3 files

At a minimum, the directory will contain ``<id>.pid`` and ``<id>.json`` files.

* ``<id>.json``: A JSON file describing the process containing the following dictionary keys:
    * ``pid``: Process PID
    * ``stdout``: Redirected stdout file path for the process (redirected to
      ``<id>.out`` by default)
    * ``stderr``: Redirected stderr file path for the process (stderr is
      redirected to ``stdout`` by default)
    * ``stdin``: Redirected stdin file path for the process (interactive
      processes are not yet supported, stdin is currently always ``null``)
    * ``returncode``: Return code for the process (``null`` if the process
      has not exited)
* ``<id>.pid``: A standard pidfile containing only the process PID

``ProcessManager`` instances can be created outside of a Celery task to manage
and monitor processes as needed:

.. code-block:: python

    >>> from dvc_task.proc import ProcessManager
    >>> manager = ProcessManager(wdir=".")
    >>> names = [name for name, _info in manager.processes()]
    ['25mYD6MyLNewXXdMVYCCr3']
    >>> for line in manager.follow(names[0]):
    ...     print(line)
    ...
    hello world

Celery Workers (``dvc_task.worker``)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

dvc-task includes a pre-configured Celery worker (``TemporaryWorker``) which
can be started from Python code. The ``TemporaryWorker`` will consume Celery
tasks until the queue is empty. Once the queue is empty, the worker will wait
up until a specified timeout for new tasks to be added to the queue. If the
queue remains empty after the timeout expires, the worker will exit.

To instantiante a worker with a 60-second timeout, with the Celery worker name
``my-worker-1``:

.. code-block:: python

    >>> from dvc_task.worker import TemporaryWorker
    >>> worker = TemporaryWorker(my_app, timeout=60)
    >>> worker.start("my-worker-1")

Note that ``worker.start`` runs the Celery worker within the calling thread.

Celery Applications (``dvc_task.app``)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

dvc-task includes a pre-configured Celery application (``FSApp``) which uses
the Kombu filesystem transport as the Celery broker along with the Celery
filesystem results storage backend. ``FSApp`` is intended to be used in
standalone Python applications where a traditional Celery producer/consumer
setup (with the appropriate messaging and storage backends) is unavailable.

.. code-block:: python

    >>> from dvc_task.app import FSApp
    >>> my_app = FSApp(wdir=".")

``FSApp`` provides iterators for accessing Kombu messages which are either
waiting in the queue or have already been processed. This allows the caller
to access Celery task information without using the Celery ``inspect`` API
(which is only functional when a Celery worker is actively running).

.. code-block:: python

    >>> for msg in my_app.iter_processed():
    ...     msg
    <Message object at 0x102e7f0d0 with details {'state': 'RECEIVED', 'content_type': 'application/json', 'delivery_tag': '0244c11a-1bcc-47fc-8587-66909a55fdc6', ...}>
    <Message object at 0x1027fd4c0 with details {'state': 'RECEIVED', 'content_type': 'application/json', 'delivery_tag': '491415d1-9527-493a-a5d7-88ed355da77c', ...}>
    <Message object at 0x102e6f160 with details {'state': 'RECEIVED', 'content_type': 'application/json', 'delivery_tag': 'ea6ab7a4-0398-42ab-9f12-8da1f8e12a8a', ...}>
    <Message object at 0x102e6f310 with details {'state': 'RECEIVED', 'content_type': 'application/json', 'delivery_tag': '77c4a335-2102-4bee-9cb8-ef4d8ef9713f', ...}>

Contributing
------------

Contributions are very welcome.
To learn more, see the `Contributor Guide`_.


License
-------

Distributed under the terms of the `Apache 2.0 license`_,
*dvc-task* is free and open source software.


Issues
------

If you encounter any problems,
please `file an issue`_ along with a detailed description.


.. _0282e14: https://github.com/celery/kombu/commit/0282e1419fad98da5ae956ff38c7e87e539889ac
.. _Apache 2.0 license: https://opensource.org/licenses/Apache-2.0
.. _Celery: https://github.com/celery/celery
.. _Kombu: https://github.com/celery/kombu
.. _PyPI: https://pypi.org/
.. _file an issue: https://github.com/iterative/dvc-task/issues
.. _pip: https://pip.pypa.io/
.. github-only
.. _Contributor Guide: CONTRIBUTING.rst

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "dvc-task",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.8",
    "maintainer_email": null,
    "keywords": null,
    "author": null,
    "author_email": "Iterative <support@dvc.org>",
    "download_url": "https://files.pythonhosted.org/packages/67/18/245fc0f1f47fc0a46a793af5359eaa4507bf375746d65fc3db6ad8b3b7b4/dvc-task-0.4.0.tar.gz",
    "platform": null,
    "description": "dvc-task\n========\n\n|PyPI| |Status| |Python Version| |License|\n\n|Tests| |Codecov| |pre-commit| |Black|\n\n.. |PyPI| image:: https://img.shields.io/pypi/v/dvc-task.svg\n   :target: https://pypi.org/project/dvc-task/\n   :alt: PyPI\n.. |Status| image:: https://img.shields.io/pypi/status/dvc-task.svg\n   :target: https://pypi.org/project/dvc-task/\n   :alt: Status\n.. |Python Version| image:: https://img.shields.io/pypi/pyversions/dvc-task\n   :target: https://pypi.org/project/dvc-task\n   :alt: Python Version\n.. |License| image:: https://img.shields.io/pypi/l/dvc-task\n   :target: https://opensource.org/licenses/Apache-2.0\n   :alt: License\n.. |Tests| image:: https://github.com/iterative/dvc-task/workflows/Tests/badge.svg\n   :target: https://github.com/iterative/dvc-task/actions?workflow=Tests\n   :alt: Tests\n.. |Codecov| image:: https://codecov.io/gh/iterative/dvc-task/branch/main/graph/badge.svg\n   :target: https://app.codecov.io/gh/iterative/dvc-task\n   :alt: Codecov\n.. |pre-commit| image:: https://img.shields.io/badge/pre--commit-enabled-lreen?logo=pre-commit&logoColor=white\n   :target: https://github.com/pre-commit/pre-commit\n   :alt: pre-commit\n.. |Black| image:: https://img.shields.io/badge/code%20style-black-000000.svg\n   :target: https://github.com/psf/black\n   :alt: Black\n\n\ndvc-task is a library for queuing, running and managing background jobs\n(processes) from standalone Python applications. dvc-task is built on Celery_,\nbut does not require a full AMQP messaging server (or any other \"heavy\" servers\nwhich are traditionally used as Celery brokers).\n\n\nFeatures\n--------\n\n* ``dvc_task.proc`` module for running and managing background processes in\n  Celery tasks\n* Preconfigured Celery app intended for use in standalone desktop\n  applications\n\n  * Uses Kombu_ filesystem transport as the message broker, and the standard\n    filesystem Celery results backend\n  * Allows standalone applications to make use of Celery without the use of\n    additional messaging and results backend servers\n* Preconfigured \"temporary\" Celery worker which will automatically terminate\n  itself when the Celery queue is empty\n\n  * Allows standalone applications to start Celery workers as needed directly\n    from Python code (as opposed to requiring a \"run-forever\" daemonized\n    CLI ``celery`` worker)\n\n\nRequirements\n------------\n\n* Celery 5.3 or later\n* Kombu 5.3 or later\n\nNote: Windows is not officially supported in Celery, but dvc-task is tested on\nWindows (and used in DVC on Windows).\n\n\nInstallation\n------------\n\nYou can install *dvc-task* via pip_ from PyPI_:\n\n.. code:: console\n\n   $ pip install dvc-task\n\n\nUsage\n-----\n\nProcesses (``dvc_task.proc``)\n^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n\nThe process module provides a simple API for managing background processes in\nbackground tasks. Background processes are run in Celery tasks, but process\nstate is stored separately from Celery, so information about managed processes\ncan be accessed from outside of the Celery producer or consumer application.\n\nAfter you have configured a Celery application, jobs can be queued (and run) via\n``ProcessManager.run`` (which returns a signature for the ``proc.tasks.run``\nCelery task):\n\n.. code-block:: python\n\n    from dvc_task.proc import ProcessManager\n\n    @app.task\n    def my_task():\n        manager = ProcessManager(wdir=\".\")\n        manager.run([\"echo\", \"hello world\"], name=\"foo\").delay()\n\nThe ``ProcessManager`` will create a subdirectory in ``wdir`` for each managed process.\n\n.. code-block:: none\n\n    $ tree .\n    .\n    \u2514\u2500\u2500 25mYD6MyLNewXXdMVYCCr3\n        \u251c\u2500\u2500 25mYD6MyLNewXXdMVYCCr3.json\n        \u251c\u2500\u2500 25mYD6MyLNewXXdMVYCCr3.out\n        \u2514\u2500\u2500 25mYD6MyLNewXXdMVYCCr3.pid\n    1 directory, 3 files\n\nAt a minimum, the directory will contain ``<id>.pid`` and ``<id>.json`` files.\n\n* ``<id>.json``: A JSON file describing the process containing the following dictionary keys:\n    * ``pid``: Process PID\n    * ``stdout``: Redirected stdout file path for the process (redirected to\n      ``<id>.out`` by default)\n    * ``stderr``: Redirected stderr file path for the process (stderr is\n      redirected to ``stdout`` by default)\n    * ``stdin``: Redirected stdin file path for the process (interactive\n      processes are not yet supported, stdin is currently always ``null``)\n    * ``returncode``: Return code for the process (``null`` if the process\n      has not exited)\n* ``<id>.pid``: A standard pidfile containing only the process PID\n\n``ProcessManager`` instances can be created outside of a Celery task to manage\nand monitor processes as needed:\n\n.. code-block:: python\n\n    >>> from dvc_task.proc import ProcessManager\n    >>> manager = ProcessManager(wdir=\".\")\n    >>> names = [name for name, _info in manager.processes()]\n    ['25mYD6MyLNewXXdMVYCCr3']\n    >>> for line in manager.follow(names[0]):\n    ...     print(line)\n    ...\n    hello world\n\nCelery Workers (``dvc_task.worker``)\n^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n\ndvc-task includes a pre-configured Celery worker (``TemporaryWorker``) which\ncan be started from Python code. The ``TemporaryWorker`` will consume Celery\ntasks until the queue is empty. Once the queue is empty, the worker will wait\nup until a specified timeout for new tasks to be added to the queue. If the\nqueue remains empty after the timeout expires, the worker will exit.\n\nTo instantiante a worker with a 60-second timeout, with the Celery worker name\n``my-worker-1``:\n\n.. code-block:: python\n\n    >>> from dvc_task.worker import TemporaryWorker\n    >>> worker = TemporaryWorker(my_app, timeout=60)\n    >>> worker.start(\"my-worker-1\")\n\nNote that ``worker.start`` runs the Celery worker within the calling thread.\n\nCelery Applications (``dvc_task.app``)\n^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n\ndvc-task includes a pre-configured Celery application (``FSApp``) which uses\nthe Kombu filesystem transport as the Celery broker along with the Celery\nfilesystem results storage backend. ``FSApp`` is intended to be used in\nstandalone Python applications where a traditional Celery producer/consumer\nsetup (with the appropriate messaging and storage backends) is unavailable.\n\n.. code-block:: python\n\n    >>> from dvc_task.app import FSApp\n    >>> my_app = FSApp(wdir=\".\")\n\n``FSApp`` provides iterators for accessing Kombu messages which are either\nwaiting in the queue or have already been processed. This allows the caller\nto access Celery task information without using the Celery ``inspect`` API\n(which is only functional when a Celery worker is actively running).\n\n.. code-block:: python\n\n    >>> for msg in my_app.iter_processed():\n    ...     msg\n    <Message object at 0x102e7f0d0 with details {'state': 'RECEIVED', 'content_type': 'application/json', 'delivery_tag': '0244c11a-1bcc-47fc-8587-66909a55fdc6', ...}>\n    <Message object at 0x1027fd4c0 with details {'state': 'RECEIVED', 'content_type': 'application/json', 'delivery_tag': '491415d1-9527-493a-a5d7-88ed355da77c', ...}>\n    <Message object at 0x102e6f160 with details {'state': 'RECEIVED', 'content_type': 'application/json', 'delivery_tag': 'ea6ab7a4-0398-42ab-9f12-8da1f8e12a8a', ...}>\n    <Message object at 0x102e6f310 with details {'state': 'RECEIVED', 'content_type': 'application/json', 'delivery_tag': '77c4a335-2102-4bee-9cb8-ef4d8ef9713f', ...}>\n\nContributing\n------------\n\nContributions are very welcome.\nTo learn more, see the `Contributor Guide`_.\n\n\nLicense\n-------\n\nDistributed under the terms of the `Apache 2.0 license`_,\n*dvc-task* is free and open source software.\n\n\nIssues\n------\n\nIf you encounter any problems,\nplease `file an issue`_ along with a detailed description.\n\n\n.. _0282e14: https://github.com/celery/kombu/commit/0282e1419fad98da5ae956ff38c7e87e539889ac\n.. _Apache 2.0 license: https://opensource.org/licenses/Apache-2.0\n.. _Celery: https://github.com/celery/celery\n.. _Kombu: https://github.com/celery/kombu\n.. _PyPI: https://pypi.org/\n.. _file an issue: https://github.com/iterative/dvc-task/issues\n.. _pip: https://pip.pypa.io/\n.. github-only\n.. _Contributor Guide: CONTRIBUTING.rst\n",
    "bugtrack_url": null,
    "license": "Apache-2.0",
    "summary": "Extensible task queue used in DVC.",
    "version": "0.4.0",
    "project_urls": {
        "Issues": "https://github.com/iterative/dvc-task/issues",
        "Source": "https://github.com/iterative/dvc-task"
    },
    "split_keywords": [],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "9dc5f56893f455486db875a6c74fbf68047fafe581313fb975da1315421117c2",
                "md5": "f919d55fea1d8cc0fed5ced467493c3b",
                "sha256": "ed047e4bb5031fcf640357ae4638a2a89e34f556560ed9d1fbf6646ed4e8cca6"
            },
            "downloads": -1,
            "filename": "dvc_task-0.4.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "f919d55fea1d8cc0fed5ced467493c3b",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.8",
            "size": 21383,
            "upload_time": "2024-03-25T12:22:54",
            "upload_time_iso_8601": "2024-03-25T12:22:54.055544Z",
            "url": "https://files.pythonhosted.org/packages/9d/c5/f56893f455486db875a6c74fbf68047fafe581313fb975da1315421117c2/dvc_task-0.4.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "6718245fc0f1f47fc0a46a793af5359eaa4507bf375746d65fc3db6ad8b3b7b4",
                "md5": "bed50307ebfd22ecfad72694b6635c08",
                "sha256": "c0166626af9c0e932ba18194fbc57df38f22860fcc0fbd450dee14ef9615cd3c"
            },
            "downloads": -1,
            "filename": "dvc-task-0.4.0.tar.gz",
            "has_sig": false,
            "md5_digest": "bed50307ebfd22ecfad72694b6635c08",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.8",
            "size": 35986,
            "upload_time": "2024-03-25T12:22:56",
            "upload_time_iso_8601": "2024-03-25T12:22:56.070264Z",
            "url": "https://files.pythonhosted.org/packages/67/18/245fc0f1f47fc0a46a793af5359eaa4507bf375746d65fc3db6ad8b3b7b4/dvc-task-0.4.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-03-25 12:22:56",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "iterative",
    "github_project": "dvc-task",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "dvc-task"
}
        
Elapsed time: 4.04397s