# Ginny
A simple, convenient task manager that is similar to luigi framework but less blown up.
It allows easy exceution and scheduling of tasks locally and remotelty using argo workflows.
### Run locally
```python
from ginny import DownloadTask, run
result = run(DownloadTask(
url="https://static.wikia.nocookie.net/harrypotter/images/e/e9/Ginny-HQ-ginevra-ginny-weasley.jpg/revision/latest/scale-to-width-down/250?cb=20150228082608&path-prefix=de",
destination='image.jpg')
)
```
### Schedule tasks via command line
```bash
ginny --task ginny.DownloadTask url "https://static.wikia.nocookie.net/harrypotter/images/e/e9/Ginny-HQ-ginevra-ginny-weasley.jpg/revision/latest/scale-to-width-down/250?cb=20150228082608&path-prefix=de" destination "image.jpg"
# run every 5 minutes
ginny --task ginny.DownloadTask --every 'minute' --count 5 url "https://static.wikia.nocookie.net/harrypotter/images/e/e9/Ginny-HQ-ginevra-ginny-weasley.jpg/revision/latest/scale-to-width-down/250?cb=20150228082608&path-prefix=de" destination "image.jpg"
# EVERY DAY at 0:00
ginny --task ginny.DownloadTask --every 'day' --at "00:00" url "https://static.wikia.nocookie.net/harrypotter/images/e/e9/Ginny-HQ-ginevra-ginny-weasley.jpg/revision/latest/scale-to-width-down/250?cb=20150228082608&path-prefix=de" destination "image.jpg"
```
#### Build your own tasks
```python
from ginny import run, Task
import dataclasses
@dataclasses.dataclass(frozen=True)
class MyTask(Task):
url: str
def depends(self):
# return tasks or targets that this task depends on
# return LocalTarget("/tmp/data.json")
# return [LocalTarget("/tmp/data.json"), LocalTarget("/tmp/data2.json")]
return [LocalTarget("/tmp/data.json"), DownloadTask(self.url, "/tmp/data2.json")]
def run(self, *args, **kwargs):
target, download_task = self.depends()
data1 = target.read_json()
data2 = download_task.target().read_json()
data1.update(data2)
with self.target().open("w") as writer:
writer.write("done")
def target(self):
# define a target if the task should not be executed every time / has output data
return LocalTarget("/tmp/target.json")
# run the task (results of all tasks that will be executed are returned in results)
task = MyTask(url=...)
# delelte results of tasks
task.delete(recursive=False) # set recursive=True, to also delete results of subtasks
results = run(task)
```
### Buld-in tasks
```python
from ginny import BashTask, S3DownloadTask, DownloadTask, S3UploadTask, Task, SSHCommandTask, DepTask, TempDownloadTask, run
r = run(BashTask(['ls', '-lha']))
```
### Run Dag/Task with Argo Workflows (local targets will automatically become s3 targets)
Define argo config with storage via yaml (preferred) and save as `storage.yaml` or use `.from_env()` to load from environment vars
```yaml (argo_config.yaml)
namespace: "argo" # default
serviceAccountName: "argo-workflows" # default
storage:
key: "argo-workflows" # default
bucket: "ai-datastore" # required
region: "us-east-1" # required
endpoint: "s3.amazonaws.com" # default
accessKeySecret: # default
name: "argo-secret"
key: "ARGO_WORKFLOWS_ACCESS"
secretKeySecret: # default
name: "argo-secret"
key: "ARGO_WORKFLOWS_SECRET2"
```
Define tasks:
```python
import dataclasses
from typing import List
from src import GlobalVar, LocalTarget, Task, S3StorageConfig
@dataclasses.dataclass(frozen=True)
class A(Task):
pano_id: str
order_id: str = GlobalVar("order_id")
def run(self, *args, **kwargs):
self.target().write_text("hello")
def target(self):
return LocalTarget("/tmp/a.txt")
@dataclasses.dataclass(frozen=True)
class B(Task):
def run(self, *args, **kwargs):
self.target().write_text("hello")
def target(self):
return LocalTarget("/tmp/b.txt")
# define the workflow (allows to define global variables which are necessary to make the workflow run)
@dataclasses.dataclass(frozen=True)
class Pipeline(Task):
order_id: str = GlobalVar("order_id")
def depends(self) -> List[Task]:
a = A(order_id=self.order_id, pano_id="testing123")
b = B()
return [a, b]
def run(self, *args, **kwargs):
print("Running pipeline")
data1 = self.depends()[0].target().read_text()
print("Task A exists: ", self.depends()[0].target().exists())
print("Task A result: ", data1)
data2 = self.depends()[1].target().read_text()
print("Task B exists: ", self.depends()[1].target().exists())
print("Task B result: ", data2)
print("Total result: ")
print(data1 + data2)
```
Create the workflow yaml from the task
```python
### export the task graph as a workflow
task = Pipeline()
config = ArgoConfig.from_yaml("argo_config.yaml")
# use the base image here where your workflow will be defined and that has the requirements (ginny) installed
workflow = schedule_to_workflow(task, "a-b-process-test", config, base_image="baudcode/ginny_test:latest")
workflow.save("test_workflow.yaml")
```
Push test_workflow.yaml to argo workflows
```bash
argo submit -n argo --watch test-workflow.yaml
```
### Run dynamic tasks
Limit: Dynamic tasks are not allowed to have another dynamic task dependecy.
```python
# generate some parameters within some task (producer)
@dataclasses.dataclass(frozen=True)
class GenerateLines(Task):
def run(self, *args, **kwargs):
self.target()[2].set([
{"key": "testing123", "dummy": "1"},
{"key": "testing456", "dummy": "2"},
{"key": "testing4567", "dummy": "3"},
])
def target(self):
return [IterableParameterMap(name='data', keys=['key', 'dummy'])]
# consume one item
@dataclasses.dataclass(frozen=True)
class ProcessLine(Task):
key: str
dummy: str
def run(self, *args, **kwargs):
self.target().write_text(f"processed {self.key} {self.dummy}")
def target(self):
return LocalTarget(f"/tmp/processed_{self.key}.txt")
# run all in parallel
@dataclasses.dataclass(frozen=True)
class ProcessLines(DynamicTask):
@property
def taskclass(self):
return ProcessLine
@property
def parameter(self):
return [IterableParameterMap(name='data', keys=['pano_id', 'order_id'])]
def depends(self):
return [GenerateLines()]
```
### Connect task to argo events
```bash
WIP
```
### Development
```bash
python setup.py clean
pip install .
```
### TODO
- implement argo events and argo sensors to connect tasks to them and make it possible to simulate events comming from them
- use logging
- make dynamic tasks work with argo workflows
Raw data
{
"_id": null,
"home_page": "https://github.com/baudcode/ginny",
"name": "ginny",
"maintainer": "Malte Koch",
"docs_url": null,
"requires_python": null,
"maintainer_email": "malte-koch@gmx.net",
"keywords": "task, worker, schedule, machine, learning",
"author": "Malte Koch",
"author_email": "malte-koch@gmx.net",
"download_url": "https://files.pythonhosted.org/packages/1c/04/094a68d4ff06aea8c384d920d1d96875b753417a315db11110d18cfc723b/ginny-0.1.1.tar.gz",
"platform": null,
"description": "# Ginny\n\nA simple, convenient task manager that is similar to luigi framework but less blown up.\nIt allows easy exceution and scheduling of tasks locally and remotelty using argo workflows. \n\n### Run locally\n\n```python\nfrom ginny import DownloadTask, run\n\nresult = run(DownloadTask(\n url=\"https://static.wikia.nocookie.net/harrypotter/images/e/e9/Ginny-HQ-ginevra-ginny-weasley.jpg/revision/latest/scale-to-width-down/250?cb=20150228082608&path-prefix=de\", \n destination='image.jpg')\n)\n```\n\n### Schedule tasks via command line\n\n```bash\nginny --task ginny.DownloadTask url \"https://static.wikia.nocookie.net/harrypotter/images/e/e9/Ginny-HQ-ginevra-ginny-weasley.jpg/revision/latest/scale-to-width-down/250?cb=20150228082608&path-prefix=de\" destination \"image.jpg\" \n\n# run every 5 minutes\nginny --task ginny.DownloadTask --every 'minute' --count 5 url \"https://static.wikia.nocookie.net/harrypotter/images/e/e9/Ginny-HQ-ginevra-ginny-weasley.jpg/revision/latest/scale-to-width-down/250?cb=20150228082608&path-prefix=de\" destination \"image.jpg\"\n\n# EVERY DAY at 0:00\nginny --task ginny.DownloadTask --every 'day' --at \"00:00\" url \"https://static.wikia.nocookie.net/harrypotter/images/e/e9/Ginny-HQ-ginevra-ginny-weasley.jpg/revision/latest/scale-to-width-down/250?cb=20150228082608&path-prefix=de\" destination \"image.jpg\" \n```\n\n#### Build your own tasks\n\n```python\nfrom ginny import run, Task\nimport dataclasses\n\n@dataclasses.dataclass(frozen=True)\nclass MyTask(Task):\n url: str\n\n def depends(self):\n # return tasks or targets that this task depends on\n # return LocalTarget(\"/tmp/data.json\")\n # return [LocalTarget(\"/tmp/data.json\"), LocalTarget(\"/tmp/data2.json\")]\n return [LocalTarget(\"/tmp/data.json\"), DownloadTask(self.url, \"/tmp/data2.json\")]\n \n def run(self, *args, **kwargs):\n target, download_task = self.depends()\n data1 = target.read_json()\n data2 = download_task.target().read_json()\n data1.update(data2)\n\n with self.target().open(\"w\") as writer:\n writer.write(\"done\")\n \n def target(self):\n # define a target if the task should not be executed every time / has output data\n return LocalTarget(\"/tmp/target.json\")\n\n# run the task (results of all tasks that will be executed are returned in results)\ntask = MyTask(url=...)\n\n# delelte results of tasks\ntask.delete(recursive=False) # set recursive=True, to also delete results of subtasks\n\nresults = run(task)\n```\n\n\n### Buld-in tasks\n```python\nfrom ginny import BashTask, S3DownloadTask, DownloadTask, S3UploadTask, Task, SSHCommandTask, DepTask, TempDownloadTask, run\n\nr = run(BashTask(['ls', '-lha']))\n```\n\n### Run Dag/Task with Argo Workflows (local targets will automatically become s3 targets)\n\nDefine argo config with storage via yaml (preferred) and save as `storage.yaml` or use `.from_env()` to load from environment vars\n\n```yaml (argo_config.yaml)\nnamespace: \"argo\" # default\nserviceAccountName: \"argo-workflows\" # default\n\nstorage:\n key: \"argo-workflows\" # default\n bucket: \"ai-datastore\" # required\n region: \"us-east-1\" # required\n endpoint: \"s3.amazonaws.com\" # default\n\n accessKeySecret: # default\n name: \"argo-secret\"\n key: \"ARGO_WORKFLOWS_ACCESS\"\n\n secretKeySecret: # default\n name: \"argo-secret\"\n key: \"ARGO_WORKFLOWS_SECRET2\"\n```\nDefine tasks:\n\n```python\nimport dataclasses\nfrom typing import List\n\nfrom src import GlobalVar, LocalTarget, Task, S3StorageConfig\n\n@dataclasses.dataclass(frozen=True)\nclass A(Task):\n pano_id: str\n order_id: str = GlobalVar(\"order_id\")\n\n def run(self, *args, **kwargs):\n self.target().write_text(\"hello\")\n\n def target(self):\n return LocalTarget(\"/tmp/a.txt\")\n\n@dataclasses.dataclass(frozen=True)\nclass B(Task):\n def run(self, *args, **kwargs):\n self.target().write_text(\"hello\")\n\n def target(self):\n return LocalTarget(\"/tmp/b.txt\")\n\n# define the workflow (allows to define global variables which are necessary to make the workflow run)\n@dataclasses.dataclass(frozen=True)\nclass Pipeline(Task):\n order_id: str = GlobalVar(\"order_id\")\n\n def depends(self) -> List[Task]:\n a = A(order_id=self.order_id, pano_id=\"testing123\")\n b = B()\n return [a, b]\n\n def run(self, *args, **kwargs):\n print(\"Running pipeline\")\n data1 = self.depends()[0].target().read_text()\n print(\"Task A exists: \", self.depends()[0].target().exists())\n print(\"Task A result: \", data1)\n data2 = self.depends()[1].target().read_text()\n print(\"Task B exists: \", self.depends()[1].target().exists())\n print(\"Task B result: \", data2)\n print(\"Total result: \")\n\n print(data1 + data2)\n```\n\nCreate the workflow yaml from the task\n```python\n### export the task graph as a workflow\ntask = Pipeline()\nconfig = ArgoConfig.from_yaml(\"argo_config.yaml\")\n\n# use the base image here where your workflow will be defined and that has the requirements (ginny) installed\nworkflow = schedule_to_workflow(task, \"a-b-process-test\", config, base_image=\"baudcode/ginny_test:latest\") \nworkflow.save(\"test_workflow.yaml\")\n```\nPush test_workflow.yaml to argo workflows\n```bash\nargo submit -n argo --watch test-workflow.yaml\n```\n\n### Run dynamic tasks\n\nLimit: Dynamic tasks are not allowed to have another dynamic task dependecy. \n\n```python\n# generate some parameters within some task (producer)\n@dataclasses.dataclass(frozen=True)\nclass GenerateLines(Task):\n def run(self, *args, **kwargs):\n self.target()[2].set([\n {\"key\": \"testing123\", \"dummy\": \"1\"},\n {\"key\": \"testing456\", \"dummy\": \"2\"},\n {\"key\": \"testing4567\", \"dummy\": \"3\"},\n ])\n\n def target(self):\n return [IterableParameterMap(name='data', keys=['key', 'dummy'])]\n\n# consume one item\n@dataclasses.dataclass(frozen=True)\nclass ProcessLine(Task):\n key: str\n dummy: str\n\n def run(self, *args, **kwargs):\n self.target().write_text(f\"processed {self.key} {self.dummy}\")\n \n def target(self):\n return LocalTarget(f\"/tmp/processed_{self.key}.txt\")\n\n# run all in parallel\n@dataclasses.dataclass(frozen=True)\nclass ProcessLines(DynamicTask):\n\n @property\n def taskclass(self):\n return ProcessLine\n \n @property\n def parameter(self):\n return [IterableParameterMap(name='data', keys=['pano_id', 'order_id'])]\n\n def depends(self):\n return [GenerateLines()]\n```\n\n### Connect task to argo events\n\n```bash\nWIP\n```\n\n\n### Development\n\n```bash\n\npython setup.py clean\npip install .\n```\n\n### TODO\n\n- implement argo events and argo sensors to connect tasks to them and make it possible to simulate events comming from them\n- use logging\n- make dynamic tasks work with argo workflows\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "run and schedule task pipeline",
"version": "0.1.1",
"project_urls": {
"Homepage": "https://github.com/baudcode/ginny"
},
"split_keywords": [
"task",
" worker",
" schedule",
" machine",
" learning"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "83d43a0f19e13591d06b514ed0220c5e69a77373f9f961cba471510c43236653",
"md5": "a52d85d11b107ab0043f7a96c8198ada",
"sha256": "836f3f1990226bbe88cfc2f73c005ae7a380ac689f4a9dcce670113ba6099b0a"
},
"downloads": -1,
"filename": "ginny-0.1.1-py3-none-any.whl",
"has_sig": false,
"md5_digest": "a52d85d11b107ab0043f7a96c8198ada",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": null,
"size": 34824,
"upload_time": "2025-03-01T20:41:35",
"upload_time_iso_8601": "2025-03-01T20:41:35.668968Z",
"url": "https://files.pythonhosted.org/packages/83/d4/3a0f19e13591d06b514ed0220c5e69a77373f9f961cba471510c43236653/ginny-0.1.1-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "1c04094a68d4ff06aea8c384d920d1d96875b753417a315db11110d18cfc723b",
"md5": "eac850db8e667c7242027195c3808f22",
"sha256": "5c744ead20a9f99f04861e0b74fb0832310b0173999b5dcaab9fa88ce6cc729e"
},
"downloads": -1,
"filename": "ginny-0.1.1.tar.gz",
"has_sig": false,
"md5_digest": "eac850db8e667c7242027195c3808f22",
"packagetype": "sdist",
"python_version": "source",
"requires_python": null,
"size": 33485,
"upload_time": "2025-03-01T20:41:36",
"upload_time_iso_8601": "2025-03-01T20:41:36.929817Z",
"url": "https://files.pythonhosted.org/packages/1c/04/094a68d4ff06aea8c384d920d1d96875b753417a315db11110d18cfc723b/ginny-0.1.1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-03-01 20:41:36",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "baudcode",
"github_project": "ginny",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"requirements": [
{
"name": "boto3",
"specs": []
},
{
"name": "tqdm",
"specs": []
},
{
"name": "ujson",
"specs": []
},
{
"name": "pillow",
"specs": []
},
{
"name": "numpy",
"specs": []
},
{
"name": "networkx",
"specs": []
},
{
"name": "schedule",
"specs": []
},
{
"name": "pydantic",
"specs": []
},
{
"name": "requests",
"specs": []
},
{
"name": "pyyaml",
"specs": []
}
],
"lcname": "ginny"
}