# learning-pipeline-plugin
Plugin for Actcast application.
This plugin provides a base Pipe class for selecting and collecting data.
## Usage
To collect data, create a pipe that inherits from `learning_pipeline_plugin.collect_pipe.CollectPipeBase`
and define `interpret_inputs()`.
Example:
```python
from typing import Optional
from learning_pipeline_plugin.collect_pipe import CollectPipeBase, DataDict
from learning_pipeline_plugin import sender_task
class CollectPipe(CollectPipeBase):
def interpret_inputs(self, inputs) -> Optional[DataDict]:
img, probs, feature = inputs
return {
"image": img,
"feature_vector": feature,
"other_data": {
"probabilities": probs
}
}
```
`interpret_inputs()` gets the previous pipe output and must return `DataDict` or `None`.
`DataDict` is TypedDict for type hint, and must have following properties:
- `image`: PIL.Image
- `feature_vector`: vector with shape (N,)
- `other_data`: any data used for calculating uncertainty
Then, create a `SenderTask` instance and pass it the pipeline_id parameter corresponding to your pipeline.
```python
def main():
[...]
sender = sender_task.SenderTask(pipeline_id)
```
Finally, instantiate your `CollectPipe` and connect to other pipes:
```python
def main():
[...]
collect_pipe = CollectPipe(...)
prev_pipe.connect(collect_pipe)
collect_pipe.connect(next_pipe)
```
## Notifier
By default, the information output by this plugin is logged as an actlog through the Notifier instance.
Users can decide what information is output (and in what format), using a custom notifier.
To customize it, define a custom notifier class inheriting from AbstractNotifier,
and define `notify()` which gets a message as str.
Then, instantiate and pass it to the CollectPipe constructor.
Example of introducing a message length limit:
```python
from datetime import datetime, timezone
import actfw_core
from learning_pipeline_plugin.notifier import AbstractNotfier
class CustomNotifier(AbstractNotfier):
def notify(self, message: str):
if len(message) > 128:
message = message[:128] + " <truncated>"
actfw_core.notify(
[
{
"info": message,
"timestamp": datetime.now(timezone.utc).isoformat(),
}
]
)
def main():
[...]
collect_pipe = CollectPipe(
...,
notifier=CustomNotifier()
)
```
Raw data
{
"_id": null,
"home_page": "https://github.com/Idein/learning-pipeline-plugin",
"name": "learning-pipeline-plugin",
"maintainer": "",
"docs_url": null,
"requires_python": ">=3.7.0,<3.11",
"maintainer_email": "",
"keywords": "",
"author": "Idein Inc.",
"author_email": "",
"download_url": "https://files.pythonhosted.org/packages/0d/a9/2ecea673674fa3971619514ed8464b18c4d45541e02b8beb8bdf8492b60b/learning_pipeline_plugin-0.6.0.tar.gz",
"platform": null,
"description": "# learning-pipeline-plugin\n\nPlugin for Actcast application.\nThis plugin provides a base Pipe class for selecting and collecting data.\n\n## Usage\n\nTo collect data, create a pipe that inherits from `learning_pipeline_plugin.collect_pipe.CollectPipeBase`\nand define `interpret_inputs()`.\n\nExample:\n\n```python\nfrom typing import Optional\nfrom learning_pipeline_plugin.collect_pipe import CollectPipeBase, DataDict\nfrom learning_pipeline_plugin import sender_task\n\nclass CollectPipe(CollectPipeBase):\n def interpret_inputs(self, inputs) -> Optional[DataDict]:\n img, probs, feature = inputs\n return {\n \"image\": img,\n \"feature_vector\": feature,\n \"other_data\": {\n \"probabilities\": probs\n }\n }\n```\n\n`interpret_inputs()` gets the previous pipe output and must return `DataDict` or `None`.\n\n`DataDict` is TypedDict for type hint, and must have following properties:\n\n- `image`: PIL.Image\n- `feature_vector`: vector with shape (N,)\n- `other_data`: any data used for calculating uncertainty\n\nThen, create a `SenderTask` instance and pass it the pipeline_id parameter corresponding to your pipeline.\n\n```python\ndef main():\n [...]\n\n sender = sender_task.SenderTask(pipeline_id)\n```\n\nFinally, instantiate your `CollectPipe` and connect to other pipes:\n\n```python\ndef main():\n [...]\n\n collect_pipe = CollectPipe(...)\n\n prev_pipe.connect(collect_pipe)\n collect_pipe.connect(next_pipe)\n```\n\n## Notifier\n\nBy default, the information output by this plugin is logged as an actlog through the Notifier instance.\nUsers can decide what information is output (and in what format), using a custom notifier.\n\nTo customize it, define a custom notifier class inheriting from AbstractNotifier,\nand define `notify()` which gets a message as str.\nThen, instantiate and pass it to the CollectPipe constructor.\n\nExample of introducing a message length limit:\n```python\nfrom datetime import datetime, timezone\nimport actfw_core\nfrom learning_pipeline_plugin.notifier import AbstractNotfier\n\nclass CustomNotifier(AbstractNotfier):\n def notify(self, message: str):\n if len(message) > 128:\n message = message[:128] + \" <truncated>\"\n actfw_core.notify(\n [\n {\n \"info\": message,\n \"timestamp\": datetime.now(timezone.utc).isoformat(),\n }\n ]\n )\n\ndef main():\n [...]\n\n collect_pipe = CollectPipe(\n ...,\n notifier=CustomNotifier()\n )\n```\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "",
"version": "0.6.0",
"project_urls": {
"Homepage": "https://github.com/Idein/learning-pipeline-plugin",
"Repository": "https://github.com/Idein/learning-pipeline-plugin"
},
"split_keywords": [],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "f2b225ff1683de8be0f33fcb7d42b4e815540d80b587d9f2968598e764de6913",
"md5": "64d3e551fbcd2a4697a379a1c9d22744",
"sha256": "76ca270f88f28d479ea21a2bc30df7cc8e691df9c4edddf842b8b6b5ff16e1b5"
},
"downloads": -1,
"filename": "learning_pipeline_plugin-0.6.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "64d3e551fbcd2a4697a379a1c9d22744",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.7.0,<3.11",
"size": 18859,
"upload_time": "2023-07-03T02:26:43",
"upload_time_iso_8601": "2023-07-03T02:26:43.209919Z",
"url": "https://files.pythonhosted.org/packages/f2/b2/25ff1683de8be0f33fcb7d42b4e815540d80b587d9f2968598e764de6913/learning_pipeline_plugin-0.6.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "0da92ecea673674fa3971619514ed8464b18c4d45541e02b8beb8bdf8492b60b",
"md5": "0efc2584a430f3a416216d868f3d8b7b",
"sha256": "3ae3fb69760e7669bdfbc779b5e64270a23600d91d91d0812fa4d3ec02b71d19"
},
"downloads": -1,
"filename": "learning_pipeline_plugin-0.6.0.tar.gz",
"has_sig": false,
"md5_digest": "0efc2584a430f3a416216d868f3d8b7b",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.7.0,<3.11",
"size": 14274,
"upload_time": "2023-07-03T02:26:45",
"upload_time_iso_8601": "2023-07-03T02:26:45.107477Z",
"url": "https://files.pythonhosted.org/packages/0d/a9/2ecea673674fa3971619514ed8464b18c4d45541e02b8beb8bdf8492b60b/learning_pipeline_plugin-0.6.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2023-07-03 02:26:45",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "Idein",
"github_project": "learning-pipeline-plugin",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "learning-pipeline-plugin"
}