Data Pipeline
=============
A data pipeline collects, stores, and processes data. This package provides a framework for facilitating this process.
Data is collected from ``DataSources``, stored in ``DataSinks``, and processed using ``Transformers``.
``DataSources`` are entities that *provide* data to the pipeline. Examples include databases, in-memory caches, and REST APIs.
``DataSinks`` are entities that *store* data provided by ``DataSources``. Examples include databases and in-memory caches. Nearly all data sinks will also be data sources because storing data is usually unhelpful if you cannot get that data out. We refer to an entity that is both a data source and data sink as a *data store*.
``Transformers`` are entities that *transform* or process data from one data type to another. For example, a transformer may transform a Word document to a PDF.
The ``DataPipeline`` consists of a list of ``DataStores`` and ``DataSinks`` that communicate via ``Transformers``.
The data sources and sinks are ordered in the data pipeline, and their order determines the order in which data is requested. Generally speaking, slower data stores/sinks should go towards the end of the pipeline.
Not every data type needs to be supported by every data sink or data store. If a data sink/store does not support a requested type of data, that data sink/source is simply skipped in the pipeline.
Example
-------
For example, if your data pipeline consists of an in-memory cache, a database, and a REST API service (in that order), when you perform a query, the ``DataPipeline`` will first look in the in-memory cache, then in the database, then in the REST API. If the data is found in the cache, it will be returned and the database and REST API will not be queried. Similarly, if the data is found in the database, the REST API will not be queried.
After data is found in a data source, the data propagates back down the data pipeline from whence it came. Any data sink encountered along the way will store that data. So, continuing the above example, if your query was provided by the REST API, the returned data would be stored in your database, then stored in the cache. A data sink will only store data that it supports.
Each data sink can define expiration periods for each type of data it supports, but this is up to the specific data sink to implement.
Usage
-----
The below code below is simplified code to illustrate an example of converting a Word document, which is requested from an SQL database, to a PDF document.
Note that no PDF documents are stored in the database, but the data pipeline can still return one if it is requested.
.. code-block:: python
# The four classes below implement a simple DataPipeline. The code would need to be filled in by the user.
class WordDoc:
...
class PDF:
...
class SQLDatabase(DataSource, DataSink):
@get.register(WordDoc) # Tells the DataPipeline that this SQL database can provide a WordDoc
def get_word_doc(query: Dict[str, Any]) -> WordDoc:
"""Returns a WordDoc from an SQL database based on the `filename` in the query."""
@put.register(WordDoc) # Tell the DataPipeline that this SQL database can store a WordDoc
def put_word_doc(doc: WordDoc, query: Dict[str, Any]):
"""Stores the document in the SQL database using the query as an identifier."""
class DocumentTransformer(Transformer):
@transform.register(WordDoc, PDF) # Tells the DataPipeline that we know how to convert a WordDoc to a PDF
def Word_to_PDF(doc: WordDoc) -> PDF:
"""Converts a WordDoc to a PDF and returns the PDF."""
# The line of code below can now be used to request a PDF.
# The WordDoc with the filename `find_me` will be pulled from the SQL database then converted to a PDF and returned to the user.
my_pdf = pipeline.get(PDF, query={"filename": "find_me"})
# Note also that because we implemented a `put(WordDoc)` method in the SQLDatabase that it will also store WordDocs that pass through the SQL database via the pipeline but are not already in the database.
Raw data
{
"_id": null,
"home_page": "https://github.com/meraki-analytics/datapipelines",
"name": "datapipelines",
"maintainer": "",
"docs_url": null,
"requires_python": "",
"maintainer_email": "",
"keywords": "",
"author": "Meraki Analytics Team",
"author_email": "team@merakianalytics.com",
"download_url": "https://files.pythonhosted.org/packages/b4/86/812f428f88d3b9e10be6178b54508580a2355ae67d50797e62ea47fef9c3/datapipelines-1.0.7.tar.gz",
"platform": "",
"description": "Data Pipeline\n=============\n\nA data pipeline collects, stores, and processes data. This package provides a framework for facilitating this process.\n\nData is collected from ``DataSources``, stored in ``DataSinks``, and processed using ``Transformers``.\n\n``DataSources`` are entities that *provide* data to the pipeline. Examples include databases, in-memory caches, and REST APIs.\n\n``DataSinks`` are entities that *store* data provided by ``DataSources``. Examples include databases and in-memory caches. Nearly all data sinks will also be data sources because storing data is usually unhelpful if you cannot get that data out. We refer to an entity that is both a data source and data sink as a *data store*.\n\n``Transformers`` are entities that *transform* or process data from one data type to another. For example, a transformer may transform a Word document to a PDF.\n\nThe ``DataPipeline`` consists of a list of ``DataStores`` and ``DataSinks`` that communicate via ``Transformers``.\n\nThe data sources and sinks are ordered in the data pipeline, and their order determines the order in which data is requested. Generally speaking, slower data stores/sinks should go towards the end of the pipeline.\n\nNot every data type needs to be supported by every data sink or data store. If a data sink/store does not support a requested type of data, that data sink/source is simply skipped in the pipeline.\n\nExample\n-------\n\nFor example, if your data pipeline consists of an in-memory cache, a database, and a REST API service (in that order), when you perform a query, the ``DataPipeline`` will first look in the in-memory cache, then in the database, then in the REST API. If the data is found in the cache, it will be returned and the database and REST API will not be queried. Similarly, if the data is found in the database, the REST API will not be queried.\n\nAfter data is found in a data source, the data propagates back down the data pipeline from whence it came. Any data sink encountered along the way will store that data. So, continuing the above example, if your query was provided by the REST API, the returned data would be stored in your database, then stored in the cache. A data sink will only store data that it supports.\n\nEach data sink can define expiration periods for each type of data it supports, but this is up to the specific data sink to implement.\n\nUsage\n-----\n\nThe below code below is simplified code to illustrate an example of converting a Word document, which is requested from an SQL database, to a PDF document.\n\nNote that no PDF documents are stored in the database, but the data pipeline can still return one if it is requested.\n\n.. code-block:: python\n\n # The four classes below implement a simple DataPipeline. The code would need to be filled in by the user.\n \n class WordDoc:\n ...\n \n class PDF:\n ...\n \n class SQLDatabase(DataSource, DataSink):\n @get.register(WordDoc) # Tells the DataPipeline that this SQL database can provide a WordDoc\n def get_word_doc(query: Dict[str, Any]) -> WordDoc:\n \"\"\"Returns a WordDoc from an SQL database based on the `filename` in the query.\"\"\"\n \n @put.register(WordDoc) # Tell the DataPipeline that this SQL database can store a WordDoc\n def put_word_doc(doc: WordDoc, query: Dict[str, Any]):\n \"\"\"Stores the document in the SQL database using the query as an identifier.\"\"\"\n \n class DocumentTransformer(Transformer):\n @transform.register(WordDoc, PDF) # Tells the DataPipeline that we know how to convert a WordDoc to a PDF\n def Word_to_PDF(doc: WordDoc) -> PDF:\n \"\"\"Converts a WordDoc to a PDF and returns the PDF.\"\"\"\n \n \n # The line of code below can now be used to request a PDF.\n # The WordDoc with the filename `find_me` will be pulled from the SQL database then converted to a PDF and returned to the user.\n my_pdf = pipeline.get(PDF, query={\"filename\": \"find_me\"})\n\n # Note also that because we implemented a `put(WordDoc)` method in the SQLDatabase that it will also store WordDocs that pass through the SQL database via the pipeline but are not already in the database.\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "Caching abstraction layer for orchestrating multiple cache tiers",
"version": "1.0.7",
"project_urls": {
"Homepage": "https://github.com/meraki-analytics/datapipelines"
},
"split_keywords": [],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "b486812f428f88d3b9e10be6178b54508580a2355ae67d50797e62ea47fef9c3",
"md5": "537e875824fe2328a5fd4982ce946dfd",
"sha256": "752bc71a1e03a45d723fc4d9b56c641ed0b872ed1df73b08896ee34d93c56e8f"
},
"downloads": -1,
"filename": "datapipelines-1.0.7.tar.gz",
"has_sig": false,
"md5_digest": "537e875824fe2328a5fd4982ce946dfd",
"packagetype": "sdist",
"python_version": "source",
"requires_python": null,
"size": 19731,
"upload_time": "2019-01-18T05:06:39",
"upload_time_iso_8601": "2019-01-18T05:06:39.107511Z",
"url": "https://files.pythonhosted.org/packages/b4/86/812f428f88d3b9e10be6178b54508580a2355ae67d50797e62ea47fef9c3/datapipelines-1.0.7.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2019-01-18 05:06:39",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "meraki-analytics",
"github_project": "datapipelines",
"travis_ci": false,
"coveralls": false,
"github_actions": false,
"circle": true,
"requirements": [
{
"name": "flake8",
"specs": []
},
{
"name": "pytest",
"specs": []
},
{
"name": "merakicommons",
"specs": [
[
">=",
"1.0.6"
]
]
},
{
"name": "networkx",
"specs": []
}
],
"lcname": "datapipelines"
}