pyami-asterisk


Namepyami-asterisk JSON
Version 1.5.2 PyPI version JSON
download
home_pagehttps://github.com/streltsovdenis/pyami_asterisk.git
Summarypyami_asterisk is a library based on python’s AsyncIO with Asterisk AMI
upload_time2023-01-22 10:45:36
maintainer
docs_urlNone
authorDenis Streltsov
requires_python
licenseMIT license
keywords ami asterisk asyncio python
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            AsyncIO python library with Asterisk AMI
========================================

[![Build Status](https://travis-ci.com/streltsovdenis/pyami_asterisk.svg?branch=main)](https://travis-ci.com/streltsovdenis/pyami_asterisk)
![PyPI](https://img.shields.io/pypi/v/pyami_asterisk)
![PyPI - Downloads](https://img.shields.io/pypi/dm/pyami_asterisk?color=green)
![PyPI - License](https://img.shields.io/pypi/l/pyami-asterisk?color=green)

Pyami_asterisk is a library based on python’s AsyncIO with Asterisk AMI

Install
-------

Install pyami_asterisk

```bash
pip install pyami-asterisk
```

Usage
-----

Asterisk AMI Listen all Events

```python
import asyncio
from pyami_asterisk import AMIClient


def all_events(events):
    print(events)


async def hangup_call(events):
    """asynchronous callbacks"""
    await asyncio.sleep(3)
    print(events)


ami = AMIClient(host='127.0.0.1', port=5038, username='username', secret='password')
ami.register_event(["*"], all_events)
ami.register_event(["Hangup"], hangup_call)
ami.connect()
```

Asterisk AMI Listen Events: Registry, ContactStatus, PeerStatus

```python
from pyami_asterisk import AMIClient


def register_multiple_events(events):
    print(events)


async def callback_peer_status(events):
    """
        Response event:
        {
            'Event': 'PeerStatus', 
            'Privilege': 'system,all',
            'ChannelType': 'PJSIP', 
            'Peer': 'PJSIP/888', 
            'PeerStatus': 'Unreachable',
        }
    """
    if events.get('PeerStatus') == 'Unreachable':
        await asyncio.sleep(2)
        print(events)

    async def ping(events):
        """asynchronous callbacks"""
        await asyncio.sleep(3)
        print(events)

    ami.create_action({"Action": "Ping"}, ping)


ami = AMIClient(host='127.0.0.1', port=5038, username='username', secret='password')
ami.register_event(patterns=["Registry", "ContactStatus"], callbacks=register_multiple_events)
ami.register_event(["PeerStatus"], callback_peer_status)
ami.connect()
```

Asterisk AMI Actions: CoreSettings

```python
import asyncio
from pyami_asterisk import AMIClient


def core_settings(events):
    print(events)


async def core_status(events):
    """asynchronous callbacks"""
    await asyncio.sleep(4)
    print(events)


ami = AMIClient(host='127.0.0.1', port=5038, username='username', secret='password')
ami.create_action({"Action": "CoreSettings"}, core_settings)
ami.create_action({"Action": "CoreStatus"}, core_status)
ami.connect()
```

Asterisk AMI Actions: CoreSettings, CoreStatus (repeat 3 seconds)

```python
from pyami_asterisk import AMIClient


def core_settings(events):
    print(events)


def core_status(events):
    print(events)
    print(events['CoreCurrentCalls'])


ami = AMIClient(host='127.0.0.1', port=5038, username='username', secret='password')
ami.create_action({"Action": "CoreSettings"}, core_settings)
ami.create_action({"Action": "CoreStatus"}, core_status, repeat=3)
ami.connect()
```

Asterisk AMI Action Originate

```python
from pyami_asterisk import AMIClient


def callback_originate(events):
    print(events)


ami = AMIClient(host='127.0.0.1', port=5038, username='username', secret='password')
ami.create_action(
    {
        "Action": "Originate",
        "Channel": "pjsip/203",
        "Timeout": "20000",
        "CallerID": "+37529XXXXXXX <203>",
        "Exten": "+37529XXXXXXX",
        "Context": "from-internal",
        "Async": "true",
        "Variable": r"PJSIP_HEADER(add,Call-Info)=\;Answer-After=0",  # or multiple Variable ['var=1', 'var=2']
        "Priority": "1",
    },
    callback_originate,
)
ami.connect()
```

Asterisk AMI Listen Events + Action

```python
from pyami_asterisk import AMIClient


def callback_peer_status(events):
    
    def callback_ping(response_ping):
        print("Response Ping", response_ping)

    print("PeerStatus", events)
    ami.create_action({"Action": "Ping"}, callback_ping)


ami = AMIClient(host='127.0.0.1', port=5038, username='username', secret='password')
ami.register_event(["PeerStatus"], callback_peer_status)
ami.connect()
```

Create asyncio task

``` python
import asyncio
import random
from pyami_asterisk import AMIClient


async def refresh_tokens(timeout=4):
    """Example: Refresh tokens"""
    while True:
        print(random.randrange(0, 1000))
        await asyncio.sleep(timeout)
    


ami = AMIClient(host='127.0.0.1', port=5038, username='username', secret='password')
ami.create_asyncio_task(tasks=[refresh_tokens(timeout=2)])
ami.connect()
```

Run / stop async

``` python
import asyncio
from pyami_asterisk import AMIClient


async def all_events(event):
    print(event)
    if event.get('Event') == 'SuccessfulAuth':
        # connection close
        await ami.connection_close()


async def run_async():
    await asyncio.sleep(2)
    await ami.connect_ami()

ami = AMIClient(host='127.0.0.1', port=5038, username='username', secret='password')
ami.register_event(["*"], all_events)
asyncio.run(run_async())
```

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/streltsovdenis/pyami_asterisk.git",
    "name": "pyami-asterisk",
    "maintainer": "",
    "docs_url": null,
    "requires_python": "",
    "maintainer_email": "",
    "keywords": "AMI,Asterisk,asyncio,python",
    "author": "Denis Streltsov",
    "author_email": "sdv.streltsov@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/0f/f0/db7e88ae43a0d317e69d238daaefcd5014f4c2592258fe62b876bc8ea3e5/pyami_asterisk-1.5.2.tar.gz",
    "platform": null,
    "description": "AsyncIO python library with Asterisk AMI\n========================================\n\n[![Build Status](https://travis-ci.com/streltsovdenis/pyami_asterisk.svg?branch=main)](https://travis-ci.com/streltsovdenis/pyami_asterisk)\n![PyPI](https://img.shields.io/pypi/v/pyami_asterisk)\n![PyPI - Downloads](https://img.shields.io/pypi/dm/pyami_asterisk?color=green)\n![PyPI - License](https://img.shields.io/pypi/l/pyami-asterisk?color=green)\n\nPyami_asterisk is a library based on python\u2019s AsyncIO with Asterisk AMI\n\nInstall\n-------\n\nInstall pyami_asterisk\n\n```bash\npip install pyami-asterisk\n```\n\nUsage\n-----\n\nAsterisk AMI Listen all Events\n\n```python\nimport asyncio\nfrom pyami_asterisk import AMIClient\n\n\ndef all_events(events):\n    print(events)\n\n\nasync def hangup_call(events):\n    \"\"\"asynchronous callbacks\"\"\"\n    await asyncio.sleep(3)\n    print(events)\n\n\nami = AMIClient(host='127.0.0.1', port=5038, username='username', secret='password')\nami.register_event([\"*\"], all_events)\nami.register_event([\"Hangup\"], hangup_call)\nami.connect()\n```\n\nAsterisk AMI Listen Events: Registry, ContactStatus, PeerStatus\n\n```python\nfrom pyami_asterisk import AMIClient\n\n\ndef register_multiple_events(events):\n    print(events)\n\n\nasync def callback_peer_status(events):\n    \"\"\"\n        Response event:\n        {\n            'Event': 'PeerStatus', \n            'Privilege': 'system,all',\n            'ChannelType': 'PJSIP', \n            'Peer': 'PJSIP/888', \n            'PeerStatus': 'Unreachable',\n        }\n    \"\"\"\n    if events.get('PeerStatus') == 'Unreachable':\n        await asyncio.sleep(2)\n        print(events)\n\n    async def ping(events):\n        \"\"\"asynchronous callbacks\"\"\"\n        await asyncio.sleep(3)\n        print(events)\n\n    ami.create_action({\"Action\": \"Ping\"}, ping)\n\n\nami = AMIClient(host='127.0.0.1', port=5038, username='username', secret='password')\nami.register_event(patterns=[\"Registry\", \"ContactStatus\"], callbacks=register_multiple_events)\nami.register_event([\"PeerStatus\"], callback_peer_status)\nami.connect()\n```\n\nAsterisk AMI Actions: CoreSettings\n\n```python\nimport asyncio\nfrom pyami_asterisk import AMIClient\n\n\ndef core_settings(events):\n    print(events)\n\n\nasync def core_status(events):\n    \"\"\"asynchronous callbacks\"\"\"\n    await asyncio.sleep(4)\n    print(events)\n\n\nami = AMIClient(host='127.0.0.1', port=5038, username='username', secret='password')\nami.create_action({\"Action\": \"CoreSettings\"}, core_settings)\nami.create_action({\"Action\": \"CoreStatus\"}, core_status)\nami.connect()\n```\n\nAsterisk AMI Actions: CoreSettings, CoreStatus (repeat 3 seconds)\n\n```python\nfrom pyami_asterisk import AMIClient\n\n\ndef core_settings(events):\n    print(events)\n\n\ndef core_status(events):\n    print(events)\n    print(events['CoreCurrentCalls'])\n\n\nami = AMIClient(host='127.0.0.1', port=5038, username='username', secret='password')\nami.create_action({\"Action\": \"CoreSettings\"}, core_settings)\nami.create_action({\"Action\": \"CoreStatus\"}, core_status, repeat=3)\nami.connect()\n```\n\nAsterisk AMI Action Originate\n\n```python\nfrom pyami_asterisk import AMIClient\n\n\ndef callback_originate(events):\n    print(events)\n\n\nami = AMIClient(host='127.0.0.1', port=5038, username='username', secret='password')\nami.create_action(\n    {\n        \"Action\": \"Originate\",\n        \"Channel\": \"pjsip/203\",\n        \"Timeout\": \"20000\",\n        \"CallerID\": \"+37529XXXXXXX <203>\",\n        \"Exten\": \"+37529XXXXXXX\",\n        \"Context\": \"from-internal\",\n        \"Async\": \"true\",\n        \"Variable\": r\"PJSIP_HEADER(add,Call-Info)=\\;Answer-After=0\",  # or multiple Variable ['var=1', 'var=2']\n        \"Priority\": \"1\",\n    },\n    callback_originate,\n)\nami.connect()\n```\n\nAsterisk AMI Listen Events + Action\n\n```python\nfrom pyami_asterisk import AMIClient\n\n\ndef callback_peer_status(events):\n    \n    def callback_ping(response_ping):\n        print(\"Response Ping\", response_ping)\n\n    print(\"PeerStatus\", events)\n    ami.create_action({\"Action\": \"Ping\"}, callback_ping)\n\n\nami = AMIClient(host='127.0.0.1', port=5038, username='username', secret='password')\nami.register_event([\"PeerStatus\"], callback_peer_status)\nami.connect()\n```\n\nCreate asyncio task\n\n``` python\nimport asyncio\nimport random\nfrom pyami_asterisk import AMIClient\n\n\nasync def refresh_tokens(timeout=4):\n    \"\"\"Example: Refresh tokens\"\"\"\n    while True:\n        print(random.randrange(0, 1000))\n        await asyncio.sleep(timeout)\n    \n\n\nami = AMIClient(host='127.0.0.1', port=5038, username='username', secret='password')\nami.create_asyncio_task(tasks=[refresh_tokens(timeout=2)])\nami.connect()\n```\n\nRun / stop async\n\n``` python\nimport asyncio\nfrom pyami_asterisk import AMIClient\n\n\nasync def all_events(event):\n    print(event)\n    if event.get('Event') == 'SuccessfulAuth':\n        # connection close\n        await ami.connection_close()\n\n\nasync def run_async():\n    await asyncio.sleep(2)\n    await ami.connect_ami()\n\nami = AMIClient(host='127.0.0.1', port=5038, username='username', secret='password')\nami.register_event([\"*\"], all_events)\nasyncio.run(run_async())\n```\n",
    "bugtrack_url": null,
    "license": "MIT license",
    "summary": "pyami_asterisk is a library based on python\u2019s AsyncIO with Asterisk AMI",
    "version": "1.5.2",
    "split_keywords": [
        "ami",
        "asterisk",
        "asyncio",
        "python"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "ab8a4571b5a0a2e1d3dd87a8e944f77aca9583e2f1429c4be43856e77452e8e6",
                "md5": "76c5099de295603f0fd9947ee5087235",
                "sha256": "b759ad67d36e14c24a155e80b1116c85685f8cba75ed935e5528d165aba4c291"
            },
            "downloads": -1,
            "filename": "pyami_asterisk-1.5.2-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "76c5099de295603f0fd9947ee5087235",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": null,
            "size": 6938,
            "upload_time": "2023-01-22T10:45:34",
            "upload_time_iso_8601": "2023-01-22T10:45:34.897727Z",
            "url": "https://files.pythonhosted.org/packages/ab/8a/4571b5a0a2e1d3dd87a8e944f77aca9583e2f1429c4be43856e77452e8e6/pyami_asterisk-1.5.2-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "0ff0db7e88ae43a0d317e69d238daaefcd5014f4c2592258fe62b876bc8ea3e5",
                "md5": "7e75b388b868ca326e9b6661ff30c60a",
                "sha256": "d9e0b1a22084c9a21675f72c3dd39a422f442cbab4f5cf42ec6718626a0eece9"
            },
            "downloads": -1,
            "filename": "pyami_asterisk-1.5.2.tar.gz",
            "has_sig": false,
            "md5_digest": "7e75b388b868ca326e9b6661ff30c60a",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": null,
            "size": 6621,
            "upload_time": "2023-01-22T10:45:36",
            "upload_time_iso_8601": "2023-01-22T10:45:36.623308Z",
            "url": "https://files.pythonhosted.org/packages/0f/f0/db7e88ae43a0d317e69d238daaefcd5014f4c2592258fe62b876bc8ea3e5/pyami_asterisk-1.5.2.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2023-01-22 10:45:36",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "github_user": "streltsovdenis",
    "github_project": "pyami_asterisk.git",
    "lcname": "pyami-asterisk"
}
        
Elapsed time: 0.04437s