# Slurpy (SLUrm Rest api PYthon client)
Slurpy is a Python client for the [Slurm REST API](https://slurm.schedmd.com/rest.html).
Slurm is an open-source job scheduler for high performance compute environments.
Its REST API is a set of HTTP endpoints for submitting, monitoring, and managing compute jobs.
This Python client is a convenience library for interacting with that API.
## Features
- โจ **Multiple API versions** - Support for Slurm REST API v0.0.40, v0.0.41, and v0.0.42
- ๐ **JWT Authentication** - Built-in support for Slurm JWT token authentication
- ๐ **Sync & Async** - Both synchronous and asynchronous client support
- ๐ **Type Hints** - Complete type annotations for better IDE support
- ๐งช **Well Tested** - Comprehensive integration tests with real Slurm clusters
- ๐ฆ **Easy Installation** - Simple pip installation with minimal dependencies
## Installation
```bash
pip install ebi-slurpy
```
## Quick Start
Slurpy supports both **synchronous** and **asynchronous** clients. Choose the one that fits your application:
### Synchronous Client
```python
import slurpy.v0040 as slurpy
# Configure the client
configuration = slurpy.Configuration(
host="http://your-slurm-rest-api:6820"
)
# Set up authentication
configuration.api_key['user'] = "your-username"
configuration.api_key['token'] = "your-jwt-token"
# Create API client
with slurpy.ApiClient(configuration) as client:
api = slurpy.SlurmApi(client)
# Test connectivity
response = api.get_ping()
print(f"Cluster status: {response.to_dict()}")
# List all jobs
jobs_response = api.get_jobs()
jobs = jobs_response.to_dict()
print(f"Found {len(jobs['jobs'])} jobs")
```
### Asynchronous Client
```python
import asyncio
import slurpy.v0040.asyncio as slurpy
async def main():
# Configure the client
configuration = slurpy.Configuration(
host="http://your-slurm-rest-api:6820"
)
# Set up authentication
configuration.api_key['user'] = "your-username"
configuration.api_key['token'] = "your-jwt-token"
# Create API client
async with slurpy.ApiClient(configuration) as client:
api = slurpy.SlurmApi(client)
# Test connectivity
response = await api.get_ping()
print(f"Cluster status: {response.to_dict()}")
# List all jobs
jobs_response = await api.get_jobs()
jobs = jobs_response.to_dict()
print(f"Found {len(jobs['jobs'])} jobs")
if __name__ == "__main__":
asyncio.run(main())
```
### Environment Variables
You can use environment variables for configuration with both sync and async clients:
**Sync version:**
```python
import os
import slurpy.v0040 as slurpy
configuration = slurpy.Configuration(
host=os.getenv("SLURM_REST_URL", "http://localhost:6820")
)
configuration.api_key['user'] = os.getenv("SLURM_USER_NAME")
configuration.api_key['token'] = os.getenv("SLURM_USER_TOKEN")
```
**Async version:**
```python
import os
import slurpy.v0040.asyncio as slurpy
configuration = slurpy.Configuration(
host=os.getenv("SLURM_REST_URL", "http://localhost:6820")
)
configuration.api_key['user'] = os.getenv("SLURM_USER_NAME")
configuration.api_key['token'] = os.getenv("SLURM_USER_TOKEN")
```
## API Versions
Slurpy supports multiple Slurm REST API versions, each available in both sync and async variants:
### v0.0.40
```python
# Synchronous
import slurpy.v0040 as slurpy
# Asynchronous
import slurpy.v0040.asyncio as slurpy
```
### v0.0.41
```python
# Synchronous
import slurpy.v0041 as slurpy
# Asynchronous
import slurpy.v0041.asyncio as slurpy
```
### v0.0.42
```python
# Synchronous
import slurpy.v0042 as slurpy
# Asynchronous
import slurpy.v0042.asyncio as slurpy
```
> **Note:** The API interface is consistent across versions and between sync/async variants, but some features and response formats may differ between Slurm versions. Check the Slurm documentation for version-specific differences.
## Common Operations
### List Jobs
**Sync version:**
```python
def list_jobs():
with slurpy.ApiClient(configuration) as client:
api = slurpy.SlurmApi(client)
response = api.get_jobs()
jobs = response.to_dict()
for job in jobs['jobs']:
print(f"Job {job['job_id']}: {job['name']} ({job['job_state']})")
```
**Async version:**
```python
async def list_jobs():
async with slurpy.ApiClient(configuration) as client:
api = slurpy.SlurmApi(client)
response = await api.get_jobs()
jobs = response.to_dict()
for job in jobs['jobs']:
print(f"Job {job['job_id']}: {job['name']} ({job['job_state']})")
```
### Submit a Job
**Sync version:**
```python
def submit_job():
job_spec = {
"script": "#!/bin/bash\\nsleep 60\\necho 'Job completed'",
"job": {
"name": "my_job",
"current_working_directory": "/home/user",
"environment": ["PATH=/bin:/usr/bin"],
"ntasks": 1,
"time_limit": {"set": True, "number": 300}, # 5 minutes
}
}
with slurpy.ApiClient(configuration) as client:
api = slurpy.SlurmApi(client)
job_request = slurpy.JobSubmitReq.from_dict(job_spec)
response = api.post_job_submit(job_submit_req=job_request)
result = response.to_dict()
print(f"Job submitted with ID: {result['job_id']}")
```
**Async version:**
```python
async def submit_job():
job_spec = {
"script": "#!/bin/bash\\nsleep 60\\necho 'Job completed'",
"job": {
"name": "my_job",
"current_working_directory": "/home/user",
"environment": ["PATH=/bin:/usr/bin"],
"ntasks": 1,
"time_limit": {"set": True, "number": 300}, # 5 minutes
}
}
async with slurpy.ApiClient(configuration) as client:
api = slurpy.SlurmApi(client)
job_request = slurpy.JobSubmitReq.from_dict(job_spec)
response = await api.post_job_submit(job_submit_req=job_request)
result = response.to_dict()
print(f"Job submitted with ID: {result['job_id']}")
```
### Get Job Details
**Sync version:**
```python
def get_job(job_id: str):
with slurpy.ApiClient(configuration) as client:
api = slurpy.SlurmApi(client)
response = api.get_job(job_id=job_id)
job = response.to_dict()
print(f"Job {job_id} status: {job['jobs'][0]['job_state']}")
```
**Async version:**
```python
async def get_job(job_id: str):
async with slurpy.ApiClient(configuration) as client:
api = slurpy.SlurmApi(client)
response = await api.get_job(job_id=job_id)
job = response.to_dict()
print(f"Job {job_id} status: {job['jobs'][0]['job_state']}")
```
### Cancel a Job
**Sync version:**
```python
def cancel_job(job_id: str):
with slurpy.ApiClient(configuration) as client:
api = slurpy.SlurmApi(client)
response = api.delete_job(job_id)
print(f"Job {job_id} cancellation requested")
```
**Async version:**
```python
async def cancel_job(job_id: str):
async with slurpy.ApiClient(configuration) as client:
api = slurpy.SlurmApi(client)
response = await api.delete_job(job_id)
print(f"Job {job_id} cancellation requested")
```
## Error Handling
Error handling works the same for both sync and async clients:
**Sync version:**
```python
from slurpy.v0040.rest import ApiException
def robust_job_operation():
try:
with slurpy.ApiClient(configuration) as client:
api = slurpy.SlurmApi(client)
response = api.get_jobs()
return response.to_dict()
except ApiException as e:
print(f"API Error: {e.status} - {e.reason}")
print(f"Response body: {e.body}")
except Exception as e:
print(f"Unexpected error: {e}")
```
**Async version:**
```python
from slurpy.v0040.asyncio.rest import ApiException
async def robust_job_operation():
try:
async with slurpy.ApiClient(configuration) as client:
api = slurpy.SlurmApi(client)
response = await api.get_jobs()
return response.to_dict()
except ApiException as e:
print(f"API Error: {e.status} - {e.reason}")
print(f"Response body: {e.body}")
except Exception as e:
print(f"Unexpected error: {e}")
```
## Configuration Options
### SSL/TLS Configuration
```python
configuration = slurpy.Configuration(
host="https://secure-slurm-api:6820",
ssl_ca_cert="/path/to/ca-cert.pem", # CA certificate
cert_file="/path/to/client-cert.pem", # Client certificate
key_file="/path/to/client-key.pem", # Client private key
verify_ssl=True
)
```
## Complete Example
Here's a complete workflow example available in both sync and async versions:
### Synchronous Version
```python
#!/usr/bin/env python3
import os
from typing import Optional
import slurpy.v0040 as slurpy
from slurpy.v0040.rest import ApiException
def slurm_workflow():
"""Complete workflow: ping, submit job, monitor, cleanup."""
# Configuration
configuration = slurpy.Configuration(
host=os.getenv("SLURM_REST_URL", "http://localhost:6820")
)
configuration.api_key['user'] = os.getenv("SLURM_USER_NAME", "slurm")
configuration.api_key['token'] = os.getenv("SLURM_USER_TOKEN")
if not configuration.api_key['token']:
print("Error: SLURM_USER_TOKEN environment variable is required")
return
job_id: Optional[str] = None
try:
with slurpy.ApiClient(configuration) as client:
api = slurpy.SlurmApi(client)
# 1. Test connectivity
print("๐ Testing cluster connectivity...")
ping_response = api.get_ping()
print(f"โ
Cluster is responsive: {ping_response.to_dict()}")
# 2. Submit a job
print("\\n๐ค Submitting job...")
job_spec = {
"script": "#!/bin/bash\\nsleep 30\\necho 'Hello from Slurm!'",
"job": {
"name": "slurpy_demo",
"current_working_directory": "/tmp",
"environment": ["PATH=/bin:/usr/bin"],
"ntasks": 1,
"time_limit": {"set": True, "number": 120},
}
}
job_request = slurpy.JobSubmitReq.from_dict(job_spec)
submit_response = api.post_job_submit(job_submit_req=job_request)
result = submit_response.to_dict()
job_id = str(result['job_id'])
print(f"โ
Job submitted successfully with ID: {job_id}")
# 3. Monitor job
print(f"\\n๐ Monitoring job {job_id}...")
job_response = api.get_job(job_id=job_id)
job_data = job_response.to_dict()
job_info = job_data['jobs'][0]
print(f"๐ Job status: {job_info['job_state']}")
print(f"๐ Job details: {job_info['name']} on {job_info.get('nodes', 'pending')}")
# 4. List all current jobs
print("\\n๐ Current cluster jobs:")
jobs_response = api.get_jobs()
jobs = jobs_response.to_dict()
print(f"Found {len(jobs['jobs'])} total jobs in the cluster")
except ApiException as e:
print(f"โ Slurm API error: {e.status} - {e.reason}")
if e.body:
print(f"Response: {e.body}")
except Exception as e:
print(f"โ Unexpected error: {e}")
finally:
# Cleanup: Cancel the job if it was created
if job_id:
try:
with slurpy.ApiClient(configuration) as client:
api = slurpy.SlurmApi(client)
api.delete_job(job_id)
print(f"\\n๐งน Cleanup: Job {job_id} cancellation requested")
except Exception as cleanup_error:
print(f"โ ๏ธ Cleanup warning: Could not cancel job {job_id}: {cleanup_error}")
if __name__ == "__main__":
slurm_workflow()
```
### Asynchronous Version
```python
#!/usr/bin/env python3
import asyncio
import os
from typing import Optional
import slurpy.v0040.asyncio as slurpy
from slurpy.v0040.asyncio.rest import ApiException
async def slurm_workflow():
"""Complete workflow: ping, submit job, monitor, cleanup."""
# Configuration
configuration = slurpy.Configuration(
host=os.getenv("SLURM_REST_URL", "http://localhost:6820")
)
configuration.api_key['user'] = os.getenv("SLURM_USER_NAME", "slurm")
configuration.api_key['token'] = os.getenv("SLURM_USER_TOKEN")
if not configuration.api_key['token']:
print("Error: SLURM_USER_TOKEN environment variable is required")
return
job_id: Optional[str] = None
try:
async with slurpy.ApiClient(configuration) as client:
api = slurpy.SlurmApi(client)
# 1. Test connectivity
print("๐ Testing cluster connectivity...")
ping_response = await api.get_ping()
print(f"โ
Cluster is responsive: {ping_response.to_dict()}")
# 2. Submit a job
print("\\n๐ค Submitting job...")
job_spec = {
"script": "#!/bin/bash\\nsleep 30\\necho 'Hello from Slurm!'",
"job": {
"name": "slurpy_demo",
"current_working_directory": "/tmp",
"environment": ["PATH=/bin:/usr/bin"],
"ntasks": 1,
"time_limit": {"set": True, "number": 120},
}
}
job_request = slurpy.JobSubmitReq.from_dict(job_spec)
submit_response = await api.post_job_submit(job_submit_req=job_request)
result = submit_response.to_dict()
job_id = str(result['job_id'])
print(f"โ
Job submitted successfully with ID: {job_id}")
# 3. Monitor job
print(f"\\n๐ Monitoring job {job_id}...")
job_response = await api.get_job(job_id=job_id)
job_data = job_response.to_dict()
job_info = job_data['jobs'][0]
print(f"๐ Job status: {job_info['job_state']}")
print(f"๐ Job details: {job_info['name']} on {job_info.get('nodes', 'pending')}")
# 4. List all current jobs
print("\\n๐ Current cluster jobs:")
jobs_response = await api.get_jobs()
jobs = jobs_response.to_dict()
print(f"Found {len(jobs['jobs'])} total jobs in the cluster")
except ApiException as e:
print(f"โ Slurm API error: {e.status} - {e.reason}")
if e.body:
print(f"Response: {e.body}")
except Exception as e:
print(f"โ Unexpected error: {e}")
finally:
# Cleanup: Cancel the job if it was created
if job_id:
try:
async with slurpy.ApiClient(configuration) as client:
api = slurpy.SlurmApi(client)
await api.delete_job(job_id)
print(f"\\n๐งน Cleanup: Job {job_id} cancellation requested")
except Exception as cleanup_error:
print(f"โ ๏ธ Cleanup warning: Could not cancel job {job_id}: {cleanup_error}")
if __name__ == "__main__":
asyncio.run(slurm_workflow())
```
## License
This project is licensed under the Apache 2.0 License - see the LICENSE file for details.
## Support
- ๐ **Documentation**: Check the docstrings and type hints
- ๐ **Issues**: Report bugs on GitHub Issues
- ๐ฌ **Discussions**: Ask questions in GitHub Discussions
- ๐ง **Contact**: Reach out to the maintainers
## Related Projects
- [Slurm](https://slurm.schedmd.com/) - The original Slurm Workload Manager
- [Slurm REST API Documentation](https://slurm.schedmd.com/rest_api.html) - Official REST API docs
Raw data
{
"_id": null,
"home_page": "https://github.com/EBI-Metagenomics/slurpy",
"name": "ebi-slurpy",
"maintainer": null,
"docs_url": null,
"requires_python": "<4.0,>=3.11",
"maintainer_email": null,
"keywords": "slurm, hpc, rest-api, job-scheduler",
"author": "Sandy Rogers",
"author_email": "sandyr@ebi.ac.uk",
"download_url": "https://files.pythonhosted.org/packages/ab/a0/0f651e0fc9d9e07d3309212f89a71aa6ca74ab8b39e8e30c1a0366b73fff/ebi_slurpy-0.1.1.tar.gz",
"platform": null,
"description": "# Slurpy (SLUrm Rest api PYthon client)\n\nSlurpy is a Python client for the [Slurm REST API](https://slurm.schedmd.com/rest.html).\nSlurm is an open-source job scheduler for high performance compute environments.\nIts REST API is a set of HTTP endpoints for submitting, monitoring, and managing compute jobs.\nThis Python client is a convenience library for interacting with that API.\n\n## Features\n\n- \u2728 **Multiple API versions** - Support for Slurm REST API v0.0.40, v0.0.41, and v0.0.42\n- \ud83d\udd10 **JWT Authentication** - Built-in support for Slurm JWT token authentication\n- \ud83d\ude80 **Sync & Async** - Both synchronous and asynchronous client support\n- \ud83d\udc0d **Type Hints** - Complete type annotations for better IDE support\n- \ud83e\uddea **Well Tested** - Comprehensive integration tests with real Slurm clusters\n- \ud83d\udce6 **Easy Installation** - Simple pip installation with minimal dependencies\n\n## Installation\n\n```bash\npip install ebi-slurpy\n```\n\n## Quick Start\n\nSlurpy supports both **synchronous** and **asynchronous** clients. Choose the one that fits your application:\n\n### Synchronous Client\n\n```python\nimport slurpy.v0040 as slurpy\n\n# Configure the client\nconfiguration = slurpy.Configuration(\n host=\"http://your-slurm-rest-api:6820\"\n)\n\n# Set up authentication\nconfiguration.api_key['user'] = \"your-username\"\nconfiguration.api_key['token'] = \"your-jwt-token\"\n\n# Create API client\nwith slurpy.ApiClient(configuration) as client:\n api = slurpy.SlurmApi(client)\n \n # Test connectivity\n response = api.get_ping()\n print(f\"Cluster status: {response.to_dict()}\")\n \n # List all jobs\n jobs_response = api.get_jobs()\n jobs = jobs_response.to_dict()\n print(f\"Found {len(jobs['jobs'])} jobs\")\n```\n\n### Asynchronous Client\n\n```python\nimport asyncio\nimport slurpy.v0040.asyncio as slurpy\n\nasync def main():\n # Configure the client\n configuration = slurpy.Configuration(\n host=\"http://your-slurm-rest-api:6820\"\n )\n \n # Set up authentication\n configuration.api_key['user'] = \"your-username\"\n configuration.api_key['token'] = \"your-jwt-token\"\n \n # Create API client\n async with slurpy.ApiClient(configuration) as client:\n api = slurpy.SlurmApi(client)\n \n # Test connectivity\n response = await api.get_ping()\n print(f\"Cluster status: {response.to_dict()}\")\n \n # List all jobs\n jobs_response = await api.get_jobs()\n jobs = jobs_response.to_dict()\n print(f\"Found {len(jobs['jobs'])} jobs\")\n\nif __name__ == \"__main__\":\n asyncio.run(main())\n```\n\n### Environment Variables\n\nYou can use environment variables for configuration with both sync and async clients:\n\n**Sync version:**\n```python\nimport os\nimport slurpy.v0040 as slurpy\n\nconfiguration = slurpy.Configuration(\n host=os.getenv(\"SLURM_REST_URL\", \"http://localhost:6820\")\n)\n\nconfiguration.api_key['user'] = os.getenv(\"SLURM_USER_NAME\")\nconfiguration.api_key['token'] = os.getenv(\"SLURM_USER_TOKEN\")\n```\n\n**Async version:**\n```python\nimport os\nimport slurpy.v0040.asyncio as slurpy\n\nconfiguration = slurpy.Configuration(\n host=os.getenv(\"SLURM_REST_URL\", \"http://localhost:6820\")\n)\n\nconfiguration.api_key['user'] = os.getenv(\"SLURM_USER_NAME\")\nconfiguration.api_key['token'] = os.getenv(\"SLURM_USER_TOKEN\")\n```\n\n## API Versions\n\nSlurpy supports multiple Slurm REST API versions, each available in both sync and async variants:\n\n### v0.0.40\n```python\n# Synchronous\nimport slurpy.v0040 as slurpy\n\n# Asynchronous \nimport slurpy.v0040.asyncio as slurpy\n```\n\n### v0.0.41\n```python\n# Synchronous\nimport slurpy.v0041 as slurpy\n\n# Asynchronous\nimport slurpy.v0041.asyncio as slurpy\n```\n\n### v0.0.42\n```python\n# Synchronous\nimport slurpy.v0042 as slurpy\n\n# Asynchronous\nimport slurpy.v0042.asyncio as slurpy\n```\n\n> **Note:** The API interface is consistent across versions and between sync/async variants, but some features and response formats may differ between Slurm versions. Check the Slurm documentation for version-specific differences.\n\n## Common Operations\n\n### List Jobs\n\n**Sync version:**\n```python\ndef list_jobs():\n with slurpy.ApiClient(configuration) as client:\n api = slurpy.SlurmApi(client)\n \n response = api.get_jobs()\n jobs = response.to_dict()\n \n for job in jobs['jobs']:\n print(f\"Job {job['job_id']}: {job['name']} ({job['job_state']})\")\n```\n\n**Async version:**\n```python\nasync def list_jobs():\n async with slurpy.ApiClient(configuration) as client:\n api = slurpy.SlurmApi(client)\n \n response = await api.get_jobs()\n jobs = response.to_dict()\n \n for job in jobs['jobs']:\n print(f\"Job {job['job_id']}: {job['name']} ({job['job_state']})\")\n```\n\n### Submit a Job\n\n**Sync version:**\n```python\ndef submit_job():\n job_spec = {\n \"script\": \"#!/bin/bash\\\\nsleep 60\\\\necho 'Job completed'\",\n \"job\": {\n \"name\": \"my_job\",\n \"current_working_directory\": \"/home/user\",\n \"environment\": [\"PATH=/bin:/usr/bin\"],\n \"ntasks\": 1,\n \"time_limit\": {\"set\": True, \"number\": 300}, # 5 minutes\n }\n }\n \n with slurpy.ApiClient(configuration) as client:\n api = slurpy.SlurmApi(client)\n \n job_request = slurpy.JobSubmitReq.from_dict(job_spec)\n response = api.post_job_submit(job_submit_req=job_request)\n \n result = response.to_dict()\n print(f\"Job submitted with ID: {result['job_id']}\")\n```\n\n**Async version:**\n```python\nasync def submit_job():\n job_spec = {\n \"script\": \"#!/bin/bash\\\\nsleep 60\\\\necho 'Job completed'\",\n \"job\": {\n \"name\": \"my_job\",\n \"current_working_directory\": \"/home/user\",\n \"environment\": [\"PATH=/bin:/usr/bin\"],\n \"ntasks\": 1,\n \"time_limit\": {\"set\": True, \"number\": 300}, # 5 minutes\n }\n }\n \n async with slurpy.ApiClient(configuration) as client:\n api = slurpy.SlurmApi(client)\n \n job_request = slurpy.JobSubmitReq.from_dict(job_spec)\n response = await api.post_job_submit(job_submit_req=job_request)\n \n result = response.to_dict()\n print(f\"Job submitted with ID: {result['job_id']}\")\n```\n\n### Get Job Details\n\n**Sync version:**\n```python\ndef get_job(job_id: str):\n with slurpy.ApiClient(configuration) as client:\n api = slurpy.SlurmApi(client)\n \n response = api.get_job(job_id=job_id)\n job = response.to_dict()\n \n print(f\"Job {job_id} status: {job['jobs'][0]['job_state']}\")\n```\n\n**Async version:**\n```python\nasync def get_job(job_id: str):\n async with slurpy.ApiClient(configuration) as client:\n api = slurpy.SlurmApi(client)\n \n response = await api.get_job(job_id=job_id)\n job = response.to_dict()\n \n print(f\"Job {job_id} status: {job['jobs'][0]['job_state']}\")\n```\n\n### Cancel a Job\n\n**Sync version:**\n```python\ndef cancel_job(job_id: str):\n with slurpy.ApiClient(configuration) as client:\n api = slurpy.SlurmApi(client)\n \n response = api.delete_job(job_id)\n print(f\"Job {job_id} cancellation requested\")\n```\n\n**Async version:**\n```python\nasync def cancel_job(job_id: str):\n async with slurpy.ApiClient(configuration) as client:\n api = slurpy.SlurmApi(client)\n \n response = await api.delete_job(job_id)\n print(f\"Job {job_id} cancellation requested\")\n```\n\n## Error Handling\n\nError handling works the same for both sync and async clients:\n\n**Sync version:**\n```python\nfrom slurpy.v0040.rest import ApiException\n\ndef robust_job_operation():\n try:\n with slurpy.ApiClient(configuration) as client:\n api = slurpy.SlurmApi(client)\n response = api.get_jobs()\n return response.to_dict()\n \n except ApiException as e:\n print(f\"API Error: {e.status} - {e.reason}\")\n print(f\"Response body: {e.body}\")\n \n except Exception as e:\n print(f\"Unexpected error: {e}\")\n```\n\n**Async version:**\n```python\nfrom slurpy.v0040.asyncio.rest import ApiException\n\nasync def robust_job_operation():\n try:\n async with slurpy.ApiClient(configuration) as client:\n api = slurpy.SlurmApi(client)\n response = await api.get_jobs()\n return response.to_dict()\n \n except ApiException as e:\n print(f\"API Error: {e.status} - {e.reason}\")\n print(f\"Response body: {e.body}\")\n \n except Exception as e:\n print(f\"Unexpected error: {e}\")\n```\n\n## Configuration Options\n\n### SSL/TLS Configuration\n\n```python\nconfiguration = slurpy.Configuration(\n host=\"https://secure-slurm-api:6820\",\n ssl_ca_cert=\"/path/to/ca-cert.pem\", # CA certificate\n cert_file=\"/path/to/client-cert.pem\", # Client certificate\n key_file=\"/path/to/client-key.pem\", # Client private key\n verify_ssl=True\n)\n```\n\n## Complete Example\n\nHere's a complete workflow example available in both sync and async versions:\n\n### Synchronous Version\n\n```python\n#!/usr/bin/env python3\nimport os\nfrom typing import Optional\n\nimport slurpy.v0040 as slurpy\nfrom slurpy.v0040.rest import ApiException\n\n\ndef slurm_workflow():\n \"\"\"Complete workflow: ping, submit job, monitor, cleanup.\"\"\"\n \n # Configuration\n configuration = slurpy.Configuration(\n host=os.getenv(\"SLURM_REST_URL\", \"http://localhost:6820\")\n )\n configuration.api_key['user'] = os.getenv(\"SLURM_USER_NAME\", \"slurm\")\n configuration.api_key['token'] = os.getenv(\"SLURM_USER_TOKEN\")\n \n if not configuration.api_key['token']:\n print(\"Error: SLURM_USER_TOKEN environment variable is required\")\n return\n \n job_id: Optional[str] = None\n \n try:\n with slurpy.ApiClient(configuration) as client:\n api = slurpy.SlurmApi(client)\n \n # 1. Test connectivity\n print(\"\ud83d\udd0d Testing cluster connectivity...\")\n ping_response = api.get_ping()\n print(f\"\u2705 Cluster is responsive: {ping_response.to_dict()}\")\n \n # 2. Submit a job\n print(\"\\\\n\ud83d\udce4 Submitting job...\")\n job_spec = {\n \"script\": \"#!/bin/bash\\\\nsleep 30\\\\necho 'Hello from Slurm!'\",\n \"job\": {\n \"name\": \"slurpy_demo\",\n \"current_working_directory\": \"/tmp\",\n \"environment\": [\"PATH=/bin:/usr/bin\"],\n \"ntasks\": 1,\n \"time_limit\": {\"set\": True, \"number\": 120},\n }\n }\n \n job_request = slurpy.JobSubmitReq.from_dict(job_spec)\n submit_response = api.post_job_submit(job_submit_req=job_request)\n result = submit_response.to_dict()\n job_id = str(result['job_id'])\n print(f\"\u2705 Job submitted successfully with ID: {job_id}\")\n \n # 3. Monitor job\n print(f\"\\\\n\ud83d\udc40 Monitoring job {job_id}...\")\n job_response = api.get_job(job_id=job_id)\n job_data = job_response.to_dict()\n job_info = job_data['jobs'][0]\n print(f\"\ud83d\udcca Job status: {job_info['job_state']}\")\n print(f\"\ud83d\udccb Job details: {job_info['name']} on {job_info.get('nodes', 'pending')}\")\n \n # 4. List all current jobs\n print(\"\\\\n\ud83d\udccb Current cluster jobs:\")\n jobs_response = api.get_jobs()\n jobs = jobs_response.to_dict()\n print(f\"Found {len(jobs['jobs'])} total jobs in the cluster\")\n \n except ApiException as e:\n print(f\"\u274c Slurm API error: {e.status} - {e.reason}\")\n if e.body:\n print(f\"Response: {e.body}\")\n \n except Exception as e:\n print(f\"\u274c Unexpected error: {e}\")\n \n finally:\n # Cleanup: Cancel the job if it was created\n if job_id:\n try:\n with slurpy.ApiClient(configuration) as client:\n api = slurpy.SlurmApi(client)\n api.delete_job(job_id)\n print(f\"\\\\n\ud83e\uddf9 Cleanup: Job {job_id} cancellation requested\")\n except Exception as cleanup_error:\n print(f\"\u26a0\ufe0f Cleanup warning: Could not cancel job {job_id}: {cleanup_error}\")\n\n\nif __name__ == \"__main__\":\n slurm_workflow()\n```\n\n### Asynchronous Version\n\n```python\n#!/usr/bin/env python3\nimport asyncio\nimport os\nfrom typing import Optional\n\nimport slurpy.v0040.asyncio as slurpy\nfrom slurpy.v0040.asyncio.rest import ApiException\n\n\nasync def slurm_workflow():\n \"\"\"Complete workflow: ping, submit job, monitor, cleanup.\"\"\"\n \n # Configuration\n configuration = slurpy.Configuration(\n host=os.getenv(\"SLURM_REST_URL\", \"http://localhost:6820\")\n )\n configuration.api_key['user'] = os.getenv(\"SLURM_USER_NAME\", \"slurm\")\n configuration.api_key['token'] = os.getenv(\"SLURM_USER_TOKEN\")\n \n if not configuration.api_key['token']:\n print(\"Error: SLURM_USER_TOKEN environment variable is required\")\n return\n \n job_id: Optional[str] = None\n \n try:\n async with slurpy.ApiClient(configuration) as client:\n api = slurpy.SlurmApi(client)\n \n # 1. Test connectivity\n print(\"\ud83d\udd0d Testing cluster connectivity...\")\n ping_response = await api.get_ping()\n print(f\"\u2705 Cluster is responsive: {ping_response.to_dict()}\")\n \n # 2. Submit a job\n print(\"\\\\n\ud83d\udce4 Submitting job...\")\n job_spec = {\n \"script\": \"#!/bin/bash\\\\nsleep 30\\\\necho 'Hello from Slurm!'\",\n \"job\": {\n \"name\": \"slurpy_demo\",\n \"current_working_directory\": \"/tmp\",\n \"environment\": [\"PATH=/bin:/usr/bin\"],\n \"ntasks\": 1,\n \"time_limit\": {\"set\": True, \"number\": 120},\n }\n }\n \n job_request = slurpy.JobSubmitReq.from_dict(job_spec)\n submit_response = await api.post_job_submit(job_submit_req=job_request)\n result = submit_response.to_dict()\n job_id = str(result['job_id'])\n print(f\"\u2705 Job submitted successfully with ID: {job_id}\")\n \n # 3. Monitor job\n print(f\"\\\\n\ud83d\udc40 Monitoring job {job_id}...\")\n job_response = await api.get_job(job_id=job_id)\n job_data = job_response.to_dict()\n job_info = job_data['jobs'][0]\n print(f\"\ud83d\udcca Job status: {job_info['job_state']}\")\n print(f\"\ud83d\udccb Job details: {job_info['name']} on {job_info.get('nodes', 'pending')}\")\n \n # 4. List all current jobs\n print(\"\\\\n\ud83d\udccb Current cluster jobs:\")\n jobs_response = await api.get_jobs()\n jobs = jobs_response.to_dict()\n print(f\"Found {len(jobs['jobs'])} total jobs in the cluster\")\n \n except ApiException as e:\n print(f\"\u274c Slurm API error: {e.status} - {e.reason}\")\n if e.body:\n print(f\"Response: {e.body}\")\n \n except Exception as e:\n print(f\"\u274c Unexpected error: {e}\")\n \n finally:\n # Cleanup: Cancel the job if it was created\n if job_id:\n try:\n async with slurpy.ApiClient(configuration) as client:\n api = slurpy.SlurmApi(client)\n await api.delete_job(job_id)\n print(f\"\\\\n\ud83e\uddf9 Cleanup: Job {job_id} cancellation requested\")\n except Exception as cleanup_error:\n print(f\"\u26a0\ufe0f Cleanup warning: Could not cancel job {job_id}: {cleanup_error}\")\n\n\nif __name__ == \"__main__\":\n asyncio.run(slurm_workflow())\n```\n\n## License\n\nThis project is licensed under the Apache 2.0 License - see the LICENSE file for details.\n\n## Support\n\n- \ud83d\udcd6 **Documentation**: Check the docstrings and type hints\n- \ud83d\udc1b **Issues**: Report bugs on GitHub Issues\n- \ud83d\udcac **Discussions**: Ask questions in GitHub Discussions\n- \ud83d\udce7 **Contact**: Reach out to the maintainers\n\n## Related Projects\n\n- [Slurm](https://slurm.schedmd.com/) - The original Slurm Workload Manager\n- [Slurm REST API Documentation](https://slurm.schedmd.com/rest_api.html) - Official REST API docs",
"bugtrack_url": null,
"license": "Apache-2.0",
"summary": "Python client for the Slurm REST API",
"version": "0.1.1",
"project_urls": {
"Homepage": "https://github.com/EBI-Metagenomics/slurpy",
"Repository": "https://github.com/EBI-Metagenomics/slurpy"
},
"split_keywords": [
"slurm",
" hpc",
" rest-api",
" job-scheduler"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "e03bdf338fe2a3c3a9a7fe9fd8739cb45175074299d90db24ff8411100e9be86",
"md5": "253c1896ccf9c324bc52f131e49b23f6",
"sha256": "65e1904abcd8ab8e02bbae49027ff53478ef8ab94f97022cee4ca790ec7b9886"
},
"downloads": -1,
"filename": "ebi_slurpy-0.1.1-py3-none-any.whl",
"has_sig": false,
"md5_digest": "253c1896ccf9c324bc52f131e49b23f6",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "<4.0,>=3.11",
"size": 2734372,
"upload_time": "2025-09-01T14:40:47",
"upload_time_iso_8601": "2025-09-01T14:40:47.013322Z",
"url": "https://files.pythonhosted.org/packages/e0/3b/df338fe2a3c3a9a7fe9fd8739cb45175074299d90db24ff8411100e9be86/ebi_slurpy-0.1.1-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "aba00f651e0fc9d9e07d3309212f89a71aa6ca74ab8b39e8e30c1a0366b73fff",
"md5": "0b4d65483689ad1c26ad07389cf95a22",
"sha256": "58bd29918c91d1eca809986b42f7241089c2491aaf526abbf9fc6086d4a44a42"
},
"downloads": -1,
"filename": "ebi_slurpy-0.1.1.tar.gz",
"has_sig": false,
"md5_digest": "0b4d65483689ad1c26ad07389cf95a22",
"packagetype": "sdist",
"python_version": "source",
"requires_python": "<4.0,>=3.11",
"size": 799359,
"upload_time": "2025-09-01T14:40:48",
"upload_time_iso_8601": "2025-09-01T14:40:48.895765Z",
"url": "https://files.pythonhosted.org/packages/ab/a0/0f651e0fc9d9e07d3309212f89a71aa6ca74ab8b39e8e30c1a0366b73fff/ebi_slurpy-0.1.1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-09-01 14:40:48",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "EBI-Metagenomics",
"github_project": "slurpy",
"travis_ci": false,
"coveralls": false,
"github_actions": false,
"lcname": "ebi-slurpy"
}