# CDP-Socket
* Handle [Chrome-Developer-Protocol](https://chromedevtools.github.io/devtools-protocol/) connections
### Feel free to test my code!
## Getting Started
### Dependencies
* [Python >= 3.7](https://www.python.org/downloads/)
* [Chrome-Browser](https://www.google.de/chrome/) installed
### Installing
* [Windows] Install [Chrome-Browser](https://www.google.de/chrome/)
* ```pip install cdp-socket```
#### Single socket
```python
from cdp_socket.utils.utils import launch_chrome, random_port
from cdp_socket.utils.conn import get_websock_url
from cdp_socket.socket import SingleCDPSocket
import os
import asyncio
async def main():
data_dir = os.getcwd()+"/data_dir"
PORT = random_port()
process = launch_chrome(data_dir,PORT)
websock_url = await get_websock_url(PORT, timeout=5)
async with SingleCDPSocket(websock_url, timeout=5) as sock:
targets = await sock.exec("Target.getTargets")
print(targets)
os.kill(process.pid, 15)
asyncio.run(main())
```
#### on_closed callback
```python
from cdp_socket.socket import SingleCDPSocket
def on_closed(code, reason):
print("Closed with: ", code, reason)
async with SingleCDPSocket(websock_url, timeout=5) as sock:
sock.on_closed.append(on_closed)
# close window for dispatching this event
targets = await sock.exec("Target.getTargets")
print(targets)
```
#### add event listener
```python
from cdp_socket.socket import SingleCDPSocket
import asyncio
def on_detached(params):
print("Detached with: ", params)
async with SingleCDPSocket(websock_url, timeout=5) as sock:
# close window for dispatching this event
sock.add_listener('Inspector.detached', on_detached)
await asyncio.sleep(1000)
```
#### iterate over event
```python
from cdp_socket.socket import SingleCDPSocket
async with SingleCDPSocket(websock_url, timeout=5) as sock:
async for i in sock.method_iterator('Inspector.detached'):
print(i)
break
```
#### wait for event
```python
from cdp_socket.socket import SingleCDPSocket
async with SingleCDPSocket(websock_url, timeout=5) as sock:
res = await sock.wait_for('Inspector.detached')
print(res)
```
#### synchronous
```python
from cdp_socket.utils.utils import launch_chrome, random_port
from cdp_socket.utils.conn import get_websock_url
from cdp_socket.socket import SingleCDPSocket
import os
import shutil
import asyncio
data_dir = os.getcwd()+"/data_dir"
PORT = random_port()
process = launch_chrome(data_dir,PORT)
loop = asyncio.get_event_loop()
websock_url = loop.run_until_complete(get_websock_url(PORT, timeout=5))
conn = loop.run_until_complete(SingleCDPSocket(websock_url, timeout=5, loop=loop))
targets = loop.run_until_complete(conn.exec("Target.getTargets"))
print(targets)
os.kill(process.pid, 15)
shutil.rmtree(data_dir)
```
#### CDPSocket
```python
from cdp_socket.utils.utils import launch_chrome, random_port
from cdp_socket.socket import CDPSocket
import os
import asyncio
async def main():
data_dir = os.getcwd()+"/data_dir"
PORT = random_port()
process = launch_chrome(data_dir,PORT)
async with CDPSocket(PORT) as base_socket:
targets = await base_socket.targets
sock1 = await base_socket.get_socket(targets[0])
targets = await sock1.exec("Target.getTargets")
print(targets)
os.kill(process.pid, 15)
asyncio.run(main())
```
#### Custom exception handling
You can implement custom exception handling as following
```python
import cdp_socket
import sys
# print exception without traceback
sys.modules["cdp_socket"].EXC_HANDLER = lambda e: print(f'Exception in event-handler:\n{e.__class__.__module__}.{e.__class__.__name__}: {e}', file=sys.stderr)
```
## Help
Please feel free to open an issue or fork!
## Performance
On a Win10 Laptop
executing `"Browser.getVersion"`
```
Static benchmark:
passed 3_640.0 mB in 2.74 s, 13_308.958 mB/sec
3_656 (each 364 bytes) exec per sec.
```
for returning a big (static) object over javascript
```
JS benchmark:
passed 22_990.0 mB in 3.16 s, 7_284.5374 mB/sec
1_584 (each 4.598 mB) exec (JS) per s
```
## Authors
[Aurin Aegerter](mailto:aurinliun@gmx.ch)
## Disclaimer
I am not responsible what you use the code for!!! Also no warranty!
## Acknowledgments
Inspiration, code snippets, etc.
- [Chrome-Developer-Protocol](https://chromedevtools.github.io/devtools-protocol/)
## contributors
- thanks to [@Redrrx](https://github.com/Redrrx) who gave me some starting-points
Raw data
{
"_id": null,
"home_page": "https://github.com/kaliiiiiiiiii/CDP-Socket",
"name": "cdp-socket",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.7",
"maintainer_email": null,
"keywords": "Chromium, socket, webautomation",
"author": "Aurin Aegerter",
"author_email": "aurinliun@gmx.ch",
"download_url": "https://files.pythonhosted.org/packages/7d/28/58812797e54fb8cf22bff61125e5a7d2763de1a86855549ecc417bdd06d5/cdp-socket-1.2.8.tar.gz",
"platform": null,
"description": "# CDP-Socket\r\n\r\n* Handle [Chrome-Developer-Protocol](https://chromedevtools.github.io/devtools-protocol/) connections\r\n\r\n### Feel free to test my code!\r\n\r\n## Getting Started\r\n\r\n### Dependencies\r\n\r\n* [Python >= 3.7](https://www.python.org/downloads/)\r\n* [Chrome-Browser](https://www.google.de/chrome/) installed\r\n\r\n### Installing\r\n\r\n* [Windows] Install [Chrome-Browser](https://www.google.de/chrome/)\r\n* ```pip install cdp-socket```\r\n\r\n#### Single socket\r\n```python\r\nfrom cdp_socket.utils.utils import launch_chrome, random_port\r\nfrom cdp_socket.utils.conn import get_websock_url\r\n\r\nfrom cdp_socket.socket import SingleCDPSocket\r\n\r\nimport os\r\nimport asyncio\r\n\r\n\r\nasync def main():\r\n data_dir = os.getcwd()+\"/data_dir\"\r\n PORT = random_port()\r\n process = launch_chrome(data_dir,PORT)\r\n\r\n websock_url = await get_websock_url(PORT, timeout=5)\r\n async with SingleCDPSocket(websock_url, timeout=5) as sock:\r\n targets = await sock.exec(\"Target.getTargets\")\r\n print(targets)\r\n\r\n os.kill(process.pid, 15)\r\n\r\n\r\nasyncio.run(main())\r\n```\r\n\r\n#### on_closed callback\r\n```python\r\nfrom cdp_socket.socket import SingleCDPSocket\r\n\r\ndef on_closed(code, reason):\r\n print(\"Closed with: \", code, reason)\r\n\r\nasync with SingleCDPSocket(websock_url, timeout=5) as sock:\r\n sock.on_closed.append(on_closed)\r\n # close window for dispatching this event\r\n targets = await sock.exec(\"Target.getTargets\")\r\n print(targets)\r\n```\r\n\r\n#### add event listener\r\n```python\r\nfrom cdp_socket.socket import SingleCDPSocket\r\nimport asyncio\r\n\r\ndef on_detached(params):\r\n print(\"Detached with: \", params)\r\n \r\nasync with SingleCDPSocket(websock_url, timeout=5) as sock:\r\n # close window for dispatching this event\r\n sock.add_listener('Inspector.detached', on_detached)\r\n await asyncio.sleep(1000)\r\n```\r\n\r\n#### iterate over event\r\n```python\r\nfrom cdp_socket.socket import SingleCDPSocket\r\n\r\nasync with SingleCDPSocket(websock_url, timeout=5) as sock:\r\n async for i in sock.method_iterator('Inspector.detached'):\r\n print(i)\r\n break\r\n```\r\n\r\n#### wait for event\r\n```python\r\nfrom cdp_socket.socket import SingleCDPSocket\r\n\r\nasync with SingleCDPSocket(websock_url, timeout=5) as sock:\r\n res = await sock.wait_for('Inspector.detached')\r\n print(res)\r\n```\r\n\r\n#### synchronous\r\n```python\r\nfrom cdp_socket.utils.utils import launch_chrome, random_port\r\nfrom cdp_socket.utils.conn import get_websock_url\r\n\r\nfrom cdp_socket.socket import SingleCDPSocket\r\n\r\nimport os\r\nimport shutil\r\nimport asyncio\r\n\r\ndata_dir = os.getcwd()+\"/data_dir\"\r\nPORT = random_port()\r\nprocess = launch_chrome(data_dir,PORT)\r\n\r\nloop = asyncio.get_event_loop()\r\nwebsock_url = loop.run_until_complete(get_websock_url(PORT, timeout=5))\r\n\r\nconn = loop.run_until_complete(SingleCDPSocket(websock_url, timeout=5, loop=loop))\r\ntargets = loop.run_until_complete(conn.exec(\"Target.getTargets\"))\r\nprint(targets)\r\n\r\nos.kill(process.pid, 15)\r\nshutil.rmtree(data_dir)\r\n```\r\n\r\n#### CDPSocket\r\n```python\r\nfrom cdp_socket.utils.utils import launch_chrome, random_port\r\nfrom cdp_socket.socket import CDPSocket\r\n\r\nimport os\r\nimport asyncio\r\n\r\nasync def main():\r\n data_dir = os.getcwd()+\"/data_dir\"\r\n PORT = random_port()\r\n process = launch_chrome(data_dir,PORT)\r\n \r\n async with CDPSocket(PORT) as base_socket:\r\n targets = await base_socket.targets\r\n sock1 = await base_socket.get_socket(targets[0])\r\n targets = await sock1.exec(\"Target.getTargets\")\r\n print(targets)\r\n os.kill(process.pid, 15)\r\n\r\n\r\nasyncio.run(main())\r\n```\r\n\r\n#### Custom exception handling\r\nYou can implement custom exception handling as following\r\n\r\n```python\r\nimport cdp_socket\r\nimport sys\r\n# print exception without traceback\r\nsys.modules[\"cdp_socket\"].EXC_HANDLER = lambda e: print(f'Exception in event-handler:\\n{e.__class__.__module__}.{e.__class__.__name__}: {e}', file=sys.stderr)\r\n```\r\n\r\n## Help\r\n\r\nPlease feel free to open an issue or fork!\r\n\r\n## Performance\r\nOn a Win10 Laptop\r\n\r\nexecuting `\"Browser.getVersion\"`\r\n```\r\nStatic benchmark:\r\npassed 3_640.0 mB in 2.74 s, 13_308.958 mB/sec \r\n3_656 (each 364 bytes) exec per sec.\r\n```\r\nfor returning a big (static) object over javascript\r\n```\r\nJS benchmark:\r\npassed 22_990.0 mB in 3.16 s, 7_284.5374 mB/sec \r\n1_584 (each 4.598 mB) exec (JS) per s\r\n```\r\n\r\n\r\n## Authors\r\n\r\n[Aurin Aegerter](mailto:aurinliun@gmx.ch)\r\n\r\n## Disclaimer\r\n\r\nI am not responsible what you use the code for!!! Also no warranty!\r\n\r\n## Acknowledgments\r\n\r\nInspiration, code snippets, etc.\r\n- [Chrome-Developer-Protocol](https://chromedevtools.github.io/devtools-protocol/)\r\n\r\n## contributors\r\n\r\n- thanks to [@Redrrx](https://github.com/Redrrx) who gave me some starting-points\r\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "socket for handling chrome-developer-protocol connections",
"version": "1.2.8",
"project_urls": {
"Bug Reports": "https://github.com/kaliiiiiiiiii/CDP-Socket/issues",
"Documentation": "https://github.com/kaliiiiiiiiii/CDP-Socket",
"Homepage": "https://github.com/kaliiiiiiiiii/CDP-Socket",
"Source Code": "https://github.com/kaliiiiiiiiii/CDP-Socket"
},
"split_keywords": [
"chromium",
" socket",
" webautomation"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "7d2858812797e54fb8cf22bff61125e5a7d2763de1a86855549ecc417bdd06d5",
"md5": "9de836930abd7e25a9fb26f85c62ae14",
"sha256": "d8a3d55883205c7c45c05292cf5ef5a5c74534873e369e258e61213cce15be1a"
},
"downloads": -1,
"filename": "cdp-socket-1.2.8.tar.gz",
"has_sig": false,
"md5_digest": "9de836930abd7e25a9fb26f85c62ae14",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.7",
"size": 19777,
"upload_time": "2024-04-28T09:55:37",
"upload_time_iso_8601": "2024-04-28T09:55:37.728126Z",
"url": "https://files.pythonhosted.org/packages/7d/28/58812797e54fb8cf22bff61125e5a7d2763de1a86855549ecc417bdd06d5/cdp-socket-1.2.8.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-04-28 09:55:37",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "kaliiiiiiiiii",
"github_project": "CDP-Socket",
"travis_ci": false,
"coveralls": false,
"github_actions": false,
"requirements": [
{
"name": "aiohttp",
"specs": []
},
{
"name": "websockets",
"specs": []
},
{
"name": "orjson",
"specs": []
},
{
"name": "twine",
"specs": []
},
{
"name": "setuptools",
"specs": []
},
{
"name": "aiodebug",
"specs": []
}
],
"lcname": "cdp-socket"
}