xqute


Namexqute JSON
Version 0.7.0 PyPI version JSON
download
home_pagehttps://github.com/pwwang/xqute
SummaryA job management system for python
upload_time2025-02-06 18:58:43
maintainerNone
docs_urlNone
authorpwwang
requires_python<4.0,>=3.9
licenseMIT
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # xqute

A job management system for python

## Features

- Written in async
- Plugin system
- Scheduler adaptor
- Job retrying/pipeline halting when failed

## Installation

```shell
pip install xqute
```

## A toy example

```python
import asyncio
from xqute import Xqute

async def main():
    # 3 jobs allowed to run at the same time
    xqute = Xqute(forks=3)
    for _ in range(10):
        await xqute.put('sleep 1')
    await xqute.run_until_complete()

if __name__ == '__main__':
    asyncio.run(main())
```

![xqute](./xqute.png)

## API

<https://pwwang.github.io/xqute/>

## Usage

### Xqute object

An xqute is initialized by:

```python
xqute = Xqute(...)
```

Available arguments are:

- scheduler: The scheduler class or name
- plugins: The plugins to enable/disable for this session
- workdir: The job meta directory (Default: `./.xqute/`)
- forks: The number of jobs allowed to run at the same time
- error_strategy: The strategy when there is error happened
- num_retries: Max number of retries when job_error_strategy is retry
- submission_batch: The number of consumers to submit jobs
- scheduler_opts: Additional keyword arguments for scheduler
- jobname_prefix: The prefix of the job name

Note that the producer must be initialized in an event loop.

To push a job into the queue:

```python
await xqute.put(['echo', 1])
```

### Using SGE scheduler

```python
xqute = Xqute(
    'sge',
    forks=100,
    scheduler_opts=dict(
        qsub='path to qsub',
        qdel='path to qdel',
        qstat='path to qstat',
        q='1-day',  # or qsub_q='1-day'
    )
    ...
)
```

Keyword-arguments with names starting with `sge_` will be interpreted as `qsub` options. `list` or `tuple` option values will be expanded. For example:
`l=['h_vmem=2G', 'gpu=1']` will be expanded in wrapped script like this:

```shell
# ...

#$ -l h_vmem=2G
#$ -l gpu=1

# ...
```

### Using Slurm scheduler

```python
xqute = Xqute(
    'slurm',
    forks=100,
    scheduler_opts = {
        "sbatch": 'path to sbatch',
        "scancel": 'path to scancel',
        "squeue": 'path to squeue',
        "partition": '1-day',  # or partition='1-day'
        "time": '01:00:00',
        ...
    },
)
```

### Using ssh scheduler

```python
xqute = Xqute(
    'ssh',
    forks=100,
    scheduler_opts={
        "ssh": 'path to ssh',
        "servers": {
            "server1": {
                "user": ...,
                "port": 22,
                "keyfile": ...,
                # How long to keep the ssh connection alive
                "ctrl_persist": 600,
                # Where to store the control socket
                "ctrl_dir": "/tmp",
            },
            ...
        }
    },
    ...
)
```

SSH servers must share the same filesystem and using keyfile authentication.

### Plugins

To write a plugin for `xqute`, you will need to implement the following hooks:

- `def on_init(scheduler)`: Right after scheduler object is initialized
- `def on_shutdown(scheduler, sig)`: When scheduler is shutting down
- `async def on_job_init(scheduler, job)`: When the job is initialized
- `async def on_job_queued(scheduler, job)`: When the job is queued
- `async def on_job_submitted(scheduler, job)`: When the job is submitted
- `async def on_job_started(scheduler, job)`: When the job is started (when status changed to running)
- `async def on_job_polling(scheduler, job)`: When job status is being polled
- `async def on_job_killing(scheduler, job)`: When the job is being killed
- `async def on_job_killed(scheduler, job)`: When the job is killed
- `async def on_job_failed(scheduler, job)`: When the job is failed
- `async def on_job_succeeded(scheduler, job)`: When the job is succeeded
- `def on_jobcmd_init(scheduler, job) -> str`: When the job command wrapper script is initialized before the prescript is run. This will replace the placeholder `{jobcmd_init}` in the wrapper script.
- `def on_jobcmd_prep(scheduler, job) -> str`: When the job command is right about to run in the wrapper script. This will replace the placeholder `{jobcmd_prep}` in the wrapper script.
- `def on_jobcmd_end(scheduler, job) -> str`: When the job command wrapper script is about to end and after the postscript is run. This will replace the placeholder `{jobcmd_end}` in the wrapper script.

Note that all hooks are corotines except `on_init`, `on_shutdown` and `on_jobcmd_*`, that means you should also implement them as corotines (sync implementations are allowed but will be warned).

You may also check where the hooks are called in the following diagram:

![xqute-design](./xqute-design.png)

To implement a hook, you have to fetch the plugin manager:

```python
from simplug import Simplug
pm = Simplug('xqute')

# or
from xqute import simplug as pm
```

and then use the decorator `pm.impl`:

```python
@pm.impl
def on_init(scheduler):
    ...
```

### Implementing a scheduler

Currently there are a few builtin schedulers: `local`, `slurm`, `gbatch` and `sge`.

One can implement a scheduler by subclassing the `Scheduler` abstract class. There are three abstract methods that have to be implemented in the subclass:

```python
from xqute import Scheduer


class MyScheduler(Scheduler):
    name = 'mysched'

    async def submit_job(self, job):
        """How to submit a job, return a unique id in the scheduler system
        (the pid for local scheduler for example)
        """

    async def kill_job(self, job):
        """How to kill a job"""

    async def job_is_running(self, job):
        """Check if a job is running"""
```

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/pwwang/xqute",
    "name": "xqute",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<4.0,>=3.9",
    "maintainer_email": null,
    "keywords": null,
    "author": "pwwang",
    "author_email": "pwwang@pwwang.com",
    "download_url": "https://files.pythonhosted.org/packages/67/7b/9225d144da0913b9535a1947c9c43b17e4bd95c859c685126eb6a8676fc2/xqute-0.7.0.tar.gz",
    "platform": null,
    "description": "# xqute\n\nA job management system for python\n\n## Features\n\n- Written in async\n- Plugin system\n- Scheduler adaptor\n- Job retrying/pipeline halting when failed\n\n## Installation\n\n```shell\npip install xqute\n```\n\n## A toy example\n\n```python\nimport asyncio\nfrom xqute import Xqute\n\nasync def main():\n    # 3 jobs allowed to run at the same time\n    xqute = Xqute(forks=3)\n    for _ in range(10):\n        await xqute.put('sleep 1')\n    await xqute.run_until_complete()\n\nif __name__ == '__main__':\n    asyncio.run(main())\n```\n\n![xqute](./xqute.png)\n\n## API\n\n<https://pwwang.github.io/xqute/>\n\n## Usage\n\n### Xqute object\n\nAn xqute is initialized by:\n\n```python\nxqute = Xqute(...)\n```\n\nAvailable arguments are:\n\n- scheduler: The scheduler class or name\n- plugins: The plugins to enable/disable for this session\n- workdir: The job meta directory (Default: `./.xqute/`)\n- forks: The number of jobs allowed to run at the same time\n- error_strategy: The strategy when there is error happened\n- num_retries: Max number of retries when job_error_strategy is retry\n- submission_batch: The number of consumers to submit jobs\n- scheduler_opts: Additional keyword arguments for scheduler\n- jobname_prefix: The prefix of the job name\n\nNote that the producer must be initialized in an event loop.\n\nTo push a job into the queue:\n\n```python\nawait xqute.put(['echo', 1])\n```\n\n### Using SGE scheduler\n\n```python\nxqute = Xqute(\n    'sge',\n    forks=100,\n    scheduler_opts=dict(\n        qsub='path to qsub',\n        qdel='path to qdel',\n        qstat='path to qstat',\n        q='1-day',  # or qsub_q='1-day'\n    )\n    ...\n)\n```\n\nKeyword-arguments with names starting with `sge_` will be interpreted as `qsub` options. `list` or `tuple` option values will be expanded. For example:\n`l=['h_vmem=2G', 'gpu=1']` will be expanded in wrapped script like this:\n\n```shell\n# ...\n\n#$ -l h_vmem=2G\n#$ -l gpu=1\n\n# ...\n```\n\n### Using Slurm scheduler\n\n```python\nxqute = Xqute(\n    'slurm',\n    forks=100,\n    scheduler_opts = {\n        \"sbatch\": 'path to sbatch',\n        \"scancel\": 'path to scancel',\n        \"squeue\": 'path to squeue',\n        \"partition\": '1-day',  # or partition='1-day'\n        \"time\": '01:00:00',\n        ...\n    },\n)\n```\n\n### Using ssh scheduler\n\n```python\nxqute = Xqute(\n    'ssh',\n    forks=100,\n    scheduler_opts={\n        \"ssh\": 'path to ssh',\n        \"servers\": {\n            \"server1\": {\n                \"user\": ...,\n                \"port\": 22,\n                \"keyfile\": ...,\n                # How long to keep the ssh connection alive\n                \"ctrl_persist\": 600,\n                # Where to store the control socket\n                \"ctrl_dir\": \"/tmp\",\n            },\n            ...\n        }\n    },\n    ...\n)\n```\n\nSSH servers must share the same filesystem and using keyfile authentication.\n\n### Plugins\n\nTo write a plugin for `xqute`, you will need to implement the following hooks:\n\n- `def on_init(scheduler)`: Right after scheduler object is initialized\n- `def on_shutdown(scheduler, sig)`: When scheduler is shutting down\n- `async def on_job_init(scheduler, job)`: When the job is initialized\n- `async def on_job_queued(scheduler, job)`: When the job is queued\n- `async def on_job_submitted(scheduler, job)`: When the job is submitted\n- `async def on_job_started(scheduler, job)`: When the job is started (when status changed to running)\n- `async def on_job_polling(scheduler, job)`: When job status is being polled\n- `async def on_job_killing(scheduler, job)`: When the job is being killed\n- `async def on_job_killed(scheduler, job)`: When the job is killed\n- `async def on_job_failed(scheduler, job)`: When the job is failed\n- `async def on_job_succeeded(scheduler, job)`: When the job is succeeded\n- `def on_jobcmd_init(scheduler, job) -> str`: When the job command wrapper script is initialized before the prescript is run. This will replace the placeholder `{jobcmd_init}` in the wrapper script.\n- `def on_jobcmd_prep(scheduler, job) -> str`: When the job command is right about to run in the wrapper script. This will replace the placeholder `{jobcmd_prep}` in the wrapper script.\n- `def on_jobcmd_end(scheduler, job) -> str`: When the job command wrapper script is about to end and after the postscript is run. This will replace the placeholder `{jobcmd_end}` in the wrapper script.\n\nNote that all hooks are corotines except `on_init`, `on_shutdown` and `on_jobcmd_*`, that means you should also implement them as corotines (sync implementations are allowed but will be warned).\n\nYou may also check where the hooks are called in the following diagram:\n\n![xqute-design](./xqute-design.png)\n\nTo implement a hook, you have to fetch the plugin manager:\n\n```python\nfrom simplug import Simplug\npm = Simplug('xqute')\n\n# or\nfrom xqute import simplug as pm\n```\n\nand then use the decorator `pm.impl`:\n\n```python\n@pm.impl\ndef on_init(scheduler):\n    ...\n```\n\n### Implementing a scheduler\n\nCurrently there are a few builtin schedulers: `local`, `slurm`, `gbatch` and `sge`.\n\nOne can implement a scheduler by subclassing the `Scheduler` abstract class. There are three abstract methods that have to be implemented in the subclass:\n\n```python\nfrom xqute import Scheduer\n\n\nclass MyScheduler(Scheduler):\n    name = 'mysched'\n\n    async def submit_job(self, job):\n        \"\"\"How to submit a job, return a unique id in the scheduler system\n        (the pid for local scheduler for example)\n        \"\"\"\n\n    async def kill_job(self, job):\n        \"\"\"How to kill a job\"\"\"\n\n    async def job_is_running(self, job):\n        \"\"\"Check if a job is running\"\"\"\n```\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "A job management system for python",
    "version": "0.7.0",
    "project_urls": {
        "Homepage": "https://github.com/pwwang/xqute",
        "Repository": "https://github.com/pwwang/xqute"
    },
    "split_keywords": [],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "17fa154ad63419e6616167c9cd53cf0a25c729d9ce0fed49d026ecb2c3dd3760",
                "md5": "a08c4c702ae28a1c969db3bbb81a44c3",
                "sha256": "bbd4d3b2b333ddf88d9247207c88f2e9c37a14d25f0fd3f4962b6f11a867d69c"
            },
            "downloads": -1,
            "filename": "xqute-0.7.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "a08c4c702ae28a1c969db3bbb81a44c3",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<4.0,>=3.9",
            "size": 27203,
            "upload_time": "2025-02-06T18:58:41",
            "upload_time_iso_8601": "2025-02-06T18:58:41.304731Z",
            "url": "https://files.pythonhosted.org/packages/17/fa/154ad63419e6616167c9cd53cf0a25c729d9ce0fed49d026ecb2c3dd3760/xqute-0.7.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "677b9225d144da0913b9535a1947c9c43b17e4bd95c859c685126eb6a8676fc2",
                "md5": "c9c0023624b9b36cf1a7966f59cb9120",
                "sha256": "585a268cdeab5c6f59235a578cea9736a32d2b8fd2093fd7b9076bf437d20458"
            },
            "downloads": -1,
            "filename": "xqute-0.7.0.tar.gz",
            "has_sig": false,
            "md5_digest": "c9c0023624b9b36cf1a7966f59cb9120",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<4.0,>=3.9",
            "size": 21683,
            "upload_time": "2025-02-06T18:58:43",
            "upload_time_iso_8601": "2025-02-06T18:58:43.059454Z",
            "url": "https://files.pythonhosted.org/packages/67/7b/9225d144da0913b9535a1947c9c43b17e4bd95c859c685126eb6a8676fc2/xqute-0.7.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-02-06 18:58:43",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "pwwang",
    "github_project": "xqute",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "tox": true,
    "lcname": "xqute"
}
        
Elapsed time: 0.38800s