robit


Namerobit JSON
Version 0.4.8 PyPI version JSON
download
home_pagehttps://github.com/stratusadv/robit
SummaryService Worker Framework
upload_time2024-09-11 01:54:41
maintainerNone
docs_urlNone
authorNathan Johnson
requires_python>=3.7
licenseNone
keywords automation bot cron cronjob chronological worker job
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Robit

Chronological Automation Service Framework

A robot for your bits! 

(Pronounced "Row-Bit")

## Features

- Lightweight (only one installed dependencies pytz which is also lightweight)
- Can run with monitoring interface or headless
- Very simple and easy to set up and configure 

### Interface

![Screenshot](./img/robit-screenshot.png)

## Usage

### Worker

- Code below is provided in the examples/worker_example.py file of this project.

```python
import random
from time import sleep

import robit


robit.set_time_zone('America/Edmonton')
robit.set_database_logging(True)


def function_to_alert_me(**kwargs):
    print(f"ALARM !!!! {kwargs['alert_message']}")


wo = robit.Worker(
    name='Robit Example Worker',
    key='Your-Own-Unique-Worker-Key-That-Is-Secure',
    web_server=True,
    # web_server_address='0.0.0.0',
    # web_server_port=8000,
    alert_method=function_to_alert_me,
    alert_health_threshold=99.0,
)


def function_sleep_short():
    sleep(2)
    return 'Slept for 2 seconds'


def function_sleep_for_time(sleep_time: int):
    sleep(sleep_time)
    return f'Slept for {sleep_time} seconds'


wo.add_job(
    'Specific Sleep Period Function',
    function_sleep_for_time,
    method_kwargs={'sleep_time': 5},
    group='Sleeping',
    cron='* * * * *',
)

wo.add_job(
    'Sleep 3 Seconds Function Every 5 Seconds',
    function_sleep_for_time,
    method_kwargs={'sleep_time': 3},
    group='Sleeping',
    cron='*/5 * * * * *',
)

wo.add_job(
    'Sleep for Short Period',
    function_sleep_short,
    group='Sleeping',
    cron='*/2 * * * *',
)


def function_random_fail_often():
    if 3 <= random.randint(1,4):
        division_by_zero = 5 / 0
    sleep(4)
    return 'No Error'


def function_random_fail_rare():
    if 1 == random.randint(1,20):
        division_by_zero = 5 / 0
    sleep(4)
    return 'No Error'


wo.add_job(
    'A Function that Fails',
    function_random_fail_often,
    group='Failing',
    cron='*/10 * * * * *',
    retry_attempts=4,
)

wo.add_job(
    'Might Fail Some Times',
    function_random_fail_rare,
    group='Failing',
    cron='* * * * *',
)


def function_full_speed():
    x = int()
    for i in range(99999999):
        x = i * i
    sleep(1)
    return f'Max multiplication result of {x:,}'


wo.add_job(
    'Lower Delay Function',
    function_full_speed,
    group='Rapid Execution',
    cron='* * * * * *',
)


def function_send_worker_information(worker: robit.Worker):
    return f'Worker Success Count: {worker.success_count}'


wo.add_job('Send Worker Information', function_send_worker_information, group='Webhooks Or Database Update', cron='*/30 * * * * *')


if __name__ == '__main__':
    wo.start()
```

The server will start and host a web portal on default port 8100 locally for you to view what is going on.


## Other Libraries Used

- Boostrap (Responsive UI)
- Alpine (Better UX)
- PyTZ (Awesome Time Zone Management)



            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/stratusadv/robit",
    "name": "robit",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.7",
    "maintainer_email": null,
    "keywords": "automation, bot, cron, cronjob, chronological, worker, job",
    "author": "Nathan Johnson",
    "author_email": "nathanj@stratusadv.com",
    "download_url": "https://files.pythonhosted.org/packages/5a/1d/99a4c967b3df0c302a2e6e77ec8742a4cbd1e9c003694ac09ecb49d713d5/robit-0.4.8.tar.gz",
    "platform": null,
    "description": "# Robit\n\nChronological Automation Service Framework\n\nA robot for your bits! \n\n(Pronounced \"Row-Bit\")\n\n## Features\n\n- Lightweight (only one installed dependencies pytz which is also lightweight)\n- Can run with monitoring interface or headless\n- Very simple and easy to set up and configure \n\n### Interface\n\n![Screenshot](./img/robit-screenshot.png)\n\n## Usage\n\n### Worker\n\n- Code below is provided in the examples/worker_example.py file of this project.\n\n```python\nimport random\nfrom time import sleep\n\nimport robit\n\n\nrobit.set_time_zone('America/Edmonton')\nrobit.set_database_logging(True)\n\n\ndef function_to_alert_me(**kwargs):\n    print(f\"ALARM !!!! {kwargs['alert_message']}\")\n\n\nwo = robit.Worker(\n    name='Robit Example Worker',\n    key='Your-Own-Unique-Worker-Key-That-Is-Secure',\n    web_server=True,\n    # web_server_address='0.0.0.0',\n    # web_server_port=8000,\n    alert_method=function_to_alert_me,\n    alert_health_threshold=99.0,\n)\n\n\ndef function_sleep_short():\n    sleep(2)\n    return 'Slept for 2 seconds'\n\n\ndef function_sleep_for_time(sleep_time: int):\n    sleep(sleep_time)\n    return f'Slept for {sleep_time} seconds'\n\n\nwo.add_job(\n    'Specific Sleep Period Function',\n    function_sleep_for_time,\n    method_kwargs={'sleep_time': 5},\n    group='Sleeping',\n    cron='* * * * *',\n)\n\nwo.add_job(\n    'Sleep 3 Seconds Function Every 5 Seconds',\n    function_sleep_for_time,\n    method_kwargs={'sleep_time': 3},\n    group='Sleeping',\n    cron='*/5 * * * * *',\n)\n\nwo.add_job(\n    'Sleep for Short Period',\n    function_sleep_short,\n    group='Sleeping',\n    cron='*/2 * * * *',\n)\n\n\ndef function_random_fail_often():\n    if 3 <= random.randint(1,4):\n        division_by_zero = 5 / 0\n    sleep(4)\n    return 'No Error'\n\n\ndef function_random_fail_rare():\n    if 1 == random.randint(1,20):\n        division_by_zero = 5 / 0\n    sleep(4)\n    return 'No Error'\n\n\nwo.add_job(\n    'A Function that Fails',\n    function_random_fail_often,\n    group='Failing',\n    cron='*/10 * * * * *',\n    retry_attempts=4,\n)\n\nwo.add_job(\n    'Might Fail Some Times',\n    function_random_fail_rare,\n    group='Failing',\n    cron='* * * * *',\n)\n\n\ndef function_full_speed():\n    x = int()\n    for i in range(99999999):\n        x = i * i\n    sleep(1)\n    return f'Max multiplication result of {x:,}'\n\n\nwo.add_job(\n    'Lower Delay Function',\n    function_full_speed,\n    group='Rapid Execution',\n    cron='* * * * * *',\n)\n\n\ndef function_send_worker_information(worker: robit.Worker):\n    return f'Worker Success Count: {worker.success_count}'\n\n\nwo.add_job('Send Worker Information', function_send_worker_information, group='Webhooks Or Database Update', cron='*/30 * * * * *')\n\n\nif __name__ == '__main__':\n    wo.start()\n```\n\nThe server will start and host a web portal on default port 8100 locally for you to view what is going on.\n\n\n## Other Libraries Used\n\n- Boostrap (Responsive UI)\n- Alpine (Better UX)\n- PyTZ (Awesome Time Zone Management)\n\n\n",
    "bugtrack_url": null,
    "license": null,
    "summary": "Service Worker Framework",
    "version": "0.4.8",
    "project_urls": {
        "Homepage": "https://github.com/stratusadv/robit"
    },
    "split_keywords": [
        "automation",
        " bot",
        " cron",
        " cronjob",
        " chronological",
        " worker",
        " job"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "be198d044f6d06433572ce039862e092c6980bc250e7dc0f8f23fdf13547c715",
                "md5": "71968d316ee155ef34177991204f1e0b",
                "sha256": "7372fd8f1b9c8a082e96221493125a3a8178e7707a912fc5c32c08a414a0766b"
            },
            "downloads": -1,
            "filename": "robit-0.4.8-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "71968d316ee155ef34177991204f1e0b",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.7",
            "size": 107853,
            "upload_time": "2024-09-11T01:54:39",
            "upload_time_iso_8601": "2024-09-11T01:54:39.619958Z",
            "url": "https://files.pythonhosted.org/packages/be/19/8d044f6d06433572ce039862e092c6980bc250e7dc0f8f23fdf13547c715/robit-0.4.8-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "5a1d99a4c967b3df0c302a2e6e77ec8742a4cbd1e9c003694ac09ecb49d713d5",
                "md5": "b228e4492fb7b0870415aeca81bead25",
                "sha256": "076ecf50c32e38b3d854f90ee55861d65ff1f82334762bf21d9e4297701c371e"
            },
            "downloads": -1,
            "filename": "robit-0.4.8.tar.gz",
            "has_sig": false,
            "md5_digest": "b228e4492fb7b0870415aeca81bead25",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.7",
            "size": 94821,
            "upload_time": "2024-09-11T01:54:41",
            "upload_time_iso_8601": "2024-09-11T01:54:41.343365Z",
            "url": "https://files.pythonhosted.org/packages/5a/1d/99a4c967b3df0c302a2e6e77ec8742a4cbd1e9c003694ac09ecb49d713d5/robit-0.4.8.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-09-11 01:54:41",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "stratusadv",
    "github_project": "robit",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "requirements": [],
    "lcname": "robit"
}
        
Elapsed time: 0.29387s