Name | xqute JSON |
Version |
0.5.3
JSON |
| download |
home_page | https://github.com/pwwang/xqute |
Summary | A job management system for python |
upload_time | 2024-08-16 15:43:40 |
maintainer | None |
docs_url | None |
author | pwwang |
requires_python | <4.0,>=3.8 |
license | MIT |
keywords |
|
VCS |
|
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
No Travis.
|
coveralls test coverage |
No coveralls.
|
# xqute
A job management system for python
## Features
- Written in async
- Plugin system
- Scheduler adaptor
- Job retrying/pipeline halting when failed
## Installation
```shell
pip install xqute
```
## A toy example
```python
import asyncio
from xqute import Xqute
async def main():
# 3 jobs allowed to run at the same time
xqute = Xqute(scheduler_forks=3)
for _ in range(10):
await xqute.put('sleep 1')
await xqute.run_until_complete()
if __name__ == '__main__':
asyncio.run(main())
```
![xqute](./xqute.png)
## API
<https://pwwang.github.io/xqute/>
## Usage
### Xqute object
An xqute is initialized by:
```python
xqute = Xqute(...)
```
Available arguments are:
- scheduler: The scheduler class or name
- plugins: The plugins to enable/disable for this session
- job_metadir: The job meta directory (Default: `./.xqute/`)
- job_error_strategy: The strategy when there is error happened
- job_num_retries: Max number of retries when job_error_strategy is retry
- job_submission_batch: The number of consumers to submit jobs
- scheduler_forks: Max number of job forks
- **scheduler_opts: Additional keyword arguments for scheduler
Note that the producer must be initialized in an event loop.
To push a job into the queue:
```python
await xqute.put(['echo', 1])
```
### Using SGE scheduler
```python
xqute = Xqute(
'sge',
scheduler_forks=100,
qsub='path to qsub',
qdel='path to qdel',
qstat='path to qstat',
sge_q='1-day', # or qsub_q='1-day'
...
)
```
Keyword-arguments with names starting with `sge_` will be interpreted as `qsub` options. `list` or `tuple` option values will be expanded. For example:
`sge_l=['h_vmem=2G', 'gpu=1']` will be expanded in wrapped script like this:
```shell
# ...
#$ -l h_vmem=2G
#$ -l gpu=1
# ...
```
### Using Slurm scheduler
```python
xqute = Xqute(
'slurm',
scheduler_forks=100,
sbatch='path to sbatch',
scancel='path to scancel',
squeue='path to squeue',
sbatch_partition='1-day', # or slurm_partition='1-day'
sbatch_time='01:00:00',
...
)
```
### Using ssh scheduler
```python
xqute = Xqute(
'ssh',
scheduler_forks=100,
ssh='path to ssh',
ssh_servers={
"server1": {
"user": ...,
"port": 22,
"keyfile": ...,
# How long to keep the ssh connection alive
"ctrl_persist": 600,
# Where to store the control socket
"ctrl_dir": "/tmp",
},
...
}
...
)
```
SSH servers must share the same filesystem and using keyfile authentication.
### Plugins
To write a plugin for `xqute`, you will need to implement the following hooks:
- `def on_init(scheduler)`: Right after scheduler object is initialized
- `def on_shutdown(scheduler, sig)`: When scheduler is shutting down
- `async def on_job_init(scheduler, job)`: When the job is initialized
- `async def on_job_queued(scheduler, job)`: When the job is queued
- `async def on_job_submitted(scheduler, job)`: When the job is submitted
- `async def on_job_started(scheduler, job)`: When the job is started (when status changed to running)
- `async def on_job_polling(scheduler, job)`: When job status is being polled
- `async def on_job_killing(scheduler, job)`: When the job is being killed
- `async def on_job_killed(scheduler, job)`: When the job is killed
- `async def on_job_failed(scheduler, job)`: When the job is failed
- `async def on_job_succeeded(scheduler, job)`: When the job is succeeded
- `def on_jobcmd_init(scheduler, job) -> str`: When the job command wrapper script is initialized before the prescript is run. This will replace the placeholder `#![jobcmd_init]` in the wrapper script.
- `def on_jobcmd_prep(scheduler, job) -> str`: When the job command is right about to run in the wrapper script. This will replace the placeholder `#![jobcmd_prep]` in the wrapper script.
- `def on_jobcmd_end(scheduler, job) -> str`: When the job command wrapper script is about to end and after the postscript is run. This will replace the placeholder `#![jobcmd_end]` in the wrapper script.
Note that all hooks are corotines except `on_init` and `on_shutdown`, that means you should also implement them as corotines (sync implementations are allowed but will be warned).
To implement a hook, you have to fetch the plugin manager:
```python
from simplug import Simplug
pm = Simplug('xqute')
# or
from xqute import simplug as pm
```
and then use the decorator `pm.impl`:
```python
@pm.impl
def on_init(scheduler):
...
```
### Implementing a scheduler
Currently there are only 2 builtin schedulers: `local` and `sge`.
One can implement a scheduler by subclassing the `Scheduler` abstract class. There are three abstract methods that have to be implemented in the subclass:
```python
from xqute import Scheduer
class MyScheduler(Scheduler):
name = 'my'
job_class: MyJob
async def submit_job(self, job):
"""How to submit a job, return a unique id in the scheduler system
(the pid for local scheduler for example)
"""
async def kill_job(self, job):
"""How to kill a job"""
async def job_is_running(self, job):
"""Check if a job is running"""
```
As you may see, we may also need to implement a job class before `MyScheduler`. The only abstract method to be implemented is `wrap_cmd`:
```python
from xqute import Job
class MyJob(Job):
def shebang(self, scheduler: Scheduler) -> str:
"""The shebang and the options for the job script"""
```
Raw data
{
"_id": null,
"home_page": "https://github.com/pwwang/xqute",
"name": "xqute",
"maintainer": null,
"docs_url": null,
"requires_python": "<4.0,>=3.8",
"maintainer_email": null,
"keywords": null,
"author": "pwwang",
"author_email": "pwwang@pwwang.com",
"download_url": "https://files.pythonhosted.org/packages/d8/03/7fad1535dbe42f1bbdfbb1343ca49bbb1328425fadbbcfc93514ac72e816/xqute-0.5.3.tar.gz",
"platform": null,
"description": "# xqute\n\nA job management system for python\n\n## Features\n\n- Written in async\n- Plugin system\n- Scheduler adaptor\n- Job retrying/pipeline halting when failed\n\n## Installation\n\n```shell\npip install xqute\n```\n\n## A toy example\n\n```python\nimport asyncio\nfrom xqute import Xqute\n\nasync def main():\n # 3 jobs allowed to run at the same time\n xqute = Xqute(scheduler_forks=3)\n for _ in range(10):\n await xqute.put('sleep 1')\n await xqute.run_until_complete()\n\nif __name__ == '__main__':\n asyncio.run(main())\n```\n\n![xqute](./xqute.png)\n\n## API\n\n<https://pwwang.github.io/xqute/>\n\n## Usage\n\n### Xqute object\n\nAn xqute is initialized by:\n\n```python\nxqute = Xqute(...)\n```\n\nAvailable arguments are:\n\n- scheduler: The scheduler class or name\n- plugins: The plugins to enable/disable for this session\n- job_metadir: The job meta directory (Default: `./.xqute/`)\n- job_error_strategy: The strategy when there is error happened\n- job_num_retries: Max number of retries when job_error_strategy is retry\n- job_submission_batch: The number of consumers to submit jobs\n- scheduler_forks: Max number of job forks\n- **scheduler_opts: Additional keyword arguments for scheduler\n\nNote that the producer must be initialized in an event loop.\n\nTo push a job into the queue:\n\n```python\nawait xqute.put(['echo', 1])\n```\n\n### Using SGE scheduler\n\n```python\nxqute = Xqute(\n 'sge',\n scheduler_forks=100,\n qsub='path to qsub',\n qdel='path to qdel',\n qstat='path to qstat',\n sge_q='1-day', # or qsub_q='1-day'\n ...\n)\n```\n\nKeyword-arguments with names starting with `sge_` will be interpreted as `qsub` options. `list` or `tuple` option values will be expanded. For example:\n`sge_l=['h_vmem=2G', 'gpu=1']` will be expanded in wrapped script like this:\n\n```shell\n# ...\n\n#$ -l h_vmem=2G\n#$ -l gpu=1\n\n# ...\n```\n\n### Using Slurm scheduler\n\n```python\nxqute = Xqute(\n 'slurm',\n scheduler_forks=100,\n sbatch='path to sbatch',\n scancel='path to scancel',\n squeue='path to squeue',\n sbatch_partition='1-day', # or slurm_partition='1-day'\n sbatch_time='01:00:00',\n ...\n)\n```\n\n### Using ssh scheduler\n\n```python\nxqute = Xqute(\n 'ssh',\n scheduler_forks=100,\n ssh='path to ssh',\n ssh_servers={\n \"server1\": {\n \"user\": ...,\n \"port\": 22,\n \"keyfile\": ...,\n # How long to keep the ssh connection alive\n \"ctrl_persist\": 600,\n # Where to store the control socket\n \"ctrl_dir\": \"/tmp\",\n },\n ...\n }\n ...\n)\n```\n\nSSH servers must share the same filesystem and using keyfile authentication.\n\n### Plugins\n\nTo write a plugin for `xqute`, you will need to implement the following hooks:\n\n- `def on_init(scheduler)`: Right after scheduler object is initialized\n- `def on_shutdown(scheduler, sig)`: When scheduler is shutting down\n- `async def on_job_init(scheduler, job)`: When the job is initialized\n- `async def on_job_queued(scheduler, job)`: When the job is queued\n- `async def on_job_submitted(scheduler, job)`: When the job is submitted\n- `async def on_job_started(scheduler, job)`: When the job is started (when status changed to running)\n- `async def on_job_polling(scheduler, job)`: When job status is being polled\n- `async def on_job_killing(scheduler, job)`: When the job is being killed\n- `async def on_job_killed(scheduler, job)`: When the job is killed\n- `async def on_job_failed(scheduler, job)`: When the job is failed\n- `async def on_job_succeeded(scheduler, job)`: When the job is succeeded\n- `def on_jobcmd_init(scheduler, job) -> str`: When the job command wrapper script is initialized before the prescript is run. This will replace the placeholder `#![jobcmd_init]` in the wrapper script.\n- `def on_jobcmd_prep(scheduler, job) -> str`: When the job command is right about to run in the wrapper script. This will replace the placeholder `#![jobcmd_prep]` in the wrapper script.\n- `def on_jobcmd_end(scheduler, job) -> str`: When the job command wrapper script is about to end and after the postscript is run. This will replace the placeholder `#![jobcmd_end]` in the wrapper script.\n\nNote that all hooks are corotines except `on_init` and `on_shutdown`, that means you should also implement them as corotines (sync implementations are allowed but will be warned).\n\nTo implement a hook, you have to fetch the plugin manager:\n\n```python\nfrom simplug import Simplug\npm = Simplug('xqute')\n\n# or\nfrom xqute import simplug as pm\n```\n\nand then use the decorator `pm.impl`:\n\n```python\n@pm.impl\ndef on_init(scheduler):\n ...\n```\n\n### Implementing a scheduler\n\nCurrently there are only 2 builtin schedulers: `local` and `sge`.\n\nOne can implement a scheduler by subclassing the `Scheduler` abstract class. There are three abstract methods that have to be implemented in the subclass:\n\n```python\nfrom xqute import Scheduer\n\nclass MyScheduler(Scheduler):\n name = 'my'\n job_class: MyJob\n\n async def submit_job(self, job):\n \"\"\"How to submit a job, return a unique id in the scheduler system\n (the pid for local scheduler for example)\n \"\"\"\n\n async def kill_job(self, job):\n \"\"\"How to kill a job\"\"\"\n\n async def job_is_running(self, job):\n \"\"\"Check if a job is running\"\"\"\n```\n\nAs you may see, we may also need to implement a job class before `MyScheduler`. The only abstract method to be implemented is `wrap_cmd`:\n\n```python\nfrom xqute import Job\n\nclass MyJob(Job):\n\n def shebang(self, scheduler: Scheduler) -> str:\n \"\"\"The shebang and the options for the job script\"\"\"\n```\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "A job management system for python",
"version": "0.5.3",
"project_urls": {
"Homepage": "https://github.com/pwwang/xqute",
"Repository": "https://github.com/pwwang/xqute"
},
"split_keywords": [],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "b2d61f9bdabc6078afd4c3adfd2c8f4a7de5f8ef20ab0fa1c5b974fdf8c7bbed",
"md5": "c05d3134ba3243ac94a9beb3a202a2c5",
"sha256": "0172bf092132ea6a83db64c7c502a49fa7a8e2e07f56c5d2cce83e56ddf5395b"
},
"downloads": -1,
"filename": "xqute-0.5.3-py3-none-any.whl",
"has_sig": false,
"md5_digest": "c05d3134ba3243ac94a9beb3a202a2c5",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "<4.0,>=3.8",
"size": 24150,
"upload_time": "2024-08-16T15:43:39",
"upload_time_iso_8601": "2024-08-16T15:43:39.633088Z",
"url": "https://files.pythonhosted.org/packages/b2/d6/1f9bdabc6078afd4c3adfd2c8f4a7de5f8ef20ab0fa1c5b974fdf8c7bbed/xqute-0.5.3-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "d8037fad1535dbe42f1bbdfbb1343ca49bbb1328425fadbbcfc93514ac72e816",
"md5": "7d20aac1735ce250773c4383cfa29dc2",
"sha256": "c7189a089a6f1c89755c7505cead669597b89ff2451cb3f2bf2b53decfa9f152"
},
"downloads": -1,
"filename": "xqute-0.5.3.tar.gz",
"has_sig": false,
"md5_digest": "7d20aac1735ce250773c4383cfa29dc2",
"packagetype": "sdist",
"python_version": "source",
"requires_python": "<4.0,>=3.8",
"size": 20275,
"upload_time": "2024-08-16T15:43:40",
"upload_time_iso_8601": "2024-08-16T15:43:40.569165Z",
"url": "https://files.pythonhosted.org/packages/d8/03/7fad1535dbe42f1bbdfbb1343ca49bbb1328425fadbbcfc93514ac72e816/xqute-0.5.3.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-08-16 15:43:40",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "pwwang",
"github_project": "xqute",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"tox": true,
"lcname": "xqute"
}