ginny


Nameginny JSON
Version 0.1.1 PyPI version JSON
download
home_pagehttps://github.com/baudcode/ginny
Summaryrun and schedule task pipeline
upload_time2025-03-01 20:41:36
maintainerMalte Koch
docs_urlNone
authorMalte Koch
requires_pythonNone
licenseMIT
keywords task worker schedule machine learning
VCS
bugtrack_url
requirements boto3 tqdm ujson pillow numpy networkx schedule pydantic requests pyyaml
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Ginny

A simple, convenient task manager that is similar to luigi framework but less blown up.
It allows easy exceution and scheduling of tasks locally and remotelty using argo workflows. 

### Run locally

```python
from ginny import DownloadTask, run

result = run(DownloadTask(
    url="https://static.wikia.nocookie.net/harrypotter/images/e/e9/Ginny-HQ-ginevra-ginny-weasley.jpg/revision/latest/scale-to-width-down/250?cb=20150228082608&path-prefix=de", 
    destination='image.jpg')
)
```

### Schedule tasks via command line

```bash
ginny --task ginny.DownloadTask url "https://static.wikia.nocookie.net/harrypotter/images/e/e9/Ginny-HQ-ginevra-ginny-weasley.jpg/revision/latest/scale-to-width-down/250?cb=20150228082608&path-prefix=de" destination "image.jpg" 

# run every 5 minutes
ginny --task ginny.DownloadTask --every 'minute' --count 5 url "https://static.wikia.nocookie.net/harrypotter/images/e/e9/Ginny-HQ-ginevra-ginny-weasley.jpg/revision/latest/scale-to-width-down/250?cb=20150228082608&path-prefix=de" destination "image.jpg"

# EVERY DAY at 0:00
ginny --task ginny.DownloadTask --every 'day' --at "00:00" url "https://static.wikia.nocookie.net/harrypotter/images/e/e9/Ginny-HQ-ginevra-ginny-weasley.jpg/revision/latest/scale-to-width-down/250?cb=20150228082608&path-prefix=de" destination "image.jpg" 
```

#### Build your own tasks

```python
from ginny import run, Task
import dataclasses

@dataclasses.dataclass(frozen=True)
class MyTask(Task):
    url: str

    def depends(self):
        # return tasks or targets that this task depends on
        # return LocalTarget("/tmp/data.json")
        # return [LocalTarget("/tmp/data.json"), LocalTarget("/tmp/data2.json")]
        return [LocalTarget("/tmp/data.json"), DownloadTask(self.url, "/tmp/data2.json")]
    
    def run(self, *args, **kwargs):
        target, download_task = self.depends()
        data1 = target.read_json()
        data2 = download_task.target().read_json()
        data1.update(data2)

        with self.target().open("w") as writer:
            writer.write("done")
        
    def target(self):
        # define a target if the task should not be executed every time / has output data
        return LocalTarget("/tmp/target.json")

# run the task (results of all tasks that will be executed are returned in results)
task = MyTask(url=...)

# delelte results of tasks
task.delete(recursive=False) # set recursive=True, to also delete results of subtasks

results = run(task)
```


### Buld-in tasks
```python
from ginny import BashTask, S3DownloadTask, DownloadTask, S3UploadTask, Task, SSHCommandTask, DepTask, TempDownloadTask, run

r = run(BashTask(['ls', '-lha']))
```

### Run Dag/Task with Argo Workflows (local targets will automatically become s3 targets)

Define argo config with storage via yaml (preferred) and save as `storage.yaml` or use `.from_env()` to load from environment vars

```yaml (argo_config.yaml)
namespace: "argo" # default
serviceAccountName: "argo-workflows" # default

storage:
    key: "argo-workflows" # default
    bucket: "ai-datastore" # required
    region: "us-east-1" # required
    endpoint: "s3.amazonaws.com" # default

    accessKeySecret: # default
        name: "argo-secret"
        key: "ARGO_WORKFLOWS_ACCESS"

    secretKeySecret: # default
        name: "argo-secret"
        key: "ARGO_WORKFLOWS_SECRET2"
```
Define tasks:

```python
import dataclasses
from typing import List

from src import GlobalVar, LocalTarget, Task, S3StorageConfig

@dataclasses.dataclass(frozen=True)
class A(Task):
    pano_id: str
    order_id: str = GlobalVar("order_id")

    def run(self, *args, **kwargs):
        self.target().write_text("hello")

    def target(self):
        return LocalTarget("/tmp/a.txt")

@dataclasses.dataclass(frozen=True)
class B(Task):
    def run(self, *args, **kwargs):
        self.target().write_text("hello")

    def target(self):
        return LocalTarget("/tmp/b.txt")

# define the workflow (allows to define global variables which are necessary to make the workflow run)
@dataclasses.dataclass(frozen=True)
class Pipeline(Task):
    order_id: str = GlobalVar("order_id")

    def depends(self) -> List[Task]:
        a = A(order_id=self.order_id, pano_id="testing123")
        b = B()
        return [a, b]

    def run(self, *args, **kwargs):
        print("Running pipeline")
        data1 = self.depends()[0].target().read_text()
        print("Task A exists: ", self.depends()[0].target().exists())
        print("Task A result: ", data1)
        data2 = self.depends()[1].target().read_text()
        print("Task B exists: ", self.depends()[1].target().exists())
        print("Task B result: ", data2)
        print("Total result: ")

        print(data1 + data2)
```

Create the workflow yaml from the task
```python
### export the task graph as a workflow
task = Pipeline()
config = ArgoConfig.from_yaml("argo_config.yaml")

# use the base image here where your workflow will be defined and that has the requirements (ginny) installed
workflow = schedule_to_workflow(task, "a-b-process-test", config, base_image="baudcode/ginny_test:latest") 
workflow.save("test_workflow.yaml")
```
Push test_workflow.yaml to argo workflows
```bash
argo submit -n argo --watch test-workflow.yaml
```

### Run dynamic tasks

Limit: Dynamic tasks are not allowed to have another dynamic task dependecy. 

```python
# generate some parameters within some task (producer)
@dataclasses.dataclass(frozen=True)
class GenerateLines(Task):
    def run(self, *args, **kwargs):
        self.target()[2].set([
            {"key": "testing123", "dummy": "1"},
            {"key": "testing456", "dummy": "2"},
            {"key": "testing4567", "dummy": "3"},
        ])

    def target(self):
        return [IterableParameterMap(name='data', keys=['key', 'dummy'])]

# consume one item
@dataclasses.dataclass(frozen=True)
class ProcessLine(Task):
    key: str
    dummy: str

    def run(self, *args, **kwargs):
        self.target().write_text(f"processed {self.key} {self.dummy}")
    
    def target(self):
        return LocalTarget(f"/tmp/processed_{self.key}.txt")

# run all in parallel
@dataclasses.dataclass(frozen=True)
class ProcessLines(DynamicTask):

    @property
    def taskclass(self):
        return ProcessLine
    
    @property
    def parameter(self):
        return [IterableParameterMap(name='data', keys=['pano_id', 'order_id'])]

    def depends(self):
        return [GenerateLines()]
```

### Connect task to argo events

```bash
WIP
```


### Development

```bash

python setup.py clean
pip install .
```

### TODO

- implement argo events and argo sensors to connect tasks to them and make it possible to simulate events comming from them
- use logging
- make dynamic tasks work with argo workflows

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/baudcode/ginny",
    "name": "ginny",
    "maintainer": "Malte Koch",
    "docs_url": null,
    "requires_python": null,
    "maintainer_email": "malte-koch@gmx.net",
    "keywords": "task, worker, schedule, machine, learning",
    "author": "Malte Koch",
    "author_email": "malte-koch@gmx.net",
    "download_url": "https://files.pythonhosted.org/packages/1c/04/094a68d4ff06aea8c384d920d1d96875b753417a315db11110d18cfc723b/ginny-0.1.1.tar.gz",
    "platform": null,
    "description": "# Ginny\n\nA simple, convenient task manager that is similar to luigi framework but less blown up.\nIt allows easy exceution and scheduling of tasks locally and remotelty using argo workflows. \n\n### Run locally\n\n```python\nfrom ginny import DownloadTask, run\n\nresult = run(DownloadTask(\n    url=\"https://static.wikia.nocookie.net/harrypotter/images/e/e9/Ginny-HQ-ginevra-ginny-weasley.jpg/revision/latest/scale-to-width-down/250?cb=20150228082608&path-prefix=de\", \n    destination='image.jpg')\n)\n```\n\n### Schedule tasks via command line\n\n```bash\nginny --task ginny.DownloadTask url \"https://static.wikia.nocookie.net/harrypotter/images/e/e9/Ginny-HQ-ginevra-ginny-weasley.jpg/revision/latest/scale-to-width-down/250?cb=20150228082608&path-prefix=de\" destination \"image.jpg\" \n\n# run every 5 minutes\nginny --task ginny.DownloadTask --every 'minute' --count 5 url \"https://static.wikia.nocookie.net/harrypotter/images/e/e9/Ginny-HQ-ginevra-ginny-weasley.jpg/revision/latest/scale-to-width-down/250?cb=20150228082608&path-prefix=de\" destination \"image.jpg\"\n\n# EVERY DAY at 0:00\nginny --task ginny.DownloadTask --every 'day' --at \"00:00\" url \"https://static.wikia.nocookie.net/harrypotter/images/e/e9/Ginny-HQ-ginevra-ginny-weasley.jpg/revision/latest/scale-to-width-down/250?cb=20150228082608&path-prefix=de\" destination \"image.jpg\" \n```\n\n#### Build your own tasks\n\n```python\nfrom ginny import run, Task\nimport dataclasses\n\n@dataclasses.dataclass(frozen=True)\nclass MyTask(Task):\n    url: str\n\n    def depends(self):\n        # return tasks or targets that this task depends on\n        # return LocalTarget(\"/tmp/data.json\")\n        # return [LocalTarget(\"/tmp/data.json\"), LocalTarget(\"/tmp/data2.json\")]\n        return [LocalTarget(\"/tmp/data.json\"), DownloadTask(self.url, \"/tmp/data2.json\")]\n    \n    def run(self, *args, **kwargs):\n        target, download_task = self.depends()\n        data1 = target.read_json()\n        data2 = download_task.target().read_json()\n        data1.update(data2)\n\n        with self.target().open(\"w\") as writer:\n            writer.write(\"done\")\n        \n    def target(self):\n        # define a target if the task should not be executed every time / has output data\n        return LocalTarget(\"/tmp/target.json\")\n\n# run the task (results of all tasks that will be executed are returned in results)\ntask = MyTask(url=...)\n\n# delelte results of tasks\ntask.delete(recursive=False) # set recursive=True, to also delete results of subtasks\n\nresults = run(task)\n```\n\n\n### Buld-in tasks\n```python\nfrom ginny import BashTask, S3DownloadTask, DownloadTask, S3UploadTask, Task, SSHCommandTask, DepTask, TempDownloadTask, run\n\nr = run(BashTask(['ls', '-lha']))\n```\n\n### Run Dag/Task with Argo Workflows (local targets will automatically become s3 targets)\n\nDefine argo config with storage via yaml (preferred) and save as `storage.yaml` or use `.from_env()` to load from environment vars\n\n```yaml (argo_config.yaml)\nnamespace: \"argo\" # default\nserviceAccountName: \"argo-workflows\" # default\n\nstorage:\n    key: \"argo-workflows\" # default\n    bucket: \"ai-datastore\" # required\n    region: \"us-east-1\" # required\n    endpoint: \"s3.amazonaws.com\" # default\n\n    accessKeySecret: # default\n        name: \"argo-secret\"\n        key: \"ARGO_WORKFLOWS_ACCESS\"\n\n    secretKeySecret: # default\n        name: \"argo-secret\"\n        key: \"ARGO_WORKFLOWS_SECRET2\"\n```\nDefine tasks:\n\n```python\nimport dataclasses\nfrom typing import List\n\nfrom src import GlobalVar, LocalTarget, Task, S3StorageConfig\n\n@dataclasses.dataclass(frozen=True)\nclass A(Task):\n    pano_id: str\n    order_id: str = GlobalVar(\"order_id\")\n\n    def run(self, *args, **kwargs):\n        self.target().write_text(\"hello\")\n\n    def target(self):\n        return LocalTarget(\"/tmp/a.txt\")\n\n@dataclasses.dataclass(frozen=True)\nclass B(Task):\n    def run(self, *args, **kwargs):\n        self.target().write_text(\"hello\")\n\n    def target(self):\n        return LocalTarget(\"/tmp/b.txt\")\n\n# define the workflow (allows to define global variables which are necessary to make the workflow run)\n@dataclasses.dataclass(frozen=True)\nclass Pipeline(Task):\n    order_id: str = GlobalVar(\"order_id\")\n\n    def depends(self) -> List[Task]:\n        a = A(order_id=self.order_id, pano_id=\"testing123\")\n        b = B()\n        return [a, b]\n\n    def run(self, *args, **kwargs):\n        print(\"Running pipeline\")\n        data1 = self.depends()[0].target().read_text()\n        print(\"Task A exists: \", self.depends()[0].target().exists())\n        print(\"Task A result: \", data1)\n        data2 = self.depends()[1].target().read_text()\n        print(\"Task B exists: \", self.depends()[1].target().exists())\n        print(\"Task B result: \", data2)\n        print(\"Total result: \")\n\n        print(data1 + data2)\n```\n\nCreate the workflow yaml from the task\n```python\n### export the task graph as a workflow\ntask = Pipeline()\nconfig = ArgoConfig.from_yaml(\"argo_config.yaml\")\n\n# use the base image here where your workflow will be defined and that has the requirements (ginny) installed\nworkflow = schedule_to_workflow(task, \"a-b-process-test\", config, base_image=\"baudcode/ginny_test:latest\") \nworkflow.save(\"test_workflow.yaml\")\n```\nPush test_workflow.yaml to argo workflows\n```bash\nargo submit -n argo --watch test-workflow.yaml\n```\n\n### Run dynamic tasks\n\nLimit: Dynamic tasks are not allowed to have another dynamic task dependecy. \n\n```python\n# generate some parameters within some task (producer)\n@dataclasses.dataclass(frozen=True)\nclass GenerateLines(Task):\n    def run(self, *args, **kwargs):\n        self.target()[2].set([\n            {\"key\": \"testing123\", \"dummy\": \"1\"},\n            {\"key\": \"testing456\", \"dummy\": \"2\"},\n            {\"key\": \"testing4567\", \"dummy\": \"3\"},\n        ])\n\n    def target(self):\n        return [IterableParameterMap(name='data', keys=['key', 'dummy'])]\n\n# consume one item\n@dataclasses.dataclass(frozen=True)\nclass ProcessLine(Task):\n    key: str\n    dummy: str\n\n    def run(self, *args, **kwargs):\n        self.target().write_text(f\"processed {self.key} {self.dummy}\")\n    \n    def target(self):\n        return LocalTarget(f\"/tmp/processed_{self.key}.txt\")\n\n# run all in parallel\n@dataclasses.dataclass(frozen=True)\nclass ProcessLines(DynamicTask):\n\n    @property\n    def taskclass(self):\n        return ProcessLine\n    \n    @property\n    def parameter(self):\n        return [IterableParameterMap(name='data', keys=['pano_id', 'order_id'])]\n\n    def depends(self):\n        return [GenerateLines()]\n```\n\n### Connect task to argo events\n\n```bash\nWIP\n```\n\n\n### Development\n\n```bash\n\npython setup.py clean\npip install .\n```\n\n### TODO\n\n- implement argo events and argo sensors to connect tasks to them and make it possible to simulate events comming from them\n- use logging\n- make dynamic tasks work with argo workflows\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "run and schedule task pipeline",
    "version": "0.1.1",
    "project_urls": {
        "Homepage": "https://github.com/baudcode/ginny"
    },
    "split_keywords": [
        "task",
        " worker",
        " schedule",
        " machine",
        " learning"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "83d43a0f19e13591d06b514ed0220c5e69a77373f9f961cba471510c43236653",
                "md5": "a52d85d11b107ab0043f7a96c8198ada",
                "sha256": "836f3f1990226bbe88cfc2f73c005ae7a380ac689f4a9dcce670113ba6099b0a"
            },
            "downloads": -1,
            "filename": "ginny-0.1.1-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "a52d85d11b107ab0043f7a96c8198ada",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": null,
            "size": 34824,
            "upload_time": "2025-03-01T20:41:35",
            "upload_time_iso_8601": "2025-03-01T20:41:35.668968Z",
            "url": "https://files.pythonhosted.org/packages/83/d4/3a0f19e13591d06b514ed0220c5e69a77373f9f961cba471510c43236653/ginny-0.1.1-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "1c04094a68d4ff06aea8c384d920d1d96875b753417a315db11110d18cfc723b",
                "md5": "eac850db8e667c7242027195c3808f22",
                "sha256": "5c744ead20a9f99f04861e0b74fb0832310b0173999b5dcaab9fa88ce6cc729e"
            },
            "downloads": -1,
            "filename": "ginny-0.1.1.tar.gz",
            "has_sig": false,
            "md5_digest": "eac850db8e667c7242027195c3808f22",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": null,
            "size": 33485,
            "upload_time": "2025-03-01T20:41:36",
            "upload_time_iso_8601": "2025-03-01T20:41:36.929817Z",
            "url": "https://files.pythonhosted.org/packages/1c/04/094a68d4ff06aea8c384d920d1d96875b753417a315db11110d18cfc723b/ginny-0.1.1.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-03-01 20:41:36",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "baudcode",
    "github_project": "ginny",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "requirements": [
        {
            "name": "boto3",
            "specs": []
        },
        {
            "name": "tqdm",
            "specs": []
        },
        {
            "name": "ujson",
            "specs": []
        },
        {
            "name": "pillow",
            "specs": []
        },
        {
            "name": "numpy",
            "specs": []
        },
        {
            "name": "networkx",
            "specs": []
        },
        {
            "name": "schedule",
            "specs": []
        },
        {
            "name": "pydantic",
            "specs": []
        },
        {
            "name": "requests",
            "specs": []
        },
        {
            "name": "pyyaml",
            "specs": []
        }
    ],
    "lcname": "ginny"
}
        
Elapsed time: 1.09239s