# ate-dispatcher
[![Project License - MIT](https://img.shields.io/pypi/l/ate-dispatcher.svg)](./LICENSE.txt)
[![pypi version](https://img.shields.io/pypi/v/ate-dispatcher.svg)](https://pypi.org/project/ate-dispatcher/)
[![conda version](https://img.shields.io/conda/vn/conda-forge/ate-dispatcher.svg)](https://www.anaconda.com/download/)
[![download count](https://img.shields.io/conda/dn/conda-forge/ate-dispatcher.svg)](https://www.anaconda.com/download/)
[![Downloads](https://pepy.tech/badge/ate-dispatcher)](https://pepy.tech/project/ate-dispatcher)
[![PyPI status](https://img.shields.io/pypi/status/ate-dispatcher.svg)](https://github.com/Semi-ATE/ate-dispatcher)
[![Unit tests](https://github.com/Semi-ATE/ate-dispatcher/actions/workflows/test_python.yml/badge.svg)](https://github.com/Semi-ATE/ate-dispatcher/actions/workflows/test_python.yml)
[![codecov](https://codecov.io/gh/Semi-ATE/ate-dispatcher/branch/main/graph/badge.svg?token=5MLBM0PHLL)](https://codecov.io/gh/Semi-ATE/ate-dispatcher)
## Overview
A pure-Python, thread-based, asynchronous dispatcher used to gather results
from distributed agents that react on a given event.
The dispatcher reacts to an event tagged with a given topic, which is relayed
to a set of `Producer` objects (which registered the kind of
events they react to). Each result is then passed to a set of `ResultListener`
objects, which will attend to the response depending if they were registered
to attend the given topic.
The exposed API is thread-safe, asynchronous and lock-free, which makes it
suitable for tasks that are lightweight and quick.
## Dependencies
This package is pure-Python, and therefore it does not depend on any external
library, besides `typing-extensions`, which is used to import the typing
classes that are not available in older Python 3 versions.
## Installation
You can install this library by using conda or pip package managers,
as it follows:
```bash
# Using conda
conda install ate-dispatcher -c conda-forge
# Using pip
pip install ate-dispatcher
```
## Local development
In order to install a local development version for `ate-dispatcher`, it is
possible to invoke pip:
```bash
pip install -e .
```
## Package usage
`ate-dispatcher` exposes two abstract interfaces (`Producer` and `ResultListener`)
as well as the main dispatcher (`ATEDispatcher`). The former classes are designed
to be inherited, while the last one is designed to be instantiated.
### Defining a producer
A `Producer` object is on charge of producing a response to an incoming input
message from the dispatcher that is tagged with a certain `topic` that the
object can handle. It must implement `produce_dispatcher_output`.
```python
import time
from typing import Any
from ate_dispatcher import Producer
# Defining a producer
class SpecificTopicProducer(Producer):
def __init__(self, _id: int, timeout: int, *topics):
super().__init__()
self.id = _id
self.timeout = timeout / 1000
self.topics = set(topics)
def produce_dispatcher_output(
self, topic: str, *args, **kwargs) -> Any:
time.sleep(timeout)
return {
'id': self.id,
'some_key': topic,
'args': args,
'kwargs': kwargs
}
```
### Defining a result listener
A `ResultListener` object will receive the responses emitted by the `Producer`
objects that reacted to a given topic that the listener also supports. This
is the endpoint for the dispatcher architecture, where all end messages will
arrive. The `ResultListener` subclasses must implement the
`process_dispatcher_result` method.
```python
from typing import Any
from ate_dispatcher import ResultListener
# Defining a result listener
class ResultListenerExample(ResultListener):
def __init__(self):
super().__init__()
self.responses = {}
def clear(self):
self.responses = {}
def process_dispatcher_result(self, topic: str, response: Any):
topic_responses = self.responses.get(topic, [])
topic_responses.append(response)
self.responses[topic] = topic_responses
```
### Creating, using and destroying the dispatcher
After defining the `Producer` and `ResultListener` subclasses, it is necessary
to instantiate and register the `ATEDispatcher instance`. Since the interfaces
inherit from `threading.Thread`, it is necessary to keep track of their
lifetime via the `start` and `end` methods.
```python
import time
# Import the producer and result listener classes
from specific_topic_producer import SpecificTopicProducer
from result_listener_example import ResultListenerExample
# Import the dispatcher
from ate_dispatcher import ATEDispatcher
# Create the dispatcher
dispatcher = ATEDispatcher()
# Start the dispatcher, the lifetime is delegated to the end developer.
dispatcher.start()
# Define the producers and register them against the dispatcher
producer1 = SpecificTopicProducer(0, 200, 'topic1', 'my_topic')
producer1.start()
producer2 = SpecificTopicProducer(1, 500, 'topic1', 'topic2', 'my_topic')
producer2.start()
for topic in producer1.topics:
dispatcher.register_result_producer(producer1, topic)
for topic in producer2.topics:
dispatcher.register_result_producer(producer2, topic)
# Define the result listeners and register them against the dispatcher
listener1 = ResultListenerExample()
listener1.start()
listener2 = ResultListenerExample()
listener2.start()
for topic in ['topic1', 'topic2', 'my_topic']:
# The first listener will receive all responses tagged with all topics
dispatcher.register_result_listener(listener1, topic)
for topic in ['topic1', 'my_topic']:
# This listener will attend to certain topics.
dispatcher.register_result_listener(listener2, topic)
```
Since the dispatcher architecture is completely asynchronous, a trigger message
may indicate a maximum timeout (in milliseconds) for all registered producers
on a given topic to emit their response. Any response received after the
specified timeout will be discarded. Also, the messages will be delivered to
the result listeners as they arrive.
```python
# Trigger a dispatcher request with a 4000ms timeout on the topic1
dispatcher.send_request('topic1', 3, 4, 5, ttl=4000, keyword='b')
# Wait for responses to arrive
time.sleep(1.0)
# Both listeners should have received the responses from both producers.
expected_responses = {
'topic1': [
{
'id': 0
'some_key': 'topic1',
'args': (3, 4, 5),
'kwargs': {
'keyword': 'b'
}
},
{
'id': 1
'some_key': 'topic1',
'args': (3, 4, 5),
'kwargs': {
'keyword': 'b'
}
},
]
}
assert listener1.responses == expected_responses
assert listener2.responses == expected_responses
# Clear the listener responses
listener1.clear()
listener2.clear()
# Trigger a dispatcher request with a 300ms timeout limit on the topic my_topic
dispatcher.send_request('my_topic', 3, 4, 5, ttl=300, keyword='b')
# Wait for responses to arrive
time.sleep(0.5)
# Both listeners should have only received the response from producer1.
expected_responses = {
'my_topic': [
{
'id': 0
'some_key': 'my_topic',
'args': (3, 4, 5),
'kwargs': {
'keyword': 'b'
}
}
]
}
assert listener1.responses == expected_responses
assert listener2.responses == expected_responses
# Clear the listener1 responses
listener1.clear()
# Trigger a dispatcher request with a 1000ms timeout limit on the topic topic2
dispatcher.send_request('topic2', 3, 4, 5, ttl=1000, keyword='b')
# Wait for responses to arrive
time.sleep(1.0)
# Only the listener1 should have received the message produced by the producer2
expected_responses = {
'topic2': [
{
'id': 1
'some_key': 'topic2',
'args': (3, 4, 5),
'kwargs': {
'keyword': 'b'
}
}
]
}
assert listener1.responses == expected_responses
```
Finally, each registered `Producer` and `ResultListener` instance can be
deregistered from an specific topic at any time. However, before stopping either
`Producer` or `ResultListener` instances, each registered topic must be
deregistered.
```python
# Deregister the listener2 and the producer1 from certain topics
dispatcher.deregister_result_listener(listener2, 'my_topic')
dispatcher.deregister_result_producer(producer1, 'topic1')
# Stopping the producer and result listener instances
for topic in producer1.topics:
dispatcher.deregister_result_producer(producer1, topic)
for topic in producer2.topics:
dispatcher.deregister_result_producer(producer2, topic)
producer1.stop()
producer2.stop()
for topic in ['topic1', 'topic2', 'my_topic']:
dispatcher.deregister_result_listener(listener1, topic)
for topic in ['topic1', 'my_topic']:
dispatcher.deregister_result_listener(listener2, topic)
# Stop the dispatcher
dispatcher.stop()
```
## Running tests
We use pytest to run tests as it follows:
```bash
pytest -x -v ate_dispatcher/tests
```
## Changelog
Visit our [CHANGELOG](CHANGELOG.md) file to learn more about our new features and improvements.
## Contribution guidelines
We follow PEP8 and PEP257. We use MyPy type annotations for all functions and classes declared on this package. Feel free to send a PR or create an issue if you have any problem/question.
Raw data
{
"_id": null,
"home_page": "https://github.com/Semi-ATE/ate-dispatcher",
"name": "ate-dispatcher",
"maintainer": "",
"docs_url": null,
"requires_python": "",
"maintainer_email": "",
"keywords": "ATE,dispatcher,async",
"author": "Semi-ATE",
"author_email": "info@Semi-ATE.com",
"download_url": "https://files.pythonhosted.org/packages/fb/ab/4aa2bc7ea3d0383e3dc83fa43acada41cbd2bcb3daf9b0e3e1fd56ca65fe/ate-dispatcher-0.1.0.tar.gz",
"platform": null,
"description": "# ate-dispatcher\n\n[![Project License - MIT](https://img.shields.io/pypi/l/ate-dispatcher.svg)](./LICENSE.txt)\n[![pypi version](https://img.shields.io/pypi/v/ate-dispatcher.svg)](https://pypi.org/project/ate-dispatcher/)\n[![conda version](https://img.shields.io/conda/vn/conda-forge/ate-dispatcher.svg)](https://www.anaconda.com/download/)\n[![download count](https://img.shields.io/conda/dn/conda-forge/ate-dispatcher.svg)](https://www.anaconda.com/download/)\n[![Downloads](https://pepy.tech/badge/ate-dispatcher)](https://pepy.tech/project/ate-dispatcher)\n[![PyPI status](https://img.shields.io/pypi/status/ate-dispatcher.svg)](https://github.com/Semi-ATE/ate-dispatcher)\n[![Unit tests](https://github.com/Semi-ATE/ate-dispatcher/actions/workflows/test_python.yml/badge.svg)](https://github.com/Semi-ATE/ate-dispatcher/actions/workflows/test_python.yml)\n[![codecov](https://codecov.io/gh/Semi-ATE/ate-dispatcher/branch/main/graph/badge.svg?token=5MLBM0PHLL)](https://codecov.io/gh/Semi-ATE/ate-dispatcher)\n\n\n## Overview\nA pure-Python, thread-based, asynchronous dispatcher used to gather results\nfrom distributed agents that react on a given event.\n\nThe dispatcher reacts to an event tagged with a given topic, which is relayed\nto a set of `Producer` objects (which registered the kind of\nevents they react to). Each result is then passed to a set of `ResultListener`\nobjects, which will attend to the response depending if they were registered\nto attend the given topic.\n\nThe exposed API is thread-safe, asynchronous and lock-free, which makes it\nsuitable for tasks that are lightweight and quick.\n\n## Dependencies\nThis package is pure-Python, and therefore it does not depend on any external\nlibrary, besides `typing-extensions`, which is used to import the typing\nclasses that are not available in older Python 3 versions.\n\n## Installation\nYou can install this library by using conda or pip package managers,\nas it follows:\n\n```bash\n# Using conda\nconda install ate-dispatcher -c conda-forge\n\n# Using pip\npip install ate-dispatcher\n```\n\n## Local development\nIn order to install a local development version for `ate-dispatcher`, it is\npossible to invoke pip:\n\n```bash\npip install -e .\n```\n\n## Package usage\n`ate-dispatcher` exposes two abstract interfaces (`Producer` and `ResultListener`)\nas well as the main dispatcher (`ATEDispatcher`). The former classes are designed\nto be inherited, while the last one is designed to be instantiated.\n\n### Defining a producer\nA `Producer` object is on charge of producing a response to an incoming input\nmessage from the dispatcher that is tagged with a certain `topic` that the\nobject can handle. It must implement `produce_dispatcher_output`.\n\n\n```python\nimport time\nfrom typing import Any\nfrom ate_dispatcher import Producer\n\n# Defining a producer\nclass SpecificTopicProducer(Producer):\n def __init__(self, _id: int, timeout: int, *topics):\n super().__init__()\n self.id = _id\n self.timeout = timeout / 1000\n self.topics = set(topics)\n\n def produce_dispatcher_output(\n self, topic: str, *args, **kwargs) -> Any:\n time.sleep(timeout)\n return {\n 'id': self.id,\n 'some_key': topic,\n 'args': args,\n 'kwargs': kwargs\n }\n```\n\n### Defining a result listener\nA `ResultListener` object will receive the responses emitted by the `Producer`\nobjects that reacted to a given topic that the listener also supports. This\nis the endpoint for the dispatcher architecture, where all end messages will\narrive. The `ResultListener` subclasses must implement the\n`process_dispatcher_result` method.\n\n```python\nfrom typing import Any\nfrom ate_dispatcher import ResultListener\n\n# Defining a result listener\nclass ResultListenerExample(ResultListener):\n def __init__(self):\n super().__init__()\n self.responses = {}\n\n def clear(self):\n self.responses = {}\n\n def process_dispatcher_result(self, topic: str, response: Any):\n topic_responses = self.responses.get(topic, [])\n topic_responses.append(response)\n self.responses[topic] = topic_responses\n\n```\n\n### Creating, using and destroying the dispatcher\nAfter defining the `Producer` and `ResultListener` subclasses, it is necessary\nto instantiate and register the `ATEDispatcher instance`. Since the interfaces\ninherit from `threading.Thread`, it is necessary to keep track of their\nlifetime via the `start` and `end` methods.\n\n```python\nimport time\n\n# Import the producer and result listener classes\nfrom specific_topic_producer import SpecificTopicProducer\nfrom result_listener_example import ResultListenerExample\n\n# Import the dispatcher\nfrom ate_dispatcher import ATEDispatcher\n\n# Create the dispatcher\ndispatcher = ATEDispatcher()\n\n# Start the dispatcher, the lifetime is delegated to the end developer.\ndispatcher.start()\n\n# Define the producers and register them against the dispatcher\nproducer1 = SpecificTopicProducer(0, 200, 'topic1', 'my_topic')\nproducer1.start()\n\nproducer2 = SpecificTopicProducer(1, 500, 'topic1', 'topic2', 'my_topic')\nproducer2.start()\n\nfor topic in producer1.topics:\n dispatcher.register_result_producer(producer1, topic)\n\nfor topic in producer2.topics:\n dispatcher.register_result_producer(producer2, topic)\n\n# Define the result listeners and register them against the dispatcher\nlistener1 = ResultListenerExample()\nlistener1.start()\n\nlistener2 = ResultListenerExample()\nlistener2.start()\n\nfor topic in ['topic1', 'topic2', 'my_topic']:\n # The first listener will receive all responses tagged with all topics\n dispatcher.register_result_listener(listener1, topic)\n\nfor topic in ['topic1', 'my_topic']:\n # This listener will attend to certain topics.\n dispatcher.register_result_listener(listener2, topic)\n```\n\nSince the dispatcher architecture is completely asynchronous, a trigger message\nmay indicate a maximum timeout (in milliseconds) for all registered producers\non a given topic to emit their response. Any response received after the\nspecified timeout will be discarded. Also, the messages will be delivered to\nthe result listeners as they arrive.\n\n```python\n# Trigger a dispatcher request with a 4000ms timeout on the topic1\ndispatcher.send_request('topic1', 3, 4, 5, ttl=4000, keyword='b')\n\n# Wait for responses to arrive\ntime.sleep(1.0)\n\n# Both listeners should have received the responses from both producers.\nexpected_responses = {\n 'topic1': [\n {\n 'id': 0\n 'some_key': 'topic1',\n 'args': (3, 4, 5),\n 'kwargs': {\n 'keyword': 'b'\n }\n },\n {\n 'id': 1\n 'some_key': 'topic1',\n 'args': (3, 4, 5),\n 'kwargs': {\n 'keyword': 'b'\n }\n },\n ]\n}\n\nassert listener1.responses == expected_responses\nassert listener2.responses == expected_responses\n\n# Clear the listener responses\nlistener1.clear()\nlistener2.clear()\n\n# Trigger a dispatcher request with a 300ms timeout limit on the topic my_topic\ndispatcher.send_request('my_topic', 3, 4, 5, ttl=300, keyword='b')\n\n# Wait for responses to arrive\ntime.sleep(0.5)\n\n# Both listeners should have only received the response from producer1.\nexpected_responses = {\n 'my_topic': [\n {\n 'id': 0\n 'some_key': 'my_topic',\n 'args': (3, 4, 5),\n 'kwargs': {\n 'keyword': 'b'\n }\n }\n ]\n}\n\nassert listener1.responses == expected_responses\nassert listener2.responses == expected_responses\n\n\n# Clear the listener1 responses\nlistener1.clear()\n\n# Trigger a dispatcher request with a 1000ms timeout limit on the topic topic2\ndispatcher.send_request('topic2', 3, 4, 5, ttl=1000, keyword='b')\n\n# Wait for responses to arrive\ntime.sleep(1.0)\n\n# Only the listener1 should have received the message produced by the producer2\nexpected_responses = {\n 'topic2': [\n {\n 'id': 1\n 'some_key': 'topic2',\n 'args': (3, 4, 5),\n 'kwargs': {\n 'keyword': 'b'\n }\n }\n ]\n}\n\nassert listener1.responses == expected_responses\n```\n\nFinally, each registered `Producer` and `ResultListener` instance can be\nderegistered from an specific topic at any time. However, before stopping either\n`Producer` or `ResultListener` instances, each registered topic must be\nderegistered.\n\n```python\n# Deregister the listener2 and the producer1 from certain topics\ndispatcher.deregister_result_listener(listener2, 'my_topic')\ndispatcher.deregister_result_producer(producer1, 'topic1')\n\n# Stopping the producer and result listener instances\nfor topic in producer1.topics:\n dispatcher.deregister_result_producer(producer1, topic)\n\nfor topic in producer2.topics:\n dispatcher.deregister_result_producer(producer2, topic)\n\nproducer1.stop()\nproducer2.stop()\n\nfor topic in ['topic1', 'topic2', 'my_topic']:\n dispatcher.deregister_result_listener(listener1, topic)\n\nfor topic in ['topic1', 'my_topic']:\n dispatcher.deregister_result_listener(listener2, topic)\n\n# Stop the dispatcher\ndispatcher.stop()\n```\n\n## Running tests\nWe use pytest to run tests as it follows:\n\n```bash\npytest -x -v ate_dispatcher/tests\n```\n\n## Changelog\nVisit our [CHANGELOG](CHANGELOG.md) file to learn more about our new features and improvements.\n\n## Contribution guidelines\nWe follow PEP8 and PEP257. We use MyPy type annotations for all functions and classes declared on this package. Feel free to send a PR or create an issue if you have any problem/question.\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "Thread-based, asynchronous dispatcher used to gather results from distributed agents.",
"version": "0.1.0",
"split_keywords": [
"ate",
"dispatcher",
"async"
],
"urls": [
{
"comment_text": "",
"digests": {
"md5": "bd4ccf267e4324fc52ef58b7a7dca3b8",
"sha256": "1c90000560828f3fb53d08627a95a47bd57ce155aa07810b6f52019b66ef9d8d"
},
"downloads": -1,
"filename": "ate_dispatcher-0.1.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "bd4ccf267e4324fc52ef58b7a7dca3b8",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": null,
"size": 8882,
"upload_time": "2022-12-17T00:21:39",
"upload_time_iso_8601": "2022-12-17T00:21:39.412209Z",
"url": "https://files.pythonhosted.org/packages/f0/b3/31e0c5ef8cc60974c214c904e729dea6e493d26b1088c4bed210f0b75c3f/ate_dispatcher-0.1.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"md5": "0eda56a56b3077ca49279fac2e272699",
"sha256": "c8de28e5cb8169078a5f15dee1caf0edeb44710a5ba598a41fbe9972f0fd2b54"
},
"downloads": -1,
"filename": "ate-dispatcher-0.1.0.tar.gz",
"has_sig": false,
"md5_digest": "0eda56a56b3077ca49279fac2e272699",
"packagetype": "sdist",
"python_version": "source",
"requires_python": null,
"size": 10873,
"upload_time": "2022-12-17T00:21:40",
"upload_time_iso_8601": "2022-12-17T00:21:40.917295Z",
"url": "https://files.pythonhosted.org/packages/fb/ab/4aa2bc7ea3d0383e3dc83fa43acada41cbd2bcb3daf9b0e3e1fd56ca65fe/ate-dispatcher-0.1.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2022-12-17 00:21:40",
"github": true,
"gitlab": false,
"bitbucket": false,
"github_user": "Semi-ATE",
"github_project": "ate-dispatcher",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "ate-dispatcher"
}