xqute


Namexqute JSON
Version 0.10.12 PyPI version JSON
download
home_pagehttps://github.com/pwwang/xqute
SummaryA job management system for python
upload_time2025-09-10 22:34:51
maintainerNone
docs_urlNone
authorpwwang
requires_python<4.0,>=3.9
licenseMIT
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage
            # xqute

A job management system for Python, designed to simplify job scheduling and execution with support for multiple schedulers and plugins.

## Features

- Written in async for high performance
- Plugin system for extensibility
- Scheduler adaptor for various backends
- Job retrying and pipeline halting on failure
- Support for cloud-based working directories
- Built-in support for Google Batch Jobs, Slurm, SGE, SSH, and container schedulers

## Installation

```shell
pip install xqute
```

## A Toy Example

```python
import asyncio
from xqute import Xqute

async def main():
    # Initialize Xqute with 3 jobs allowed to run concurrently
    xqute = Xqute(forks=3)
    for _ in range(10):
        await xqute.put(['sleep', '1'])
    await xqute.run_until_complete()

if __name__ == '__main__':
    asyncio.run(main())
```

![xqute](./xqute.png)

## API Documentation

Full API documentation is available at: <https://pwwang.github.io/xqute/>

## Usage

### Xqute Object

An `Xqute` object is initialized as follows:

```python
xqute = Xqute(...)
```

Available arguments are:

- `scheduler`: The scheduler class or name (default: `local`)
- `plugins`: Plugins to enable/disable for this session
- `workdir`: Directory for job metadata (default: `./.xqute/`)
- `forks`: Number of jobs allowed to run concurrently
- `error_strategy`: Strategy for handling errors (e.g., `halt`, `retry`)
- `num_retries`: Maximum number of retries when `error_strategy` is set to `retry`
- `submission_batch`: Number of jobs to submit in a batch
- `scheduler_opts`: Additional keyword arguments for the scheduler
- `jobname_prefix`: Prefix for job names
- `recheck_interval`: Interval (in seconds) to recheck job status

**Note:** The producer must be initialized within an event loop.

To add a job to the queue:

```python
await xqute.put(['echo', 'Hello, World!'])
```

### Using SGE Scheduler

```python
xqute = Xqute(
    scheduler='sge',
    forks=100,
    scheduler_opts={
        'qsub': '/path/to/qsub',
        'qdel': '/path/to/qdel',
        'qstat': '/path/to/qstat',
        'q': '1-day',  # or qsub_q='1-day'
    }
)
```

Keyword arguments starting with `sge_` are interpreted as `qsub` options. For example:

```python
'l': ['h_vmem=2G', 'gpu=1']
```
will be expanded in the job script as:

```shell
#$ -l h_vmem=2G
#$ -l gpu=1
```

### Using Slurm Scheduler

```python
xqute = Xqute(
    scheduler='slurm',
    forks=100,
    scheduler_opts={
        'sbatch': '/path/to/sbatch',
        'scancel': '/path/to/scancel',
        'squeue': '/path/to/squeue',
        'partition': '1-day',
        'time': '01:00:00',
    }
)
```

### Using SSH Scheduler

```python
xqute = Xqute(
    scheduler='ssh',
    forks=100,
    scheduler_opts={
        'ssh': '/path/to/ssh',
        'servers': {
            'server1': {
                'user': 'username',
                'port': 22,
                'keyfile': '/path/to/keyfile',
                'ctrl_persist': 600,
                'ctrl_dir': '/tmp',
            }
        }
    }
)
```

**Note:** SSH servers must share the same filesystem and use keyfile authentication.

### Using Google Batch Jobs Scheduler

```python
xqute = Xqute(
    scheduler='gbatch',
    forks=100,
    scheduler_opts={
        'project': 'your-gcp-project-id',
        'location': 'us-central1',
        'gcloud': '/path/to/gcloud',
        'taskGroups': [ ... ],
    }
)
```

### Using Container Scheduler

```python
xqute = Xqute(
    scheduler='container',
    forks=100,
    scheduler_opts={
        'image': 'docker://bash:latest',
        'entrypoint': '/usr/local/bin/bash',
        'bin': 'docker',
        'volumes': '/host/path:/container/path',
        'envs': {'MY_ENV_VAR': 'value'},
        'remove': True,
        'bin_args': ['--hostname', 'xqute-container'],
    }
)
```

### Plugins

To create a plugin for `xqute`, implement the following hooks:

- `def on_init(scheduler)`: Called after the scheduler is initialized
- `def on_shutdown(scheduler, sig)`: Called when the scheduler shuts down
- `async def on_job_init(scheduler, job)`: Called when a job is initialized
- `async def on_job_queued(scheduler, job)`: Called when a job is queued
- `async def on_job_submitted(scheduler, job)`: Called when a job is submitted
- `async def on_job_started(scheduler, job)`: Called when a job starts running
- `async def on_job_polling(scheduler, job, counter)`: Called during job status polling
- `async def on_job_killing(scheduler, job)`: Called when a job is being killed
- `async def on_job_killed(scheduler, job)`: Called when a job is killed
- `async def on_job_failed(scheduler, job)`: Called when a job fails
- `async def on_job_succeeded(scheduler, job)`: Called when a job succeeds
- `def on_jobcmd_init(scheduler, job) -> str`: Called during job command initialization
- `def on_jobcmd_prep(scheduler, job) -> str`: Called before the job command runs
- `def on_jobcmd_end(scheduler, job) -> str`: Called after the job command completes

To implement a hook, use the `simplug` plugin manager:

```python
from xqute import simplug as pm

@pm.impl
def on_init(scheduler):
    ...
```

### Implementing a Scheduler

To create a custom scheduler, subclass the `Scheduler` abstract class and implement the following methods:

```python
from xqute import Scheduler

class MyScheduler(Scheduler):
    name = 'mysched'

    async def submit_job(self, job):
        """Submit a job and return its unique ID."""

    async def kill_job(self, job):
        """Kill a job."""

    async def job_is_running(self, job):
        """Check if a job is running."""
```

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/pwwang/xqute",
    "name": "xqute",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<4.0,>=3.9",
    "maintainer_email": null,
    "keywords": null,
    "author": "pwwang",
    "author_email": "pwwang@pwwang.com",
    "download_url": "https://files.pythonhosted.org/packages/2d/75/62da8571fa875d6b7a0a48c04c19a5da91501a7c1d934024a4ce97154987/xqute-0.10.12.tar.gz",
    "platform": null,
    "description": "# xqute\n\nA job management system for Python, designed to simplify job scheduling and execution with support for multiple schedulers and plugins.\n\n## Features\n\n- Written in async for high performance\n- Plugin system for extensibility\n- Scheduler adaptor for various backends\n- Job retrying and pipeline halting on failure\n- Support for cloud-based working directories\n- Built-in support for Google Batch Jobs, Slurm, SGE, SSH, and container schedulers\n\n## Installation\n\n```shell\npip install xqute\n```\n\n## A Toy Example\n\n```python\nimport asyncio\nfrom xqute import Xqute\n\nasync def main():\n    # Initialize Xqute with 3 jobs allowed to run concurrently\n    xqute = Xqute(forks=3)\n    for _ in range(10):\n        await xqute.put(['sleep', '1'])\n    await xqute.run_until_complete()\n\nif __name__ == '__main__':\n    asyncio.run(main())\n```\n\n![xqute](./xqute.png)\n\n## API Documentation\n\nFull API documentation is available at: <https://pwwang.github.io/xqute/>\n\n## Usage\n\n### Xqute Object\n\nAn `Xqute` object is initialized as follows:\n\n```python\nxqute = Xqute(...)\n```\n\nAvailable arguments are:\n\n- `scheduler`: The scheduler class or name (default: `local`)\n- `plugins`: Plugins to enable/disable for this session\n- `workdir`: Directory for job metadata (default: `./.xqute/`)\n- `forks`: Number of jobs allowed to run concurrently\n- `error_strategy`: Strategy for handling errors (e.g., `halt`, `retry`)\n- `num_retries`: Maximum number of retries when `error_strategy` is set to `retry`\n- `submission_batch`: Number of jobs to submit in a batch\n- `scheduler_opts`: Additional keyword arguments for the scheduler\n- `jobname_prefix`: Prefix for job names\n- `recheck_interval`: Interval (in seconds) to recheck job status\n\n**Note:** The producer must be initialized within an event loop.\n\nTo add a job to the queue:\n\n```python\nawait xqute.put(['echo', 'Hello, World!'])\n```\n\n### Using SGE Scheduler\n\n```python\nxqute = Xqute(\n    scheduler='sge',\n    forks=100,\n    scheduler_opts={\n        'qsub': '/path/to/qsub',\n        'qdel': '/path/to/qdel',\n        'qstat': '/path/to/qstat',\n        'q': '1-day',  # or qsub_q='1-day'\n    }\n)\n```\n\nKeyword arguments starting with `sge_` are interpreted as `qsub` options. For example:\n\n```python\n'l': ['h_vmem=2G', 'gpu=1']\n```\nwill be expanded in the job script as:\n\n```shell\n#$ -l h_vmem=2G\n#$ -l gpu=1\n```\n\n### Using Slurm Scheduler\n\n```python\nxqute = Xqute(\n    scheduler='slurm',\n    forks=100,\n    scheduler_opts={\n        'sbatch': '/path/to/sbatch',\n        'scancel': '/path/to/scancel',\n        'squeue': '/path/to/squeue',\n        'partition': '1-day',\n        'time': '01:00:00',\n    }\n)\n```\n\n### Using SSH Scheduler\n\n```python\nxqute = Xqute(\n    scheduler='ssh',\n    forks=100,\n    scheduler_opts={\n        'ssh': '/path/to/ssh',\n        'servers': {\n            'server1': {\n                'user': 'username',\n                'port': 22,\n                'keyfile': '/path/to/keyfile',\n                'ctrl_persist': 600,\n                'ctrl_dir': '/tmp',\n            }\n        }\n    }\n)\n```\n\n**Note:** SSH servers must share the same filesystem and use keyfile authentication.\n\n### Using Google Batch Jobs Scheduler\n\n```python\nxqute = Xqute(\n    scheduler='gbatch',\n    forks=100,\n    scheduler_opts={\n        'project': 'your-gcp-project-id',\n        'location': 'us-central1',\n        'gcloud': '/path/to/gcloud',\n        'taskGroups': [ ... ],\n    }\n)\n```\n\n### Using Container Scheduler\n\n```python\nxqute = Xqute(\n    scheduler='container',\n    forks=100,\n    scheduler_opts={\n        'image': 'docker://bash:latest',\n        'entrypoint': '/usr/local/bin/bash',\n        'bin': 'docker',\n        'volumes': '/host/path:/container/path',\n        'envs': {'MY_ENV_VAR': 'value'},\n        'remove': True,\n        'bin_args': ['--hostname', 'xqute-container'],\n    }\n)\n```\n\n### Plugins\n\nTo create a plugin for `xqute`, implement the following hooks:\n\n- `def on_init(scheduler)`: Called after the scheduler is initialized\n- `def on_shutdown(scheduler, sig)`: Called when the scheduler shuts down\n- `async def on_job_init(scheduler, job)`: Called when a job is initialized\n- `async def on_job_queued(scheduler, job)`: Called when a job is queued\n- `async def on_job_submitted(scheduler, job)`: Called when a job is submitted\n- `async def on_job_started(scheduler, job)`: Called when a job starts running\n- `async def on_job_polling(scheduler, job, counter)`: Called during job status polling\n- `async def on_job_killing(scheduler, job)`: Called when a job is being killed\n- `async def on_job_killed(scheduler, job)`: Called when a job is killed\n- `async def on_job_failed(scheduler, job)`: Called when a job fails\n- `async def on_job_succeeded(scheduler, job)`: Called when a job succeeds\n- `def on_jobcmd_init(scheduler, job) -> str`: Called during job command initialization\n- `def on_jobcmd_prep(scheduler, job) -> str`: Called before the job command runs\n- `def on_jobcmd_end(scheduler, job) -> str`: Called after the job command completes\n\nTo implement a hook, use the `simplug` plugin manager:\n\n```python\nfrom xqute import simplug as pm\n\n@pm.impl\ndef on_init(scheduler):\n    ...\n```\n\n### Implementing a Scheduler\n\nTo create a custom scheduler, subclass the `Scheduler` abstract class and implement the following methods:\n\n```python\nfrom xqute import Scheduler\n\nclass MyScheduler(Scheduler):\n    name = 'mysched'\n\n    async def submit_job(self, job):\n        \"\"\"Submit a job and return its unique ID.\"\"\"\n\n    async def kill_job(self, job):\n        \"\"\"Kill a job.\"\"\"\n\n    async def job_is_running(self, job):\n        \"\"\"Check if a job is running.\"\"\"\n```\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "A job management system for python",
    "version": "0.10.12",
    "project_urls": {
        "Homepage": "https://github.com/pwwang/xqute",
        "Repository": "https://github.com/pwwang/xqute"
    },
    "split_keywords": [],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "753b961c5116b5c09400aa3af46a09c4491d2d58396e22171a219217b5debfe1",
                "md5": "0a256d69030d6e136eb0e396e7d3a597",
                "sha256": "10dcd097cf20d41f28e4458c642a2464757c775a482333791a1e8689e97ee80e"
            },
            "downloads": -1,
            "filename": "xqute-0.10.12-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "0a256d69030d6e136eb0e396e7d3a597",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<4.0,>=3.9",
            "size": 38888,
            "upload_time": "2025-09-10T22:34:50",
            "upload_time_iso_8601": "2025-09-10T22:34:50.477568Z",
            "url": "https://files.pythonhosted.org/packages/75/3b/961c5116b5c09400aa3af46a09c4491d2d58396e22171a219217b5debfe1/xqute-0.10.12-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "2d7562da8571fa875d6b7a0a48c04c19a5da91501a7c1d934024a4ce97154987",
                "md5": "f393633f8a425165e0a0a480016023c1",
                "sha256": "6736230c02e1fe3e1d93be8fd909966e059fb088ba44bf713a088d3209207feb"
            },
            "downloads": -1,
            "filename": "xqute-0.10.12.tar.gz",
            "has_sig": false,
            "md5_digest": "f393633f8a425165e0a0a480016023c1",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<4.0,>=3.9",
            "size": 32493,
            "upload_time": "2025-09-10T22:34:51",
            "upload_time_iso_8601": "2025-09-10T22:34:51.696191Z",
            "url": "https://files.pythonhosted.org/packages/2d/75/62da8571fa875d6b7a0a48c04c19a5da91501a7c1d934024a4ce97154987/xqute-0.10.12.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-09-10 22:34:51",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "pwwang",
    "github_project": "xqute",
    "travis_ci": false,
    "coveralls": true,
    "github_actions": true,
    "tox": true,
    "lcname": "xqute"
}
        
Elapsed time: 1.81187s