ate-dispatcher


Nameate-dispatcher JSON
Version 0.1.0 PyPI version JSON
download
home_pagehttps://github.com/Semi-ATE/ate-dispatcher
SummaryThread-based, asynchronous dispatcher used to gather results from distributed agents.
upload_time2022-12-17 00:21:40
maintainer
docs_urlNone
authorSemi-ATE
requires_python
licenseMIT
keywords ate dispatcher async
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # ate-dispatcher

[![Project License - MIT](https://img.shields.io/pypi/l/ate-dispatcher.svg)](./LICENSE.txt)
[![pypi version](https://img.shields.io/pypi/v/ate-dispatcher.svg)](https://pypi.org/project/ate-dispatcher/)
[![conda version](https://img.shields.io/conda/vn/conda-forge/ate-dispatcher.svg)](https://www.anaconda.com/download/)
[![download count](https://img.shields.io/conda/dn/conda-forge/ate-dispatcher.svg)](https://www.anaconda.com/download/)
[![Downloads](https://pepy.tech/badge/ate-dispatcher)](https://pepy.tech/project/ate-dispatcher)
[![PyPI status](https://img.shields.io/pypi/status/ate-dispatcher.svg)](https://github.com/Semi-ATE/ate-dispatcher)
[![Unit tests](https://github.com/Semi-ATE/ate-dispatcher/actions/workflows/test_python.yml/badge.svg)](https://github.com/Semi-ATE/ate-dispatcher/actions/workflows/test_python.yml)
[![codecov](https://codecov.io/gh/Semi-ATE/ate-dispatcher/branch/main/graph/badge.svg?token=5MLBM0PHLL)](https://codecov.io/gh/Semi-ATE/ate-dispatcher)


## Overview
A pure-Python, thread-based, asynchronous dispatcher used to gather results
from distributed agents that react on a given event.

The dispatcher reacts to an event tagged with a given topic, which is relayed
to a set of `Producer` objects (which registered the kind of
events they react to). Each result is then passed to a set of `ResultListener`
objects, which will attend to the response depending if they were registered
to attend the given topic.

The exposed API is thread-safe, asynchronous and lock-free, which makes it
suitable for tasks that are lightweight and quick.

## Dependencies
This package is pure-Python, and therefore it does not depend on any external
library, besides `typing-extensions`, which is used to import the typing
classes that are not available in older Python 3 versions.

## Installation
You can install this library by using conda or pip package managers,
as it follows:

```bash
# Using conda
conda install ate-dispatcher -c conda-forge

# Using pip
pip install ate-dispatcher
```

## Local development
In order to install a local development version for `ate-dispatcher`, it is
possible to invoke pip:

```bash
pip install -e .
```

## Package usage
`ate-dispatcher` exposes two abstract interfaces (`Producer` and `ResultListener`)
as well as the main dispatcher (`ATEDispatcher`). The former classes are designed
to be inherited, while the last one is designed to be instantiated.

### Defining a producer
A `Producer` object is on charge of producing a response to an incoming input
message from the dispatcher that is tagged with a certain `topic` that the
object can handle. It must implement `produce_dispatcher_output`.


```python
import time
from typing import Any
from ate_dispatcher import Producer

# Defining a producer
class SpecificTopicProducer(Producer):
    def __init__(self, _id: int, timeout: int, *topics):
        super().__init__()
        self.id = _id
        self.timeout = timeout / 1000
        self.topics = set(topics)

    def produce_dispatcher_output(
            self, topic: str, *args, **kwargs) -> Any:
        time.sleep(timeout)
        return {
            'id': self.id,
            'some_key': topic,
            'args': args,
            'kwargs': kwargs
        }
```

### Defining a result listener
A `ResultListener` object will receive the responses emitted by the `Producer`
objects that reacted to a given topic that the listener also supports. This
is the endpoint for the dispatcher architecture, where all end messages will
arrive. The `ResultListener` subclasses must implement the
`process_dispatcher_result` method.

```python
from typing import Any
from ate_dispatcher import ResultListener

# Defining a result listener
class ResultListenerExample(ResultListener):
    def __init__(self):
        super().__init__()
        self.responses = {}

    def clear(self):
        self.responses = {}

    def process_dispatcher_result(self, topic: str, response: Any):
        topic_responses = self.responses.get(topic, [])
        topic_responses.append(response)
        self.responses[topic] = topic_responses

```

### Creating, using and destroying the dispatcher
After defining the `Producer` and `ResultListener` subclasses, it is necessary
to instantiate and register the `ATEDispatcher instance`. Since the interfaces
inherit from `threading.Thread`, it is necessary to keep track of their
lifetime via the `start` and `end` methods.

```python
import time

# Import the producer and result listener classes
from specific_topic_producer import SpecificTopicProducer
from result_listener_example import ResultListenerExample

# Import the dispatcher
from ate_dispatcher import ATEDispatcher

# Create the dispatcher
dispatcher = ATEDispatcher()

# Start the dispatcher, the lifetime is delegated to the end developer.
dispatcher.start()

# Define the producers and register them against the dispatcher
producer1 = SpecificTopicProducer(0, 200, 'topic1', 'my_topic')
producer1.start()

producer2 = SpecificTopicProducer(1, 500, 'topic1', 'topic2', 'my_topic')
producer2.start()

for topic in producer1.topics:
    dispatcher.register_result_producer(producer1, topic)

for topic in producer2.topics:
    dispatcher.register_result_producer(producer2, topic)

# Define the result listeners and register them against the dispatcher
listener1 = ResultListenerExample()
listener1.start()

listener2 = ResultListenerExample()
listener2.start()

for topic in ['topic1', 'topic2', 'my_topic']:
    # The first listener will receive all responses tagged with all topics
    dispatcher.register_result_listener(listener1, topic)

for topic in ['topic1', 'my_topic']:
    # This listener will attend to certain topics.
    dispatcher.register_result_listener(listener2, topic)
```

Since the dispatcher architecture is completely asynchronous, a trigger message
may indicate a maximum timeout (in milliseconds) for all registered producers
on a given topic to emit their response. Any response received after the
specified timeout will be discarded. Also, the messages will be delivered to
the result listeners as they arrive.

```python
# Trigger a dispatcher request with a 4000ms timeout on the topic1
dispatcher.send_request('topic1', 3, 4, 5, ttl=4000, keyword='b')

# Wait for responses to arrive
time.sleep(1.0)

# Both listeners should have received the responses from both producers.
expected_responses = {
    'topic1': [
        {
            'id': 0
            'some_key': 'topic1',
            'args': (3, 4, 5),
            'kwargs': {
                'keyword': 'b'
            }
        },
        {
            'id': 1
            'some_key': 'topic1',
            'args': (3, 4, 5),
            'kwargs': {
                'keyword': 'b'
            }
        },
    ]
}

assert listener1.responses == expected_responses
assert listener2.responses == expected_responses

# Clear the listener responses
listener1.clear()
listener2.clear()

# Trigger a dispatcher request with a 300ms timeout limit on the topic my_topic
dispatcher.send_request('my_topic', 3, 4, 5, ttl=300, keyword='b')

# Wait for responses to arrive
time.sleep(0.5)

# Both listeners should have only received the response from producer1.
expected_responses = {
    'my_topic': [
        {
            'id': 0
            'some_key': 'my_topic',
            'args': (3, 4, 5),
            'kwargs': {
                'keyword': 'b'
            }
        }
    ]
}

assert listener1.responses == expected_responses
assert listener2.responses == expected_responses


# Clear the listener1 responses
listener1.clear()

# Trigger a dispatcher request with a 1000ms timeout limit on the topic topic2
dispatcher.send_request('topic2', 3, 4, 5, ttl=1000, keyword='b')

# Wait for responses to arrive
time.sleep(1.0)

# Only the listener1 should have received the message produced by the producer2
expected_responses = {
    'topic2': [
        {
            'id': 1
            'some_key': 'topic2',
            'args': (3, 4, 5),
            'kwargs': {
                'keyword': 'b'
            }
        }
    ]
}

assert listener1.responses == expected_responses
```

Finally, each registered `Producer` and `ResultListener` instance can be
deregistered from an specific topic at any time. However, before stopping either
`Producer` or `ResultListener` instances, each registered topic must be
deregistered.

```python
# Deregister the listener2 and the producer1 from certain topics
dispatcher.deregister_result_listener(listener2, 'my_topic')
dispatcher.deregister_result_producer(producer1, 'topic1')

# Stopping the producer and result listener instances
for topic in producer1.topics:
    dispatcher.deregister_result_producer(producer1, topic)

for topic in producer2.topics:
    dispatcher.deregister_result_producer(producer2, topic)

producer1.stop()
producer2.stop()

for topic in ['topic1', 'topic2', 'my_topic']:
    dispatcher.deregister_result_listener(listener1, topic)

for topic in ['topic1', 'my_topic']:
    dispatcher.deregister_result_listener(listener2, topic)

# Stop the dispatcher
dispatcher.stop()
```

## Running tests
We use pytest to run tests as it follows:

```bash
pytest -x -v ate_dispatcher/tests
```

## Changelog
Visit our [CHANGELOG](CHANGELOG.md) file to learn more about our new features and improvements.

## Contribution guidelines
We follow PEP8 and PEP257. We use MyPy type annotations for all functions and classes declared on this package. Feel free to send a PR or create an issue if you have any problem/question.

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/Semi-ATE/ate-dispatcher",
    "name": "ate-dispatcher",
    "maintainer": "",
    "docs_url": null,
    "requires_python": "",
    "maintainer_email": "",
    "keywords": "ATE,dispatcher,async",
    "author": "Semi-ATE",
    "author_email": "info@Semi-ATE.com",
    "download_url": "https://files.pythonhosted.org/packages/fb/ab/4aa2bc7ea3d0383e3dc83fa43acada41cbd2bcb3daf9b0e3e1fd56ca65fe/ate-dispatcher-0.1.0.tar.gz",
    "platform": null,
    "description": "# ate-dispatcher\n\n[![Project License - MIT](https://img.shields.io/pypi/l/ate-dispatcher.svg)](./LICENSE.txt)\n[![pypi version](https://img.shields.io/pypi/v/ate-dispatcher.svg)](https://pypi.org/project/ate-dispatcher/)\n[![conda version](https://img.shields.io/conda/vn/conda-forge/ate-dispatcher.svg)](https://www.anaconda.com/download/)\n[![download count](https://img.shields.io/conda/dn/conda-forge/ate-dispatcher.svg)](https://www.anaconda.com/download/)\n[![Downloads](https://pepy.tech/badge/ate-dispatcher)](https://pepy.tech/project/ate-dispatcher)\n[![PyPI status](https://img.shields.io/pypi/status/ate-dispatcher.svg)](https://github.com/Semi-ATE/ate-dispatcher)\n[![Unit tests](https://github.com/Semi-ATE/ate-dispatcher/actions/workflows/test_python.yml/badge.svg)](https://github.com/Semi-ATE/ate-dispatcher/actions/workflows/test_python.yml)\n[![codecov](https://codecov.io/gh/Semi-ATE/ate-dispatcher/branch/main/graph/badge.svg?token=5MLBM0PHLL)](https://codecov.io/gh/Semi-ATE/ate-dispatcher)\n\n\n## Overview\nA pure-Python, thread-based, asynchronous dispatcher used to gather results\nfrom distributed agents that react on a given event.\n\nThe dispatcher reacts to an event tagged with a given topic, which is relayed\nto a set of `Producer` objects (which registered the kind of\nevents they react to). Each result is then passed to a set of `ResultListener`\nobjects, which will attend to the response depending if they were registered\nto attend the given topic.\n\nThe exposed API is thread-safe, asynchronous and lock-free, which makes it\nsuitable for tasks that are lightweight and quick.\n\n## Dependencies\nThis package is pure-Python, and therefore it does not depend on any external\nlibrary, besides `typing-extensions`, which is used to import the typing\nclasses that are not available in older Python 3 versions.\n\n## Installation\nYou can install this library by using conda or pip package managers,\nas it follows:\n\n```bash\n# Using conda\nconda install ate-dispatcher -c conda-forge\n\n# Using pip\npip install ate-dispatcher\n```\n\n## Local development\nIn order to install a local development version for `ate-dispatcher`, it is\npossible to invoke pip:\n\n```bash\npip install -e .\n```\n\n## Package usage\n`ate-dispatcher` exposes two abstract interfaces (`Producer` and `ResultListener`)\nas well as the main dispatcher (`ATEDispatcher`). The former classes are designed\nto be inherited, while the last one is designed to be instantiated.\n\n### Defining a producer\nA `Producer` object is on charge of producing a response to an incoming input\nmessage from the dispatcher that is tagged with a certain `topic` that the\nobject can handle. It must implement `produce_dispatcher_output`.\n\n\n```python\nimport time\nfrom typing import Any\nfrom ate_dispatcher import Producer\n\n# Defining a producer\nclass SpecificTopicProducer(Producer):\n    def __init__(self, _id: int, timeout: int, *topics):\n        super().__init__()\n        self.id = _id\n        self.timeout = timeout / 1000\n        self.topics = set(topics)\n\n    def produce_dispatcher_output(\n            self, topic: str, *args, **kwargs) -> Any:\n        time.sleep(timeout)\n        return {\n            'id': self.id,\n            'some_key': topic,\n            'args': args,\n            'kwargs': kwargs\n        }\n```\n\n### Defining a result listener\nA `ResultListener` object will receive the responses emitted by the `Producer`\nobjects that reacted to a given topic that the listener also supports. This\nis the endpoint for the dispatcher architecture, where all end messages will\narrive. The `ResultListener` subclasses must implement the\n`process_dispatcher_result` method.\n\n```python\nfrom typing import Any\nfrom ate_dispatcher import ResultListener\n\n# Defining a result listener\nclass ResultListenerExample(ResultListener):\n    def __init__(self):\n        super().__init__()\n        self.responses = {}\n\n    def clear(self):\n        self.responses = {}\n\n    def process_dispatcher_result(self, topic: str, response: Any):\n        topic_responses = self.responses.get(topic, [])\n        topic_responses.append(response)\n        self.responses[topic] = topic_responses\n\n```\n\n### Creating, using and destroying the dispatcher\nAfter defining the `Producer` and `ResultListener` subclasses, it is necessary\nto instantiate and register the `ATEDispatcher instance`. Since the interfaces\ninherit from `threading.Thread`, it is necessary to keep track of their\nlifetime via the `start` and `end` methods.\n\n```python\nimport time\n\n# Import the producer and result listener classes\nfrom specific_topic_producer import SpecificTopicProducer\nfrom result_listener_example import ResultListenerExample\n\n# Import the dispatcher\nfrom ate_dispatcher import ATEDispatcher\n\n# Create the dispatcher\ndispatcher = ATEDispatcher()\n\n# Start the dispatcher, the lifetime is delegated to the end developer.\ndispatcher.start()\n\n# Define the producers and register them against the dispatcher\nproducer1 = SpecificTopicProducer(0, 200, 'topic1', 'my_topic')\nproducer1.start()\n\nproducer2 = SpecificTopicProducer(1, 500, 'topic1', 'topic2', 'my_topic')\nproducer2.start()\n\nfor topic in producer1.topics:\n    dispatcher.register_result_producer(producer1, topic)\n\nfor topic in producer2.topics:\n    dispatcher.register_result_producer(producer2, topic)\n\n# Define the result listeners and register them against the dispatcher\nlistener1 = ResultListenerExample()\nlistener1.start()\n\nlistener2 = ResultListenerExample()\nlistener2.start()\n\nfor topic in ['topic1', 'topic2', 'my_topic']:\n    # The first listener will receive all responses tagged with all topics\n    dispatcher.register_result_listener(listener1, topic)\n\nfor topic in ['topic1', 'my_topic']:\n    # This listener will attend to certain topics.\n    dispatcher.register_result_listener(listener2, topic)\n```\n\nSince the dispatcher architecture is completely asynchronous, a trigger message\nmay indicate a maximum timeout (in milliseconds) for all registered producers\non a given topic to emit their response. Any response received after the\nspecified timeout will be discarded. Also, the messages will be delivered to\nthe result listeners as they arrive.\n\n```python\n# Trigger a dispatcher request with a 4000ms timeout on the topic1\ndispatcher.send_request('topic1', 3, 4, 5, ttl=4000, keyword='b')\n\n# Wait for responses to arrive\ntime.sleep(1.0)\n\n# Both listeners should have received the responses from both producers.\nexpected_responses = {\n    'topic1': [\n        {\n            'id': 0\n            'some_key': 'topic1',\n            'args': (3, 4, 5),\n            'kwargs': {\n                'keyword': 'b'\n            }\n        },\n        {\n            'id': 1\n            'some_key': 'topic1',\n            'args': (3, 4, 5),\n            'kwargs': {\n                'keyword': 'b'\n            }\n        },\n    ]\n}\n\nassert listener1.responses == expected_responses\nassert listener2.responses == expected_responses\n\n# Clear the listener responses\nlistener1.clear()\nlistener2.clear()\n\n# Trigger a dispatcher request with a 300ms timeout limit on the topic my_topic\ndispatcher.send_request('my_topic', 3, 4, 5, ttl=300, keyword='b')\n\n# Wait for responses to arrive\ntime.sleep(0.5)\n\n# Both listeners should have only received the response from producer1.\nexpected_responses = {\n    'my_topic': [\n        {\n            'id': 0\n            'some_key': 'my_topic',\n            'args': (3, 4, 5),\n            'kwargs': {\n                'keyword': 'b'\n            }\n        }\n    ]\n}\n\nassert listener1.responses == expected_responses\nassert listener2.responses == expected_responses\n\n\n# Clear the listener1 responses\nlistener1.clear()\n\n# Trigger a dispatcher request with a 1000ms timeout limit on the topic topic2\ndispatcher.send_request('topic2', 3, 4, 5, ttl=1000, keyword='b')\n\n# Wait for responses to arrive\ntime.sleep(1.0)\n\n# Only the listener1 should have received the message produced by the producer2\nexpected_responses = {\n    'topic2': [\n        {\n            'id': 1\n            'some_key': 'topic2',\n            'args': (3, 4, 5),\n            'kwargs': {\n                'keyword': 'b'\n            }\n        }\n    ]\n}\n\nassert listener1.responses == expected_responses\n```\n\nFinally, each registered `Producer` and `ResultListener` instance can be\nderegistered from an specific topic at any time. However, before stopping either\n`Producer` or `ResultListener` instances, each registered topic must be\nderegistered.\n\n```python\n# Deregister the listener2 and the producer1 from certain topics\ndispatcher.deregister_result_listener(listener2, 'my_topic')\ndispatcher.deregister_result_producer(producer1, 'topic1')\n\n# Stopping the producer and result listener instances\nfor topic in producer1.topics:\n    dispatcher.deregister_result_producer(producer1, topic)\n\nfor topic in producer2.topics:\n    dispatcher.deregister_result_producer(producer2, topic)\n\nproducer1.stop()\nproducer2.stop()\n\nfor topic in ['topic1', 'topic2', 'my_topic']:\n    dispatcher.deregister_result_listener(listener1, topic)\n\nfor topic in ['topic1', 'my_topic']:\n    dispatcher.deregister_result_listener(listener2, topic)\n\n# Stop the dispatcher\ndispatcher.stop()\n```\n\n## Running tests\nWe use pytest to run tests as it follows:\n\n```bash\npytest -x -v ate_dispatcher/tests\n```\n\n## Changelog\nVisit our [CHANGELOG](CHANGELOG.md) file to learn more about our new features and improvements.\n\n## Contribution guidelines\nWe follow PEP8 and PEP257. We use MyPy type annotations for all functions and classes declared on this package. Feel free to send a PR or create an issue if you have any problem/question.\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "Thread-based, asynchronous dispatcher used to gather results from distributed agents.",
    "version": "0.1.0",
    "split_keywords": [
        "ate",
        "dispatcher",
        "async"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "md5": "bd4ccf267e4324fc52ef58b7a7dca3b8",
                "sha256": "1c90000560828f3fb53d08627a95a47bd57ce155aa07810b6f52019b66ef9d8d"
            },
            "downloads": -1,
            "filename": "ate_dispatcher-0.1.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "bd4ccf267e4324fc52ef58b7a7dca3b8",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": null,
            "size": 8882,
            "upload_time": "2022-12-17T00:21:39",
            "upload_time_iso_8601": "2022-12-17T00:21:39.412209Z",
            "url": "https://files.pythonhosted.org/packages/f0/b3/31e0c5ef8cc60974c214c904e729dea6e493d26b1088c4bed210f0b75c3f/ate_dispatcher-0.1.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "md5": "0eda56a56b3077ca49279fac2e272699",
                "sha256": "c8de28e5cb8169078a5f15dee1caf0edeb44710a5ba598a41fbe9972f0fd2b54"
            },
            "downloads": -1,
            "filename": "ate-dispatcher-0.1.0.tar.gz",
            "has_sig": false,
            "md5_digest": "0eda56a56b3077ca49279fac2e272699",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": null,
            "size": 10873,
            "upload_time": "2022-12-17T00:21:40",
            "upload_time_iso_8601": "2022-12-17T00:21:40.917295Z",
            "url": "https://files.pythonhosted.org/packages/fb/ab/4aa2bc7ea3d0383e3dc83fa43acada41cbd2bcb3daf9b0e3e1fd56ca65fe/ate-dispatcher-0.1.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2022-12-17 00:21:40",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "github_user": "Semi-ATE",
    "github_project": "ate-dispatcher",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "ate-dispatcher"
}
        
Elapsed time: 0.01918s