sanic_sse
########
1. 安装
==========
.. code-block:: shell
pip install sanic-sse-py3
2. 示例
==========
- 2.1 代码
.. code-block:: python
import os
import sys
from sanic import Sanic
from sanic.response import html
sys.path.insert(0, os.path.join(os.path.dirname(__file__), os.pardir, os.pardir))
from sse.api.urls import sse_bgp
from sse import SseApp
app = Sanic(name="sse")
SSE_CONFIG = {
"pubsub_options": {
"redis_host": os.getenv("SSE_REDIS_HOST", "127.0.0.1"),
"redis_port": int(os.getenv("SSE_REDIS_PORT", "16379")),
"redis_passwd": os.getenv("SSE_REDIS_PASSWD", ""),
},
"ping_interval": int(os.getenv("SSE_PING_INTERVAL", 10)),
}
def init_app(_app):
_app.ctx.sse_config = SSE_CONFIG
SseApp(_app)
_app.blueprint(sse_bgp)
@app.route("/index", methods=["GET"])
async def index(request):
event = request.args.get("event", "test")
url = f"sse/event/listen?event={event}"
d = """
<html>
<body>
<script>
var source = new EventSource("%s");
source.onmessage = function(e) {
console.log("xxxxxxx");
document.getElementById('response').innerHTML + e.data + "<br>";
}
</script>
<h1>Getting server updates</h1>
<div id="response"></div>
</body>
</html>
""" % url
return html(body=d)
if __name__ == "__main__":
init_app(app)
app.run(host="0.0.0.0", port=8008, workers=10)
- 2.2 接口
.. code-block:: shell
GET /sse/event/send?event=test
GET /sse/event/listen?event=test&client_id=
GET /sse/event/terminate?event=test&client_id=
Raw data
{
"_id": null,
"home_page": "https://github.com/daleeg/sanic_sse",
"name": "sanic-sse-py3",
"maintainer": "",
"docs_url": null,
"requires_python": ">=3.8",
"maintainer_email": "",
"keywords": "aio redis pubsub sanic sse",
"author": "",
"author_email": "",
"download_url": "https://files.pythonhosted.org/packages/8e/41/bb2d3e888ca601a6bfe8340d8c389b9cbb905abdb7cc0e25ac1678d405ae/sanic-sse-py3-1.0.7.tar.gz",
"platform": "POSIX",
"description": "sanic_sse\n########\n\n\n1. \u5b89\u88c5\n==========\n\n.. code-block:: shell\n\n pip install sanic-sse-py3\n\n2. \u793a\u4f8b\n==========\n\n- 2.1 \u4ee3\u7801\n\n.. code-block:: python\n\n import os\n import sys\n from sanic import Sanic\n from sanic.response import html\n\n sys.path.insert(0, os.path.join(os.path.dirname(__file__), os.pardir, os.pardir))\n\n from sse.api.urls import sse_bgp\n from sse import SseApp\n\n app = Sanic(name=\"sse\")\n\n SSE_CONFIG = {\n \"pubsub_options\": {\n \"redis_host\": os.getenv(\"SSE_REDIS_HOST\", \"127.0.0.1\"),\n \"redis_port\": int(os.getenv(\"SSE_REDIS_PORT\", \"16379\")),\n \"redis_passwd\": os.getenv(\"SSE_REDIS_PASSWD\", \"\"),\n },\n \"ping_interval\": int(os.getenv(\"SSE_PING_INTERVAL\", 10)),\n\n }\n\n\n def init_app(_app):\n _app.ctx.sse_config = SSE_CONFIG\n SseApp(_app)\n _app.blueprint(sse_bgp)\n\n\n @app.route(\"/index\", methods=[\"GET\"])\n async def index(request):\n event = request.args.get(\"event\", \"test\")\n url = f\"sse/event/listen?event={event}\"\n d = \"\"\"\n <html>\n <body>\n <script>\n var source = new EventSource(\"%s\");\n source.onmessage = function(e) {\n console.log(\"xxxxxxx\");\n document.getElementById('response').innerHTML + e.data + \"<br>\";\n }\n </script>\n <h1>Getting server updates</h1>\n <div id=\"response\"></div>\n </body>\n </html>\n \"\"\" % url\n return html(body=d)\n\n\n if __name__ == \"__main__\":\n init_app(app)\n app.run(host=\"0.0.0.0\", port=8008, workers=10)\n\n\n- 2.2 \u63a5\u53e3\n\n.. code-block:: shell\n\n GET /sse/event/send?event=test\n GET /sse/event/listen?event=test&client_id=\n GET /sse/event/terminate?event=test&client_id=\n\n\n\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "aio sanic sse",
"version": "1.0.7",
"project_urls": {
"Homepage": "https://github.com/daleeg/sanic_sse"
},
"split_keywords": [
"aio",
"redis",
"pubsub",
"sanic",
"sse"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "00f93a0c3097a3e81f3eff2a4211a44657d718bb3824ce18636263c9ef216a45",
"md5": "2854477fa64f7b8961901b891ce2b44b",
"sha256": "d32a4b38b3883bc94e62e904486bb3b7f065279b30d6b00bc183c640f03bbb23"
},
"downloads": -1,
"filename": "sanic_sse_py3-1.0.7-py3-none-any.whl",
"has_sig": false,
"md5_digest": "2854477fa64f7b8961901b891ce2b44b",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.8",
"size": 12011,
"upload_time": "2023-11-09T09:27:34",
"upload_time_iso_8601": "2023-11-09T09:27:34.070723Z",
"url": "https://files.pythonhosted.org/packages/00/f9/3a0c3097a3e81f3eff2a4211a44657d718bb3824ce18636263c9ef216a45/sanic_sse_py3-1.0.7-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "8e41bb2d3e888ca601a6bfe8340d8c389b9cbb905abdb7cc0e25ac1678d405ae",
"md5": "2e20ee0b016edfb5837ea22cc0174584",
"sha256": "b16e48d3fb8b575b75507835d830b6256c5b04d3f3f43e4a452c98d55dad0768"
},
"downloads": -1,
"filename": "sanic-sse-py3-1.0.7.tar.gz",
"has_sig": false,
"md5_digest": "2e20ee0b016edfb5837ea22cc0174584",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.8",
"size": 9155,
"upload_time": "2023-11-09T09:27:35",
"upload_time_iso_8601": "2023-11-09T09:27:35.840094Z",
"url": "https://files.pythonhosted.org/packages/8e/41/bb2d3e888ca601a6bfe8340d8c389b9cbb905abdb7cc0e25ac1678d405ae/sanic-sse-py3-1.0.7.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2023-11-09 09:27:35",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "daleeg",
"github_project": "sanic_sse",
"travis_ci": false,
"coveralls": false,
"github_actions": false,
"requirements": [],
"lcname": "sanic-sse-py3"
}