Name | aiomisc-pytest JSON |
Version |
1.1.2
JSON |
| download |
home_page | |
Summary | pytest integration for aiomisc |
upload_time | 2024-03-11 14:24:09 |
maintainer | |
docs_url | None |
author | Dmitry Orlov |
requires_python | >=3.7,<4.0 |
license | MIT |
keywords |
pytest
aiomisc
|
VCS |
|
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
No Travis.
|
coveralls test coverage |
No coveralls.
|
aiomisc pytest plugin
=====================
This package contains a plugin for pytest.
## Basic usage
Simple usage example:
```python
async def test_sample(event_loop):
f = event_loop.crete_future()
event_loop.call_soon(f.set_result, True)
assert await f
```
asynchronous fixture example:
```python
import asyncio
import pytest
@pytest.fixture
async def my_fixture(loop):
await asyncio.sleep(0)
# Requires python 3.6+
yield
```
In case you have to save an instance of an async fixture between tests,
the wrong solution is just changing the fixture scope.
But why it wouldn't work? That's because, in the base scenario, the `loop`
fixture creates a new event loop instance per test which will be closed after
test teardown. When you have to use an async fixture any caller of
`asyncio.get_event_loop()` will get the current event loop instance which
will be closed and the next test will run in another event loop.
So the solution is to redefine the `loop` fixture with the required scope
and custom fixture with the required scope.
```python
import asyncio
import pytest
from aiomisc import entrypoint
@pytest.fixture(scope='module')
def loop():
with entrypoint() as loop:
asyncio.set_event_loop(loop)
yield loop
@pytest.fixture(scope='module')
async def sample_fixture(loop):
yield 1
LOOP_ID = None
async def test_using_fixture(sample_fixture):
global LOOP_ID
LOOP_ID = id(asyncio.get_event_loop())
assert sample_fixture == 1
async def test_not_using_fixture(loop):
assert id(loop) == LOOP_ID
```
## pytest markers
Package contains some useful markers for pytest:
* `catch_loop_exceptions` - uncaught event loop exceptions will failling test.
* `forbid_get_event_loop` - forbids call `asyncio.get_event_loop`
during test case.
```python
import asyncio
import pytest
@pytest.mark.forbid_get_event_loop
async def test_with_get_loop():
def switch_context():
loop = asyncio.get_event_loop()
future = loop.create_future()
loop.call_soon(future.set_result, True)
return future
with pytest.raises(Exception):
await switch_context()
# Test will be failed
@pytest.mark.catch_loop_exceptions
async def test_with_errors(loop):
async def fail():
# switch context
await asyncio.sleep(0)
raise Exception()
loop.create_task(fail())
await asyncio.sleep(0.1)
return
```
## Passing default context
```python
import pytest
@pytest.fixture
def default_context():
return {
'foo': 'bar',
'bar': 'foo',
}
```
## Testing services
Redefine `services` fixture in your test module:
```python
import aiomisc
import pytest
class SimpleServie(aiomisc.Service):
async def start(self) -> None:
pass
@pytest.fixture
def services():
return [SimpleServie()]
```
## Event loop policy overriding
```python
import asyncio
import pytest
import tokio
import uvloop
policy_ids = ('uvloop', 'asyncio', 'tokio')
policies = (uvloop.EventLoopPolicy(),
asyncio.DefaultEventLoopPolicy(),
tokio.EventLoopPolicy())
@pytest.fixture(params=policies, ids=policy_ids)
def event_loop_policy(request):
return request.param
```
## Thread pool overriding
```python
import pytest
from aiomisc.thread_pool import ThreadPoolExecutor
import concurrent.futures
thread_pool_ids = ('aiomisc pool', 'default pool')
thread_pool_implementation = (ThreadPoolExecutor,
concurrent.futures.ThreadPoolExecutor)
@pytest.fixture(params=thread_pool_implementation, ids=thread_pool_ids)
def thread_pool_executor(request):
return request.param
```
## entrypoint arguments
```python
import pytest
@pytest.fixture
def entrypoint_kwargs() -> dict:
return dict(log_config=False)
```
## aiohttp test client
```python
import pytest
from myapp.services.rest import REST
@pytest.fixture
def rest_port(aiomisc_unused_port_factory):
return aiomisc_unused_port_factory()
@pytest.fixture
def rest_service(rest_port):
return REST(port=rest_port)
@pytest.fixture
def services(rest_service):
return [rest_service]
@pytest.fixture
def api_client(api_service):
test_srv = TestServer(
app=rest_service.app,
port=arguments.port,
)
return TestClient(test_srv)
...
```
## TCPProxy
Simple TCP proxy for emulate network problems. Available as fixture `tcp_proxy`
Examples:
```python
import asyncio
import time
import pytest
import aiomisc
class EchoServer(aiomisc.service.TCPServer):
async def handle_client(
self, reader: asyncio.StreamReader,
writer: asyncio.StreamWriter
):
chunk = await reader.read(65534)
while chunk:
writer.write(chunk)
chunk = await reader.read(65534)
writer.close()
await writer.wait_closed()
@pytest.fixture()
def server_port(aiomisc_unused_port_factory) -> int:
return aiomisc_unused_port_factory()
@pytest.fixture()
def services(server_port, localhost):
return [EchoServer(port=server_port, address=localhost)]
@pytest.fixture()
async def proxy(tcp_proxy, localhost, server_port):
async with tcp_proxy(localhost, server_port) as proxy:
yield proxy
async def test_proxy_client_close(proxy):
reader, writer = await proxy.create_client()
payload = b"Hello world"
writer.write(payload)
response = await asyncio.wait_for(reader.read(1024), timeout=1)
assert response == payload
assert not reader.at_eof()
await proxy.disconnect_all()
assert await asyncio.wait_for(reader.read(), timeout=1) == b""
assert reader.at_eof()
async def test_proxy_client_slow(proxy):
read_delay = 0.1
write_delay = 0.2
# Emulation of asymmetric and slow ISP
with proxy.slowdown(read_delay, write_delay):
reader, writer = await proxy.create_client()
payload = b"Hello world"
delta = -time.monotonic()
writer.write(payload)
await asyncio.wait_for(reader.read(1024), timeout=2)
delta += time.monotonic()
assert delta >= read_delay + write_delay
async def test_proxy_client_with_processor(proxy):
processed_request = b"Never say hello"
# Patching protocol functions
proxy.set_content_processors(
# Process data from client to server
lambda _: processed_request,
# Process data from server to client
lambda chunk: chunk[::-1],
)
reader, writer = await proxy.create_client()
writer.write(b'nevermind')
response = await reader.read(16)
assert response == processed_request[::-1]
```
Raw data
{
"_id": null,
"home_page": "",
"name": "aiomisc-pytest",
"maintainer": "",
"docs_url": null,
"requires_python": ">=3.7,<4.0",
"maintainer_email": "",
"keywords": "pytest,aiomisc",
"author": "Dmitry Orlov",
"author_email": "me@mosquito.su",
"download_url": "https://files.pythonhosted.org/packages/59/0d/b4fb919be85c580a06e201aab21b03526fc00d04a96ed5b7875147cf52fc/aiomisc_pytest-1.1.2.tar.gz",
"platform": null,
"description": "aiomisc pytest plugin\n=====================\n\nThis package contains a plugin for pytest.\n\n## Basic usage\n\nSimple usage example:\n\n```python\nasync def test_sample(event_loop):\n f = event_loop.crete_future()\n event_loop.call_soon(f.set_result, True)\n assert await f\n```\n\nasynchronous fixture example:\n\n\n```python\n\nimport asyncio\nimport pytest\n\n\n@pytest.fixture\nasync def my_fixture(loop):\n await asyncio.sleep(0)\n\n # Requires python 3.6+\n yield\n```\n\nIn case you have to save an instance of an async fixture between tests,\nthe wrong solution is just changing the fixture scope.\nBut why it wouldn't work? That's because, in the base scenario, the `loop`\nfixture creates a new event loop instance per test which will be closed after\ntest teardown. When you have to use an async fixture any caller of\n`asyncio.get_event_loop()` will get the current event loop instance which\nwill be closed and the next test will run in another event loop.\nSo the solution is to redefine the `loop` fixture with the required scope\nand custom fixture with the required scope.\n\n```python\nimport asyncio\nimport pytest\nfrom aiomisc import entrypoint\n\n\n@pytest.fixture(scope='module')\ndef loop():\n with entrypoint() as loop:\n asyncio.set_event_loop(loop)\n yield loop\n\n\n@pytest.fixture(scope='module')\nasync def sample_fixture(loop):\n yield 1\n\n\nLOOP_ID = None\n\n\nasync def test_using_fixture(sample_fixture):\n global LOOP_ID\n LOOP_ID = id(asyncio.get_event_loop())\n assert sample_fixture == 1\n\n\nasync def test_not_using_fixture(loop):\n assert id(loop) == LOOP_ID\n```\n\n## pytest markers\n\nPackage contains some useful markers for pytest:\n\n* `catch_loop_exceptions` - uncaught event loop exceptions will failling test.\n* `forbid_get_event_loop` - forbids call `asyncio.get_event_loop`\n during test case.\n\n```python\nimport asyncio\nimport pytest\n\n\n@pytest.mark.forbid_get_event_loop\nasync def test_with_get_loop():\n def switch_context():\n loop = asyncio.get_event_loop()\n future = loop.create_future()\n loop.call_soon(future.set_result, True)\n return future\n\n with pytest.raises(Exception):\n await switch_context()\n\n\n# Test will be failed\n@pytest.mark.catch_loop_exceptions\nasync def test_with_errors(loop):\n async def fail():\n # switch context\n await asyncio.sleep(0)\n raise Exception()\n\n loop.create_task(fail())\n await asyncio.sleep(0.1)\n return\n```\n\n## Passing default context\n\n```python\nimport pytest\n\n@pytest.fixture\ndef default_context():\n return {\n 'foo': 'bar',\n 'bar': 'foo',\n }\n```\n\n## Testing services\n\nRedefine `services` fixture in your test module:\n\n```python\nimport aiomisc\nimport pytest\n\n\nclass SimpleServie(aiomisc.Service):\n async def start(self) -> None:\n pass\n\n \n@pytest.fixture\ndef services():\n return [SimpleServie()]\n```\n\n## Event loop policy overriding\n\n```python\nimport asyncio\nimport pytest\nimport tokio\nimport uvloop\n\npolicy_ids = ('uvloop', 'asyncio', 'tokio')\npolicies = (uvloop.EventLoopPolicy(),\n asyncio.DefaultEventLoopPolicy(),\n tokio.EventLoopPolicy())\n\n@pytest.fixture(params=policies, ids=policy_ids)\ndef event_loop_policy(request):\n return request.param\n```\n\n## Thread pool overriding\n\n```python\nimport pytest\nfrom aiomisc.thread_pool import ThreadPoolExecutor\nimport concurrent.futures\n\nthread_pool_ids = ('aiomisc pool', 'default pool')\nthread_pool_implementation = (ThreadPoolExecutor,\n concurrent.futures.ThreadPoolExecutor)\n\n\n@pytest.fixture(params=thread_pool_implementation, ids=thread_pool_ids)\ndef thread_pool_executor(request):\n return request.param\n```\n\n## entrypoint arguments\n\n```python\nimport pytest\n\n@pytest.fixture\ndef entrypoint_kwargs() -> dict:\n return dict(log_config=False)\n```\n\n## aiohttp test client\n\n\n```python\n\nimport pytest\nfrom myapp.services.rest import REST\n\n\n@pytest.fixture\ndef rest_port(aiomisc_unused_port_factory):\n return aiomisc_unused_port_factory()\n\n\n@pytest.fixture\ndef rest_service(rest_port):\n return REST(port=rest_port)\n\n\n@pytest.fixture\ndef services(rest_service):\n return [rest_service]\n\n\n@pytest.fixture\ndef api_client(api_service):\n test_srv = TestServer(\n app=rest_service.app,\n port=arguments.port,\n )\n\n return TestClient(test_srv)\n\n...\n```\n\n\n## TCPProxy\n\nSimple TCP proxy for emulate network problems. Available as fixture `tcp_proxy`\n\n\nExamples:\n\n```python\nimport asyncio\nimport time\n\nimport pytest\n\nimport aiomisc\n\n\nclass EchoServer(aiomisc.service.TCPServer):\n async def handle_client(\n self, reader: asyncio.StreamReader,\n writer: asyncio.StreamWriter\n ):\n chunk = await reader.read(65534)\n while chunk:\n writer.write(chunk)\n chunk = await reader.read(65534)\n\n writer.close()\n await writer.wait_closed()\n\n\n@pytest.fixture()\ndef server_port(aiomisc_unused_port_factory) -> int:\n return aiomisc_unused_port_factory()\n\n\n@pytest.fixture()\ndef services(server_port, localhost):\n return [EchoServer(port=server_port, address=localhost)]\n\n\n@pytest.fixture()\nasync def proxy(tcp_proxy, localhost, server_port):\n async with tcp_proxy(localhost, server_port) as proxy:\n yield proxy\n\n\nasync def test_proxy_client_close(proxy):\n reader, writer = await proxy.create_client()\n payload = b\"Hello world\"\n\n writer.write(payload)\n response = await asyncio.wait_for(reader.read(1024), timeout=1)\n\n assert response == payload\n\n assert not reader.at_eof()\n await proxy.disconnect_all()\n\n assert await asyncio.wait_for(reader.read(), timeout=1) == b\"\"\n assert reader.at_eof()\n\n\nasync def test_proxy_client_slow(proxy):\n read_delay = 0.1\n write_delay = 0.2\n\n # Emulation of asymmetric and slow ISP\n with proxy.slowdown(read_delay, write_delay):\n reader, writer = await proxy.create_client()\n payload = b\"Hello world\"\n\n delta = -time.monotonic()\n\n writer.write(payload)\n await asyncio.wait_for(reader.read(1024), timeout=2)\n\n delta += time.monotonic()\n\n assert delta >= read_delay + write_delay\n\n\nasync def test_proxy_client_with_processor(proxy):\n processed_request = b\"Never say hello\"\n\n # Patching protocol functions\n proxy.set_content_processors(\n # Process data from client to server\n lambda _: processed_request,\n\n # Process data from server to client\n lambda chunk: chunk[::-1],\n )\n\n reader, writer = await proxy.create_client()\n writer.write(b'nevermind')\n\n response = await reader.read(16)\n\n assert response == processed_request[::-1]\n```\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "pytest integration for aiomisc",
"version": "1.1.2",
"project_urls": null,
"split_keywords": [
"pytest",
"aiomisc"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "dee1c8d40f588c4a2dd656edb0412c5ee85f8fb86bc012709c47811a63541dbe",
"md5": "4b1780d570cc6ba6a687d2964be0ae21",
"sha256": "3519cb40a6ce245c26e18f3a77e43979d0d3675d59fa27757f17d09a16b4cc04"
},
"downloads": -1,
"filename": "aiomisc_pytest-1.1.2-py3-none-any.whl",
"has_sig": false,
"md5_digest": "4b1780d570cc6ba6a687d2964be0ae21",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.7,<4.0",
"size": 10616,
"upload_time": "2024-03-11T14:24:07",
"upload_time_iso_8601": "2024-03-11T14:24:07.783292Z",
"url": "https://files.pythonhosted.org/packages/de/e1/c8d40f588c4a2dd656edb0412c5ee85f8fb86bc012709c47811a63541dbe/aiomisc_pytest-1.1.2-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "590db4fb919be85c580a06e201aab21b03526fc00d04a96ed5b7875147cf52fc",
"md5": "5fde3a45968ae058141e1228d33d929e",
"sha256": "6636b470d16b9fa99416564eb7302d049fc69f6eda903e7da97ea9f3ccad0fac"
},
"downloads": -1,
"filename": "aiomisc_pytest-1.1.2.tar.gz",
"has_sig": false,
"md5_digest": "5fde3a45968ae058141e1228d33d929e",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.7,<4.0",
"size": 11622,
"upload_time": "2024-03-11T14:24:09",
"upload_time_iso_8601": "2024-03-11T14:24:09.495468Z",
"url": "https://files.pythonhosted.org/packages/59/0d/b4fb919be85c580a06e201aab21b03526fc00d04a96ed5b7875147cf52fc/aiomisc_pytest-1.1.2.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-03-11 14:24:09",
"github": false,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"lcname": "aiomisc-pytest"
}