# Robit
Chronological Automation Service Framework
A robot for your bits!
(Pronounced "Row-Bit")
## Features
- Lightweight (only one installed dependencies pytz which is also lightweight)
- Can run with monitoring interface or headless
- Very simple and easy to set up and configure
### Interface
![Screenshot](./img/robit-screenshot.png)
## Usage
### Worker
- Code below is provided in the examples/worker_example.py file of this project.
```python
import random
from time import sleep
import robit
robit.set_time_zone('America/Edmonton')
robit.set_database_logging(True)
def function_to_alert_me(**kwargs):
print(f"ALARM !!!! {kwargs['alert_message']}")
wo = robit.Worker(
name='Robit Example Worker',
key='Your-Own-Unique-Worker-Key-That-Is-Secure',
web_server=True,
# web_server_address='0.0.0.0',
# web_server_port=8000,
alert_method=function_to_alert_me,
alert_health_threshold=99.0,
)
def function_sleep_short():
sleep(2)
return 'Slept for 2 seconds'
def function_sleep_for_time(sleep_time: int):
sleep(sleep_time)
return f'Slept for {sleep_time} seconds'
wo.add_job(
'Specific Sleep Period Function',
function_sleep_for_time,
method_kwargs={'sleep_time': 5},
group='Sleeping',
cron='* * * * *',
)
wo.add_job(
'Sleep 3 Seconds Function Every 5 Seconds',
function_sleep_for_time,
method_kwargs={'sleep_time': 3},
group='Sleeping',
cron='*/5 * * * * *',
)
wo.add_job(
'Sleep for Short Period',
function_sleep_short,
group='Sleeping',
cron='*/2 * * * *',
)
def function_random_fail_often():
if 3 <= random.randint(1,4):
division_by_zero = 5 / 0
sleep(4)
return 'No Error'
def function_random_fail_rare():
if 1 == random.randint(1,20):
division_by_zero = 5 / 0
sleep(4)
return 'No Error'
wo.add_job(
'A Function that Fails',
function_random_fail_often,
group='Failing',
cron='*/10 * * * * *',
retry_attempts=4,
)
wo.add_job(
'Might Fail Some Times',
function_random_fail_rare,
group='Failing',
cron='* * * * *',
)
def function_full_speed():
x = int()
for i in range(99999999):
x = i * i
sleep(1)
return f'Max multiplication result of {x:,}'
wo.add_job(
'Lower Delay Function',
function_full_speed,
group='Rapid Execution',
cron='* * * * * *',
)
def function_send_worker_information(worker: robit.Worker):
return f'Worker Success Count: {worker.success_count}'
wo.add_job('Send Worker Information', function_send_worker_information, group='Webhooks Or Database Update', cron='*/30 * * * * *')
if __name__ == '__main__':
wo.start()
```
The server will start and host a web portal on default port 8100 locally for you to view what is going on.
## Other Libraries Used
- Boostrap (Responsive UI)
- Alpine (Better UX)
- PyTZ (Awesome Time Zone Management)
Raw data
{
"_id": null,
"home_page": "https://github.com/stratusadv/robit",
"name": "robit",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.7",
"maintainer_email": null,
"keywords": "automation, bot, cron, cronjob, chronological, worker, job",
"author": "Nathan Johnson",
"author_email": "nathanj@stratusadv.com",
"download_url": "https://files.pythonhosted.org/packages/5a/1d/99a4c967b3df0c302a2e6e77ec8742a4cbd1e9c003694ac09ecb49d713d5/robit-0.4.8.tar.gz",
"platform": null,
"description": "# Robit\n\nChronological Automation Service Framework\n\nA robot for your bits! \n\n(Pronounced \"Row-Bit\")\n\n## Features\n\n- Lightweight (only one installed dependencies pytz which is also lightweight)\n- Can run with monitoring interface or headless\n- Very simple and easy to set up and configure \n\n### Interface\n\n![Screenshot](./img/robit-screenshot.png)\n\n## Usage\n\n### Worker\n\n- Code below is provided in the examples/worker_example.py file of this project.\n\n```python\nimport random\nfrom time import sleep\n\nimport robit\n\n\nrobit.set_time_zone('America/Edmonton')\nrobit.set_database_logging(True)\n\n\ndef function_to_alert_me(**kwargs):\n print(f\"ALARM !!!! {kwargs['alert_message']}\")\n\n\nwo = robit.Worker(\n name='Robit Example Worker',\n key='Your-Own-Unique-Worker-Key-That-Is-Secure',\n web_server=True,\n # web_server_address='0.0.0.0',\n # web_server_port=8000,\n alert_method=function_to_alert_me,\n alert_health_threshold=99.0,\n)\n\n\ndef function_sleep_short():\n sleep(2)\n return 'Slept for 2 seconds'\n\n\ndef function_sleep_for_time(sleep_time: int):\n sleep(sleep_time)\n return f'Slept for {sleep_time} seconds'\n\n\nwo.add_job(\n 'Specific Sleep Period Function',\n function_sleep_for_time,\n method_kwargs={'sleep_time': 5},\n group='Sleeping',\n cron='* * * * *',\n)\n\nwo.add_job(\n 'Sleep 3 Seconds Function Every 5 Seconds',\n function_sleep_for_time,\n method_kwargs={'sleep_time': 3},\n group='Sleeping',\n cron='*/5 * * * * *',\n)\n\nwo.add_job(\n 'Sleep for Short Period',\n function_sleep_short,\n group='Sleeping',\n cron='*/2 * * * *',\n)\n\n\ndef function_random_fail_often():\n if 3 <= random.randint(1,4):\n division_by_zero = 5 / 0\n sleep(4)\n return 'No Error'\n\n\ndef function_random_fail_rare():\n if 1 == random.randint(1,20):\n division_by_zero = 5 / 0\n sleep(4)\n return 'No Error'\n\n\nwo.add_job(\n 'A Function that Fails',\n function_random_fail_often,\n group='Failing',\n cron='*/10 * * * * *',\n retry_attempts=4,\n)\n\nwo.add_job(\n 'Might Fail Some Times',\n function_random_fail_rare,\n group='Failing',\n cron='* * * * *',\n)\n\n\ndef function_full_speed():\n x = int()\n for i in range(99999999):\n x = i * i\n sleep(1)\n return f'Max multiplication result of {x:,}'\n\n\nwo.add_job(\n 'Lower Delay Function',\n function_full_speed,\n group='Rapid Execution',\n cron='* * * * * *',\n)\n\n\ndef function_send_worker_information(worker: robit.Worker):\n return f'Worker Success Count: {worker.success_count}'\n\n\nwo.add_job('Send Worker Information', function_send_worker_information, group='Webhooks Or Database Update', cron='*/30 * * * * *')\n\n\nif __name__ == '__main__':\n wo.start()\n```\n\nThe server will start and host a web portal on default port 8100 locally for you to view what is going on.\n\n\n## Other Libraries Used\n\n- Boostrap (Responsive UI)\n- Alpine (Better UX)\n- PyTZ (Awesome Time Zone Management)\n\n\n",
"bugtrack_url": null,
"license": null,
"summary": "Service Worker Framework",
"version": "0.4.8",
"project_urls": {
"Homepage": "https://github.com/stratusadv/robit"
},
"split_keywords": [
"automation",
" bot",
" cron",
" cronjob",
" chronological",
" worker",
" job"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "be198d044f6d06433572ce039862e092c6980bc250e7dc0f8f23fdf13547c715",
"md5": "71968d316ee155ef34177991204f1e0b",
"sha256": "7372fd8f1b9c8a082e96221493125a3a8178e7707a912fc5c32c08a414a0766b"
},
"downloads": -1,
"filename": "robit-0.4.8-py3-none-any.whl",
"has_sig": false,
"md5_digest": "71968d316ee155ef34177991204f1e0b",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.7",
"size": 107853,
"upload_time": "2024-09-11T01:54:39",
"upload_time_iso_8601": "2024-09-11T01:54:39.619958Z",
"url": "https://files.pythonhosted.org/packages/be/19/8d044f6d06433572ce039862e092c6980bc250e7dc0f8f23fdf13547c715/robit-0.4.8-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "5a1d99a4c967b3df0c302a2e6e77ec8742a4cbd1e9c003694ac09ecb49d713d5",
"md5": "b228e4492fb7b0870415aeca81bead25",
"sha256": "076ecf50c32e38b3d854f90ee55861d65ff1f82334762bf21d9e4297701c371e"
},
"downloads": -1,
"filename": "robit-0.4.8.tar.gz",
"has_sig": false,
"md5_digest": "b228e4492fb7b0870415aeca81bead25",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.7",
"size": 94821,
"upload_time": "2024-09-11T01:54:41",
"upload_time_iso_8601": "2024-09-11T01:54:41.343365Z",
"url": "https://files.pythonhosted.org/packages/5a/1d/99a4c967b3df0c302a2e6e77ec8742a4cbd1e9c003694ac09ecb49d713d5/robit-0.4.8.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-09-11 01:54:41",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "stratusadv",
"github_project": "robit",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"requirements": [],
"lcname": "robit"
}