just-jobs


Namejust-jobs JSON
Version 2.1.0 PyPI version JSON
download
home_pagehttps://justjobs.thearchitector.dev
SummaryA friendly and lightweight wrapper for arq.
upload_time2023-07-13 04:10:31
maintainer
docs_urlNone
author
requires_python<4.0,>=3.7
licenseBSD-3-Clause
keywords jobs arq tasks celery redis
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # just-jobs

![GitHub Workflow Status](https://img.shields.io/github/actions/workflow/status/thearchitector/just-jobs/ci.yaml?label=tests&style=flat-square)
![PyPI - Downloads](https://img.shields.io/pypi/dw/just-jobs?style=flat-square)
![GitHub](https://img.shields.io/github/license/thearchitector/just-jobs?style=flat-square)
[![Buy a tree](https://img.shields.io/badge/Treeware-%F0%9F%8C%B3-lightgreen?style=flat-square)](https://ecologi.com/eliasgabriel?r=6128126916bfab8bd051026c)

A friendly and lightweight wrapper for [arq](https://arq-docs.helpmanual.io). just-jobs provides a simple interface on top of arq that implements additional functionality like synchronous job types (IO-bound vs. CPU-bound) and signed and secure task serialization.

Documentation: <https://justjobs.thearchitector.dev>.

Tested support on Python 3.7, 3.8, 3.9, and 3.10, 3.11.

```sh
$ pdm add just-jobs
# or
$ pip install --user just-jobs
```

## Features

just-jobs doesn't aim to replace the invocations that arq provides, only wrap some of them to make job creation and execution easier and better. It lets you:

- Define and run non-async jobs. Passing a non-async `@job` function to arq will run properly. Non-async jobs can also be defined as either IO-bound or CPU-bound, which changes how the job will be executed to prevent blocking the asyncio event loop.
- The arq `Context` parameter now works a lot like [FastAPI's `Request`](https://fastapi.tiangolo.com/advanced/using-request-directly/). It's no longer a required parameter, but if it exists, it will get set. It doesn't have to be named `ctx` either, only have the type `Context`.
- Specify a single `RedisSettings` within your `WorkerSettings` from which you can create a pool using `Settings.create_pool()`.
- Run jobs either immediately or via normal arq enqueueing.
- Use non-pickable job arguments and kwargs (supported by the [dill](http://dill.rtfd.io/) library).

## Usage

Using just-jobs is pretty straight forward:

### Add `@job()` to any function to make it a delayable job.

If the job is synchronous, specify its job type so just-jobs knows how to optimally run it. If you don't, you'll get an error. This helps encourage thoughtful and intentional job design while ensuring that the event loop is never blocked.

```python
@job(job_type=JobType.CPU_BOUND)
def complex_math(i: int, j: int, k: int)
```

If it's a coroutine function, you don't need to specify a job type (and will get a warning if you do).

```python
@job()
async def poll_reddit(subr: str)
```

### Invoke a job normally if you want to run it immediately.

Invoking a job as a regular function allows you to run a job as if it were one. If you have logic that you only want to execute when enqueued, include a parameter with type `Context` and check if it exists at runtime (functions with a `Context` that are run immediately will have that argument set to `None`).

```python
@job()
async def context_aware(ctx: Context, msg: str):
    if ctx:
        # enqueued then run by arq
        return f"hello {msg}"
    else:
        # invoked manually
        return f"bye {msg}"

await context_aware("world") == "bye world"

j = await p.enqueue_job("context_aware", "world")
await j.result() == "hello world"
```

### Define WorkerSettings using the `BaseSettings` metaclass.

The execution logic that `@job` provides requires some stuff. When you defining your WorkerSettings, you must declare `BaseSettings` as its metaclass to ensure that stuff exists.

```python
class Settings(metaclass=BaseSettings):
    redis_settings = ...
```

### Use `Settings.create_pool()`.

While you may elect to use `arq.connections.create_pool` as you would normally, using the `create_pool` function provided by your `Settings` class ensures the pool it creates always matches your worker's Redis and serialization settings (it will be less of a headache). It also lets you take advantage of additional functionality, namely that it can be used as an auto-closing context manager.

```python
# manually
pool = await Settings.create_pool()
await pool.close(close_connection_pool=True)

# or as an async context manager
async with Settings.create_pool() as pool:
    ...
```

### Enqueue your job.

just-jobs doesn't change the way in which you enqueue your jobs. Just use `await pool.enqueue_job(...)`.

```python
await pool.enqueue_job('complex_math', 2, 1, 3)
```

## Caveats

1. `arq.func()` and `@job()` are mutually exclusive. If you want to configure a job in the same way, pass the settings you would have passed to `func()` to `@job()` instead.

   ```python
   @job(job_type=JobType.CPU_BOUND, keep_result_forever=True, max_tries=10)
   def task(a: int, b: int):
      return a + b
   ```

2. There isn't support for asynchronous CPU-bound tasks. Currently, job types only configure the execution behavior of synchronous tasks (not coroutines). However, there are some valid cases for CPU-bound tasks that also need to be run in an asyncio context.

   At the moment, the best way to achieve this will be to create a synchronous CPU-bound task (so it runs in a separate process) that then invokes a coroutine via `asyncio.run`. If you intend on running the task in the current context from time to time, just return the coroutine instead and it will get automatically executed in the current event loop.

   ```python
   async _async_task(a: int, b: int, c: int):
       ab = await add(a, b)
       return await add(ab, c)

   @job(job_type=JobType.CPU_BOUND)
   def wrapper_cpu_bound(ctx: Context, a: int, b: int, c: int):
       task = _async_task(a, b, c)
       return asyncio.run(task) if ctx else task
   ```

## Example

The complete example is available at [docs/example.py](https://github.com/thearchitector/just-jobs/blob/main/docs/example.py) and should work out of the box. The snippet below is just an excerpt to show the features described above:

```python
from just_jobs import BaseSettings, Context, JobType, job

@job()
async def async_task(url: str):
    return url

@job(job_type=JobType.IO_BOUND)
def sync_task(ctx: Context, url: str):
    # if the context is present, this is being run from the arq listener
    if ctx:
        print(url)
    return url

class Settings(metaclass=BaseSettings):
    functions = [async_task, sync_task]
    redis_settings = RedisSettings(host="redis")

async def main():
    # create a Redis pool using the Settings already defined
    pool = await Settings.create_pool()
    # run the_task right now and return the url
    url = sync_task("https://www.theglassfiles.com")

    await pool.enqueue_job("async_task", "https://www.eliasfgabriel.com")
    await pool.enqueue_job("sync_task", "https://gianturl.net")

    await pool.close(close_connection_pool=True)
```

## License

This software is licensed under the [3-Clause BSD License](LICENSE).

This package is [Treeware](https://treeware.earth). If you use it in production, consider [**buying the world a tree**](https://ecologi.com/eliasgabriel?r=6128126916bfab8bd051026c) to thank me for my work. By contributing to my forest, you’ll be creating employment for local families and restoring wildlife habitats.

            

Raw data

            {
    "_id": null,
    "home_page": "https://justjobs.thearchitector.dev",
    "name": "just-jobs",
    "maintainer": "",
    "docs_url": null,
    "requires_python": "<4.0,>=3.7",
    "maintainer_email": "",
    "keywords": "jobs arq tasks celery redis",
    "author": "",
    "author_email": "Elias Gabriel <me@eliasfgabriel.com>",
    "download_url": "https://files.pythonhosted.org/packages/6e/af/278504654cd20ff39fa913a56cdda9d7dc8370b9e6a46517e04201853481/just_jobs-2.1.0.tar.gz",
    "platform": null,
    "description": "# just-jobs\n\n![GitHub Workflow Status](https://img.shields.io/github/actions/workflow/status/thearchitector/just-jobs/ci.yaml?label=tests&style=flat-square)\n![PyPI - Downloads](https://img.shields.io/pypi/dw/just-jobs?style=flat-square)\n![GitHub](https://img.shields.io/github/license/thearchitector/just-jobs?style=flat-square)\n[![Buy a tree](https://img.shields.io/badge/Treeware-%F0%9F%8C%B3-lightgreen?style=flat-square)](https://ecologi.com/eliasgabriel?r=6128126916bfab8bd051026c)\n\nA friendly and lightweight wrapper for [arq](https://arq-docs.helpmanual.io). just-jobs provides a simple interface on top of arq that implements additional functionality like synchronous job types (IO-bound vs. CPU-bound) and signed and secure task serialization.\n\nDocumentation: <https://justjobs.thearchitector.dev>.\n\nTested support on Python 3.7, 3.8, 3.9, and 3.10, 3.11.\n\n```sh\n$ pdm add just-jobs\n# or\n$ pip install --user just-jobs\n```\n\n## Features\n\njust-jobs doesn't aim to replace the invocations that arq provides, only wrap some of them to make job creation and execution easier and better. It lets you:\n\n- Define and run non-async jobs. Passing a non-async `@job` function to arq will run properly. Non-async jobs can also be defined as either IO-bound or CPU-bound, which changes how the job will be executed to prevent blocking the asyncio event loop.\n- The arq `Context` parameter now works a lot like [FastAPI's `Request`](https://fastapi.tiangolo.com/advanced/using-request-directly/). It's no longer a required parameter, but if it exists, it will get set. It doesn't have to be named `ctx` either, only have the type `Context`.\n- Specify a single `RedisSettings` within your `WorkerSettings` from which you can create a pool using `Settings.create_pool()`.\n- Run jobs either immediately or via normal arq enqueueing.\n- Use non-pickable job arguments and kwargs (supported by the [dill](http://dill.rtfd.io/) library).\n\n## Usage\n\nUsing just-jobs is pretty straight forward:\n\n### Add `@job()` to any function to make it a delayable job.\n\nIf the job is synchronous, specify its job type so just-jobs knows how to optimally run it. If you don't, you'll get an error. This helps encourage thoughtful and intentional job design while ensuring that the event loop is never blocked.\n\n```python\n@job(job_type=JobType.CPU_BOUND)\ndef complex_math(i: int, j: int, k: int)\n```\n\nIf it's a coroutine function, you don't need to specify a job type (and will get a warning if you do).\n\n```python\n@job()\nasync def poll_reddit(subr: str)\n```\n\n### Invoke a job normally if you want to run it immediately.\n\nInvoking a job as a regular function allows you to run a job as if it were one. If you have logic that you only want to execute when enqueued, include a parameter with type `Context` and check if it exists at runtime (functions with a `Context` that are run immediately will have that argument set to `None`).\n\n```python\n@job()\nasync def context_aware(ctx: Context, msg: str):\n    if ctx:\n        # enqueued then run by arq\n        return f\"hello {msg}\"\n    else:\n        # invoked manually\n        return f\"bye {msg}\"\n\nawait context_aware(\"world\") == \"bye world\"\n\nj = await p.enqueue_job(\"context_aware\", \"world\")\nawait j.result() == \"hello world\"\n```\n\n### Define WorkerSettings using the `BaseSettings` metaclass.\n\nThe execution logic that `@job` provides requires some stuff. When you defining your WorkerSettings, you must declare `BaseSettings` as its metaclass to ensure that stuff exists.\n\n```python\nclass Settings(metaclass=BaseSettings):\n    redis_settings = ...\n```\n\n### Use `Settings.create_pool()`.\n\nWhile you may elect to use `arq.connections.create_pool` as you would normally, using the `create_pool` function provided by your `Settings` class ensures the pool it creates always matches your worker's Redis and serialization settings (it will be less of a headache). It also lets you take advantage of additional functionality, namely that it can be used as an auto-closing context manager.\n\n```python\n# manually\npool = await Settings.create_pool()\nawait pool.close(close_connection_pool=True)\n\n# or as an async context manager\nasync with Settings.create_pool() as pool:\n    ...\n```\n\n### Enqueue your job.\n\njust-jobs doesn't change the way in which you enqueue your jobs. Just use `await pool.enqueue_job(...)`.\n\n```python\nawait pool.enqueue_job('complex_math', 2, 1, 3)\n```\n\n## Caveats\n\n1. `arq.func()` and `@job()` are mutually exclusive. If you want to configure a job in the same way, pass the settings you would have passed to `func()` to `@job()` instead.\n\n   ```python\n   @job(job_type=JobType.CPU_BOUND, keep_result_forever=True, max_tries=10)\n   def task(a: int, b: int):\n      return a + b\n   ```\n\n2. There isn't support for asynchronous CPU-bound tasks. Currently, job types only configure the execution behavior of synchronous tasks (not coroutines). However, there are some valid cases for CPU-bound tasks that also need to be run in an asyncio context.\n\n   At the moment, the best way to achieve this will be to create a synchronous CPU-bound task (so it runs in a separate process) that then invokes a coroutine via `asyncio.run`. If you intend on running the task in the current context from time to time, just return the coroutine instead and it will get automatically executed in the current event loop.\n\n   ```python\n   async _async_task(a: int, b: int, c: int):\n       ab = await add(a, b)\n       return await add(ab, c)\n\n   @job(job_type=JobType.CPU_BOUND)\n   def wrapper_cpu_bound(ctx: Context, a: int, b: int, c: int):\n       task = _async_task(a, b, c)\n       return asyncio.run(task) if ctx else task\n   ```\n\n## Example\n\nThe complete example is available at [docs/example.py](https://github.com/thearchitector/just-jobs/blob/main/docs/example.py) and should work out of the box. The snippet below is just an excerpt to show the features described above:\n\n```python\nfrom just_jobs import BaseSettings, Context, JobType, job\n\n@job()\nasync def async_task(url: str):\n    return url\n\n@job(job_type=JobType.IO_BOUND)\ndef sync_task(ctx: Context, url: str):\n    # if the context is present, this is being run from the arq listener\n    if ctx:\n        print(url)\n    return url\n\nclass Settings(metaclass=BaseSettings):\n    functions = [async_task, sync_task]\n    redis_settings = RedisSettings(host=\"redis\")\n\nasync def main():\n    # create a Redis pool using the Settings already defined\n    pool = await Settings.create_pool()\n    # run the_task right now and return the url\n    url = sync_task(\"https://www.theglassfiles.com\")\n\n    await pool.enqueue_job(\"async_task\", \"https://www.eliasfgabriel.com\")\n    await pool.enqueue_job(\"sync_task\", \"https://gianturl.net\")\n\n    await pool.close(close_connection_pool=True)\n```\n\n## License\n\nThis software is licensed under the [3-Clause BSD License](LICENSE).\n\nThis package is [Treeware](https://treeware.earth). If you use it in production, consider [**buying the world a tree**](https://ecologi.com/eliasgabriel?r=6128126916bfab8bd051026c) to thank me for my work. By contributing to my forest, you\u2019ll be creating employment for local families and restoring wildlife habitats.\n",
    "bugtrack_url": null,
    "license": "BSD-3-Clause",
    "summary": "A friendly and lightweight wrapper for arq.",
    "version": "2.1.0",
    "project_urls": {
        "Changelog": "https://github.com/thearchitector/just-jobs/blob/main/CHANGELOG.md",
        "Documentation": "https://justjobs.thearchitector.dev",
        "Homepage": "https://justjobs.thearchitector.dev",
        "Repository": "https://github.com/thearchitector/just-jobs"
    },
    "split_keywords": [
        "jobs",
        "arq",
        "tasks",
        "celery",
        "redis"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "4c71238878a74546a2fcbaf2e1f4becd971b3c5f062f90be8a59e4a42af27b5f",
                "md5": "d345ef3cf2f395b6655512a764bfeedf",
                "sha256": "5f6a0fb60bc195ba6a8abb29d11b486a626149353800e24006ac2d57d11df522"
            },
            "downloads": -1,
            "filename": "just_jobs-2.1.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "d345ef3cf2f395b6655512a764bfeedf",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<4.0,>=3.7",
            "size": 11418,
            "upload_time": "2023-07-13T04:10:29",
            "upload_time_iso_8601": "2023-07-13T04:10:29.717412Z",
            "url": "https://files.pythonhosted.org/packages/4c/71/238878a74546a2fcbaf2e1f4becd971b3c5f062f90be8a59e4a42af27b5f/just_jobs-2.1.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "6eaf278504654cd20ff39fa913a56cdda9d7dc8370b9e6a46517e04201853481",
                "md5": "091261d10c5b7c4b3826d7b11de92bee",
                "sha256": "fe32a886cb5484681141203c9353fbfe176bcb39a9fcd2def435f67b108e899b"
            },
            "downloads": -1,
            "filename": "just_jobs-2.1.0.tar.gz",
            "has_sig": false,
            "md5_digest": "091261d10c5b7c4b3826d7b11de92bee",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<4.0,>=3.7",
            "size": 14088,
            "upload_time": "2023-07-13T04:10:31",
            "upload_time_iso_8601": "2023-07-13T04:10:31.162703Z",
            "url": "https://files.pythonhosted.org/packages/6e/af/278504654cd20ff39fa913a56cdda9d7dc8370b9e6a46517e04201853481/just_jobs-2.1.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2023-07-13 04:10:31",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "thearchitector",
    "github_project": "just-jobs",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "just-jobs"
}
        
Elapsed time: 0.31132s