About schedula
**************
**schedula** is a dynamic flow-based programming environment for
python, that handles automatically the control flow of the program.
The control flow generally is represented by a Directed Acyclic Graph
(DAG), where nodes are the operations/functions to be executed and
edges are the dependencies between them.
The algorithm of **schedula** dates back to 2014, when a colleague
asked for a method to automatically populate the missing data of a
database. The imputation method chosen to complete the database was a
system of interdependent physical formulas - i.e., the inputs of a
formula are the outputs of other formulas. The current library has
been developed in 2015 to support the design of the CO:sub:`2`MPAS
`tool <https://github.com/JRCSTU/CO2MPAS-TA>`_ - a CO:sub:`2` vehicle
`simulator
<https://jrcstu.github.io/co2mpas/model/?url=https://jrcstu.github.io/co2mpas/model/core/CO2MPAS_model/calibrate_with_wltp_h.html>`_.
During the developing phase, the physical formulas (more than 700)
were known on the contrary of the software inputs and outputs.
Why schedula?
=============
The design of flow-based programs begins with the definition of the
control flow graph, and implicitly of its inputs and outputs. If the
program accepts multiple combinations of inputs and outputs, you have
to design and code all control flow graphs. With normal schedulers, it
can be very demanding.
While with **schedula**, giving whatever set of inputs, it
automatically calculates any of the desired computable outputs,
choosing the most appropriate DAG from the dataflow execution model.
Note: The DAG is determined at runtime and it is extracted using the
shortest path from the provided inputs. The path is calculated
based on a weighted directed graph (dataflow execution model) with
a modified Dijkstra algorithm.
**schedula** makes the code easy to debug, to optimize, and to present
it to a non-IT audience through its interactive graphs and charts. It
provides the option to run a model asynchronously or in parallel
managing automatically the Global Interpreter Lock (GIL), and to
convert a model into a web API service.
.. _start-install-core:
Installation
************
To install it use (with root privileges):
.. code:: console
$ pip install schedula
or download the last git version and use (with root privileges):
.. code:: console
$ python setup.py install
.. _end-install-core:
Install extras
==============
Some additional functionality is enabled installing the following
extras:
* ``io``: enables to read/write functions.
* ``plot``: enables the plot of the Dispatcher model and workflow
(see ``plot()``).
* ``web``: enables to build a dispatcher Flask app (see ``web()``).
* ``sphinx``: enables the sphinx extension directives (i.e.,
autosummary and dispatcher).
* ``parallel``: enables the parallel execution of Dispatcher model.
To install **schedula** and all extras, do:
.. code:: console
$ pip install 'schedula[all]'
Note: ``plot`` extra requires **Graphviz**. Make sure that the directory
containing the ``dot`` executable is on your systems’ path. If you
have not you can install it from its `download page
<https://www.graphviz.org/download/>`_.
Tutorial
********
Let’s assume that we want develop a tool to automatically manage the
symmetric cryptography. The base idea is to open a file, read its
content, encrypt or decrypt the data and then write them out to a new
file. This tutorial shows how to:
1. `define <#model-definition>`_ and `execute <#dispatching>`_ a
dataflow execution model,
2. `extract <#sub-model-extraction>`_ a sub-model, and
3. `deploy <#api-server>`_ a web API service.
Note: You can find more examples, on how to use the **schedula** library,
into the folder `examples
<https://github.com/vinci1it2000/schedula/tree/master/examples>`_.
Model definition
================
First of all we start defining an empty ``Dispatcher`` named
*symmetric_cryptography* that defines the dataflow execution model:
::
>>> import schedula as sh
>>> dsp = sh.Dispatcher(name='symmetric_cryptography')
There are two main ways to get a key, we can either generate a new one
or use one that has previously been generated. Hence, we can define
three functions to simply generate, save, and load the key. To
automatically populate the model inheriting the arguments names, we
can use the decorator ``add_function()`` as follow:
::
>>> import os.path as osp
>>> from cryptography.fernet import Fernet
>>> @sh.add_function(dsp, outputs=['key'], weight=2)
... def generate_key():
... return Fernet.generate_key().decode()
>>> @sh.add_function(dsp)
... def write_key(key_fpath, key):
... with open(key_fpath, 'w') as f:
... f.write(key)
>>> @sh.add_function(dsp, outputs=['key'], input_domain=osp.isfile)
... def read_key(key_fpath):
... with open(key_fpath) as f:
... return f.read()
Note: Since Python does not come with anything that can encrypt/decrypt
files, in this tutorial, we use a third party module named
``cryptography``. To install it execute ``pip install
cryptography``.
To encrypt/decrypt a message, you will need a key as previously
defined and your data *encrypted* or *decrypted*. Therefore, we can
define two functions and add them, as before, to the model:
::
>>> @sh.add_function(dsp, outputs=['encrypted'])
... def encrypt_message(key, decrypted):
... return Fernet(key.encode()).encrypt(decrypted.encode()).decode()
>>> @sh.add_function(dsp, outputs=['decrypted'])
... def decrypt_message(key, encrypted):
... return Fernet(key.encode()).decrypt(encrypted.encode()).decode()
Finally, to read and write the encrypted or decrypted message,
according to the functional programming philosophy, we can reuse the
previously defined functions ``read_key`` and ``write_key`` changing
the model mapping (i.e., *function_id*, *inputs*, and *outputs*). To
add to the model, we can simply use the ``add_function`` method as
follow:
::
>>> dsp.add_function(
... function_id='read_decrypted',
... function=read_key,
... inputs=['decrypted_fpath'],
... outputs=['decrypted']
... )
'read_decrypted'
>>> dsp.add_function(
... 'read_encrypted', read_key, ['encrypted_fpath'], ['encrypted'],
... input_domain=osp.isfile
... )
'read_encrypted'
>>> dsp.add_function(
... 'write_decrypted', write_key, ['decrypted_fpath', 'decrypted'],
... input_domain=osp.isfile
... )
'write_decrypted'
>>> dsp.add_function(
... 'write_encrypted', write_key, ['encrypted_fpath', 'encrypted']
... )
'write_encrypted'
Note: For more details on how to create a ``Dispatcher`` see:
``add_data()``, ``add_func()``, ``add_function()``,
``add_dispatcher()``, ``SubDispatch``, ``MapDispatch``,
``SubDispatchFunction``, ``SubDispatchPipe``, and ``DispatchPipe``.
To inspect and visualize the dataflow execution model, you can simply
plot the graph as follow:
::
>>> dsp.plot()
[graph]
Tip: You can explore the diagram by clicking on it.
Dispatching
===========
To see the dataflow execution model in action and its workflow to
generate a key, to encrypt a message, and to write the encrypt data,
you can simply invoke ``dispatch()`` or ``__call__()`` methods of the
``dsp``:
>>> import tempfile
>>> tempdir = tempfile.mkdtemp()
>>> message = "secret message"
>>> sol = dsp(inputs=dict(
... decrypted=message,
... encrypted_fpath=osp.join(tempdir, 'data.secret'),
... key_fpath=osp.join(tempdir,'key.key')
... ))
>>> sol.plot(index=True)
[graph]
Note: As you can see from the workflow graph (orange nodes), when some
function’s inputs does not respect its domain, the Dispatcher
automatically finds an alternative path to estimate all computable
outputs. The same logic applies when there is a function failure.
Now to decrypt the data and verify the message without saving the
decrypted message, you just need to execute again the ``dsp`` changing
the *inputs* and setting the desired *outputs*. In this way, the
dispatcher automatically selects and executes only a sub-part of the
dataflow execution model.
>>> dsp(
... inputs=sh.selector(('encrypted_fpath', 'key_fpath'), sol),
... outputs=['decrypted']
... )['decrypted'] == message
True
If you want to visualize the latest workflow of the dispatcher, you
can use the ``plot()`` method with the keyword ``workflow=True``:
>>> dsp.plot(workflow=True, index=True)
[graph]
Raw data
{
"_id": null,
"home_page": "https://github.com/vinci1it2000/schedula",
"name": "schedula",
"maintainer": null,
"docs_url": null,
"requires_python": null,
"maintainer_email": null,
"keywords": "flow-based programming, dataflow, parallel, asynchronous, async, scheduling, dispatch, functional programming, dataflow programming",
"author": "Vincenzo Arcidiacono",
"author_email": "vinci1it2000@gmail.com",
"download_url": "https://files.pythonhosted.org/packages/ef/7b/c7f8b34c85eddaa2d1ab9d08f4a1796ae79a58bc4e1975cb12e210e0dd8a/schedula-1.5.53.tar.gz",
"platform": null,
"description": "\nAbout schedula\n**************\n\n**schedula** is a dynamic flow-based programming environment for\npython, that handles automatically the control flow of the program.\nThe control flow generally is represented by a Directed Acyclic Graph\n(DAG), where nodes are the operations/functions to be executed and\nedges are the dependencies between them.\n\nThe algorithm of **schedula** dates back to 2014, when a colleague\nasked for a method to automatically populate the missing data of a\ndatabase. The imputation method chosen to complete the database was a\nsystem of interdependent physical formulas - i.e., the inputs of a\nformula are the outputs of other formulas. The current library has\nbeen developed in 2015 to support the design of the CO:sub:`2`MPAS\n`tool <https://github.com/JRCSTU/CO2MPAS-TA>`_ - a CO:sub:`2` vehicle\n`simulator\n<https://jrcstu.github.io/co2mpas/model/?url=https://jrcstu.github.io/co2mpas/model/core/CO2MPAS_model/calibrate_with_wltp_h.html>`_.\nDuring the developing phase, the physical formulas (more than 700)\nwere known on the contrary of the software inputs and outputs.\n\n\nWhy schedula?\n=============\n\nThe design of flow-based programs begins with the definition of the\ncontrol flow graph, and implicitly of its inputs and outputs. If the\nprogram accepts multiple combinations of inputs and outputs, you have\nto design and code all control flow graphs. With normal schedulers, it\ncan be very demanding.\n\nWhile with **schedula**, giving whatever set of inputs, it\nautomatically calculates any of the desired computable outputs,\nchoosing the most appropriate DAG from the dataflow execution model.\n\nNote: The DAG is determined at runtime and it is extracted using the\n shortest path from the provided inputs. The path is calculated\n based on a weighted directed graph (dataflow execution model) with\n a modified Dijkstra algorithm.\n\n**schedula** makes the code easy to debug, to optimize, and to present\nit to a non-IT audience through its interactive graphs and charts. It\nprovides the option to run a model asynchronously or in parallel\nmanaging automatically the Global Interpreter Lock (GIL), and to\nconvert a model into a web API service.\n\n.. _start-install-core:\n\n\nInstallation\n************\n\nTo install it use (with root privileges):\n\n.. code:: console\n\n $ pip install schedula\n\nor download the last git version and use (with root privileges):\n\n.. code:: console\n\n $ python setup.py install\n\n.. _end-install-core:\n\n\nInstall extras\n==============\n\nSome additional functionality is enabled installing the following\nextras:\n\n* ``io``: enables to read/write functions.\n\n* ``plot``: enables the plot of the Dispatcher model and workflow\n (see ``plot()``).\n\n* ``web``: enables to build a dispatcher Flask app (see ``web()``).\n\n* ``sphinx``: enables the sphinx extension directives (i.e.,\n autosummary and dispatcher).\n\n* ``parallel``: enables the parallel execution of Dispatcher model.\n\nTo install **schedula** and all extras, do:\n\n.. code:: console\n\n $ pip install 'schedula[all]'\n\nNote: ``plot`` extra requires **Graphviz**. Make sure that the directory\n containing the ``dot`` executable is on your systems\u2019 path. If you\n have not you can install it from its `download page\n <https://www.graphviz.org/download/>`_.\n\n\nTutorial\n********\n\nLet\u2019s assume that we want develop a tool to automatically manage the\nsymmetric cryptography. The base idea is to open a file, read its\ncontent, encrypt or decrypt the data and then write them out to a new\nfile. This tutorial shows how to:\n\n 1. `define <#model-definition>`_ and `execute <#dispatching>`_ a\n dataflow execution model,\n\n 2. `extract <#sub-model-extraction>`_ a sub-model, and\n\n 3. `deploy <#api-server>`_ a web API service.\n\nNote: You can find more examples, on how to use the **schedula** library,\n into the folder `examples\n <https://github.com/vinci1it2000/schedula/tree/master/examples>`_.\n\n\nModel definition\n================\n\nFirst of all we start defining an empty ``Dispatcher`` named\n*symmetric_cryptography* that defines the dataflow execution model:\n\n::\n\n >>> import schedula as sh\n >>> dsp = sh.Dispatcher(name='symmetric_cryptography')\n\nThere are two main ways to get a key, we can either generate a new one\nor use one that has previously been generated. Hence, we can define\nthree functions to simply generate, save, and load the key. To\nautomatically populate the model inheriting the arguments names, we\ncan use the decorator ``add_function()`` as follow:\n\n::\n\n >>> import os.path as osp\n >>> from cryptography.fernet import Fernet\n >>> @sh.add_function(dsp, outputs=['key'], weight=2)\n ... def generate_key():\n ... return Fernet.generate_key().decode()\n >>> @sh.add_function(dsp)\n ... def write_key(key_fpath, key):\n ... with open(key_fpath, 'w') as f:\n ... f.write(key)\n >>> @sh.add_function(dsp, outputs=['key'], input_domain=osp.isfile)\n ... def read_key(key_fpath):\n ... with open(key_fpath) as f:\n ... return f.read()\n\nNote: Since Python does not come with anything that can encrypt/decrypt\n files, in this tutorial, we use a third party module named\n ``cryptography``. To install it execute ``pip install\n cryptography``.\n\nTo encrypt/decrypt a message, you will need a key as previously\ndefined and your data *encrypted* or *decrypted*. Therefore, we can\ndefine two functions and add them, as before, to the model:\n\n::\n\n >>> @sh.add_function(dsp, outputs=['encrypted'])\n ... def encrypt_message(key, decrypted):\n ... return Fernet(key.encode()).encrypt(decrypted.encode()).decode()\n >>> @sh.add_function(dsp, outputs=['decrypted'])\n ... def decrypt_message(key, encrypted):\n ... return Fernet(key.encode()).decrypt(encrypted.encode()).decode()\n\nFinally, to read and write the encrypted or decrypted message,\naccording to the functional programming philosophy, we can reuse the\npreviously defined functions ``read_key`` and ``write_key`` changing\nthe model mapping (i.e., *function_id*, *inputs*, and *outputs*). To\nadd to the model, we can simply use the ``add_function`` method as\nfollow:\n\n::\n\n >>> dsp.add_function(\n ... function_id='read_decrypted',\n ... function=read_key,\n ... inputs=['decrypted_fpath'],\n ... outputs=['decrypted']\n ... )\n 'read_decrypted'\n >>> dsp.add_function(\n ... 'read_encrypted', read_key, ['encrypted_fpath'], ['encrypted'],\n ... input_domain=osp.isfile\n ... )\n 'read_encrypted'\n >>> dsp.add_function(\n ... 'write_decrypted', write_key, ['decrypted_fpath', 'decrypted'],\n ... input_domain=osp.isfile\n ... )\n 'write_decrypted'\n >>> dsp.add_function(\n ... 'write_encrypted', write_key, ['encrypted_fpath', 'encrypted']\n ... )\n 'write_encrypted'\n\nNote: For more details on how to create a ``Dispatcher`` see:\n ``add_data()``, ``add_func()``, ``add_function()``,\n ``add_dispatcher()``, ``SubDispatch``, ``MapDispatch``,\n ``SubDispatchFunction``, ``SubDispatchPipe``, and ``DispatchPipe``.\n\nTo inspect and visualize the dataflow execution model, you can simply\nplot the graph as follow:\n\n::\n\n >>> dsp.plot() \n\n[graph]\n\nTip: You can explore the diagram by clicking on it.\n\n\nDispatching\n===========\n\nTo see the dataflow execution model in action and its workflow to\ngenerate a key, to encrypt a message, and to write the encrypt data,\nyou can simply invoke ``dispatch()`` or ``__call__()`` methods of the\n``dsp``:\n\n>>> import tempfile\n>>> tempdir = tempfile.mkdtemp()\n>>> message = \"secret message\"\n>>> sol = dsp(inputs=dict(\n... decrypted=message,\n... encrypted_fpath=osp.join(tempdir, 'data.secret'),\n... key_fpath=osp.join(tempdir,'key.key')\n... ))\n>>> sol.plot(index=True) \n\n[graph]\n\nNote: As you can see from the workflow graph (orange nodes), when some\n function\u2019s inputs does not respect its domain, the Dispatcher\n automatically finds an alternative path to estimate all computable\n outputs. The same logic applies when there is a function failure.\n\nNow to decrypt the data and verify the message without saving the\ndecrypted message, you just need to execute again the ``dsp`` changing\nthe *inputs* and setting the desired *outputs*. In this way, the\ndispatcher automatically selects and executes only a sub-part of the\ndataflow execution model.\n\n>>> dsp(\n... inputs=sh.selector(('encrypted_fpath', 'key_fpath'), sol),\n... outputs=['decrypted']\n... )['decrypted'] == message\nTrue\n\nIf you want to visualize the latest workflow of the dispatcher, you\ncan use the ``plot()`` method with the keyword ``workflow=True``:\n\n>>> dsp.plot(workflow=True, index=True) \n\n[graph]\n",
"bugtrack_url": null,
"license": "EUPL 1.1+",
"summary": "Produce a plan that dispatches calls based on a graph of functions, satisfying data dependencies.",
"version": "1.5.53",
"project_urls": {
"Documentation": "https://schedula.readthedocs.io",
"Download": "https://github.com/vinci1it2000/schedula/tarball/v1.5.53",
"Homepage": "https://github.com/vinci1it2000/schedula",
"Issue tracker": "https://github.com/vinci1it2000/schedula/issues"
},
"split_keywords": [
"flow-based programming",
" dataflow",
" parallel",
" asynchronous",
" async",
" scheduling",
" dispatch",
" functional programming",
" dataflow programming"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "6807c585fd087fe7fbed83ccefa68c822ae9c1de869d14f024735e64db4f1ff5",
"md5": "73fa4ef1505465b897f86ee5a529f4b7",
"sha256": "99abe585de585fcbbe63a55e76b167d1b810c0e68e7414de2df1875b99eb35cc"
},
"downloads": -1,
"filename": "schedula-1.5.53-py2.py3-none-any.whl",
"has_sig": false,
"md5_digest": "73fa4ef1505465b897f86ee5a529f4b7",
"packagetype": "bdist_wheel",
"python_version": "py2.py3",
"requires_python": null,
"size": 6159621,
"upload_time": "2025-01-16T15:27:35",
"upload_time_iso_8601": "2025-01-16T15:27:35.269610Z",
"url": "https://files.pythonhosted.org/packages/68/07/c585fd087fe7fbed83ccefa68c822ae9c1de869d14f024735e64db4f1ff5/schedula-1.5.53-py2.py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "ef7bc7f8b34c85eddaa2d1ab9d08f4a1796ae79a58bc4e1975cb12e210e0dd8a",
"md5": "855770e4b84628798bcbdf19dcc84e89",
"sha256": "edc994747f67cb745527b29aa0c4d5c217922488fb2e5c6299dfd192c5fd73f4"
},
"downloads": -1,
"filename": "schedula-1.5.53.tar.gz",
"has_sig": false,
"md5_digest": "855770e4b84628798bcbdf19dcc84e89",
"packagetype": "sdist",
"python_version": "source",
"requires_python": null,
"size": 6003444,
"upload_time": "2025-01-16T15:27:39",
"upload_time_iso_8601": "2025-01-16T15:27:39.261868Z",
"url": "https://files.pythonhosted.org/packages/ef/7b/c7f8b34c85eddaa2d1ab9d08f4a1796ae79a58bc4e1975cb12e210e0dd8a/schedula-1.5.53.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-01-16 15:27:39",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "vinci1it2000",
"github_project": "schedula",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "schedula"
}