xqute


Namexqute JSON
Version 0.5.3 PyPI version JSON
download
home_pagehttps://github.com/pwwang/xqute
SummaryA job management system for python
upload_time2024-08-16 15:43:40
maintainerNone
docs_urlNone
authorpwwang
requires_python<4.0,>=3.8
licenseMIT
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # xqute

A job management system for python

## Features

- Written in async
- Plugin system
- Scheduler adaptor
- Job retrying/pipeline halting when failed

## Installation

```shell
pip install xqute
```

## A toy example

```python
import asyncio
from xqute import Xqute

async def main():
    # 3 jobs allowed to run at the same time
    xqute = Xqute(scheduler_forks=3)
    for _ in range(10):
        await xqute.put('sleep 1')
    await xqute.run_until_complete()

if __name__ == '__main__':
    asyncio.run(main())
```

![xqute](./xqute.png)

## API

<https://pwwang.github.io/xqute/>

## Usage

### Xqute object

An xqute is initialized by:

```python
xqute = Xqute(...)
```

Available arguments are:

- scheduler: The scheduler class or name
- plugins: The plugins to enable/disable for this session
- job_metadir: The job meta directory (Default: `./.xqute/`)
- job_error_strategy: The strategy when there is error happened
- job_num_retries: Max number of retries when job_error_strategy is retry
- job_submission_batch: The number of consumers to submit jobs
- scheduler_forks: Max number of job forks
- **scheduler_opts: Additional keyword arguments for scheduler

Note that the producer must be initialized in an event loop.

To push a job into the queue:

```python
await xqute.put(['echo', 1])
```

### Using SGE scheduler

```python
xqute = Xqute(
    'sge',
    scheduler_forks=100,
    qsub='path to qsub',
    qdel='path to qdel',
    qstat='path to qstat',
    sge_q='1-day',  # or qsub_q='1-day'
    ...
)
```

Keyword-arguments with names starting with `sge_` will be interpreted as `qsub` options. `list` or `tuple` option values will be expanded. For example:
`sge_l=['h_vmem=2G', 'gpu=1']` will be expanded in wrapped script like this:

```shell
# ...

#$ -l h_vmem=2G
#$ -l gpu=1

# ...
```

### Using Slurm scheduler

```python
xqute = Xqute(
    'slurm',
    scheduler_forks=100,
    sbatch='path to sbatch',
    scancel='path to scancel',
    squeue='path to squeue',
    sbatch_partition='1-day',  # or slurm_partition='1-day'
    sbatch_time='01:00:00',
    ...
)
```

### Using ssh scheduler

```python
xqute = Xqute(
    'ssh',
    scheduler_forks=100,
    ssh='path to ssh',
    ssh_servers={
        "server1": {
            "user": ...,
            "port": 22,
            "keyfile": ...,
            # How long to keep the ssh connection alive
            "ctrl_persist": 600,
            # Where to store the control socket
            "ctrl_dir": "/tmp",
        },
        ...
    }
    ...
)
```

SSH servers must share the same filesystem and using keyfile authentication.

### Plugins

To write a plugin for `xqute`, you will need to implement the following hooks:

- `def on_init(scheduler)`: Right after scheduler object is initialized
- `def on_shutdown(scheduler, sig)`: When scheduler is shutting down
- `async def on_job_init(scheduler, job)`: When the job is initialized
- `async def on_job_queued(scheduler, job)`: When the job is queued
- `async def on_job_submitted(scheduler, job)`: When the job is submitted
- `async def on_job_started(scheduler, job)`: When the job is started (when status changed to running)
- `async def on_job_polling(scheduler, job)`: When job status is being polled
- `async def on_job_killing(scheduler, job)`: When the job is being killed
- `async def on_job_killed(scheduler, job)`: When the job is killed
- `async def on_job_failed(scheduler, job)`: When the job is failed
- `async def on_job_succeeded(scheduler, job)`: When the job is succeeded
- `def on_jobcmd_init(scheduler, job) -> str`: When the job command wrapper script is initialized before the prescript is run. This will replace the placeholder `#![jobcmd_init]` in the wrapper script.
- `def on_jobcmd_prep(scheduler, job) -> str`: When the job command is right about to run in the wrapper script. This will replace the placeholder `#![jobcmd_prep]` in the wrapper script.
- `def on_jobcmd_end(scheduler, job) -> str`: When the job command wrapper script is about to end and after the postscript is run. This will replace the placeholder `#![jobcmd_end]` in the wrapper script.

Note that all hooks are corotines except `on_init` and `on_shutdown`, that means you should also implement them as corotines (sync implementations are allowed but will be warned).

To implement a hook, you have to fetch the plugin manager:

```python
from simplug import Simplug
pm = Simplug('xqute')

# or
from xqute import simplug as pm
```

and then use the decorator `pm.impl`:

```python
@pm.impl
def on_init(scheduler):
    ...
```

### Implementing a scheduler

Currently there are only 2 builtin schedulers: `local` and `sge`.

One can implement a scheduler by subclassing the `Scheduler` abstract class. There are three abstract methods that have to be implemented in the subclass:

```python
from xqute import Scheduer

class MyScheduler(Scheduler):
    name = 'my'
    job_class: MyJob

    async def submit_job(self, job):
        """How to submit a job, return a unique id in the scheduler system
        (the pid for local scheduler for example)
        """

    async def kill_job(self, job):
        """How to kill a job"""

    async def job_is_running(self, job):
        """Check if a job is running"""
```

As you may see, we may also need to implement a job class before `MyScheduler`. The only abstract method to be implemented is `wrap_cmd`:

```python
from xqute import Job

class MyJob(Job):

    def shebang(self, scheduler: Scheduler) -> str:
        """The shebang and the options for the job script"""
```

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/pwwang/xqute",
    "name": "xqute",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<4.0,>=3.8",
    "maintainer_email": null,
    "keywords": null,
    "author": "pwwang",
    "author_email": "pwwang@pwwang.com",
    "download_url": "https://files.pythonhosted.org/packages/d8/03/7fad1535dbe42f1bbdfbb1343ca49bbb1328425fadbbcfc93514ac72e816/xqute-0.5.3.tar.gz",
    "platform": null,
    "description": "# xqute\n\nA job management system for python\n\n## Features\n\n- Written in async\n- Plugin system\n- Scheduler adaptor\n- Job retrying/pipeline halting when failed\n\n## Installation\n\n```shell\npip install xqute\n```\n\n## A toy example\n\n```python\nimport asyncio\nfrom xqute import Xqute\n\nasync def main():\n    # 3 jobs allowed to run at the same time\n    xqute = Xqute(scheduler_forks=3)\n    for _ in range(10):\n        await xqute.put('sleep 1')\n    await xqute.run_until_complete()\n\nif __name__ == '__main__':\n    asyncio.run(main())\n```\n\n![xqute](./xqute.png)\n\n## API\n\n<https://pwwang.github.io/xqute/>\n\n## Usage\n\n### Xqute object\n\nAn xqute is initialized by:\n\n```python\nxqute = Xqute(...)\n```\n\nAvailable arguments are:\n\n- scheduler: The scheduler class or name\n- plugins: The plugins to enable/disable for this session\n- job_metadir: The job meta directory (Default: `./.xqute/`)\n- job_error_strategy: The strategy when there is error happened\n- job_num_retries: Max number of retries when job_error_strategy is retry\n- job_submission_batch: The number of consumers to submit jobs\n- scheduler_forks: Max number of job forks\n- **scheduler_opts: Additional keyword arguments for scheduler\n\nNote that the producer must be initialized in an event loop.\n\nTo push a job into the queue:\n\n```python\nawait xqute.put(['echo', 1])\n```\n\n### Using SGE scheduler\n\n```python\nxqute = Xqute(\n    'sge',\n    scheduler_forks=100,\n    qsub='path to qsub',\n    qdel='path to qdel',\n    qstat='path to qstat',\n    sge_q='1-day',  # or qsub_q='1-day'\n    ...\n)\n```\n\nKeyword-arguments with names starting with `sge_` will be interpreted as `qsub` options. `list` or `tuple` option values will be expanded. For example:\n`sge_l=['h_vmem=2G', 'gpu=1']` will be expanded in wrapped script like this:\n\n```shell\n# ...\n\n#$ -l h_vmem=2G\n#$ -l gpu=1\n\n# ...\n```\n\n### Using Slurm scheduler\n\n```python\nxqute = Xqute(\n    'slurm',\n    scheduler_forks=100,\n    sbatch='path to sbatch',\n    scancel='path to scancel',\n    squeue='path to squeue',\n    sbatch_partition='1-day',  # or slurm_partition='1-day'\n    sbatch_time='01:00:00',\n    ...\n)\n```\n\n### Using ssh scheduler\n\n```python\nxqute = Xqute(\n    'ssh',\n    scheduler_forks=100,\n    ssh='path to ssh',\n    ssh_servers={\n        \"server1\": {\n            \"user\": ...,\n            \"port\": 22,\n            \"keyfile\": ...,\n            # How long to keep the ssh connection alive\n            \"ctrl_persist\": 600,\n            # Where to store the control socket\n            \"ctrl_dir\": \"/tmp\",\n        },\n        ...\n    }\n    ...\n)\n```\n\nSSH servers must share the same filesystem and using keyfile authentication.\n\n### Plugins\n\nTo write a plugin for `xqute`, you will need to implement the following hooks:\n\n- `def on_init(scheduler)`: Right after scheduler object is initialized\n- `def on_shutdown(scheduler, sig)`: When scheduler is shutting down\n- `async def on_job_init(scheduler, job)`: When the job is initialized\n- `async def on_job_queued(scheduler, job)`: When the job is queued\n- `async def on_job_submitted(scheduler, job)`: When the job is submitted\n- `async def on_job_started(scheduler, job)`: When the job is started (when status changed to running)\n- `async def on_job_polling(scheduler, job)`: When job status is being polled\n- `async def on_job_killing(scheduler, job)`: When the job is being killed\n- `async def on_job_killed(scheduler, job)`: When the job is killed\n- `async def on_job_failed(scheduler, job)`: When the job is failed\n- `async def on_job_succeeded(scheduler, job)`: When the job is succeeded\n- `def on_jobcmd_init(scheduler, job) -> str`: When the job command wrapper script is initialized before the prescript is run. This will replace the placeholder `#![jobcmd_init]` in the wrapper script.\n- `def on_jobcmd_prep(scheduler, job) -> str`: When the job command is right about to run in the wrapper script. This will replace the placeholder `#![jobcmd_prep]` in the wrapper script.\n- `def on_jobcmd_end(scheduler, job) -> str`: When the job command wrapper script is about to end and after the postscript is run. This will replace the placeholder `#![jobcmd_end]` in the wrapper script.\n\nNote that all hooks are corotines except `on_init` and `on_shutdown`, that means you should also implement them as corotines (sync implementations are allowed but will be warned).\n\nTo implement a hook, you have to fetch the plugin manager:\n\n```python\nfrom simplug import Simplug\npm = Simplug('xqute')\n\n# or\nfrom xqute import simplug as pm\n```\n\nand then use the decorator `pm.impl`:\n\n```python\n@pm.impl\ndef on_init(scheduler):\n    ...\n```\n\n### Implementing a scheduler\n\nCurrently there are only 2 builtin schedulers: `local` and `sge`.\n\nOne can implement a scheduler by subclassing the `Scheduler` abstract class. There are three abstract methods that have to be implemented in the subclass:\n\n```python\nfrom xqute import Scheduer\n\nclass MyScheduler(Scheduler):\n    name = 'my'\n    job_class: MyJob\n\n    async def submit_job(self, job):\n        \"\"\"How to submit a job, return a unique id in the scheduler system\n        (the pid for local scheduler for example)\n        \"\"\"\n\n    async def kill_job(self, job):\n        \"\"\"How to kill a job\"\"\"\n\n    async def job_is_running(self, job):\n        \"\"\"Check if a job is running\"\"\"\n```\n\nAs you may see, we may also need to implement a job class before `MyScheduler`. The only abstract method to be implemented is `wrap_cmd`:\n\n```python\nfrom xqute import Job\n\nclass MyJob(Job):\n\n    def shebang(self, scheduler: Scheduler) -> str:\n        \"\"\"The shebang and the options for the job script\"\"\"\n```\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "A job management system for python",
    "version": "0.5.3",
    "project_urls": {
        "Homepage": "https://github.com/pwwang/xqute",
        "Repository": "https://github.com/pwwang/xqute"
    },
    "split_keywords": [],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "b2d61f9bdabc6078afd4c3adfd2c8f4a7de5f8ef20ab0fa1c5b974fdf8c7bbed",
                "md5": "c05d3134ba3243ac94a9beb3a202a2c5",
                "sha256": "0172bf092132ea6a83db64c7c502a49fa7a8e2e07f56c5d2cce83e56ddf5395b"
            },
            "downloads": -1,
            "filename": "xqute-0.5.3-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "c05d3134ba3243ac94a9beb3a202a2c5",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<4.0,>=3.8",
            "size": 24150,
            "upload_time": "2024-08-16T15:43:39",
            "upload_time_iso_8601": "2024-08-16T15:43:39.633088Z",
            "url": "https://files.pythonhosted.org/packages/b2/d6/1f9bdabc6078afd4c3adfd2c8f4a7de5f8ef20ab0fa1c5b974fdf8c7bbed/xqute-0.5.3-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "d8037fad1535dbe42f1bbdfbb1343ca49bbb1328425fadbbcfc93514ac72e816",
                "md5": "7d20aac1735ce250773c4383cfa29dc2",
                "sha256": "c7189a089a6f1c89755c7505cead669597b89ff2451cb3f2bf2b53decfa9f152"
            },
            "downloads": -1,
            "filename": "xqute-0.5.3.tar.gz",
            "has_sig": false,
            "md5_digest": "7d20aac1735ce250773c4383cfa29dc2",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<4.0,>=3.8",
            "size": 20275,
            "upload_time": "2024-08-16T15:43:40",
            "upload_time_iso_8601": "2024-08-16T15:43:40.569165Z",
            "url": "https://files.pythonhosted.org/packages/d8/03/7fad1535dbe42f1bbdfbb1343ca49bbb1328425fadbbcfc93514ac72e816/xqute-0.5.3.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-08-16 15:43:40",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "pwwang",
    "github_project": "xqute",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "tox": true,
    "lcname": "xqute"
}
        
Elapsed time: 0.30404s