
Namesenor-octopus JSON
Version 0.2.0 PyPI version JSON
SummaryA streaming hub. Sort of.
upload_time2023-04-16 19:23:16
authorBeto Dealmeida
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage

.. image::
.. image::
   :alt: Cirrus CI - Base Branch Build Status
.. image::
.. image::
   :alt: PyPI - Python Version

They say there are only 2 kinds of work: you either move information from one place to another, or you move mass from one place to another.

**Señor Octopus is an application that moves data around**. It reads a YAML configuration file that describes how to connect **nodes**. For example, you might want to measure your internet speed every hour and store it in a database:

.. code-block:: yaml

      plugin: source.speedtest
      flow: -> db
      schedule: @hourly

      plugin: sink.db.postgresql
      flow: speedtest ->
      user: alice
      password: XXX
      host: localhost
      port: 5432
      dbname: default

Nodes are connected by the ``flow`` attribute. The ``speedtest`` node is connected to the ``db`` node because it points to it:

.. code-block:: yaml

      flow: -> db

The ``db`` node, on the other hand, listens to events from the ``speedtest`` node:

.. code-block:: yaml

      flow: speedtest ->

We can also use ``*`` as a wildcard, if we want a node to connect to all other nodes, or specify a list of nodes:

.. code-block:: yaml

      flow: -> db, log

      flow: "* ->"

Note that in YAML we need to quote attributes that start with an asterisk.

Running Señor Octopus

You can save the configuration above to a file called ``speedtest.yaml`` and run:

.. code-block:: bash

    $ pip install senor-octopus
    $ srocto speedtest.yaml

Every hour the ``speedtest`` **source** node will run, and the results will be sent to the ``db`` **sink** node, which writes them to a Postgres database.

How to these results look like?


Señor Octopus uses a very simple but flexible data model to move data around. We have nodes called **sources** that create a stream of events, each one like this:

.. code-block:: python

    class Event(TypedDict):
        timestamp: datetime
        name: str
        value: Any

An event has a **timestamp** associated with it, a **name**, and a **value**. Note that the value can be anything!

A **source** will produce a stream of events. In the example above, once per hour the ``speedtest`` source will produce events like these:

.. code-block:: python

            'timestamp': datetime.datetime(2021, 5, 11, 22, 16, 26, 812083, tzinfo=datetime.timezone.utc),
            'name': '',
            'value': 16568200.018792046,
            'timestamp': datetime.datetime(2021, 5, 11, 22, 16, 26, 812966, tzinfo=datetime.timezone.utc),
            'name': 'hub.speedtest.upload',
            'value': 5449607.159468643,
            'timestamp': datetime.datetime(2021, 5, 11, 22, 16, 26, 820369, tzinfo=datetime.timezone.utc),
            'name': 'hub.speedtest.client',
            'value': {
                'ip': '',
                'lat': '37.751',
                'lon': '-97.822',
                'isp': 'Colocation America Corporation',
                'isprating': '3.7',
                'rating': '0',
                'ispdlavg': '0',
                'ispulavg': '0',
                'loggedin': '0',
                'country': 'US',

The events are sent to **sinks**, which consume the stream. In this example, the ``db`` sink will receive the events and store them in a Postgres database.

Event-driven sources

In the previous example we configured the ``speedtest`` source to run hourly. Not all sources need to be scheduled, though. We can have a source that listens to a given topic in `MQTT <>`_, eg:

.. code-block:: yaml

      plugin: source.mqtt
      flow: -> db
        - "srocto/feeds/#"
      host: localhost
      port: 1883
      username: bob
      password: XXX
      message_is_json: true

The source above will immediately send an event to the ``db`` node every time a new message shows up in the topic wildcard ``srocto/feeds/#``, so it can be written to the database — a super easy way of persisting a message queue to disk!

Batching events

The example above is not super efficient, since it writes to the database every time an event arrives. Instead, we can easily **batch** the events so that they're accumulated in a queue and processed every, say, 5 minutes:

.. code-block:: yaml

      plugin: sink.db.postgresl
      flow: speedtest, mqtt ->
      batch: 5 minutes
      user: alice
      password: XXX
      host: localhost
      port: 5432
      dbname: default

With the ``batch`` parameter any incoming events are stored in a queue for the configured time, and processed by the sink together. Any pending events in the queue will still be processed if ``srocto`` terminates gracefully (eg, with ``ctrl+C``).

Filtering events

Much of the flexibility of Señor Octopus comes from a third type of node, the **filter**. Filters can be used to not only filter data, but also format it. For example, let's say we want to turn on some lights at sunset. The ``sun`` source will send events with a value of "sunset" or "sunrise" every time one occurs:

.. code-block:: python

        'timestamp': ...,
        'name': 'hub.sun',
        'value': 'sunset',

The ``tuya`` sink can be used to control a smart switch, but in order to turn it on it expects an event that looks like this:

.. code-block:: python

        'timestamp': ...,
        'name': ...,
        'value': 'on',

We can use the ``jinja`` filter to ignore "sunrise" events, and to convert the "sunset" value into "on":

.. code-block:: yaml

      plugin: source.sun
      flow: -> sunset
      latitude: 38.3
      longitude: -123.0

      plugin: filter.jinja
      flow: sun -> lights
      template: >
        {% if event['value'] == 'sunset' %}
        {% endif %}

      plugin: sink.tuya
      flow: sunset ->
      device: "Porch lights"
      password: XXX
      country: "1"
      application: smart_life

With this configuration the ``sunset`` filter will drop any events that don't have a value of "sunset". And for those events that have, the value will be replaced by the string "on" so it can activate the lights in the ``lights`` node.

Throttling events

Sometimes we want to limit the number of events being consumed by a sink. For example, imagine that we want to use Señor Octopus to monitor air quality using an `Awair Element <>`_, sending us an SMS when the score is below a given threshold. We would like the SMS to be sent at most once every 30 minutes, and only between 8am and 10pm.

Here's how we can do that:

.. code-block:: yaml

      plugin: source.awair
      flow: -> bad_air
      schedule: 0/10 * * * *
      access_token: XXX
      device_type: awair-element
      device_id: 12345

      plugin: filter.jinja
      flow: awair -> sms
      template: >
        {% if
           event['timestamp'].astimezone().hour >= 8 and
           event['timestamp'].astimezone().hour <= 21 and
           event['name'] == 'hub.awair.score' and
           event['value'] < 80
          Air quality score is low: {{ event['value'] }}
        {% endif %}

      plugin: sink.sms
      flow: bad_air ->
      throttle: 30 minutes
      account_sid: XXX
      auth_token: XXX
      from: "+18002738255"
      to: "+15558675309"

In the example above, the ``awair`` source will fetch air quality data every 10 minutes, and send it to ``bad_air``. The filter checks for the hour, to prevent sending an SMS from 10pm to 8am, and checks the air quality score — if it's lower than 80 it will reformat the value of the event to a nice message, eg:

    "Air quality score is low: 70"

This is then sent to the ``sms`` sink, which has a ``throttle`` of 30 minutes. The throttle configuration will prevent the sink from running more than once every 30 minutes, to avoid spamming us with messages in case the score remains low.


Señor Octopus supports an increasing list of plugins, and it's straightforward to add new ones. Each plugin is simply a function that produces, processes, or consumes a stream.

Here's the ``random`` source, which produces random numbers:

.. code-block:: python

    async def rand(events: int = 10, prefix: str = "hub.random") -> Stream:
        for _ in range(events):
            yield {
                "name": prefix,
                "value": random.random(),

This is the full source code for the ``jinja`` filter:

.. code-block:: python

    async def jinja(stream: Stream, template: str) -> Stream:
        _logger.debug("Applying template to events")
        tmpl = Template(template)
        async for event in stream:
            value = tmpl.render(event=event)
            if value:
                yield {
                    "timestamp": event["timestamp"],
                    "name": event["name"],
                    "value": value,

And this is the ``sms`` sink:

.. code-block:: python

    async def sms(
        stream: Stream, account_sid: str, auth_token: str, to: str, **kwargs: str
    ) -> None:
        from_ = kwargs["from"]
        client = Client(account_sid, auth_token)
        async for event in stream:
  "Sending SMS")
            client.messages.create(body=str(event["value"]).strip(), from_=from_, to=to)

As you can see, a source is an async generator that yields events. A filter receives the stream with additional configuration parameters, and also returns a stream. And a sink receives a stream with additional parameters, and returns nothing.


The current plugins for sources are:

- `source.awair <>`_: Fetch air quality data from Awair Element monitor.
- `source.crypto <>`_: Fetch price of cryptocurrencies from
- `source.mqtt <>`_: Subscribe to messages on one or more MQTT topics.
- `source.rand <>`_: Generate random numbers between 0 and 1.
- `source.speed <>`_: Measure internet speed.
- `source.sqla <>`_: Read data from database.
- `source.static <>`_: Generate static events.
- `source.stock <>`_: Fetch stock price form Yahoo! Finance.
- `source.sun <>`_: Send events on sunrise and sunset.
- `source.udp <>`_: Listens to UDP messages on a given port.
- `source.weatherapi <>`_: Fetch weather forecast data from
- `source.whistle <>`_: Fetch device information and location for a Whistle pet tracker.


The existing filters are very similar, the main difference being how you configure them:

- `filter.format <>`_: Format an event stream based using Python string formatting.
- `filter.jinja <>`_: Apply a Jinja2 template to events.
- `filter.jsonpath <>`_: Filter event stream based on a JSON path.


These are the current sinks:

- `sink.log <>`_: Send events to a logger.
- `sink.mqtt <>`_: Send events as messages to an MQTT topic.
- `sink.pushover <>`_: Send events to the Pushover mobile app.
- `sink.slack <>`_: Send messages to a Slack channel.
- `sink.sms <>`_: Send SMS via Twilio.
- `sink.tuya <>`_: Send commands to a Tuya/Smart Life device.


Raw data

    "_id": null,
    "home_page": "",
    "name": "senor-octopus",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3.8",
    "maintainer_email": "",
    "keywords": "",
    "author": "Beto Dealmeida",
    "author_email": "",
    "download_url": "",
    "platform": "any",
    "description": "=============\nsenor-octopus\n=============\n\n.. image::\n   :target:\n.. image::\n   :target:\n   :alt: Cirrus CI - Base Branch Build Status\n.. image::\n   :target:\n.. image::\n   :alt: PyPI - Python Version\n\nThey say there are only 2 kinds of work: you either move information from one place to another, or you move mass from one place to another.\n\n**Se\u00f1or Octopus is an application that moves data around**. It reads a YAML configuration file that describes how to connect **nodes**. For example, you might want to measure your internet speed every hour and store it in a database:\n\n.. code-block:: yaml\n\n    speedtest:\n      plugin: source.speedtest\n      flow: -> db\n      schedule: @hourly\n\n    db:\n      plugin: sink.db.postgresql\n      flow: speedtest ->\n      user: alice\n      password: XXX\n      host: localhost\n      port: 5432\n      dbname: default\n\nNodes are connected by the ``flow`` attribute. The ``speedtest`` node is connected to the ``db`` node because it points to it:\n\n.. code-block:: yaml\n\n    speedtest:\n      flow: -> db\n\nThe ``db`` node, on the other hand, listens to events from the ``speedtest`` node:\n\n.. code-block:: yaml\n\n    db:\n      flow: speedtest ->\n\nWe can also use ``*`` as a wildcard, if we want a node to connect to all other nodes, or specify a list of nodes:\n\n.. code-block:: yaml\n\n    speedtest:\n      flow: -> db, log\n\n    db:\n      flow: \"* ->\"\n\nNote that in YAML we need to quote attributes that start with an asterisk.\n\nRunning Se\u00f1or Octopus\n=====================\n\nYou can save the configuration above to a file called ``speedtest.yaml`` and run:\n\n.. code-block:: bash\n\n    $ pip install senor-octopus\n    $ srocto speedtest.yaml\n\nEvery hour the ``speedtest`` **source** node will run, and the results will be sent to the ``db`` **sink** node, which writes them to a Postgres database.\n\nHow to these results look like?\n\nEvents\n======\n\nSe\u00f1or Octopus uses a very simple but flexible data model to move data around. We have nodes called **sources** that create a stream of events, each one like this:\n\n.. code-block:: python\n\n    class Event(TypedDict):\n        timestamp: datetime\n        name: str\n        value: Any\n\nAn event has a **timestamp** associated with it, a **name**, and a **value**. Note that the value can be anything!\n\nA **source** will produce a stream of events. In the example above, once per hour the ``speedtest`` source will produce events like these:\n\n.. code-block:: python\n\n    [\n        {\n            'timestamp': datetime.datetime(2021, 5, 11, 22, 16, 26, 812083, tzinfo=datetime.timezone.utc),\n            'name': '',\n            'value': 16568200.018792046,\n        },\n        {\n            'timestamp': datetime.datetime(2021, 5, 11, 22, 16, 26, 812966, tzinfo=datetime.timezone.utc),\n            'name': 'hub.speedtest.upload',\n            'value': 5449607.159468643,\n        },\n        {\n            'timestamp': datetime.datetime(2021, 5, 11, 22, 16, 26, 820369, tzinfo=datetime.timezone.utc),\n            'name': 'hub.speedtest.client',\n            'value': {\n                'ip': '',\n                'lat': '37.751',\n                'lon': '-97.822',\n                'isp': 'Colocation America Corporation',\n                'isprating': '3.7',\n                'rating': '0',\n                'ispdlavg': '0',\n                'ispulavg': '0',\n                'loggedin': '0',\n                'country': 'US',\n            }\n        },\n        ...\n    ]\n\nThe events are sent to **sinks**, which consume the stream. In this example, the ``db`` sink will receive the events and store them in a Postgres database.\n\nEvent-driven sources\n====================\n\nIn the previous example we configured the ``speedtest`` source to run hourly. Not all sources need to be scheduled, though. We can have a source that listens to a given topic in `MQTT <>`_, eg:\n\n.. code-block:: yaml\n\n    mqtt:\n      plugin: source.mqtt\n      flow: -> db\n      topics:\n        - \"srocto/feeds/#\"\n      host: localhost\n      port: 1883\n      username: bob\n      password: XXX\n      message_is_json: true\n\nThe source above will immediately send an event to the ``db`` node every time a new message shows up in the topic wildcard ``srocto/feeds/#``, so it can be written to the database \u2014 a super easy way of persisting a message queue to disk!\n\nBatching events\n===============\n\nThe example above is not super efficient, since it writes to the database every time an event arrives. Instead, we can easily **batch** the events so that they're accumulated in a queue and processed every, say, 5 minutes:\n\n.. code-block:: yaml\n\n    db:\n      plugin: sink.db.postgresl\n      flow: speedtest, mqtt ->\n      batch: 5 minutes\n      user: alice\n      password: XXX\n      host: localhost\n      port: 5432\n      dbname: default\n\nWith the ``batch`` parameter any incoming events are stored in a queue for the configured time, and processed by the sink together. Any pending events in the queue will still be processed if ``srocto`` terminates gracefully (eg, with ``ctrl+C``).\n\nFiltering events\n================\n\nMuch of the flexibility of Se\u00f1or Octopus comes from a third type of node, the **filter**. Filters can be used to not only filter data, but also format it. For example, let's say we want to turn on some lights at sunset. The ``sun`` source will send events with a value of \"sunset\" or \"sunrise\" every time one occurs:\n\n.. code-block:: python\n\n    {\n        'timestamp': ...,\n        'name': 'hub.sun',\n        'value': 'sunset',\n    }\n\nThe ``tuya`` sink can be used to control a smart switch, but in order to turn it on it expects an event that looks like this:\n\n.. code-block:: python\n\n    {\n        'timestamp': ...,\n        'name': ...,\n        'value': 'on',\n    }\n\nWe can use the ``jinja`` filter to ignore \"sunrise\" events, and to convert the \"sunset\" value into \"on\":\n\n\n.. code-block:: yaml\n\n    sun:\n      plugin: source.sun\n      flow: -> sunset\n      latitude: 38.3\n      longitude: -123.0\n\n    sunset:\n      plugin: filter.jinja\n      flow: sun -> lights\n      template: >\n        {% if event['value'] == 'sunset' %}\n          on\n        {% endif %}\n\n    lights:\n      plugin: sink.tuya\n      flow: sunset ->\n      device: \"Porch lights\"\n      email:\n      password: XXX\n      country: \"1\"\n      application: smart_life\n\nWith this configuration the ``sunset`` filter will drop any events that don't have a value of \"sunset\". And for those events that have, the value will be replaced by the string \"on\" so it can activate the lights in the ``lights`` node.\n\nThrottling events\n=================\n\nSometimes we want to limit the number of events being consumed by a sink. For example, imagine that we want to use Se\u00f1or Octopus to monitor air quality using an `Awair Element <>`_, sending us an SMS when the score is below a given threshold. We would like the SMS to be sent at most once every 30 minutes, and only between 8am and 10pm.\n\nHere's how we can do that:\n\n.. code-block:: yaml\n\n    awair:\n      plugin: source.awair\n      flow: -> bad_air\n      schedule: 0/10 * * * *\n      access_token: XXX\n      device_type: awair-element\n      device_id: 12345\n\n    bad_air:\n      plugin: filter.jinja\n      flow: awair -> sms\n      template: >\n        {% if\n           event['timestamp'].astimezone().hour >= 8 and\n           event['timestamp'].astimezone().hour <= 21 and\n           event['name'] == 'hub.awair.score' and\n           event['value'] < 80\n        %}\n          Air quality score is low: {{ event['value'] }}\n        {% endif %}\n\n    sms:\n      plugin: sink.sms\n      flow: bad_air ->\n      throttle: 30 minutes\n      account_sid: XXX\n      auth_token: XXX\n      from: \"+18002738255\"\n      to: \"+15558675309\"\n\nIn the example above, the ``awair`` source will fetch air quality data every 10 minutes, and send it to ``bad_air``. The filter checks for the hour, to prevent sending an SMS from 10pm to 8am, and checks the air quality score \u2014 if it's lower than 80 it will reformat the value of the event to a nice message, eg:\n\n    \"Air quality score is low: 70\"\n\nThis is then sent to the ``sms`` sink, which has a ``throttle`` of 30 minutes. The throttle configuration will prevent the sink from running more than once every 30 minutes, to avoid spamming us with messages in case the score remains low.\n\nPlugins\n=======\n\nSe\u00f1or Octopus supports an increasing list of plugins, and it's straightforward to add new ones. Each plugin is simply a function that produces, processes, or consumes a stream.\n\nHere's the ``random`` source, which produces random numbers:\n\n.. code-block:: python\n\n    async def rand(events: int = 10, prefix: str = \"hub.random\") -> Stream:\n        for _ in range(events):\n            yield {\n                \"timestamp\":,\n                \"name\": prefix,\n                \"value\": random.random(),\n            }\n\nThis is the full source code for the ``jinja`` filter:\n\n.. code-block:: python\n\n    async def jinja(stream: Stream, template: str) -> Stream:\n        _logger.debug(\"Applying template to events\")\n        tmpl = Template(template)\n        async for event in stream:\n            value = tmpl.render(event=event)\n            if value:\n                yield {\n                    \"timestamp\": event[\"timestamp\"],\n                    \"name\": event[\"name\"],\n                    \"value\": value,\n                }\n\nAnd this is the ``sms`` sink:\n\n.. code-block:: python\n\n    async def sms(\n        stream: Stream, account_sid: str, auth_token: str, to: str, **kwargs: str\n    ) -> None:\n        from_ = kwargs[\"from\"]\n        client = Client(account_sid, auth_token)\n        async for event in stream:\n            _logger.debug(event)\n  \"Sending SMS\")\n            client.messages.create(body=str(event[\"value\"]).strip(), from_=from_, to=to)\n\nAs you can see, a source is an async generator that yields events. A filter receives the stream with additional configuration parameters, and also returns a stream. And a sink receives a stream with additional parameters, and returns nothing.\n\nSources\n~~~~~~~\n\nThe current plugins for sources are:\n\n- `source.awair <>`_: Fetch air quality data from Awair Element monitor.\n- `source.crypto <>`_: Fetch price of cryptocurrencies from\n- `source.mqtt <>`_: Subscribe to messages on one or more MQTT topics.\n- `source.rand <>`_: Generate random numbers between 0 and 1.\n- `source.speed <>`_: Measure internet speed.\n- `source.sqla <>`_: Read data from database.\n- `source.static <>`_: Generate static events.\n- `source.stock <>`_: Fetch stock price form Yahoo! Finance.\n- `source.sun <>`_: Send events on sunrise and sunset.\n- `source.udp <>`_: Listens to UDP messages on a given port.\n- `source.weatherapi <>`_: Fetch weather forecast data from\n- `source.whistle <>`_: Fetch device information and location for a Whistle pet tracker.\n\nFilters\n~~~~~~~\n\nThe existing filters are very similar, the main difference being how you configure them:\n\n- `filter.format <>`_: Format an event stream based using Python string formatting.\n- `filter.jinja <>`_: Apply a Jinja2 template to events.\n- `filter.jsonpath <>`_: Filter event stream based on a JSON path.\n\nSinks\n~~~~~\n\nThese are the current sinks:\n\n- `sink.log <>`_: Send events to a logger.\n- `sink.mqtt <>`_: Send events as messages to an MQTT topic.\n- `sink.pushover <>`_: Send events to the Pushover mobile app.\n- `sink.slack <>`_: Send messages to a Slack channel.\n- `sink.sms <>`_: Send SMS via Twilio.\n- `sink.tuya <>`_: Send commands to a Tuya/Smart Life device.\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "A streaming hub. Sort of.",
    "version": "0.2.0",
    "split_keywords": [],
    "urls": [
            "comment_text": "",
            "digests": {
                "blake2b_256": "cda5831ee65bc32006a894ea72bef94a1fae38d73dd010814da6aafaba486cdc",
                "md5": "e4f185ff955a1211652e0204a74bf48e",
                "sha256": "c2595ae467e14dd34fc9e4b5cb253a1e5651a89ad1492488cba76ea19301957f"
            "downloads": -1,
            "filename": "senor_octopus-0.2.0-py2.py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "e4f185ff955a1211652e0204a74bf48e",
            "packagetype": "bdist_wheel",
            "python_version": "py2.py3",
            "requires_python": ">=3.8",
            "size": 37316,
            "upload_time": "2023-04-16T19:23:14",
            "upload_time_iso_8601": "2023-04-16T19:23:14.360751Z",
            "url": "",
            "yanked": false,
            "yanked_reason": null
            "comment_text": "",
            "digests": {
                "blake2b_256": "bb86b824939331a03634dddf003ebf83f5ec1ad2107187defb885426f479628e",
                "md5": "eff474b6bdfbef39ca9ac98dc1ea5ad1",
                "sha256": "1d4648d138cb2939b21886b57eb5ee7472a1a908dba99fc618fdbac429b851bf"
            "downloads": -1,
            "filename": "senor-octopus-0.2.0.tar.gz",
            "has_sig": false,
            "md5_digest": "eff474b6bdfbef39ca9ac98dc1ea5ad1",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.8",
            "size": 65270,
            "upload_time": "2023-04-16T19:23:16",
            "upload_time_iso_8601": "2023-04-16T19:23:16.867288Z",
            "url": "",
            "yanked": false,
            "yanked_reason": null
    "upload_time": "2023-04-16 19:23:16",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "github_user": "betodealmeida",
    "github_project": "senor-octopus",
    "travis_ci": false,
    "coveralls": true,
    "github_actions": true,
    "lcname": "senor-octopus"
Elapsed time: 0.05433s