=======================================
Python Broqer
=======================================
.. image:: https://img.shields.io/pypi/v/broqer.svg
:target: https://pypi.python.org/pypi/broqer
.. image:: https://readthedocs.org/projects/python-broqer/badge/?version=latest
:target: https://python-broqer.readthedocs.io/en/latest
.. image:: https://img.shields.io/github/license/semiversus/python-broqer.svg
:target: https://en.wikipedia.org/wiki/MIT_License
Initial focus on embedded systems *Broqer* can be used wherever continuous streams of data have to be processed - and they are everywhere. Watch out!
.. image:: https://cdn.rawgit.com/semiversus/python-broqer/7beb7379/docs/logo.svg
.. header
Synopsis
========
- Pure python implementation without dependencies
- Under MIT license (2018 Günther Jena)
- Source is hosted on GitHub.com_
- Documentation is hosted on ReadTheDocs.com_
- Tested on Python 3.7. 3.8, 3.9, 3.10 and 3.11
- Unit tested with pytest_, coding style checked with Flake8_, static type checked with mypy_, static code checked with Pylint_, documented with Sphinx_
- Operators known from ReactiveX_ and other streaming frameworks (like Map_, CombineLatest_, ...)
+ Centralised object to keep track of publishers and subscribers
+ Starting point to build applications with a microservice architecture
.. _pytest: https://docs.pytest.org/en/latest
.. _Flake8: http://flake8.pycqa.org/en/latest/
.. _mypy: http://mypy-lang.org/
.. _Pylint: https://www.pylint.org/
.. _Sphinx: http://www.sphinx-doc.org
.. _GitHub.com: https://github.com/semiversus/python-broqer
.. _ReadTheDocs.com: http://python-broqer.readthedocs.io
.. _ReactiveX: http://reactivex.io/
Showcase
========
In other frameworks a *Publisher* is sometimes called *Oberservable*. A *Subscriber*
is able to observe changes the publisher is emitting. With these basics you're
able to use the observer pattern - let's see!
Observer pattern
----------------
Subscribing to a publisher is done via the .subscribe() method.
A simple subscriber is ``Sink`` which is calling a function with optional positional
and keyword arguments.
.. code-block:: python3
>>> from broqer import Publisher, Sink
>>> a = Publisher(5) # create a publisher with state `5`
>>> s = Sink(print, 'Change:') # create a subscriber
>>> disposable = a.subscribe(s) # subscribe subscriber to publisher
Change: 5
>>> a.notify(3) # change the state
Change: 3
>>> disposable.dispose() # unsubscribe
Combine publishers with arithmetic operators
--------------------------------------------
You're able to create publishers on the fly by combining two publishers with
the common operators (like ``+``, ``>``, ``<<``, ...).
.. code-block:: python3
>>> a = Publisher(1)
>>> b = Publisher(3)
>>> c = a * 3 > b # create a new publisher via operator overloading
>>> disposable = c.subscribe(Sink(print, 'c:'))
c: False
>>> a.notify(2)
c: True
>>> b.notify(10)
c: False
Also fancy stuff like getting item by index or key is possible:
.. code-block:: python3
>>> i = Publisher('a')
>>> d = Publisher({'a':100, 'b':200, 'c':300})
>>> disposable = d[i].subscribe(Sink(print, 'r:'))
r: 100
>>> i.notify('c')
r: 300
>>> d.notify({'c':123})
r: 123
Some python built in functions can't return Publishers (e.g. ``len()`` needs to
return an integer). For these cases special functions are defined in broqer: ``Str``,
``Int``, ``Float``, ``Len`` and ``In`` (for ``x in y``). Also other functions
for convenience are available: ``All``, ``Any``, ``BitwiseAnd`` and ``BitwiseOr``.
Attribute access on a publisher is building a publisher where the actual attribute
access is done on emitting values. A publisher has to know, which type it should
mimic - this is done via ``.inherit_type(type)``.
.. code-block:: python3
>>> i = Publisher('Attribute access made REACTIVE')
>>> i.inherit_type(str)
>>> disposable = i.lower().split(sep=' ').subscribe(Sink(print))
['attribute', 'access', 'made', 'reactive']
>>> i.notify('Reactive and pythonic')
['reactive', 'and', 'pythonic']
Function decorators
-------------------
Make your own operators on the fly with function decorators. Decorators are
available for ``Accumulate``, ``CombineLatest``, ``Filter``, ``Map``, ``MapAsync``,
``MapThreaded``, ``Reduce`` and ``Sink``.
.. code-block:: python3
>>> from broqer import op
>>> @op.build_map
... def count_vowels(s):
... return sum([s.count(v) for v in 'aeiou'])
>>> msg = Publisher('Hello World!')
>>> disposable = (msg | count_vowels).subscribe(Sink(print, 'Number of vowels:'))
Number of vowels: 3
>>> msg.notify('Wahuuu')
Number of vowels: 4
You can even make configurable ``Map`` s and ``Filter`` s:
.. code-block:: python3
>>> import re
>>> @op.build_filter_factory
... def filter_pattern(pattern, s):
... return re.search(pattern, s) is not None
>>> msg = Publisher('Cars passed: 135!')
>>> disposable = (msg | filter_pattern('[0-9]+')).subscribe(Sink(print))
Cars passed: 135!
>>> msg.notify('No cars have passed')
>>> msg.notify('Only 1 car has passed')
Only 1 car has passed
Install
=======
.. code-block:: bash
pip install broqer
Credits
=======
Broqer was inspired by:
* RxPY_: Reactive Extension for Python (by Børge Lanes and Dag Brattli)
* aioreactive_: Async/Await reactive tools for Python (by Dag Brattli)
* streamz_: build pipelines to manage continuous streams of data (by Matthew Rocklin)
* MQTT_: M2M connectivity protocol
* `Florian Feurstein <https://github.com/flofeurstein>`_: spending hours of discussion, coming up with great ideas and help me understand the concepts!
.. _RxPY: https://github.com/ReactiveX/RxPY
.. _aioreactive: https://github.com/dbrattli/aioreactive
.. _streamz: https://github.com/mrocklin/streamz
.. _MQTT: http://mqtt.org/
.. _Value: https://python-broqer.readthedocs.io/en/latest/subjects.html#value
.. _Publisher: https://python-broqer.readthedocs.io/en/latest/publishers.html#publisher
.. _Subscriber: https://python-broqer.readthedocs.io/en/latest/subscribers.html#subscriber
.. _CombineLatest: https://python-broqer.readthedocs.io/en/latest/operators/combine_latest.py
.. _Filter: https://python-broqer.readthedocs.io/en/latest/operators/filter_.py
.. _Map: https://python-broqer.readthedocs.io/en/latest/operators/map_.py
.. _MapAsync: https://python-broqer.readthedocs.io/en/latest/operators/map_async.py
.. _Sink: https://python-broqer.readthedocs.io/en/latest/operators/subscribers/sink.py
.. _SinkAsync: https://python-broqer.readthedocs.io/en/latest/operators/subscribers/sink_async.py
.. _OnEmitFuture: https://python-broqer.readthedocs.io/en/latest/subscribers.html#trace
.. _Trace: https://python-broqer.readthedocs.io/en/latest/subscribers.html#trace
.. api
API
===
Publishers
----------
A Publisher_ is the source for messages.
+------------------------------------+--------------------------------------------------------------------------+
| Publisher_ () | Basic publisher |
+------------------------------------+--------------------------------------------------------------------------+
Operators
---------
+-------------------------------------+-----------------------------------------------------------------------------+
| CombineLatest_ (\*publishers) | Combine the latest emit of multiple publishers and emit the combination |
+-------------------------------------+-----------------------------------------------------------------------------+
| Filter_ (predicate, ...) | Filters values based on a ``predicate`` function |
+-------------------------------------+-----------------------------------------------------------------------------+
| Map_ (map_func, \*args, \*\*kwargs) | Apply ``map_func(*args, value, **kwargs)`` to each emitted value |
+-------------------------------------+-----------------------------------------------------------------------------+
| MapAsync_ (coro, mode, ...) | Apply ``coro(*args, value, **kwargs)`` to each emitted value |
+-------------------------------------+-----------------------------------------------------------------------------+
| Throttle (duration) | Limit the number of emits per duration |
+-------------------------------------+-----------------------------------------------------------------------------+
Subscribers
-----------
A Subscriber_ is the sink for messages.
+----------------------------------+--------------------------------------------------------------+
| Sink_ (func, \*args, \*\*kwargs) | Apply ``func(*args, value, **kwargs)`` to each emitted value |
+----------------------------------+--------------------------------------------------------------+
| SinkAsync_ (coro, ...) | Apply ``coro(*args, value, **kwargs)`` to each emitted value |
+----------------------------------+--------------------------------------------------------------+
| OnEmitFuture_ (timeout=None) | Build a future able to await for |
+----------------------------------+--------------------------------------------------------------+
| Trace_ (d) | Debug output for publishers |
+----------------------------------+--------------------------------------------------------------+
Values
--------
+--------------------------+--------------------------------------------------------------+
| Value_ (\*init) | Publisher and Subscriber |
+--------------------------+--------------------------------------------------------------+
Raw data
{
"_id": null,
"home_page": "https://github.com/semiversus/python-broqer",
"name": "broqer",
"maintainer": "",
"docs_url": null,
"requires_python": "",
"maintainer_email": "",
"keywords": "broker publisher subscriber reactive frp observable",
"author": "G\u00fcnther Jena",
"author_email": "guenther@jena.at",
"download_url": "https://files.pythonhosted.org/packages/eb/7a/cfbb7d38a1db331970c3e09dfecae8550ffc8627849dc95e83c86ec91e09/broqer-3.0.3.tar.gz",
"platform": null,
"description": "=======================================\nPython Broqer\n=======================================\n\n.. image:: https://img.shields.io/pypi/v/broqer.svg\n :target: https://pypi.python.org/pypi/broqer\n\n.. image:: https://readthedocs.org/projects/python-broqer/badge/?version=latest\n :target: https://python-broqer.readthedocs.io/en/latest\n\n.. image:: https://img.shields.io/github/license/semiversus/python-broqer.svg\n :target: https://en.wikipedia.org/wiki/MIT_License\n\nInitial focus on embedded systems *Broqer* can be used wherever continuous streams of data have to be processed - and they are everywhere. Watch out!\n\n.. image:: https://cdn.rawgit.com/semiversus/python-broqer/7beb7379/docs/logo.svg\n\n.. header\n\nSynopsis\n========\n\n- Pure python implementation without dependencies\n- Under MIT license (2018 G\u00fcnther Jena)\n- Source is hosted on GitHub.com_\n- Documentation is hosted on ReadTheDocs.com_\n- Tested on Python 3.7. 3.8, 3.9, 3.10 and 3.11\n- Unit tested with pytest_, coding style checked with Flake8_, static type checked with mypy_, static code checked with Pylint_, documented with Sphinx_\n- Operators known from ReactiveX_ and other streaming frameworks (like Map_, CombineLatest_, ...)\n\n + Centralised object to keep track of publishers and subscribers\n + Starting point to build applications with a microservice architecture\n\n.. _pytest: https://docs.pytest.org/en/latest\n.. _Flake8: http://flake8.pycqa.org/en/latest/\n.. _mypy: http://mypy-lang.org/\n.. _Pylint: https://www.pylint.org/\n.. _Sphinx: http://www.sphinx-doc.org\n.. _GitHub.com: https://github.com/semiversus/python-broqer\n.. _ReadTheDocs.com: http://python-broqer.readthedocs.io\n.. _ReactiveX: http://reactivex.io/\n\nShowcase\n========\n\nIn other frameworks a *Publisher* is sometimes called *Oberservable*. A *Subscriber*\nis able to observe changes the publisher is emitting. With these basics you're\nable to use the observer pattern - let's see!\n\nObserver pattern\n----------------\n\nSubscribing to a publisher is done via the .subscribe() method.\nA simple subscriber is ``Sink`` which is calling a function with optional positional\nand keyword arguments.\n\n.. code-block:: python3\n\n >>> from broqer import Publisher, Sink\n >>> a = Publisher(5) # create a publisher with state `5`\n >>> s = Sink(print, 'Change:') # create a subscriber\n >>> disposable = a.subscribe(s) # subscribe subscriber to publisher\n Change: 5\n\n >>> a.notify(3) # change the state\n Change: 3\n\n >>> disposable.dispose() # unsubscribe\n\nCombine publishers with arithmetic operators\n--------------------------------------------\n\nYou're able to create publishers on the fly by combining two publishers with\nthe common operators (like ``+``, ``>``, ``<<``, ...).\n\n.. code-block:: python3\n\n >>> a = Publisher(1)\n >>> b = Publisher(3)\n\n >>> c = a * 3 > b # create a new publisher via operator overloading\n >>> disposable = c.subscribe(Sink(print, 'c:'))\n c: False\n\n >>> a.notify(2)\n c: True\n\n >>> b.notify(10)\n c: False\n\nAlso fancy stuff like getting item by index or key is possible:\n\n.. code-block:: python3\n\n >>> i = Publisher('a')\n >>> d = Publisher({'a':100, 'b':200, 'c':300})\n\n >>> disposable = d[i].subscribe(Sink(print, 'r:'))\n r: 100\n\n >>> i.notify('c')\n r: 300\n >>> d.notify({'c':123})\n r: 123\n\nSome python built in functions can't return Publishers (e.g. ``len()`` needs to\nreturn an integer). For these cases special functions are defined in broqer: ``Str``,\n``Int``, ``Float``, ``Len`` and ``In`` (for ``x in y``). Also other functions\nfor convenience are available: ``All``, ``Any``, ``BitwiseAnd`` and ``BitwiseOr``.\n\nAttribute access on a publisher is building a publisher where the actual attribute\naccess is done on emitting values. A publisher has to know, which type it should\nmimic - this is done via ``.inherit_type(type)``.\n\n.. code-block:: python3\n\n >>> i = Publisher('Attribute access made REACTIVE')\n >>> i.inherit_type(str)\n >>> disposable = i.lower().split(sep=' ').subscribe(Sink(print))\n ['attribute', 'access', 'made', 'reactive']\n\n >>> i.notify('Reactive and pythonic')\n ['reactive', 'and', 'pythonic']\n\nFunction decorators\n-------------------\n\nMake your own operators on the fly with function decorators. Decorators are\navailable for ``Accumulate``, ``CombineLatest``, ``Filter``, ``Map``, ``MapAsync``,\n``MapThreaded``, ``Reduce`` and ``Sink``.\n\n.. code-block:: python3\n\n >>> from broqer import op\n >>> @op.build_map\n ... def count_vowels(s):\n ... return sum([s.count(v) for v in 'aeiou'])\n\n >>> msg = Publisher('Hello World!')\n >>> disposable = (msg | count_vowels).subscribe(Sink(print, 'Number of vowels:'))\n Number of vowels: 3\n >>> msg.notify('Wahuuu')\n Number of vowels: 4\n\nYou can even make configurable ``Map`` s and ``Filter`` s:\n\n.. code-block:: python3\n\n >>> import re\n\n >>> @op.build_filter_factory\n ... def filter_pattern(pattern, s):\n ... return re.search(pattern, s) is not None\n\n >>> msg = Publisher('Cars passed: 135!')\n >>> disposable = (msg | filter_pattern('[0-9]+')).subscribe(Sink(print))\n Cars passed: 135!\n >>> msg.notify('No cars have passed')\n >>> msg.notify('Only 1 car has passed')\n Only 1 car has passed\n\n\nInstall\n=======\n\n.. code-block:: bash\n\n pip install broqer\n\nCredits\n=======\n\nBroqer was inspired by:\n\n* RxPY_: Reactive Extension for Python (by B\u00f8rge Lanes and Dag Brattli)\n* aioreactive_: Async/Await reactive tools for Python (by Dag Brattli)\n* streamz_: build pipelines to manage continuous streams of data (by Matthew Rocklin)\n* MQTT_: M2M connectivity protocol\n* `Florian Feurstein <https://github.com/flofeurstein>`_: spending hours of discussion, coming up with great ideas and help me understand the concepts!\n\n.. _RxPY: https://github.com/ReactiveX/RxPY\n.. _aioreactive: https://github.com/dbrattli/aioreactive\n.. _streamz: https://github.com/mrocklin/streamz\n.. _MQTT: http://mqtt.org/\n.. _Value: https://python-broqer.readthedocs.io/en/latest/subjects.html#value\n.. _Publisher: https://python-broqer.readthedocs.io/en/latest/publishers.html#publisher\n.. _Subscriber: https://python-broqer.readthedocs.io/en/latest/subscribers.html#subscriber\n.. _CombineLatest: https://python-broqer.readthedocs.io/en/latest/operators/combine_latest.py\n.. _Filter: https://python-broqer.readthedocs.io/en/latest/operators/filter_.py\n.. _Map: https://python-broqer.readthedocs.io/en/latest/operators/map_.py\n.. _MapAsync: https://python-broqer.readthedocs.io/en/latest/operators/map_async.py\n.. _Sink: https://python-broqer.readthedocs.io/en/latest/operators/subscribers/sink.py\n.. _SinkAsync: https://python-broqer.readthedocs.io/en/latest/operators/subscribers/sink_async.py\n.. _OnEmitFuture: https://python-broqer.readthedocs.io/en/latest/subscribers.html#trace\n.. _Trace: https://python-broqer.readthedocs.io/en/latest/subscribers.html#trace\n\n.. api\n\nAPI\n===\n\nPublishers\n----------\n\nA Publisher_ is the source for messages.\n\n+------------------------------------+--------------------------------------------------------------------------+\n| Publisher_ () | Basic publisher |\n+------------------------------------+--------------------------------------------------------------------------+\n\nOperators\n---------\n\n+-------------------------------------+-----------------------------------------------------------------------------+\n| CombineLatest_ (\\*publishers) | Combine the latest emit of multiple publishers and emit the combination |\n+-------------------------------------+-----------------------------------------------------------------------------+\n| Filter_ (predicate, ...) | Filters values based on a ``predicate`` function |\n+-------------------------------------+-----------------------------------------------------------------------------+\n| Map_ (map_func, \\*args, \\*\\*kwargs) | Apply ``map_func(*args, value, **kwargs)`` to each emitted value |\n+-------------------------------------+-----------------------------------------------------------------------------+\n| MapAsync_ (coro, mode, ...) | Apply ``coro(*args, value, **kwargs)`` to each emitted value |\n+-------------------------------------+-----------------------------------------------------------------------------+\n| Throttle (duration) | Limit the number of emits per duration |\n+-------------------------------------+-----------------------------------------------------------------------------+\n\nSubscribers\n-----------\n\nA Subscriber_ is the sink for messages.\n\n+----------------------------------+--------------------------------------------------------------+\n| Sink_ (func, \\*args, \\*\\*kwargs) | Apply ``func(*args, value, **kwargs)`` to each emitted value |\n+----------------------------------+--------------------------------------------------------------+\n| SinkAsync_ (coro, ...) | Apply ``coro(*args, value, **kwargs)`` to each emitted value |\n+----------------------------------+--------------------------------------------------------------+\n| OnEmitFuture_ (timeout=None) | Build a future able to await for |\n+----------------------------------+--------------------------------------------------------------+\n| Trace_ (d) | Debug output for publishers |\n+----------------------------------+--------------------------------------------------------------+\n\nValues\n--------\n\n+--------------------------+--------------------------------------------------------------+\n| Value_ (\\*init) | Publisher and Subscriber |\n+--------------------------+--------------------------------------------------------------+\n",
"bugtrack_url": null,
"license": "MIT license",
"summary": "Carefully crafted library to operate with continuous streams of data in a reactive style with publish/subscribe and broker functionality.",
"version": "3.0.3",
"project_urls": {
"Homepage": "https://github.com/semiversus/python-broqer"
},
"split_keywords": [
"broker",
"publisher",
"subscriber",
"reactive",
"frp",
"observable"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "5c0354f7d16ffa83184c5a58935d311defcd63f221a2faa5433756a2cde85ed5",
"md5": "8568c0ee4ac5d98f20ff6af14c18cf78",
"sha256": "7499a3c7fcee28d91f677ccdc594736e299ea384291f727758b2db790b357ecc"
},
"downloads": -1,
"filename": "broqer-3.0.3-py2.py3-none-any.whl",
"has_sig": false,
"md5_digest": "8568c0ee4ac5d98f20ff6af14c18cf78",
"packagetype": "bdist_wheel",
"python_version": "py2.py3",
"requires_python": null,
"size": 33851,
"upload_time": "2024-01-18T20:34:43",
"upload_time_iso_8601": "2024-01-18T20:34:43.104609Z",
"url": "https://files.pythonhosted.org/packages/5c/03/54f7d16ffa83184c5a58935d311defcd63f221a2faa5433756a2cde85ed5/broqer-3.0.3-py2.py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "eb7acfbb7d38a1db331970c3e09dfecae8550ffc8627849dc95e83c86ec91e09",
"md5": "233f21868b8e3baf2408b0822f2de815",
"sha256": "a427caaff186089a0f1bfa4cd2d3acc251eabf704f315fdcdabe7e5493e82a36"
},
"downloads": -1,
"filename": "broqer-3.0.3.tar.gz",
"has_sig": false,
"md5_digest": "233f21868b8e3baf2408b0822f2de815",
"packagetype": "sdist",
"python_version": "source",
"requires_python": null,
"size": 60207,
"upload_time": "2024-01-18T20:34:47",
"upload_time_iso_8601": "2024-01-18T20:34:47.466244Z",
"url": "https://files.pythonhosted.org/packages/eb/7a/cfbb7d38a1db331970c3e09dfecae8550ffc8627849dc95e83c86ec91e09/broqer-3.0.3.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-01-18 20:34:47",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "semiversus",
"github_project": "python-broqer",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "broqer"
}