AsyncIO python library with Asterisk AMI
========================================
[![Build Status](https://travis-ci.com/streltsovdenis/pyami_asterisk.svg?branch=main)](https://travis-ci.com/streltsovdenis/pyami_asterisk)
![PyPI](https://img.shields.io/pypi/v/pyami_asterisk)
![PyPI - Downloads](https://img.shields.io/pypi/dm/pyami_asterisk?color=green)
![PyPI - License](https://img.shields.io/pypi/l/pyami-asterisk?color=green)
Pyami_asterisk is a library based on python’s AsyncIO with Asterisk AMI
Install
-------
Install pyami_asterisk
```bash
pip install pyami-asterisk
```
Usage
-----
Asterisk AMI Listen all Events
```python
import asyncio
from pyami_asterisk import AMIClient
def all_events(events):
print(events)
async def hangup_call(events):
"""asynchronous callbacks"""
await asyncio.sleep(3)
print(events)
ami = AMIClient(host='127.0.0.1', port=5038, username='username', secret='password')
ami.register_event(["*"], all_events)
ami.register_event(["Hangup"], hangup_call)
ami.connect()
```
Asterisk AMI Listen Events: Registry, ContactStatus, PeerStatus
```python
from pyami_asterisk import AMIClient
def register_multiple_events(events):
print(events)
async def callback_peer_status(events):
"""
Response event:
{
'Event': 'PeerStatus',
'Privilege': 'system,all',
'ChannelType': 'PJSIP',
'Peer': 'PJSIP/888',
'PeerStatus': 'Unreachable',
}
"""
if events.get('PeerStatus') == 'Unreachable':
await asyncio.sleep(2)
print(events)
async def ping(events):
"""asynchronous callbacks"""
await asyncio.sleep(3)
print(events)
ami.create_action({"Action": "Ping"}, ping)
ami = AMIClient(host='127.0.0.1', port=5038, username='username', secret='password')
ami.register_event(patterns=["Registry", "ContactStatus"], callbacks=register_multiple_events)
ami.register_event(["PeerStatus"], callback_peer_status)
ami.connect()
```
Asterisk AMI Actions: CoreSettings
```python
import asyncio
from pyami_asterisk import AMIClient
def core_settings(events):
print(events)
async def core_status(events):
"""asynchronous callbacks"""
await asyncio.sleep(4)
print(events)
ami = AMIClient(host='127.0.0.1', port=5038, username='username', secret='password')
ami.create_action({"Action": "CoreSettings"}, core_settings)
ami.create_action({"Action": "CoreStatus"}, core_status)
ami.connect()
```
Asterisk AMI Actions: CoreSettings, CoreStatus (repeat 3 seconds)
```python
from pyami_asterisk import AMIClient
def core_settings(events):
print(events)
def core_status(events):
print(events)
print(events['CoreCurrentCalls'])
ami = AMIClient(host='127.0.0.1', port=5038, username='username', secret='password')
ami.create_action({"Action": "CoreSettings"}, core_settings)
ami.create_action({"Action": "CoreStatus"}, core_status, repeat=3)
ami.connect()
```
Asterisk AMI Action Originate
```python
from pyami_asterisk import AMIClient
def callback_originate(events):
print(events)
ami = AMIClient(host='127.0.0.1', port=5038, username='username', secret='password')
ami.create_action(
{
"Action": "Originate",
"Channel": "pjsip/203",
"Timeout": "20000",
"CallerID": "+37529XXXXXXX <203>",
"Exten": "+37529XXXXXXX",
"Context": "from-internal",
"Async": "true",
"Variable": r"PJSIP_HEADER(add,Call-Info)=\;Answer-After=0", # or multiple Variable ['var=1', 'var=2']
"Priority": "1",
},
callback_originate,
)
ami.connect()
```
Asterisk AMI Listen Events + Action
```python
from pyami_asterisk import AMIClient
def callback_peer_status(events):
def callback_ping(response_ping):
print("Response Ping", response_ping)
print("PeerStatus", events)
ami.create_action({"Action": "Ping"}, callback_ping)
ami = AMIClient(host='127.0.0.1', port=5038, username='username', secret='password')
ami.register_event(["PeerStatus"], callback_peer_status)
ami.connect()
```
Create asyncio task
``` python
import asyncio
import random
from pyami_asterisk import AMIClient
async def refresh_tokens(timeout=4):
"""Example: Refresh tokens"""
while True:
print(random.randrange(0, 1000))
await asyncio.sleep(timeout)
ami = AMIClient(host='127.0.0.1', port=5038, username='username', secret='password')
ami.create_asyncio_task(tasks=[refresh_tokens(timeout=2)])
ami.connect()
```
Run / stop async
``` python
import asyncio
from pyami_asterisk import AMIClient
async def all_events(event):
print(event)
if event.get('Event') == 'SuccessfulAuth':
# connection close
await ami.connection_close()
async def run_async():
await asyncio.sleep(2)
await ami.connect_ami()
ami = AMIClient(host='127.0.0.1', port=5038, username='username', secret='password')
ami.register_event(["*"], all_events)
asyncio.run(run_async())
```
Raw data
{
"_id": null,
"home_page": "https://github.com/streltsovdenis/pyami_asterisk.git",
"name": "pyami-asterisk",
"maintainer": "",
"docs_url": null,
"requires_python": "",
"maintainer_email": "",
"keywords": "AMI,Asterisk,asyncio,python",
"author": "Denis Streltsov",
"author_email": "sdv.streltsov@gmail.com",
"download_url": "https://files.pythonhosted.org/packages/0f/f0/db7e88ae43a0d317e69d238daaefcd5014f4c2592258fe62b876bc8ea3e5/pyami_asterisk-1.5.2.tar.gz",
"platform": null,
"description": "AsyncIO python library with Asterisk AMI\n========================================\n\n[![Build Status](https://travis-ci.com/streltsovdenis/pyami_asterisk.svg?branch=main)](https://travis-ci.com/streltsovdenis/pyami_asterisk)\n![PyPI](https://img.shields.io/pypi/v/pyami_asterisk)\n![PyPI - Downloads](https://img.shields.io/pypi/dm/pyami_asterisk?color=green)\n![PyPI - License](https://img.shields.io/pypi/l/pyami-asterisk?color=green)\n\nPyami_asterisk is a library based on python\u2019s AsyncIO with Asterisk AMI\n\nInstall\n-------\n\nInstall pyami_asterisk\n\n```bash\npip install pyami-asterisk\n```\n\nUsage\n-----\n\nAsterisk AMI Listen all Events\n\n```python\nimport asyncio\nfrom pyami_asterisk import AMIClient\n\n\ndef all_events(events):\n print(events)\n\n\nasync def hangup_call(events):\n \"\"\"asynchronous callbacks\"\"\"\n await asyncio.sleep(3)\n print(events)\n\n\nami = AMIClient(host='127.0.0.1', port=5038, username='username', secret='password')\nami.register_event([\"*\"], all_events)\nami.register_event([\"Hangup\"], hangup_call)\nami.connect()\n```\n\nAsterisk AMI Listen Events: Registry, ContactStatus, PeerStatus\n\n```python\nfrom pyami_asterisk import AMIClient\n\n\ndef register_multiple_events(events):\n print(events)\n\n\nasync def callback_peer_status(events):\n \"\"\"\n Response event:\n {\n 'Event': 'PeerStatus', \n 'Privilege': 'system,all',\n 'ChannelType': 'PJSIP', \n 'Peer': 'PJSIP/888', \n 'PeerStatus': 'Unreachable',\n }\n \"\"\"\n if events.get('PeerStatus') == 'Unreachable':\n await asyncio.sleep(2)\n print(events)\n\n async def ping(events):\n \"\"\"asynchronous callbacks\"\"\"\n await asyncio.sleep(3)\n print(events)\n\n ami.create_action({\"Action\": \"Ping\"}, ping)\n\n\nami = AMIClient(host='127.0.0.1', port=5038, username='username', secret='password')\nami.register_event(patterns=[\"Registry\", \"ContactStatus\"], callbacks=register_multiple_events)\nami.register_event([\"PeerStatus\"], callback_peer_status)\nami.connect()\n```\n\nAsterisk AMI Actions: CoreSettings\n\n```python\nimport asyncio\nfrom pyami_asterisk import AMIClient\n\n\ndef core_settings(events):\n print(events)\n\n\nasync def core_status(events):\n \"\"\"asynchronous callbacks\"\"\"\n await asyncio.sleep(4)\n print(events)\n\n\nami = AMIClient(host='127.0.0.1', port=5038, username='username', secret='password')\nami.create_action({\"Action\": \"CoreSettings\"}, core_settings)\nami.create_action({\"Action\": \"CoreStatus\"}, core_status)\nami.connect()\n```\n\nAsterisk AMI Actions: CoreSettings, CoreStatus (repeat 3 seconds)\n\n```python\nfrom pyami_asterisk import AMIClient\n\n\ndef core_settings(events):\n print(events)\n\n\ndef core_status(events):\n print(events)\n print(events['CoreCurrentCalls'])\n\n\nami = AMIClient(host='127.0.0.1', port=5038, username='username', secret='password')\nami.create_action({\"Action\": \"CoreSettings\"}, core_settings)\nami.create_action({\"Action\": \"CoreStatus\"}, core_status, repeat=3)\nami.connect()\n```\n\nAsterisk AMI Action Originate\n\n```python\nfrom pyami_asterisk import AMIClient\n\n\ndef callback_originate(events):\n print(events)\n\n\nami = AMIClient(host='127.0.0.1', port=5038, username='username', secret='password')\nami.create_action(\n {\n \"Action\": \"Originate\",\n \"Channel\": \"pjsip/203\",\n \"Timeout\": \"20000\",\n \"CallerID\": \"+37529XXXXXXX <203>\",\n \"Exten\": \"+37529XXXXXXX\",\n \"Context\": \"from-internal\",\n \"Async\": \"true\",\n \"Variable\": r\"PJSIP_HEADER(add,Call-Info)=\\;Answer-After=0\", # or multiple Variable ['var=1', 'var=2']\n \"Priority\": \"1\",\n },\n callback_originate,\n)\nami.connect()\n```\n\nAsterisk AMI Listen Events + Action\n\n```python\nfrom pyami_asterisk import AMIClient\n\n\ndef callback_peer_status(events):\n \n def callback_ping(response_ping):\n print(\"Response Ping\", response_ping)\n\n print(\"PeerStatus\", events)\n ami.create_action({\"Action\": \"Ping\"}, callback_ping)\n\n\nami = AMIClient(host='127.0.0.1', port=5038, username='username', secret='password')\nami.register_event([\"PeerStatus\"], callback_peer_status)\nami.connect()\n```\n\nCreate asyncio task\n\n``` python\nimport asyncio\nimport random\nfrom pyami_asterisk import AMIClient\n\n\nasync def refresh_tokens(timeout=4):\n \"\"\"Example: Refresh tokens\"\"\"\n while True:\n print(random.randrange(0, 1000))\n await asyncio.sleep(timeout)\n \n\n\nami = AMIClient(host='127.0.0.1', port=5038, username='username', secret='password')\nami.create_asyncio_task(tasks=[refresh_tokens(timeout=2)])\nami.connect()\n```\n\nRun / stop async\n\n``` python\nimport asyncio\nfrom pyami_asterisk import AMIClient\n\n\nasync def all_events(event):\n print(event)\n if event.get('Event') == 'SuccessfulAuth':\n # connection close\n await ami.connection_close()\n\n\nasync def run_async():\n await asyncio.sleep(2)\n await ami.connect_ami()\n\nami = AMIClient(host='127.0.0.1', port=5038, username='username', secret='password')\nami.register_event([\"*\"], all_events)\nasyncio.run(run_async())\n```\n",
"bugtrack_url": null,
"license": "MIT license",
"summary": "pyami_asterisk is a library based on python\u2019s AsyncIO with Asterisk AMI",
"version": "1.5.2",
"split_keywords": [
"ami",
"asterisk",
"asyncio",
"python"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "ab8a4571b5a0a2e1d3dd87a8e944f77aca9583e2f1429c4be43856e77452e8e6",
"md5": "76c5099de295603f0fd9947ee5087235",
"sha256": "b759ad67d36e14c24a155e80b1116c85685f8cba75ed935e5528d165aba4c291"
},
"downloads": -1,
"filename": "pyami_asterisk-1.5.2-py3-none-any.whl",
"has_sig": false,
"md5_digest": "76c5099de295603f0fd9947ee5087235",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": null,
"size": 6938,
"upload_time": "2023-01-22T10:45:34",
"upload_time_iso_8601": "2023-01-22T10:45:34.897727Z",
"url": "https://files.pythonhosted.org/packages/ab/8a/4571b5a0a2e1d3dd87a8e944f77aca9583e2f1429c4be43856e77452e8e6/pyami_asterisk-1.5.2-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "0ff0db7e88ae43a0d317e69d238daaefcd5014f4c2592258fe62b876bc8ea3e5",
"md5": "7e75b388b868ca326e9b6661ff30c60a",
"sha256": "d9e0b1a22084c9a21675f72c3dd39a422f442cbab4f5cf42ec6718626a0eece9"
},
"downloads": -1,
"filename": "pyami_asterisk-1.5.2.tar.gz",
"has_sig": false,
"md5_digest": "7e75b388b868ca326e9b6661ff30c60a",
"packagetype": "sdist",
"python_version": "source",
"requires_python": null,
"size": 6621,
"upload_time": "2023-01-22T10:45:36",
"upload_time_iso_8601": "2023-01-22T10:45:36.623308Z",
"url": "https://files.pythonhosted.org/packages/0f/f0/db7e88ae43a0d317e69d238daaefcd5014f4c2592258fe62b876bc8ea3e5/pyami_asterisk-1.5.2.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2023-01-22 10:45:36",
"github": true,
"gitlab": false,
"bitbucket": false,
"github_user": "streltsovdenis",
"github_project": "pyami_asterisk.git",
"lcname": "pyami-asterisk"
}