SmartPipeline
-------------
A framework for rapid development of robust data pipelines following a simple design pattern
.. figure:: https://imgs.xkcd.com/comics/data_pipeline.png
:alt: pipeline comic
from https://xkcd.com
.. image:: https://readthedocs.org/projects/smartpipeline/badge/?version=stable
:target: https://smartpipeline.readthedocs.io/en/stable/?badge=stable
:alt: Documentation Status
.. image:: https://github.com/giacbrd/SmartPipeline/actions/workflows/tests.yml/badge.svg?branch=master
:target: https://github.com/giacbrd/SmartPipeline/actions/workflows/tests.yml
:alt: Tests
.. image:: https://coveralls.io/repos/github/giacbrd/SmartPipeline/badge.svg?branch=master
:target: https://coveralls.io/github/giacbrd/SmartPipeline?branch=master
:alt: Tests Coverage
.. documentation-marker
SmartPipeline gives you the tools to design and formalize simple data pipelines,
in which tasks are sequentially encapsulated in pipeline stages.
It is straightforward to implement pipelines,
but they are deeply customizable:
stages can run concurrently and scale on heavy tasks,
they can process batch of items at once,
moreover executions and errors can be monitored easily.
It is a framework for engineering sequences of data operations
and making them concurrent, following an optimized but transparent producer-consumer pattern.
An excellent solution for fast and clean data analysis prototypes (small/medium projects and POC)
but also for production code, as an alternative to plain scripts.
Consider it as a solution for problems where big task queues and workflow frameworks are overkill.
No dependencies are required.
Install
~~~~~~~
Install from PyPI, no dependencies will be installed:
.. code-block:: bash
pip install smartpipeline
Writing your pipeline
~~~~~~~~~~~~~~~~~~~~~
SmartPipeline is designed to help the developer following best practices,
the design is based on industrial experience on data products.
SmartPipeline focuses on simplicity and efficiency in handling data locally,
i.e. serialization and copies of the data are minimized.
Main features:
- Define a pipeline object as a sequence of stateful stage objects,
optionally set a source on which the pipeline iterates.
- A pipeline can run indefinitely on the source or it can be used to process single items.
- Concurrency can be set independently for each stage and single items can be processed asynchronously.
- A stage can be designed for processing batches, i.e. sequences of consecutive items, at once.
- Custom error handling can be set for logging and monitoring at stage level.
An example of a trivial pipeline for retrieving news from a feed
and generating text embeddings of the raw pages content.
We define the source of the data and two stages, then we build and run the pipeline.
.. code-block:: python
class FeedReader(Source):
def __init__(self):
feed = feedparser.parse("https://hnrss.org/newest")
self.urls = (entry.link for entry in feed.entries)
# pop method generates a new data item when called
def pop(self):
# each call of pop consumes an url to send to the pipeline
url = next(self.urls, None)
if url is not None:
item = Item()
item.data["url"] = url
return item
# when all urls are consumed we stop the pipeline
else:
self.stop()
class NewsRetrieve(Stage):
def process(self, item):
# add the page content to each item,
# http errors will be implicitly handled by the pipeline error manager
html = requests.get(item.data["url"]).text
item.data["content"] = re.sub('<.*?>', '', html).strip()
return item
class NewsEmbedding(BatchStage):
def __init__(self, size: int):
super().__init__(size)
self.model = SentenceTransformer("all-MiniLM-L6-v2")
def process_batch(self, items):
# efficiently compute embeddings by batching pages texts,
# instead of processing one page at a time
vectors = self.model.encode([item.data["content"] for item in items])
for vector, item in zip(vectors, items):
item.data["vector"] = vector
return items
pipeline = (
Pipeline()
.set_source(FeedReader())
# by using multi-thread (default) concurrency we speed up multiple http calls
.append("retriever", NewsRetrieve(), concurrency=4)
# each batch of items to vectorize will be of size 10
.append("vectorizer", NewsEmbedding(size=10))
.build()
)
for item in pipeline.run():
print(item)
`Read the documentation <https://smartpipeline.readthedocs.io>`_ for an exhaustive guide.
The `examples` folder contains full working sample pipelines.
Future improvements:
- Stages can be memory profiled.
- Processed items can be cached at stage level.
Raw data
{
"_id": null,
"home_page": "https://github.com/giacbrd/SmartPipeline",
"name": "SmartPipeline",
"maintainer": "",
"docs_url": null,
"requires_python": ">=3.9",
"maintainer_email": "",
"keywords": "data pipeline,task queue,data science,machine learning,design pattern",
"author": "Giacomo Berardi",
"author_email": "barnets@gmail.com",
"download_url": "https://files.pythonhosted.org/packages/c3/7f/2964e88d4399f81df7786d6132ef9f147866492e1dc0081582fb2602ca72/SmartPipeline-0.7.3.tar.gz",
"platform": null,
"description": "SmartPipeline\n-------------\n\nA framework for rapid development of robust data pipelines following a simple design pattern\n\n.. figure:: https://imgs.xkcd.com/comics/data_pipeline.png\n :alt: pipeline comic\n\n from https://xkcd.com\n\n.. image:: https://readthedocs.org/projects/smartpipeline/badge/?version=stable\n :target: https://smartpipeline.readthedocs.io/en/stable/?badge=stable\n :alt: Documentation Status\n\n.. image:: https://github.com/giacbrd/SmartPipeline/actions/workflows/tests.yml/badge.svg?branch=master\n :target: https://github.com/giacbrd/SmartPipeline/actions/workflows/tests.yml\n :alt: Tests\n\n.. image:: https://coveralls.io/repos/github/giacbrd/SmartPipeline/badge.svg?branch=master\n :target: https://coveralls.io/github/giacbrd/SmartPipeline?branch=master\n :alt: Tests Coverage\n\n\n.. documentation-marker\n\nSmartPipeline gives you the tools to design and formalize simple data pipelines,\nin which tasks are sequentially encapsulated in pipeline stages.\n\nIt is straightforward to implement pipelines,\nbut they are deeply customizable:\nstages can run concurrently and scale on heavy tasks,\nthey can process batch of items at once,\nmoreover executions and errors can be monitored easily.\n\nIt is a framework for engineering sequences of data operations\nand making them concurrent, following an optimized but transparent producer-consumer pattern.\nAn excellent solution for fast and clean data analysis prototypes (small/medium projects and POC)\nbut also for production code, as an alternative to plain scripts.\nConsider it as a solution for problems where big task queues and workflow frameworks are overkill.\nNo dependencies are required.\n\nInstall\n~~~~~~~\n\nInstall from PyPI, no dependencies will be installed:\n\n.. code-block:: bash\n\n pip install smartpipeline\n\nWriting your pipeline\n~~~~~~~~~~~~~~~~~~~~~\n\nSmartPipeline is designed to help the developer following best practices,\nthe design is based on industrial experience on data products.\n\nSmartPipeline focuses on simplicity and efficiency in handling data locally,\ni.e. serialization and copies of the data are minimized.\n\nMain features:\n\n- Define a pipeline object as a sequence of stateful stage objects,\n optionally set a source on which the pipeline iterates.\n- A pipeline can run indefinitely on the source or it can be used to process single items.\n- Concurrency can be set independently for each stage and single items can be processed asynchronously.\n- A stage can be designed for processing batches, i.e. sequences of consecutive items, at once.\n- Custom error handling can be set for logging and monitoring at stage level.\n\nAn example of a trivial pipeline for retrieving news from a feed\nand generating text embeddings of the raw pages content.\nWe define the source of the data and two stages, then we build and run the pipeline.\n\n.. code-block:: python\n\n class FeedReader(Source):\n def __init__(self):\n feed = feedparser.parse(\"https://hnrss.org/newest\")\n self.urls = (entry.link for entry in feed.entries)\n\n # pop method generates a new data item when called\n def pop(self):\n # each call of pop consumes an url to send to the pipeline\n url = next(self.urls, None)\n if url is not None:\n item = Item()\n item.data[\"url\"] = url\n return item\n # when all urls are consumed we stop the pipeline\n else:\n self.stop()\n\n\n class NewsRetrieve(Stage):\n def process(self, item):\n # add the page content to each item,\n # http errors will be implicitly handled by the pipeline error manager\n html = requests.get(item.data[\"url\"]).text\n item.data[\"content\"] = re.sub('<.*?>', '', html).strip()\n return item\n\n\n class NewsEmbedding(BatchStage):\n def __init__(self, size: int):\n super().__init__(size)\n self.model = SentenceTransformer(\"all-MiniLM-L6-v2\")\n\n def process_batch(self, items):\n # efficiently compute embeddings by batching pages texts,\n # instead of processing one page at a time\n vectors = self.model.encode([item.data[\"content\"] for item in items])\n for vector, item in zip(vectors, items):\n item.data[\"vector\"] = vector\n return items\n\n\n pipeline = (\n Pipeline()\n .set_source(FeedReader())\n # by using multi-thread (default) concurrency we speed up multiple http calls\n .append(\"retriever\", NewsRetrieve(), concurrency=4)\n # each batch of items to vectorize will be of size 10\n .append(\"vectorizer\", NewsEmbedding(size=10))\n .build()\n )\n\n\n for item in pipeline.run():\n print(item)\n\n`Read the documentation <https://smartpipeline.readthedocs.io>`_ for an exhaustive guide.\n\nThe `examples` folder contains full working sample pipelines.\n\nFuture improvements:\n\n- Stages can be memory profiled.\n- Processed items can be cached at stage level.\n",
"bugtrack_url": null,
"license": "",
"summary": "A framework for fast developing scalable data pipelines following a simple design pattern",
"version": "0.7.3",
"project_urls": {
"Documentation": "https://smartpipeline.readthedocs.io",
"Homepage": "https://github.com/giacbrd/SmartPipeline",
"Source": "https://github.com/giacbrd/SmartPipeline"
},
"split_keywords": [
"data pipeline",
"task queue",
"data science",
"machine learning",
"design pattern"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "c37f2964e88d4399f81df7786d6132ef9f147866492e1dc0081582fb2602ca72",
"md5": "ee68dfaeb21dd3ba772c806dda5d07a1",
"sha256": "b75fd0bcf0ce7fcedfb7056dd1b0383b54c162e5012bff17ec7cd25b81592d1a"
},
"downloads": -1,
"filename": "SmartPipeline-0.7.3.tar.gz",
"has_sig": false,
"md5_digest": "ee68dfaeb21dd3ba772c806dda5d07a1",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.9",
"size": 32044,
"upload_time": "2023-12-20T23:34:13",
"upload_time_iso_8601": "2023-12-20T23:34:13.643320Z",
"url": "https://files.pythonhosted.org/packages/c3/7f/2964e88d4399f81df7786d6132ef9f147866492e1dc0081582fb2602ca72/SmartPipeline-0.7.3.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2023-12-20 23:34:13",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "giacbrd",
"github_project": "SmartPipeline",
"travis_ci": false,
"coveralls": true,
"github_actions": true,
"lcname": "smartpipeline"
}