Name | xqute JSON |
Version |
0.7.0
JSON |
| download |
home_page | https://github.com/pwwang/xqute |
Summary | A job management system for python |
upload_time | 2025-02-06 18:58:43 |
maintainer | None |
docs_url | None |
author | pwwang |
requires_python | <4.0,>=3.9 |
license | MIT |
keywords |
|
VCS |
 |
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
No Travis.
|
coveralls test coverage |
No coveralls.
|
# xqute
A job management system for python
## Features
- Written in async
- Plugin system
- Scheduler adaptor
- Job retrying/pipeline halting when failed
## Installation
```shell
pip install xqute
```
## A toy example
```python
import asyncio
from xqute import Xqute
async def main():
# 3 jobs allowed to run at the same time
xqute = Xqute(forks=3)
for _ in range(10):
await xqute.put('sleep 1')
await xqute.run_until_complete()
if __name__ == '__main__':
asyncio.run(main())
```

## API
<https://pwwang.github.io/xqute/>
## Usage
### Xqute object
An xqute is initialized by:
```python
xqute = Xqute(...)
```
Available arguments are:
- scheduler: The scheduler class or name
- plugins: The plugins to enable/disable for this session
- workdir: The job meta directory (Default: `./.xqute/`)
- forks: The number of jobs allowed to run at the same time
- error_strategy: The strategy when there is error happened
- num_retries: Max number of retries when job_error_strategy is retry
- submission_batch: The number of consumers to submit jobs
- scheduler_opts: Additional keyword arguments for scheduler
- jobname_prefix: The prefix of the job name
Note that the producer must be initialized in an event loop.
To push a job into the queue:
```python
await xqute.put(['echo', 1])
```
### Using SGE scheduler
```python
xqute = Xqute(
'sge',
forks=100,
scheduler_opts=dict(
qsub='path to qsub',
qdel='path to qdel',
qstat='path to qstat',
q='1-day', # or qsub_q='1-day'
)
...
)
```
Keyword-arguments with names starting with `sge_` will be interpreted as `qsub` options. `list` or `tuple` option values will be expanded. For example:
`l=['h_vmem=2G', 'gpu=1']` will be expanded in wrapped script like this:
```shell
# ...
#$ -l h_vmem=2G
#$ -l gpu=1
# ...
```
### Using Slurm scheduler
```python
xqute = Xqute(
'slurm',
forks=100,
scheduler_opts = {
"sbatch": 'path to sbatch',
"scancel": 'path to scancel',
"squeue": 'path to squeue',
"partition": '1-day', # or partition='1-day'
"time": '01:00:00',
...
},
)
```
### Using ssh scheduler
```python
xqute = Xqute(
'ssh',
forks=100,
scheduler_opts={
"ssh": 'path to ssh',
"servers": {
"server1": {
"user": ...,
"port": 22,
"keyfile": ...,
# How long to keep the ssh connection alive
"ctrl_persist": 600,
# Where to store the control socket
"ctrl_dir": "/tmp",
},
...
}
},
...
)
```
SSH servers must share the same filesystem and using keyfile authentication.
### Plugins
To write a plugin for `xqute`, you will need to implement the following hooks:
- `def on_init(scheduler)`: Right after scheduler object is initialized
- `def on_shutdown(scheduler, sig)`: When scheduler is shutting down
- `async def on_job_init(scheduler, job)`: When the job is initialized
- `async def on_job_queued(scheduler, job)`: When the job is queued
- `async def on_job_submitted(scheduler, job)`: When the job is submitted
- `async def on_job_started(scheduler, job)`: When the job is started (when status changed to running)
- `async def on_job_polling(scheduler, job)`: When job status is being polled
- `async def on_job_killing(scheduler, job)`: When the job is being killed
- `async def on_job_killed(scheduler, job)`: When the job is killed
- `async def on_job_failed(scheduler, job)`: When the job is failed
- `async def on_job_succeeded(scheduler, job)`: When the job is succeeded
- `def on_jobcmd_init(scheduler, job) -> str`: When the job command wrapper script is initialized before the prescript is run. This will replace the placeholder `{jobcmd_init}` in the wrapper script.
- `def on_jobcmd_prep(scheduler, job) -> str`: When the job command is right about to run in the wrapper script. This will replace the placeholder `{jobcmd_prep}` in the wrapper script.
- `def on_jobcmd_end(scheduler, job) -> str`: When the job command wrapper script is about to end and after the postscript is run. This will replace the placeholder `{jobcmd_end}` in the wrapper script.
Note that all hooks are corotines except `on_init`, `on_shutdown` and `on_jobcmd_*`, that means you should also implement them as corotines (sync implementations are allowed but will be warned).
You may also check where the hooks are called in the following diagram:

To implement a hook, you have to fetch the plugin manager:
```python
from simplug import Simplug
pm = Simplug('xqute')
# or
from xqute import simplug as pm
```
and then use the decorator `pm.impl`:
```python
@pm.impl
def on_init(scheduler):
...
```
### Implementing a scheduler
Currently there are a few builtin schedulers: `local`, `slurm`, `gbatch` and `sge`.
One can implement a scheduler by subclassing the `Scheduler` abstract class. There are three abstract methods that have to be implemented in the subclass:
```python
from xqute import Scheduer
class MyScheduler(Scheduler):
name = 'mysched'
async def submit_job(self, job):
"""How to submit a job, return a unique id in the scheduler system
(the pid for local scheduler for example)
"""
async def kill_job(self, job):
"""How to kill a job"""
async def job_is_running(self, job):
"""Check if a job is running"""
```
Raw data
{
"_id": null,
"home_page": "https://github.com/pwwang/xqute",
"name": "xqute",
"maintainer": null,
"docs_url": null,
"requires_python": "<4.0,>=3.9",
"maintainer_email": null,
"keywords": null,
"author": "pwwang",
"author_email": "pwwang@pwwang.com",
"download_url": "https://files.pythonhosted.org/packages/67/7b/9225d144da0913b9535a1947c9c43b17e4bd95c859c685126eb6a8676fc2/xqute-0.7.0.tar.gz",
"platform": null,
"description": "# xqute\n\nA job management system for python\n\n## Features\n\n- Written in async\n- Plugin system\n- Scheduler adaptor\n- Job retrying/pipeline halting when failed\n\n## Installation\n\n```shell\npip install xqute\n```\n\n## A toy example\n\n```python\nimport asyncio\nfrom xqute import Xqute\n\nasync def main():\n # 3 jobs allowed to run at the same time\n xqute = Xqute(forks=3)\n for _ in range(10):\n await xqute.put('sleep 1')\n await xqute.run_until_complete()\n\nif __name__ == '__main__':\n asyncio.run(main())\n```\n\n\n\n## API\n\n<https://pwwang.github.io/xqute/>\n\n## Usage\n\n### Xqute object\n\nAn xqute is initialized by:\n\n```python\nxqute = Xqute(...)\n```\n\nAvailable arguments are:\n\n- scheduler: The scheduler class or name\n- plugins: The plugins to enable/disable for this session\n- workdir: The job meta directory (Default: `./.xqute/`)\n- forks: The number of jobs allowed to run at the same time\n- error_strategy: The strategy when there is error happened\n- num_retries: Max number of retries when job_error_strategy is retry\n- submission_batch: The number of consumers to submit jobs\n- scheduler_opts: Additional keyword arguments for scheduler\n- jobname_prefix: The prefix of the job name\n\nNote that the producer must be initialized in an event loop.\n\nTo push a job into the queue:\n\n```python\nawait xqute.put(['echo', 1])\n```\n\n### Using SGE scheduler\n\n```python\nxqute = Xqute(\n 'sge',\n forks=100,\n scheduler_opts=dict(\n qsub='path to qsub',\n qdel='path to qdel',\n qstat='path to qstat',\n q='1-day', # or qsub_q='1-day'\n )\n ...\n)\n```\n\nKeyword-arguments with names starting with `sge_` will be interpreted as `qsub` options. `list` or `tuple` option values will be expanded. For example:\n`l=['h_vmem=2G', 'gpu=1']` will be expanded in wrapped script like this:\n\n```shell\n# ...\n\n#$ -l h_vmem=2G\n#$ -l gpu=1\n\n# ...\n```\n\n### Using Slurm scheduler\n\n```python\nxqute = Xqute(\n 'slurm',\n forks=100,\n scheduler_opts = {\n \"sbatch\": 'path to sbatch',\n \"scancel\": 'path to scancel',\n \"squeue\": 'path to squeue',\n \"partition\": '1-day', # or partition='1-day'\n \"time\": '01:00:00',\n ...\n },\n)\n```\n\n### Using ssh scheduler\n\n```python\nxqute = Xqute(\n 'ssh',\n forks=100,\n scheduler_opts={\n \"ssh\": 'path to ssh',\n \"servers\": {\n \"server1\": {\n \"user\": ...,\n \"port\": 22,\n \"keyfile\": ...,\n # How long to keep the ssh connection alive\n \"ctrl_persist\": 600,\n # Where to store the control socket\n \"ctrl_dir\": \"/tmp\",\n },\n ...\n }\n },\n ...\n)\n```\n\nSSH servers must share the same filesystem and using keyfile authentication.\n\n### Plugins\n\nTo write a plugin for `xqute`, you will need to implement the following hooks:\n\n- `def on_init(scheduler)`: Right after scheduler object is initialized\n- `def on_shutdown(scheduler, sig)`: When scheduler is shutting down\n- `async def on_job_init(scheduler, job)`: When the job is initialized\n- `async def on_job_queued(scheduler, job)`: When the job is queued\n- `async def on_job_submitted(scheduler, job)`: When the job is submitted\n- `async def on_job_started(scheduler, job)`: When the job is started (when status changed to running)\n- `async def on_job_polling(scheduler, job)`: When job status is being polled\n- `async def on_job_killing(scheduler, job)`: When the job is being killed\n- `async def on_job_killed(scheduler, job)`: When the job is killed\n- `async def on_job_failed(scheduler, job)`: When the job is failed\n- `async def on_job_succeeded(scheduler, job)`: When the job is succeeded\n- `def on_jobcmd_init(scheduler, job) -> str`: When the job command wrapper script is initialized before the prescript is run. This will replace the placeholder `{jobcmd_init}` in the wrapper script.\n- `def on_jobcmd_prep(scheduler, job) -> str`: When the job command is right about to run in the wrapper script. This will replace the placeholder `{jobcmd_prep}` in the wrapper script.\n- `def on_jobcmd_end(scheduler, job) -> str`: When the job command wrapper script is about to end and after the postscript is run. This will replace the placeholder `{jobcmd_end}` in the wrapper script.\n\nNote that all hooks are corotines except `on_init`, `on_shutdown` and `on_jobcmd_*`, that means you should also implement them as corotines (sync implementations are allowed but will be warned).\n\nYou may also check where the hooks are called in the following diagram:\n\n\n\nTo implement a hook, you have to fetch the plugin manager:\n\n```python\nfrom simplug import Simplug\npm = Simplug('xqute')\n\n# or\nfrom xqute import simplug as pm\n```\n\nand then use the decorator `pm.impl`:\n\n```python\n@pm.impl\ndef on_init(scheduler):\n ...\n```\n\n### Implementing a scheduler\n\nCurrently there are a few builtin schedulers: `local`, `slurm`, `gbatch` and `sge`.\n\nOne can implement a scheduler by subclassing the `Scheduler` abstract class. There are three abstract methods that have to be implemented in the subclass:\n\n```python\nfrom xqute import Scheduer\n\n\nclass MyScheduler(Scheduler):\n name = 'mysched'\n\n async def submit_job(self, job):\n \"\"\"How to submit a job, return a unique id in the scheduler system\n (the pid for local scheduler for example)\n \"\"\"\n\n async def kill_job(self, job):\n \"\"\"How to kill a job\"\"\"\n\n async def job_is_running(self, job):\n \"\"\"Check if a job is running\"\"\"\n```\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "A job management system for python",
"version": "0.7.0",
"project_urls": {
"Homepage": "https://github.com/pwwang/xqute",
"Repository": "https://github.com/pwwang/xqute"
},
"split_keywords": [],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "17fa154ad63419e6616167c9cd53cf0a25c729d9ce0fed49d026ecb2c3dd3760",
"md5": "a08c4c702ae28a1c969db3bbb81a44c3",
"sha256": "bbd4d3b2b333ddf88d9247207c88f2e9c37a14d25f0fd3f4962b6f11a867d69c"
},
"downloads": -1,
"filename": "xqute-0.7.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "a08c4c702ae28a1c969db3bbb81a44c3",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "<4.0,>=3.9",
"size": 27203,
"upload_time": "2025-02-06T18:58:41",
"upload_time_iso_8601": "2025-02-06T18:58:41.304731Z",
"url": "https://files.pythonhosted.org/packages/17/fa/154ad63419e6616167c9cd53cf0a25c729d9ce0fed49d026ecb2c3dd3760/xqute-0.7.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "677b9225d144da0913b9535a1947c9c43b17e4bd95c859c685126eb6a8676fc2",
"md5": "c9c0023624b9b36cf1a7966f59cb9120",
"sha256": "585a268cdeab5c6f59235a578cea9736a32d2b8fd2093fd7b9076bf437d20458"
},
"downloads": -1,
"filename": "xqute-0.7.0.tar.gz",
"has_sig": false,
"md5_digest": "c9c0023624b9b36cf1a7966f59cb9120",
"packagetype": "sdist",
"python_version": "source",
"requires_python": "<4.0,>=3.9",
"size": 21683,
"upload_time": "2025-02-06T18:58:43",
"upload_time_iso_8601": "2025-02-06T18:58:43.059454Z",
"url": "https://files.pythonhosted.org/packages/67/7b/9225d144da0913b9535a1947c9c43b17e4bd95c859c685126eb6a8676fc2/xqute-0.7.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-02-06 18:58:43",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "pwwang",
"github_project": "xqute",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"tox": true,
"lcname": "xqute"
}