classwork


Nameclasswork JSON
Version 0.0.7 PyPI version JSON
download
home_page
SummaryA simple Python module to help distribute you tasks across multiple brokers as microservices
upload_time2023-06-05 02:28:50
maintainer
docs_urlNone
authorAnthony Mugendi
requires_python>=3.8,<4.0
licenseMIT
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            <!--
 Copyright (c) 2023 Anthony Mugendi
 
 This software is released under the MIT License.
 https://opensource.org/licenses/MIT
-->

# Classwork

This is a simple Python module to help distribute you tasks across multiple brokers as microservices.

Classwork uses [NATS](https://nats.io/) to manage communication between job schedulers and workers. Nats does most of the heavy lifting, allowing us to keep the rest of the code simple and concise.

## What is in name?
So why ClassWork? 
Well, this is simply because workers are simply **"python classes"** (The Class) whose **"methods"** become individual workers (The Students). 

## Get started

First let us create a simple worker

```python
import asyncio
from classwork import ClassWork


class WorkerClass:
    # Your worker...
    # Notice that we expect workers to be async
    async def add(self, a, b):
        # the work!
        resp = a + b
        # simulate a long process
        await asyncio.sleep(3)
        # return results
        return resp


# this is our nats server url
nats_url = "nats://127.0.0.1:4222"
# initialize ClassWork
class_work = ClassWork(nats_url=nats_url)
# init worker class
worker = WorkerClass()

# Ok, let us register things with asyncio
# notice the method 'class_work.register' is async!
asyncio.run(class_work.register(name="my-worker", worker_class=worker))

```

This is all the code we need to set up the worker.
It is important to note the following:

1. Worker Class methods should be asynchronous
2. class_work.register is also async
3. You will need NATS running with [JetStream](https://docs.nats.io/nats-concepts/jetstream) enabled!

## The Job Scheduler

Now we need to create the job scheduler. Below is the code we need

```python
import asyncio
import numpy as np
from pprint import pprint
from classwork import ClassWork

# init ClassWork
nats_url = "nats://127.0.0.1:4222"
class_work = ClassWork(nats_url=nats_url)

# Our callback function
# This is where complete work gets reported
async def report_callback(report_card):
    print("We have a report!")
    pprint(report_card)


# We need this function to create our job schedules
async def schedules():
    # Assign a job
    await class_work.assign(
        # the task name
        task="my_worker.add",
        # arguments
        args=[1, 2],
        # the callback to report our results
        report_callback=report_callback,
    )


# Ok, let us create the schedules now
asyncio.run(schedules())
```

This code will create a *task* into NATS and the job workers (attentive students ๐Ÿ˜‚) already listening will pick the task and run it, then publish their reports which is routed via NATS back to the scheduler (teacher?).

Take note of the following:
1. `class_work.assign` must be run in async mode. So we have wrapped it in an async method. You can also use `asyncio.run` directly.
2. Naming your task is very important. This naming convection is unashamedly borrowed from [moleculer](https://moleculer.services/). In this case, your task is **"my_worker.add"**. This will route to any worker class registered with the **name** "my_worker" and method "add". 
3. Because all this traffic is routed via NATS, your arguments must be JSON serializable. Even though we use [typ](https://github.com/vsapronov/typjson) to handle edge cases like `sets`, beware that there are limits to what you can pass in your arguments
4. `report_callback` must be async. It is called with a 'report' of your task. A report card ๐Ÿ˜Š will look like the one below:
5. `args` can be passed as a list or dict. They will be treated as `*args` if list and `**kwargs` if dict.

```
We have a report!
{'task': 'my_worker.add',
 'duration': {'latency': {'request': '4 ms and 356 ยตs',
                          'response': '5 s and 4 ms'},
              'my_worker.add': '3 s and 1 ms'},
 'req_id': '8mYjJjM0kb5',
 'response': 3}
```

## Report Explanation
- **duration:** is a high precision (down to yoctoseconds) report of the time taken.
    - **latency:** shows the time taken to route your "task **request**" to the worker and "task **response**" back to the scheduler. It is important to understand that since both worker and scheduler are disconnected, latency may also include delays of either to access the NATS network and thus does not specifically refer to network latency.
- **req_id:** is a unique id assigned to each job
- **response:** is the actual value returned by the worker


## Try it

We have sample code in [scheduler.py](scheduler.py) and [worker.py](worker.py)
            

Raw data

            {
    "_id": null,
    "home_page": "",
    "name": "classwork",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3.8,<4.0",
    "maintainer_email": "",
    "keywords": "",
    "author": "Anthony Mugendi",
    "author_email": "ngurumugz@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/ca/c7/775bb2a79ea4c16f3e9b3dc5ed40625c56d5dd72c78ac9db94858619bb74/classwork-0.0.7.tar.gz",
    "platform": null,
    "description": "<!--\n Copyright (c) 2023 Anthony Mugendi\n \n This software is released under the MIT License.\n https://opensource.org/licenses/MIT\n-->\n\n# Classwork\n\nThis is a simple Python module to help distribute you tasks across multiple brokers as microservices.\n\nClasswork uses [NATS](https://nats.io/) to manage communication between job schedulers and workers. Nats does most of the heavy lifting, allowing us to keep the rest of the code simple and concise.\n\n## What is in name?\nSo why ClassWork? \nWell, this is simply because workers are simply **\"python classes\"** (The Class) whose **\"methods\"** become individual workers (The Students). \n\n## Get started\n\nFirst let us create a simple worker\n\n```python\nimport asyncio\nfrom classwork import ClassWork\n\n\nclass WorkerClass:\n    # Your worker...\n    # Notice that we expect workers to be async\n    async def add(self, a, b):\n        # the work!\n        resp = a + b\n        # simulate a long process\n        await asyncio.sleep(3)\n        # return results\n        return resp\n\n\n# this is our nats server url\nnats_url = \"nats://127.0.0.1:4222\"\n# initialize ClassWork\nclass_work = ClassWork(nats_url=nats_url)\n# init worker class\nworker = WorkerClass()\n\n# Ok, let us register things with asyncio\n# notice the method 'class_work.register' is async!\nasyncio.run(class_work.register(name=\"my-worker\", worker_class=worker))\n\n```\n\nThis is all the code we need to set up the worker.\nIt is important to note the following:\n\n1. Worker Class methods should be asynchronous\n2. class_work.register is also async\n3. You will need NATS running with [JetStream](https://docs.nats.io/nats-concepts/jetstream) enabled!\n\n## The Job Scheduler\n\nNow we need to create the job scheduler. Below is the code we need\n\n```python\nimport asyncio\nimport numpy as np\nfrom pprint import pprint\nfrom classwork import ClassWork\n\n# init ClassWork\nnats_url = \"nats://127.0.0.1:4222\"\nclass_work = ClassWork(nats_url=nats_url)\n\n# Our callback function\n# This is where complete work gets reported\nasync def report_callback(report_card):\n    print(\"We have a report!\")\n    pprint(report_card)\n\n\n# We need this function to create our job schedules\nasync def schedules():\n    # Assign a job\n    await class_work.assign(\n        # the task name\n        task=\"my_worker.add\",\n        # arguments\n        args=[1, 2],\n        # the callback to report our results\n        report_callback=report_callback,\n    )\n\n\n# Ok, let us create the schedules now\nasyncio.run(schedules())\n```\n\nThis code will create a *task* into NATS and the job workers (attentive students \ud83d\ude02) already listening will pick the task and run it, then publish their reports which is routed via NATS back to the scheduler (teacher?).\n\nTake note of the following:\n1. `class_work.assign` must be run in async mode. So we have wrapped it in an async method. You can also use `asyncio.run` directly.\n2. Naming your task is very important. This naming convection is unashamedly borrowed from [moleculer](https://moleculer.services/). In this case, your task is **\"my_worker.add\"**. This will route to any worker class registered with the **name** \"my_worker\" and method \"add\". \n3. Because all this traffic is routed via NATS, your arguments must be JSON serializable. Even though we use [typ](https://github.com/vsapronov/typjson) to handle edge cases like `sets`, beware that there are limits to what you can pass in your arguments\n4. `report_callback` must be async. It is called with a 'report' of your task. A report card \ud83d\ude0a will look like the one below:\n5. `args` can be passed as a list or dict. They will be treated as `*args` if list and `**kwargs` if dict.\n\n```\nWe have a report!\n{'task': 'my_worker.add',\n 'duration': {'latency': {'request': '4 ms and 356 \u00b5s',\n                          'response': '5 s and 4 ms'},\n              'my_worker.add': '3 s and 1 ms'},\n 'req_id': '8mYjJjM0kb5',\n 'response': 3}\n```\n\n## Report Explanation\n- **duration:** is a high precision (down to yoctoseconds) report of the time taken.\n    - **latency:** shows the time taken to route your \"task **request**\" to the worker and \"task **response**\" back to the scheduler. It is important to understand that since both worker and scheduler are disconnected, latency may also include delays of either to access the NATS network and thus does not specifically refer to network latency.\n- **req_id:** is a unique id assigned to each job\n- **response:** is the actual value returned by the worker\n\n\n## Try it\n\nWe have sample code in [scheduler.py](scheduler.py) and [worker.py](worker.py)",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "A simple Python module to help distribute you tasks across multiple brokers as microservices",
    "version": "0.0.7",
    "project_urls": null,
    "split_keywords": [],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "0c41cce65d9bcbba49130b062b82d74fe188b787b67e8adb92df78b62677e1d3",
                "md5": "88c373ab93a1e5bf1d5c43653747c648",
                "sha256": "d177153da451a6501c5ccd9ae0096fc3c478fc61b317e88886ef6a2de92e01b2"
            },
            "downloads": -1,
            "filename": "classwork-0.0.7-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "88c373ab93a1e5bf1d5c43653747c648",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.8,<4.0",
            "size": 9205,
            "upload_time": "2023-06-05T02:28:48",
            "upload_time_iso_8601": "2023-06-05T02:28:48.412644Z",
            "url": "https://files.pythonhosted.org/packages/0c/41/cce65d9bcbba49130b062b82d74fe188b787b67e8adb92df78b62677e1d3/classwork-0.0.7-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "cac7775bb2a79ea4c16f3e9b3dc5ed40625c56d5dd72c78ac9db94858619bb74",
                "md5": "402225a8aae415f8b55d992237addef8",
                "sha256": "25f74c8a41830125e630a26b2313fbc5746e924dc533d7dee5e11bff62972d16"
            },
            "downloads": -1,
            "filename": "classwork-0.0.7.tar.gz",
            "has_sig": false,
            "md5_digest": "402225a8aae415f8b55d992237addef8",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.8,<4.0",
            "size": 7934,
            "upload_time": "2023-06-05T02:28:50",
            "upload_time_iso_8601": "2023-06-05T02:28:50.421427Z",
            "url": "https://files.pythonhosted.org/packages/ca/c7/775bb2a79ea4c16f3e9b3dc5ed40625c56d5dd72c78ac9db94858619bb74/classwork-0.0.7.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2023-06-05 02:28:50",
    "github": false,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "lcname": "classwork"
}
        
Elapsed time: 0.07266s