Name | xqute JSON |
Version |
0.10.12
JSON |
| download |
home_page | https://github.com/pwwang/xqute |
Summary | A job management system for python |
upload_time | 2025-09-10 22:34:51 |
maintainer | None |
docs_url | None |
author | pwwang |
requires_python | <4.0,>=3.9 |
license | MIT |
keywords |
|
VCS |
 |
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
No Travis.
|
coveralls test coverage |
|
# xqute
A job management system for Python, designed to simplify job scheduling and execution with support for multiple schedulers and plugins.
## Features
- Written in async for high performance
- Plugin system for extensibility
- Scheduler adaptor for various backends
- Job retrying and pipeline halting on failure
- Support for cloud-based working directories
- Built-in support for Google Batch Jobs, Slurm, SGE, SSH, and container schedulers
## Installation
```shell
pip install xqute
```
## A Toy Example
```python
import asyncio
from xqute import Xqute
async def main():
# Initialize Xqute with 3 jobs allowed to run concurrently
xqute = Xqute(forks=3)
for _ in range(10):
await xqute.put(['sleep', '1'])
await xqute.run_until_complete()
if __name__ == '__main__':
asyncio.run(main())
```

## API Documentation
Full API documentation is available at: <https://pwwang.github.io/xqute/>
## Usage
### Xqute Object
An `Xqute` object is initialized as follows:
```python
xqute = Xqute(...)
```
Available arguments are:
- `scheduler`: The scheduler class or name (default: `local`)
- `plugins`: Plugins to enable/disable for this session
- `workdir`: Directory for job metadata (default: `./.xqute/`)
- `forks`: Number of jobs allowed to run concurrently
- `error_strategy`: Strategy for handling errors (e.g., `halt`, `retry`)
- `num_retries`: Maximum number of retries when `error_strategy` is set to `retry`
- `submission_batch`: Number of jobs to submit in a batch
- `scheduler_opts`: Additional keyword arguments for the scheduler
- `jobname_prefix`: Prefix for job names
- `recheck_interval`: Interval (in seconds) to recheck job status
**Note:** The producer must be initialized within an event loop.
To add a job to the queue:
```python
await xqute.put(['echo', 'Hello, World!'])
```
### Using SGE Scheduler
```python
xqute = Xqute(
scheduler='sge',
forks=100,
scheduler_opts={
'qsub': '/path/to/qsub',
'qdel': '/path/to/qdel',
'qstat': '/path/to/qstat',
'q': '1-day', # or qsub_q='1-day'
}
)
```
Keyword arguments starting with `sge_` are interpreted as `qsub` options. For example:
```python
'l': ['h_vmem=2G', 'gpu=1']
```
will be expanded in the job script as:
```shell
#$ -l h_vmem=2G
#$ -l gpu=1
```
### Using Slurm Scheduler
```python
xqute = Xqute(
scheduler='slurm',
forks=100,
scheduler_opts={
'sbatch': '/path/to/sbatch',
'scancel': '/path/to/scancel',
'squeue': '/path/to/squeue',
'partition': '1-day',
'time': '01:00:00',
}
)
```
### Using SSH Scheduler
```python
xqute = Xqute(
scheduler='ssh',
forks=100,
scheduler_opts={
'ssh': '/path/to/ssh',
'servers': {
'server1': {
'user': 'username',
'port': 22,
'keyfile': '/path/to/keyfile',
'ctrl_persist': 600,
'ctrl_dir': '/tmp',
}
}
}
)
```
**Note:** SSH servers must share the same filesystem and use keyfile authentication.
### Using Google Batch Jobs Scheduler
```python
xqute = Xqute(
scheduler='gbatch',
forks=100,
scheduler_opts={
'project': 'your-gcp-project-id',
'location': 'us-central1',
'gcloud': '/path/to/gcloud',
'taskGroups': [ ... ],
}
)
```
### Using Container Scheduler
```python
xqute = Xqute(
scheduler='container',
forks=100,
scheduler_opts={
'image': 'docker://bash:latest',
'entrypoint': '/usr/local/bin/bash',
'bin': 'docker',
'volumes': '/host/path:/container/path',
'envs': {'MY_ENV_VAR': 'value'},
'remove': True,
'bin_args': ['--hostname', 'xqute-container'],
}
)
```
### Plugins
To create a plugin for `xqute`, implement the following hooks:
- `def on_init(scheduler)`: Called after the scheduler is initialized
- `def on_shutdown(scheduler, sig)`: Called when the scheduler shuts down
- `async def on_job_init(scheduler, job)`: Called when a job is initialized
- `async def on_job_queued(scheduler, job)`: Called when a job is queued
- `async def on_job_submitted(scheduler, job)`: Called when a job is submitted
- `async def on_job_started(scheduler, job)`: Called when a job starts running
- `async def on_job_polling(scheduler, job, counter)`: Called during job status polling
- `async def on_job_killing(scheduler, job)`: Called when a job is being killed
- `async def on_job_killed(scheduler, job)`: Called when a job is killed
- `async def on_job_failed(scheduler, job)`: Called when a job fails
- `async def on_job_succeeded(scheduler, job)`: Called when a job succeeds
- `def on_jobcmd_init(scheduler, job) -> str`: Called during job command initialization
- `def on_jobcmd_prep(scheduler, job) -> str`: Called before the job command runs
- `def on_jobcmd_end(scheduler, job) -> str`: Called after the job command completes
To implement a hook, use the `simplug` plugin manager:
```python
from xqute import simplug as pm
@pm.impl
def on_init(scheduler):
...
```
### Implementing a Scheduler
To create a custom scheduler, subclass the `Scheduler` abstract class and implement the following methods:
```python
from xqute import Scheduler
class MyScheduler(Scheduler):
name = 'mysched'
async def submit_job(self, job):
"""Submit a job and return its unique ID."""
async def kill_job(self, job):
"""Kill a job."""
async def job_is_running(self, job):
"""Check if a job is running."""
```
Raw data
{
"_id": null,
"home_page": "https://github.com/pwwang/xqute",
"name": "xqute",
"maintainer": null,
"docs_url": null,
"requires_python": "<4.0,>=3.9",
"maintainer_email": null,
"keywords": null,
"author": "pwwang",
"author_email": "pwwang@pwwang.com",
"download_url": "https://files.pythonhosted.org/packages/2d/75/62da8571fa875d6b7a0a48c04c19a5da91501a7c1d934024a4ce97154987/xqute-0.10.12.tar.gz",
"platform": null,
"description": "# xqute\n\nA job management system for Python, designed to simplify job scheduling and execution with support for multiple schedulers and plugins.\n\n## Features\n\n- Written in async for high performance\n- Plugin system for extensibility\n- Scheduler adaptor for various backends\n- Job retrying and pipeline halting on failure\n- Support for cloud-based working directories\n- Built-in support for Google Batch Jobs, Slurm, SGE, SSH, and container schedulers\n\n## Installation\n\n```shell\npip install xqute\n```\n\n## A Toy Example\n\n```python\nimport asyncio\nfrom xqute import Xqute\n\nasync def main():\n # Initialize Xqute with 3 jobs allowed to run concurrently\n xqute = Xqute(forks=3)\n for _ in range(10):\n await xqute.put(['sleep', '1'])\n await xqute.run_until_complete()\n\nif __name__ == '__main__':\n asyncio.run(main())\n```\n\n\n\n## API Documentation\n\nFull API documentation is available at: <https://pwwang.github.io/xqute/>\n\n## Usage\n\n### Xqute Object\n\nAn `Xqute` object is initialized as follows:\n\n```python\nxqute = Xqute(...)\n```\n\nAvailable arguments are:\n\n- `scheduler`: The scheduler class or name (default: `local`)\n- `plugins`: Plugins to enable/disable for this session\n- `workdir`: Directory for job metadata (default: `./.xqute/`)\n- `forks`: Number of jobs allowed to run concurrently\n- `error_strategy`: Strategy for handling errors (e.g., `halt`, `retry`)\n- `num_retries`: Maximum number of retries when `error_strategy` is set to `retry`\n- `submission_batch`: Number of jobs to submit in a batch\n- `scheduler_opts`: Additional keyword arguments for the scheduler\n- `jobname_prefix`: Prefix for job names\n- `recheck_interval`: Interval (in seconds) to recheck job status\n\n**Note:** The producer must be initialized within an event loop.\n\nTo add a job to the queue:\n\n```python\nawait xqute.put(['echo', 'Hello, World!'])\n```\n\n### Using SGE Scheduler\n\n```python\nxqute = Xqute(\n scheduler='sge',\n forks=100,\n scheduler_opts={\n 'qsub': '/path/to/qsub',\n 'qdel': '/path/to/qdel',\n 'qstat': '/path/to/qstat',\n 'q': '1-day', # or qsub_q='1-day'\n }\n)\n```\n\nKeyword arguments starting with `sge_` are interpreted as `qsub` options. For example:\n\n```python\n'l': ['h_vmem=2G', 'gpu=1']\n```\nwill be expanded in the job script as:\n\n```shell\n#$ -l h_vmem=2G\n#$ -l gpu=1\n```\n\n### Using Slurm Scheduler\n\n```python\nxqute = Xqute(\n scheduler='slurm',\n forks=100,\n scheduler_opts={\n 'sbatch': '/path/to/sbatch',\n 'scancel': '/path/to/scancel',\n 'squeue': '/path/to/squeue',\n 'partition': '1-day',\n 'time': '01:00:00',\n }\n)\n```\n\n### Using SSH Scheduler\n\n```python\nxqute = Xqute(\n scheduler='ssh',\n forks=100,\n scheduler_opts={\n 'ssh': '/path/to/ssh',\n 'servers': {\n 'server1': {\n 'user': 'username',\n 'port': 22,\n 'keyfile': '/path/to/keyfile',\n 'ctrl_persist': 600,\n 'ctrl_dir': '/tmp',\n }\n }\n }\n)\n```\n\n**Note:** SSH servers must share the same filesystem and use keyfile authentication.\n\n### Using Google Batch Jobs Scheduler\n\n```python\nxqute = Xqute(\n scheduler='gbatch',\n forks=100,\n scheduler_opts={\n 'project': 'your-gcp-project-id',\n 'location': 'us-central1',\n 'gcloud': '/path/to/gcloud',\n 'taskGroups': [ ... ],\n }\n)\n```\n\n### Using Container Scheduler\n\n```python\nxqute = Xqute(\n scheduler='container',\n forks=100,\n scheduler_opts={\n 'image': 'docker://bash:latest',\n 'entrypoint': '/usr/local/bin/bash',\n 'bin': 'docker',\n 'volumes': '/host/path:/container/path',\n 'envs': {'MY_ENV_VAR': 'value'},\n 'remove': True,\n 'bin_args': ['--hostname', 'xqute-container'],\n }\n)\n```\n\n### Plugins\n\nTo create a plugin for `xqute`, implement the following hooks:\n\n- `def on_init(scheduler)`: Called after the scheduler is initialized\n- `def on_shutdown(scheduler, sig)`: Called when the scheduler shuts down\n- `async def on_job_init(scheduler, job)`: Called when a job is initialized\n- `async def on_job_queued(scheduler, job)`: Called when a job is queued\n- `async def on_job_submitted(scheduler, job)`: Called when a job is submitted\n- `async def on_job_started(scheduler, job)`: Called when a job starts running\n- `async def on_job_polling(scheduler, job, counter)`: Called during job status polling\n- `async def on_job_killing(scheduler, job)`: Called when a job is being killed\n- `async def on_job_killed(scheduler, job)`: Called when a job is killed\n- `async def on_job_failed(scheduler, job)`: Called when a job fails\n- `async def on_job_succeeded(scheduler, job)`: Called when a job succeeds\n- `def on_jobcmd_init(scheduler, job) -> str`: Called during job command initialization\n- `def on_jobcmd_prep(scheduler, job) -> str`: Called before the job command runs\n- `def on_jobcmd_end(scheduler, job) -> str`: Called after the job command completes\n\nTo implement a hook, use the `simplug` plugin manager:\n\n```python\nfrom xqute import simplug as pm\n\n@pm.impl\ndef on_init(scheduler):\n ...\n```\n\n### Implementing a Scheduler\n\nTo create a custom scheduler, subclass the `Scheduler` abstract class and implement the following methods:\n\n```python\nfrom xqute import Scheduler\n\nclass MyScheduler(Scheduler):\n name = 'mysched'\n\n async def submit_job(self, job):\n \"\"\"Submit a job and return its unique ID.\"\"\"\n\n async def kill_job(self, job):\n \"\"\"Kill a job.\"\"\"\n\n async def job_is_running(self, job):\n \"\"\"Check if a job is running.\"\"\"\n```\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "A job management system for python",
"version": "0.10.12",
"project_urls": {
"Homepage": "https://github.com/pwwang/xqute",
"Repository": "https://github.com/pwwang/xqute"
},
"split_keywords": [],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "753b961c5116b5c09400aa3af46a09c4491d2d58396e22171a219217b5debfe1",
"md5": "0a256d69030d6e136eb0e396e7d3a597",
"sha256": "10dcd097cf20d41f28e4458c642a2464757c775a482333791a1e8689e97ee80e"
},
"downloads": -1,
"filename": "xqute-0.10.12-py3-none-any.whl",
"has_sig": false,
"md5_digest": "0a256d69030d6e136eb0e396e7d3a597",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "<4.0,>=3.9",
"size": 38888,
"upload_time": "2025-09-10T22:34:50",
"upload_time_iso_8601": "2025-09-10T22:34:50.477568Z",
"url": "https://files.pythonhosted.org/packages/75/3b/961c5116b5c09400aa3af46a09c4491d2d58396e22171a219217b5debfe1/xqute-0.10.12-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "2d7562da8571fa875d6b7a0a48c04c19a5da91501a7c1d934024a4ce97154987",
"md5": "f393633f8a425165e0a0a480016023c1",
"sha256": "6736230c02e1fe3e1d93be8fd909966e059fb088ba44bf713a088d3209207feb"
},
"downloads": -1,
"filename": "xqute-0.10.12.tar.gz",
"has_sig": false,
"md5_digest": "f393633f8a425165e0a0a480016023c1",
"packagetype": "sdist",
"python_version": "source",
"requires_python": "<4.0,>=3.9",
"size": 32493,
"upload_time": "2025-09-10T22:34:51",
"upload_time_iso_8601": "2025-09-10T22:34:51.696191Z",
"url": "https://files.pythonhosted.org/packages/2d/75/62da8571fa875d6b7a0a48c04c19a5da91501a7c1d934024a4ce97154987/xqute-0.10.12.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-09-10 22:34:51",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "pwwang",
"github_project": "xqute",
"travis_ci": false,
"coveralls": true,
"github_actions": true,
"tox": true,
"lcname": "xqute"
}