## Intercept-it!
The philosophy of the library is the ability to flexibly catch any exception,
execute some logic specified for this exception and continue executing the program.
You can intercept exceptions from coroutines and ordinary functions using the same user's interface
### Features
* Different ways to intercept exceptions
* Easy setup of interceptors objects
* Interceptors can be executed in asynchronous code
* Generic logging system:
* Built-in std logger
* Easy way to create and use custom loggers
* Supports the use of several loggers for the one exception
* Generic handlers:
* Use any callable such as default function, lambda function or complete class with any arguments
* Choose execution order of the handlers
* Maximum customization of any object such as loggers, handlers and interceptors
## Installation guide (pip)
```console
$ pip3 install intercept-it
```
## Installation guide (GitHub)
#### 1. Clone the project repo one of the following ways:
```console
$ git clone https://github.com/pro100broo/intercept-it.git
$ git clone git@github.com/pro100broo/intercept-it.git
```
#### 2. Jump into the project repository
```console
$ cd intercept-it
```
#### 3. If you have no python virtual environment, create and activate it
```console
$ python3 -m venv venv
$ chmod +x venv/bin/activate
$ source venv/bin/acitvate
```
#### 4. Install setuptools
```console
$ pip3 install setuptools
```
#### 4. Install the library one of the following ways:
```console
$ make app-build
$ make app-build-clean
$ python3 setup.py install
```
## Table of contents
### Interceptors overview
#### There are three main classes to intercept exceptions:
1. ``UnitInterceptor`` - Catches specified exception from a function
2. ``GlobalInterceptor`` - Has the ability to catch multiple specified exceptions from a function
3. ``LoopedInterceptor`` - Retry execution of the target function if an exception was caught
4. ``NestedtInterceptor`` - Is a container for few interceptors. Routes any calls to them
Any of them can intercept exceptions in **asynchronous** code too
#### All interceptors have three user interfaces:
* register_handler - Adds any callable handler to interceptor
* intercept - A decorator that catches exceptions
* wrap - A function that can wrap another function to catch exception within it
### Let's see how to configure and use Global Interceptor!
```python
from intercept_it import GlobalInterceptor
from intercept_it.loggers import STDLogger
from intercept_it.utils import cooldown_handler
# Initialize interceptor's class with necessary parameters
interceptor = GlobalInterceptor(
[IndexError, ZeroDivisionError], # Collection of target exceptions
loggers=[STDLogger()] # Use default std logger
)
# Add some handlers to interceptor
interceptor.register_handler(
cooldown_handler, # callable
5, # positional argument
execution_order=1
)
interceptor.register_handler(
lambda x, y: print(f'{x}. {y}'), # another callable :)
'I am additional handler', 'It is so cool!', # a few positional arguments
execution_order=2
)
# Intercept the exception in decorator
@interceptor.intercept
def dangerous_calculation(some_number: int) -> float:
return some_number / 0
def dangerous_list_access(index: int) -> int:
numbers = [1, 2, 3]
return numbers[index]
if __name__ == '__main__':
dangerous_calculation(5)
# Intercept the exception in wrapper
interceptor.wrap(dangerous_list_access, 100)
```
#### Results:
```console
2024-11-10 17:49:33.156556+03:00 | ERROR | File "...\intercept-it\examples\readme_examples\global_example.py", line 45: division by zero
I am additional handler. It is so cool!
2024-11-10 17:49:38.174263+03:00 | ERROR | File "...\intercept-it\examples\readme_examples\global_example.py", line 46: list index out of range
I am additional handler. It is so cool!
```
We used two simple handlers:
* Default cooldown handler (just waits the specified time after intercepting the exception)
* Simple lambda function with some logging message
You can execute more difficult logic such as sending exception details to logs stash or notify clients in messengers
Other Interceptors have quite different setup. You can find additional usage examples [here](https://github.com/pro100broo/intercept-it/tree/main/examples) or in the following
documentation examples
## Usage tips
### Loggers customization
```python
from intercept_it import GlobalInterceptor
from intercept_it.loggers import STDLogger
# Need to be a function, that receives and returns the string
def custom_formatter(message: str) -> str:
return f'I was formatted: {message}'
# Default std logger
default_logger = STDLogger()
# Customized std logger
customized_logger = STDLogger(
logging_level='WARNING',
default_formatter=custom_formatter,
pytz_timezone='Africa/Tunis',
)
interceptor = GlobalInterceptor(
[IndexError, ZeroDivisionError],
loggers=[default_logger, customized_logger],
)
```
#### Results:
```
2024-11-10 15:55:28.415905+01:00 | ERROR | File "...\intercept-it\examples\loggers_customization.py", line 59: division by zero
2024-11-10 15:55:28.415905+01:00 | WARNING | I was formatted: division by zero
2024-11-10 15:55:33.428577+01:00 | ERROR | File "...\intercept-it\examples\loggers_customization.py", line 60: list index out of range
2024-11-10 15:55:33.428577+01:00 | WARNING | I was formatted: list index out of range
```
### Creating new loggers
Each logger must be an instance of the ``BaseLogger`` or ``AsyncBaseLogger`` class and implements ``save_logs`` method
```python
import logging
from intercept_it import GlobalInterceptor
from intercept_it.loggers.base_logger import BaseLogger
# Custom logger
class CustomLogger(BaseLogger):
def __init__(self):
self._logger = logging.getLogger()
def save_logs(self, message: str) -> None:
self._logger.warning(f'Be careful! Im custom logger: {message}')
interceptor = GlobalInterceptor(
[IndexError, ZeroDivisionError],
loggers=[CustomLogger()],
)
```
#### Results:
```
Be careful! Im custom logger: division by zero
I am additional handler. It is so cool!
Be careful! Im custom logger: list index out of range
I am additional handler. It is so cool!
```
### Exceptions management
If you need to send intercepted exception higher up the call stack or implement nested interceptors, you need specify
``raise_exception`` parameter
```python
from intercept_it import GlobalInterceptor, UnitInterceptor
# Setup global interceptor
global_interceptor = GlobalInterceptor(
[IndexError, ZeroDivisionError],
raise_exception=True
)
global_interceptor.register_handler(
lambda message: print(message),
'Got exception in main function',
)
# Setup unit interceptor
unit_interceptor = UnitInterceptor(
raise_exception=True
)
unit_interceptor.register_handler(
lambda message: print(message),
'Got exception in third-party function',
)
@unit_interceptor.intercept(ZeroDivisionError)
def dangerous_calculation(some_number: int) -> float:
return some_number / 0
@global_interceptor.intercept
def main():
dangerous_calculation(100)
if __name__ == '__main__':
try:
main()
except ZeroDivisionError:
print('Got exception in entry point')
```
#### Results:
```
Got exception in third-party function
Got exception in main function
Got exception in entry point
```
### Looping
Let's imagine the situation:
Your script delivers important data from the API to the database every 30 minutes.
Suddenly, with the next request to the API you get 404 error. For example API server down to maintenance.
You can use ``LoopedInterceptor`` with specified timeout and wait until the server reboots.
```python
import random
from intercept_it import LoopedInterceptor
from intercept_it import STDLogger
class RequestsException(Exception):
pass
# Initialize interceptor's object with necessary configuration
interceptor = LoopedInterceptor(
exceptions=[RequestsException],
loggers=[STDLogger(default_formatter=lambda error: f'Error occurred: {error}. Waiting for success connection')],
timeout=5
)
# Simulating the webserver work
@interceptor.intercept
def receive_data_from_api(api_key: str) -> dict[str, str]:
is_server_down = random.randint(0, 10)
if is_server_down >= 4:
raise RequestsException('Integration down to maintenance')
print(f'Successful connection with api key: {api_key}')
return {'user': 'pro100broo', 'password': '12345'}
if __name__ == '__main__':
print(f'Received data from integration: {receive_data_from_api("_API_KEY_")}')
```
#### Results:
```
2024-11-18 01:02:39.596949+03:00 | ERROR | Error occurred: Integration down to maintenance. Waiting for success connection
2024-11-18 01:02:44.597286+03:00 | ERROR | Error occurred: Integration down to maintenance. Waiting for success connection
2024-11-18 01:02:44.597286+03:00 | ERROR | Error occurred: Integration down to maintenance. Waiting for success connection
Successful connection with api key: _API_KEY_
Received data from integration: {'user': 'pro100broo', 'password': '12345'}
```
### Additional processing of wrapped function parameters
Let's imagine another situation :)
You are developing a service where some data needs to be delivered anyway.
For example, it can be a chat messanger.
We take the necessary data from the task pool and try to send messages.
If the message was not delivered due to a broken connection, you must resend it, returning the data to the additional
task pool.
You can use any interceptor to route parameters from wrapped function to handlers.
There are two steps to implement this:
1. Specify ``greed_mode`` parameter for interceptor
2. Specify ``receive_parameters`` parameter for handler
I would recommend to set up and initialize interceptors in separated modules.
This will make the business logic cleaner and simpler :)
#### Some entities are initialized in an additional module:
```python
# entities.py
import asyncio
import logging
from datetime import datetime
from pydantic import BaseModel
from intercept_it import UnitInterceptor
from intercept_it.loggers.base_logger import BaseAsyncLogger
# Custom exception
class RequestsException(Exception):
pass
# Custom async logger
class CustomLogger(BaseAsyncLogger):
def __init__(self):
self._logger = logging.getLogger()
async def save_logs(self, message: str) -> None:
self._logger.error(f"{message} | {datetime.now()}")
# Custom message model
class MessageModel(BaseModel):
message: str
status: str
def __str__(self) -> str:
return f"Text: {self.message}. Status: {self.status}"
# The stash of undelivered messages
resend_requests_queue = asyncio.Queue(maxsize=50)
# Undelivered messages handler
async def parameters_handler(message: MessageModel, send_requests_queue: asyncio.Queue) -> None:
send_requests_queue.task_done()
print(f'Intercepted message: {message}')
message.status = 'Awaiting resend'
await resend_requests_queue.put(message)
# Initialize interceptor's object with necessary configuration
interceptor = UnitInterceptor(
loggers=[CustomLogger()],
greed_mode=True, # Enable routing parameters from the wrapped function to handlers
async_mode=True # Enable async code support
)
interceptor.register_handler(
parameters_handler,
receive_parameters=True # Enable receiving wrapped function parameters from interceptor
)
```
#### The main module:
```python
# parameters_processing.py
import random
import asyncio
from entities import (
MessageModel,
RequestsException,
interceptor,
resend_requests_queue
)
# Attempt to send message
@interceptor.intercept(RequestsException)
async def send_message_to_server(message: MessageModel, tasks_queue: asyncio.Queue) -> None:
is_server_down = random.randint(0, 10)
if is_server_down == 10:
raise RequestsException(f'Connection lost. Failed to send message: {message}')
message.status = 'Delivered'
tasks_queue.task_done()
print(f'Message successfully delivered: {message}')
# Gets message from the queue and tries to send it
async def send_message(send_requests_queue: asyncio.Queue) -> None:
while True:
message_content = await send_requests_queue.get()
await send_message_to_server(message_content, send_requests_queue)
# Simulating the appearance of messages
async def generate_messages(send_requests_queue: asyncio.Queue) -> None:
[
await send_requests_queue.put(
MessageModel(
message=random.choice(['Hi!', 'Hello!', "What's up!"]),
status="Awaiting send"
)
) for _ in range(20)
]
# The entrypoint
async def main():
send_requests_queue = asyncio.Queue(maxsize=50)
await generate_messages(send_requests_queue)
tasks = [asyncio.create_task(send_message(send_requests_queue)) for _ in range(4)]
await send_requests_queue.join()
[task.cancel() for task in tasks]
print(f'Message queue for sending: {send_requests_queue}')
print(f'Message queue for resending: {resend_requests_queue}')
if __name__ == '__main__':
asyncio.run(main())
```
#### Results:
```
Connection lost. Failed to send message: Text: Hi!. Status: Awaiting send | 2024-11-18 03:22:30.645844
Connection lost. Failed to send message: Text: What's up!. Status: Awaiting send | 2024-11-18 03:22:30.647229
Intercepted message: Text: Hi!. Status: Awaiting send
Message successfully delivered: Text: Hi!. Status: Delivered
Message successfully delivered: Text: What's up!. Status: Delivered
Message successfully delivered: Text: Hi!. Status: Delivered
Message successfully delivered: Text: Hello!. Status: Delivered
Message successfully delivered: Text: What's up!. Status: Delivered
Message successfully delivered: Text: Hi!. Status: Delivered
Message successfully delivered: Text: Hi!. Status: Delivered
Message successfully delivered: Text: Hello!. Status: Delivered
Message successfully delivered: Text: Hello!. Status: Delivered
Message successfully delivered: Text: Hello!. Status: Delivered
Intercepted message: Text: What's up!. Status: Awaiting send
Message successfully delivered: Text: What's up!. Status: Delivered
Message successfully delivered: Text: Hello!. Status: Delivered
Message successfully delivered: Text: Hi!. Status: Delivered
Message successfully delivered: Text: What's up!. Status: Delivered
Message successfully delivered: Text: Hello!. Status: Delivered
Message successfully delivered: Text: What's up!. Status: Delivered
Message successfully delivered: Text: What's up!. Status: Delivered
Message successfully delivered: Text: Hi!. Status: Delivered
Message queue for sending: <Queue maxsize=50 _getters[4]>
Message queue for resending: <Queue maxsize=50 _queue=[MessageModel(message='Hi!', status='Awaiting resend'), MessageModel(message="What's up!", status='Awaiting resend')] tasks=2>
```
### Loggers and handlers management in asynchronous code
There are two executing modes for loggers and handlers:
* Fast (default) - coroutines will be wrapped in tasks and executed
* Ordered - coroutines will be executed in specified order
### Fast mode
```python
import asyncio
from datetime import datetime
from intercept_it import UnitInterceptor
async def first_logging_operation() -> None:
print(f'First handler received logs: {datetime.now()}')
await asyncio.sleep(5)
print(f'First handler delivered logs: {datetime.now()}')
async def second_logging_operation() -> None:
print(f'Second handler received logs: {datetime.now()}')
await asyncio.sleep(5)
print(f'Second handler delivered logs: {datetime.now()}')
# Initialize interceptor's object with necessary configuration
interceptor = UnitInterceptor(async_mode=True)
interceptor.register_handler(first_logging_operation)
interceptor.register_handler(second_logging_operation)
@interceptor.intercept(ZeroDivisionError)
async def dangerous_calculation(number: int) -> float:
return number / 0
if __name__ == '__main__':
asyncio.run(dangerous_calculation(100))
```
#### Results:
```
First handler received logs: 2024-12-07 13:43:37.524841
Second handler received logs: 2024-12-07 13:43:37.524841
First handler delivered logs: 2024-12-07 13:43:42.532210
Second handler delivered logs: 2024-12-07 13:43:42.532210
```
As you can see, both handlers work together without delay.
### Ordered mode
```python
# If you want to save execution order in asynchronous code,
# you can disable handlers wrapping in tasks
interceptor = UnitInterceptor(
async_mode=True,
fast_handlers_execution=False
)
```
#### Results:
```
First handler received logs: 2024-12-07 13:54:29.035445
First handler delivered logs: 2024-12-07 13:54:34.047535
Second handler received logs: 2024-12-07 13:54:34.047535
Second handler delivered logs: 2024-12-07 13:54:39.059667
```
In this case we can see the delay between the execution of handlers.
### Nesting interceptors
If you need to use multiple interceptors with different settings, you can package them in a ``NestedInterceptor``.
This is useful when you can configure everything in a separate module and
use any of the specified interceptors in any other module
```python
# interceptor_setup.py
from datetime import datetime
from intercept_it import NestedInterceptor, GlobalInterceptor, UnitInterceptor, LoopedInterceptor
from intercept_it.loggers import STDLogger
from intercept_it.utils import cooldown_handler
global_interceptor = GlobalInterceptor(
exceptions=[ZeroDivisionError, ValueError],
loggers=[
STDLogger(default_formatter=lambda message: f"{message} intercepted in global logger {datetime.now()}"),
],
)
global_interceptor.register_handler(
cooldown_handler,
5
)
unit_interceptor = UnitInterceptor(
loggers=[
STDLogger(default_formatter=lambda message: f"{message} intercepted in unit logger {datetime.now()}")
]
)
unit_interceptor.register_handler(
cooldown_handler,
5
)
looped_interceptor = LoopedInterceptor(
exceptions=[ModuleNotFoundError],
loggers=[
STDLogger(default_formatter=lambda message: f"{message} intercepted in looped logger {datetime.now()}")
],
timeout=2
)
interceptor = NestedInterceptor(
{
'Global': global_interceptor,
8: looped_interceptor,
IndexError: unit_interceptor,
}
)
```
You can use any string or integer to specify ``GlobalInterceptor`` and ``LoopedInterceptor`` identifiers.
To specify ``UnitInterceptor`` you need to use exception objects
```python
# main_module.py
import math
from interceptor_setup import interceptor
@interceptor.intercept('Global')
def dangerous_calculation1(some_number: int) -> float:
return some_number / 0
@interceptor.intercept(IndexError)
def dangerous_list_access(index: int) -> int:
numbers = [1, 2, 3]
return numbers[index]
@interceptor.intercept(8)
def dangerous_import() -> None:
import python
def dangerous_calculation2(some_number: int) -> float:
return math.sqrt(some_number)
if __name__ == '__main__':
dangerous_calculation1(5)
dangerous_list_access(100)
interceptor.wrap(dangerous_calculation2, 'Global', -1)
dangerous_import()
```
Note, that you need to specify interceptor identifier in decorators and wrappers.
This is necessary so that ``NestedInterceptor`` knows which of the interceptors needs to be called
``NestedInterceptor`` can include synchronous and asynchronous interceptors
#### Results:
```
2024-12-07 14:31:47.640265+03:00 | ERROR | division by zero intercepted in global logger 2024-12-07 14:31:47.583100
2024-12-07 14:31:52.644645+03:00 | ERROR | list index out of range intercepted in unit logger 2024-12-07 14:31:52.643645
2024-12-07 14:31:57.645588+03:00 | ERROR | math domain error intercepted in global logger 2024-12-07 14:31:57.645588
2024-12-07 14:32:02.648816+03:00 | ERROR | No module named 'python' intercepted in looped logger 2024-12-07 14:32:02.647814
2024-12-07 14:32:04.651607+03:00 | ERROR | No module named 'python' intercepted in looped logger 2024-12-07 14:32:04.651607
2024-12-07 14:32:06.654012+03:00 | ERROR | No module named 'python' intercepted in looped logger 2024-12-07 14:32:06.654012
2024-12-07 14:32:08.656878+03:00 | ERROR | No module named 'python' intercepted in looped logger 2024-12-07 14:32:08.656878
```
## Future plans
I want to customize exceptions tracing in asynchronous code.
The following points will allow us to obtain a complete tree of exceptions that occur during the execution of coroutines:
* ExceptionGroup supporting: [PEP-654](https://peps.python.org/pep-0654/)
* Exception notes supporting: [PEP-678](https://peps.python.org/pep-0678/)
I also would like to add additional customization for loggers and add new types of interceptors
Raw data
{
"_id": null,
"home_page": "https://github.com/pro100broo/intercept-it",
"name": "intercept-it",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.12",
"maintainer_email": null,
"keywords": null,
"author": "Simon Shalnev",
"author_email": "Simon Shalnev <shalnev.sema@mail.ru>",
"download_url": "https://files.pythonhosted.org/packages/67/9f/037006350a6d447af19fe264e239e27f20d9cfb17bd98e900cdc83120a5d/intercept_it-0.1.0.tar.gz",
"platform": null,
"description": "## Intercept-it!\r\nThe philosophy of the library is the ability to flexibly catch any exception, \r\nexecute some logic specified for this exception and continue executing the program.\r\n\r\nYou can intercept exceptions from coroutines and ordinary functions using the same user's interface \r\n\r\n### Features\r\n* Different ways to intercept exceptions\r\n* Easy setup of interceptors objects\r\n* Interceptors can be executed in asynchronous code \r\n* Generic logging system:\r\n * Built-in std logger\r\n * Easy way to create and use custom loggers\r\n * Supports the use of several loggers for the one exception\r\n* Generic handlers:\r\n * Use any callable such as default function, lambda function or complete class with any arguments\r\n * Choose execution order of the handlers\r\n* Maximum customization of any object such as loggers, handlers and interceptors\r\n\r\n## Installation guide (pip)\r\n\r\n```console\r\n$ pip3 install intercept-it\r\n```\r\n\r\n## Installation guide (GitHub)\r\n\r\n#### 1. Clone the project repo one of the following ways:\r\n```console\r\n$ git clone https://github.com/pro100broo/intercept-it.git\r\n$ git clone git@github.com/pro100broo/intercept-it.git\r\n```\r\n\r\n#### 2. Jump into the project repository\r\n```console\r\n$ cd intercept-it\r\n```\r\n\r\n#### 3. If you have no python virtual environment, create and activate it \r\n```console\r\n$ python3 -m venv venv\r\n$ chmod +x venv/bin/activate\r\n$ source venv/bin/acitvate\r\n```\r\n\r\n#### 4. Install setuptools\r\n```console\r\n$ pip3 install setuptools\r\n```\r\n\r\n#### 4. Install the library one of the following ways:\r\n```console\r\n$ make app-build\r\n$ make app-build-clean\r\n$ python3 setup.py install\r\n```\r\n\r\n## Table of contents\r\n\r\n### Interceptors overview\r\n\r\n#### There are three main classes to intercept exceptions:\r\n\r\n1. ``UnitInterceptor`` - Catches specified exception from a function\r\n2. ``GlobalInterceptor`` - Has the ability to catch multiple specified exceptions from a function\r\n3. ``LoopedInterceptor`` - Retry execution of the target function if an exception was caught\r\n4. ``NestedtInterceptor`` - Is a container for few interceptors. Routes any calls to them\r\n\r\nAny of them can intercept exceptions in **asynchronous** code too\r\n\r\n#### All interceptors have three user interfaces:\r\n\r\n* register_handler - Adds any callable handler to interceptor\r\n* intercept - A decorator that catches exceptions\r\n* wrap - A function that can wrap another function to catch exception within it\r\n\r\n### Let's see how to configure and use Global Interceptor!\r\n```python\r\nfrom intercept_it import GlobalInterceptor\r\nfrom intercept_it.loggers import STDLogger\r\n\r\nfrom intercept_it.utils import cooldown_handler\r\n\r\n\r\n# Initialize interceptor's class with necessary parameters\r\ninterceptor = GlobalInterceptor(\r\n [IndexError, ZeroDivisionError], # Collection of target exceptions\r\n loggers=[STDLogger()] # Use default std logger\r\n)\r\n\r\n# Add some handlers to interceptor\r\ninterceptor.register_handler(\r\n cooldown_handler, # callable\r\n 5, # positional argument\r\n execution_order=1\r\n)\r\n\r\ninterceptor.register_handler(\r\n lambda x, y: print(f'{x}. {y}'), # another callable :)\r\n 'I am additional handler', 'It is so cool!', # a few positional arguments\r\n execution_order=2\r\n)\r\n\r\n\r\n# Intercept the exception in decorator\r\n@interceptor.intercept\r\ndef dangerous_calculation(some_number: int) -> float:\r\n return some_number / 0\r\n\r\n\r\ndef dangerous_list_access(index: int) -> int:\r\n numbers = [1, 2, 3]\r\n return numbers[index]\r\n\r\n\r\nif __name__ == '__main__':\r\n dangerous_calculation(5)\r\n\r\n # Intercept the exception in wrapper\r\n interceptor.wrap(dangerous_list_access, 100)\r\n\r\n```\r\n#### Results:\r\n\r\n```console\r\n2024-11-10 17:49:33.156556+03:00 | ERROR | File \"...\\intercept-it\\examples\\readme_examples\\global_example.py\", line 45: division by zero\r\nI am additional handler. It is so cool!\r\n2024-11-10 17:49:38.174263+03:00 | ERROR | File \"...\\intercept-it\\examples\\readme_examples\\global_example.py\", line 46: list index out of range\r\nI am additional handler. It is so cool!\r\n```\r\nWe used two simple handlers:\r\n\r\n* Default cooldown handler (just waits the specified time after intercepting the exception)\r\n* Simple lambda function with some logging message\r\n\r\nYou can execute more difficult logic such as sending exception details to logs stash or notify clients in messengers\r\n\r\nOther Interceptors have quite different setup. You can find additional usage examples [here](https://github.com/pro100broo/intercept-it/tree/main/examples) or in the following\r\ndocumentation examples\r\n\r\n## Usage tips\r\n\r\n### Loggers customization\r\n\r\n```python\r\nfrom intercept_it import GlobalInterceptor\r\nfrom intercept_it.loggers import STDLogger\r\n\r\n# Need to be a function, that receives and returns the string\r\ndef custom_formatter(message: str) -> str:\r\n return f'I was formatted: {message}'\r\n\r\n\r\n# Default std logger\r\ndefault_logger = STDLogger()\r\n\r\n# Customized std logger\r\ncustomized_logger = STDLogger(\r\n logging_level='WARNING',\r\n default_formatter=custom_formatter,\r\n pytz_timezone='Africa/Tunis',\r\n)\r\n\r\ninterceptor = GlobalInterceptor(\r\n [IndexError, ZeroDivisionError], \r\n loggers=[default_logger, customized_logger], \r\n)\r\n```\r\n#### Results:\r\n```\r\n2024-11-10 15:55:28.415905+01:00 | ERROR | File \"...\\intercept-it\\examples\\loggers_customization.py\", line 59: division by zero\r\n2024-11-10 15:55:28.415905+01:00 | WARNING | I was formatted: division by zero\r\n2024-11-10 15:55:33.428577+01:00 | ERROR | File \"...\\intercept-it\\examples\\loggers_customization.py\", line 60: list index out of range\r\n2024-11-10 15:55:33.428577+01:00 | WARNING | I was formatted: list index out of range\r\n```\r\n\r\n### Creating new loggers\r\n\r\nEach logger must be an instance of the ``BaseLogger`` or ``AsyncBaseLogger`` class and implements ``save_logs`` method\r\n\r\n```python\r\nimport logging\r\n\r\nfrom intercept_it import GlobalInterceptor\r\nfrom intercept_it.loggers.base_logger import BaseLogger\r\n\r\n\r\n# Custom logger\r\nclass CustomLogger(BaseLogger):\r\n def __init__(self):\r\n self._logger = logging.getLogger()\r\n\r\n def save_logs(self, message: str) -> None:\r\n self._logger.warning(f'Be careful! Im custom logger: {message}')\r\n\r\n\r\ninterceptor = GlobalInterceptor(\r\n [IndexError, ZeroDivisionError], \r\n loggers=[CustomLogger()], \r\n)\r\n\r\n```\r\n#### Results:\r\n```\r\nBe careful! Im custom logger: division by zero\r\nI am additional handler. It is so cool!\r\nBe careful! Im custom logger: list index out of range\r\nI am additional handler. It is so cool!\r\n```\r\n\r\n### Exceptions management\r\n\r\nIf you need to send intercepted exception higher up the call stack or implement nested interceptors, you need specify \r\n``raise_exception`` parameter\r\n\r\n```python\r\nfrom intercept_it import GlobalInterceptor, UnitInterceptor\r\n\r\n# Setup global interceptor\r\nglobal_interceptor = GlobalInterceptor(\r\n [IndexError, ZeroDivisionError],\r\n raise_exception=True\r\n)\r\n\r\nglobal_interceptor.register_handler(\r\n lambda message: print(message),\r\n 'Got exception in main function',\r\n)\r\n\r\n# Setup unit interceptor\r\nunit_interceptor = UnitInterceptor(\r\n raise_exception=True\r\n)\r\n\r\nunit_interceptor.register_handler(\r\n lambda message: print(message),\r\n 'Got exception in third-party function',\r\n)\r\n\r\n\r\n@unit_interceptor.intercept(ZeroDivisionError)\r\ndef dangerous_calculation(some_number: int) -> float:\r\n return some_number / 0\r\n\r\n\r\n@global_interceptor.intercept\r\ndef main():\r\n dangerous_calculation(100)\r\n\r\n\r\nif __name__ == '__main__':\r\n try:\r\n main()\r\n except ZeroDivisionError:\r\n print('Got exception in entry point')\r\n \r\n```\r\n#### Results:\r\n```\r\nGot exception in third-party function\r\nGot exception in main function\r\nGot exception in entry point\r\n```\r\n\r\n### Looping\r\n\r\nLet's imagine the situation:\r\nYour script delivers important data from the API to the database every 30 minutes. \r\nSuddenly, with the next request to the API you get 404 error. For example API server down to maintenance. \r\nYou can use ``LoopedInterceptor`` with specified timeout and wait until the server reboots.\r\n\r\n```python\r\nimport random\r\n\r\nfrom intercept_it import LoopedInterceptor\r\nfrom intercept_it import STDLogger\r\n\r\n\r\nclass RequestsException(Exception):\r\n pass\r\n\r\n\r\n# Initialize interceptor's object with necessary configuration\r\ninterceptor = LoopedInterceptor(\r\n exceptions=[RequestsException],\r\n loggers=[STDLogger(default_formatter=lambda error: f'Error occurred: {error}. Waiting for success connection')],\r\n timeout=5\r\n)\r\n\r\n\r\n# Simulating the webserver work\r\n@interceptor.intercept\r\ndef receive_data_from_api(api_key: str) -> dict[str, str]:\r\n is_server_down = random.randint(0, 10)\r\n if is_server_down >= 4:\r\n raise RequestsException('Integration down to maintenance')\r\n\r\n print(f'Successful connection with api key: {api_key}')\r\n return {'user': 'pro100broo', 'password': '12345'}\r\n\r\n\r\nif __name__ == '__main__':\r\n print(f'Received data from integration: {receive_data_from_api(\"_API_KEY_\")}')\r\n```\r\n#### Results:\r\n```\r\n2024-11-18 01:02:39.596949+03:00 | ERROR | Error occurred: Integration down to maintenance. Waiting for success connection\r\n2024-11-18 01:02:44.597286+03:00 | ERROR | Error occurred: Integration down to maintenance. Waiting for success connection\r\n2024-11-18 01:02:44.597286+03:00 | ERROR | Error occurred: Integration down to maintenance. Waiting for success connection\r\nSuccessful connection with api key: _API_KEY_\r\nReceived data from integration: {'user': 'pro100broo', 'password': '12345'}\r\n```\r\n\r\n### Additional processing of wrapped function parameters\r\n\r\nLet's imagine another situation :) \r\nYou are developing a service where some data needs to be delivered anyway. \r\nFor example, it can be a chat messanger. \r\nWe take the necessary data from the task pool and try to send messages. \r\nIf the message was not delivered due to a broken connection, you must resend it, returning the data to the additional\r\ntask pool. \r\nYou can use any interceptor to route parameters from wrapped function to handlers. \r\n\r\nThere are two steps to implement this:\r\n\r\n1. Specify ``greed_mode`` parameter for interceptor\r\n2. Specify ``receive_parameters`` parameter for handler\r\n\r\nI would recommend to set up and initialize interceptors in separated modules. \r\nThis will make the business logic cleaner and simpler :)\r\n\r\n#### Some entities are initialized in an additional module:\r\n```python\r\n# entities.py\r\nimport asyncio\r\nimport logging\r\nfrom datetime import datetime\r\nfrom pydantic import BaseModel\r\n\r\nfrom intercept_it import UnitInterceptor\r\nfrom intercept_it.loggers.base_logger import BaseAsyncLogger\r\n\r\n\r\n# Custom exception\r\nclass RequestsException(Exception):\r\n pass\r\n\r\n\r\n# Custom async logger\r\nclass CustomLogger(BaseAsyncLogger):\r\n def __init__(self):\r\n self._logger = logging.getLogger()\r\n\r\n async def save_logs(self, message: str) -> None:\r\n self._logger.error(f\"{message} | {datetime.now()}\")\r\n\r\n\r\n# Custom message model\r\nclass MessageModel(BaseModel):\r\n message: str\r\n status: str\r\n\r\n def __str__(self) -> str:\r\n return f\"Text: {self.message}. Status: {self.status}\"\r\n\r\n\r\n# The stash of undelivered messages\r\nresend_requests_queue = asyncio.Queue(maxsize=50)\r\n\r\n\r\n# Undelivered messages handler\r\nasync def parameters_handler(message: MessageModel, send_requests_queue: asyncio.Queue) -> None:\r\n send_requests_queue.task_done()\r\n print(f'Intercepted message: {message}')\r\n message.status = 'Awaiting resend'\r\n await resend_requests_queue.put(message)\r\n\r\n\r\n# Initialize interceptor's object with necessary configuration\r\ninterceptor = UnitInterceptor(\r\n loggers=[CustomLogger()],\r\n greed_mode=True, # Enable routing parameters from the wrapped function to handlers\r\n async_mode=True # Enable async code support\r\n)\r\n\r\n\r\ninterceptor.register_handler(\r\n parameters_handler,\r\n receive_parameters=True # Enable receiving wrapped function parameters from interceptor\r\n)\r\n```\r\n\r\n#### The main module:\r\n```python\r\n# parameters_processing.py\r\nimport random\r\nimport asyncio\r\n\r\nfrom entities import (\r\n MessageModel,\r\n RequestsException,\r\n interceptor,\r\n resend_requests_queue\r\n)\r\n\r\n\r\n# Attempt to send message\r\n@interceptor.intercept(RequestsException)\r\nasync def send_message_to_server(message: MessageModel, tasks_queue: asyncio.Queue) -> None:\r\n is_server_down = random.randint(0, 10)\r\n if is_server_down == 10:\r\n raise RequestsException(f'Connection lost. Failed to send message: {message}')\r\n\r\n message.status = 'Delivered'\r\n tasks_queue.task_done()\r\n\r\n print(f'Message successfully delivered: {message}')\r\n\r\n\r\n# Gets message from the queue and tries to send it\r\nasync def send_message(send_requests_queue: asyncio.Queue) -> None:\r\n while True:\r\n message_content = await send_requests_queue.get()\r\n await send_message_to_server(message_content, send_requests_queue)\r\n\r\n\r\n# Simulating the appearance of messages\r\nasync def generate_messages(send_requests_queue: asyncio.Queue) -> None:\r\n [\r\n await send_requests_queue.put(\r\n MessageModel(\r\n message=random.choice(['Hi!', 'Hello!', \"What's up!\"]),\r\n status=\"Awaiting send\"\r\n )\r\n ) for _ in range(20)\r\n ]\r\n\r\n\r\n# The entrypoint\r\nasync def main():\r\n send_requests_queue = asyncio.Queue(maxsize=50)\r\n await generate_messages(send_requests_queue)\r\n\r\n tasks = [asyncio.create_task(send_message(send_requests_queue)) for _ in range(4)]\r\n\r\n await send_requests_queue.join()\r\n\r\n [task.cancel() for task in tasks]\r\n\r\n print(f'Message queue for sending: {send_requests_queue}')\r\n print(f'Message queue for resending: {resend_requests_queue}')\r\n\r\n\r\nif __name__ == '__main__':\r\n asyncio.run(main())\r\n```\r\n#### Results:\r\n```\r\nConnection lost. Failed to send message: Text: Hi!. Status: Awaiting send | 2024-11-18 03:22:30.645844\r\nConnection lost. Failed to send message: Text: What's up!. Status: Awaiting send | 2024-11-18 03:22:30.647229\r\nIntercepted message: Text: Hi!. Status: Awaiting send\r\nMessage successfully delivered: Text: Hi!. Status: Delivered\r\nMessage successfully delivered: Text: What's up!. Status: Delivered\r\nMessage successfully delivered: Text: Hi!. Status: Delivered\r\nMessage successfully delivered: Text: Hello!. Status: Delivered\r\nMessage successfully delivered: Text: What's up!. Status: Delivered\r\nMessage successfully delivered: Text: Hi!. Status: Delivered\r\nMessage successfully delivered: Text: Hi!. Status: Delivered\r\nMessage successfully delivered: Text: Hello!. Status: Delivered\r\nMessage successfully delivered: Text: Hello!. Status: Delivered\r\nMessage successfully delivered: Text: Hello!. Status: Delivered\r\nIntercepted message: Text: What's up!. Status: Awaiting send\r\nMessage successfully delivered: Text: What's up!. Status: Delivered\r\nMessage successfully delivered: Text: Hello!. Status: Delivered\r\nMessage successfully delivered: Text: Hi!. Status: Delivered\r\nMessage successfully delivered: Text: What's up!. Status: Delivered\r\nMessage successfully delivered: Text: Hello!. Status: Delivered\r\nMessage successfully delivered: Text: What's up!. Status: Delivered\r\nMessage successfully delivered: Text: What's up!. Status: Delivered\r\nMessage successfully delivered: Text: Hi!. Status: Delivered\r\nMessage queue for sending: <Queue maxsize=50 _getters[4]>\r\nMessage queue for resending: <Queue maxsize=50 _queue=[MessageModel(message='Hi!', status='Awaiting resend'), MessageModel(message=\"What's up!\", status='Awaiting resend')] tasks=2>\r\n```\r\n### Loggers and handlers management in asynchronous code\r\n\r\nThere are two executing modes for loggers and handlers:\r\n\r\n* Fast (default) - coroutines will be wrapped in tasks and executed\r\n* Ordered - coroutines will be executed in specified order\r\n\r\n### Fast mode\r\n```python\r\nimport asyncio\r\nfrom datetime import datetime\r\n\r\nfrom intercept_it import UnitInterceptor\r\n\r\n\r\nasync def first_logging_operation() -> None:\r\n print(f'First handler received logs: {datetime.now()}')\r\n await asyncio.sleep(5)\r\n print(f'First handler delivered logs: {datetime.now()}')\r\n\r\n\r\nasync def second_logging_operation() -> None:\r\n print(f'Second handler received logs: {datetime.now()}')\r\n await asyncio.sleep(5)\r\n print(f'Second handler delivered logs: {datetime.now()}')\r\n\r\n\r\n# Initialize interceptor's object with necessary configuration\r\ninterceptor = UnitInterceptor(async_mode=True)\r\n\r\ninterceptor.register_handler(first_logging_operation)\r\ninterceptor.register_handler(second_logging_operation)\r\n\r\n\r\n@interceptor.intercept(ZeroDivisionError)\r\nasync def dangerous_calculation(number: int) -> float:\r\n return number / 0\r\n\r\n\r\nif __name__ == '__main__':\r\n asyncio.run(dangerous_calculation(100))\r\n```\r\n#### Results:\r\n```\r\nFirst handler received logs: 2024-12-07 13:43:37.524841\r\nSecond handler received logs: 2024-12-07 13:43:37.524841\r\nFirst handler delivered logs: 2024-12-07 13:43:42.532210\r\nSecond handler delivered logs: 2024-12-07 13:43:42.532210\r\n```\r\nAs you can see, both handlers work together without delay. \r\n\r\n### Ordered mode\r\n```python\r\n# If you want to save execution order in asynchronous code, \r\n# you can disable handlers wrapping in tasks\r\ninterceptor = UnitInterceptor(\r\n async_mode=True,\r\n fast_handlers_execution=False \r\n)\r\n\r\n```\r\n#### Results:\r\n```\r\nFirst handler received logs: 2024-12-07 13:54:29.035445\r\nFirst handler delivered logs: 2024-12-07 13:54:34.047535\r\nSecond handler received logs: 2024-12-07 13:54:34.047535\r\nSecond handler delivered logs: 2024-12-07 13:54:39.059667\r\n```\r\nIn this case we can see the delay between the execution of handlers.\r\n\r\n### Nesting interceptors\r\n\r\nIf you need to use multiple interceptors with different settings, you can package them in a ``NestedInterceptor``.\r\n\r\nThis is useful when you can configure everything in a separate module and \r\nuse any of the specified interceptors in any other module\r\n\r\n```python\r\n# interceptor_setup.py\r\nfrom datetime import datetime\r\n\r\nfrom intercept_it import NestedInterceptor, GlobalInterceptor, UnitInterceptor, LoopedInterceptor\r\nfrom intercept_it.loggers import STDLogger\r\n\r\nfrom intercept_it.utils import cooldown_handler\r\n\r\n\r\nglobal_interceptor = GlobalInterceptor(\r\n exceptions=[ZeroDivisionError, ValueError],\r\n loggers=[\r\n STDLogger(default_formatter=lambda message: f\"{message} intercepted in global logger {datetime.now()}\"),\r\n ],\r\n )\r\n\r\nglobal_interceptor.register_handler(\r\n cooldown_handler,\r\n 5\r\n)\r\n\r\nunit_interceptor = UnitInterceptor(\r\n loggers=[\r\n STDLogger(default_formatter=lambda message: f\"{message} intercepted in unit logger {datetime.now()}\")\r\n ]\r\n )\r\n\r\nunit_interceptor.register_handler(\r\n cooldown_handler,\r\n 5\r\n)\r\n\r\nlooped_interceptor = LoopedInterceptor(\r\n exceptions=[ModuleNotFoundError],\r\n loggers=[\r\n STDLogger(default_formatter=lambda message: f\"{message} intercepted in looped logger {datetime.now()}\")\r\n ],\r\n timeout=2\r\n )\r\n\r\ninterceptor = NestedInterceptor(\r\n {\r\n 'Global': global_interceptor,\r\n 8: looped_interceptor,\r\n IndexError: unit_interceptor,\r\n }\r\n)\r\n```\r\nYou can use any string or integer to specify ``GlobalInterceptor`` and ``LoopedInterceptor`` identifiers.\r\nTo specify ``UnitInterceptor`` you need to use exception objects\r\n\r\n```python\r\n# main_module.py\r\nimport math\r\nfrom interceptor_setup import interceptor\r\n\r\n\r\n@interceptor.intercept('Global')\r\ndef dangerous_calculation1(some_number: int) -> float:\r\n return some_number / 0\r\n\r\n\r\n@interceptor.intercept(IndexError)\r\ndef dangerous_list_access(index: int) -> int:\r\n numbers = [1, 2, 3]\r\n return numbers[index]\r\n\r\n\r\n@interceptor.intercept(8)\r\ndef dangerous_import() -> None:\r\n import python\r\n\r\n\r\ndef dangerous_calculation2(some_number: int) -> float:\r\n return math.sqrt(some_number)\r\n\r\n\r\nif __name__ == '__main__':\r\n dangerous_calculation1(5)\r\n dangerous_list_access(100)\r\n\r\n interceptor.wrap(dangerous_calculation2, 'Global', -1)\r\n\r\n dangerous_import()\r\n```\r\nNote, that you need to specify interceptor identifier in decorators and wrappers. \r\nThis is necessary so that ``NestedInterceptor`` knows which of the interceptors needs to be called\r\n\r\n``NestedInterceptor`` can include synchronous and asynchronous interceptors\r\n\r\n#### Results:\r\n```\r\n2024-12-07 14:31:47.640265+03:00 | ERROR | division by zero intercepted in global logger 2024-12-07 14:31:47.583100\r\n2024-12-07 14:31:52.644645+03:00 | ERROR | list index out of range intercepted in unit logger 2024-12-07 14:31:52.643645\r\n2024-12-07 14:31:57.645588+03:00 | ERROR | math domain error intercepted in global logger 2024-12-07 14:31:57.645588\r\n2024-12-07 14:32:02.648816+03:00 | ERROR | No module named 'python' intercepted in looped logger 2024-12-07 14:32:02.647814\r\n2024-12-07 14:32:04.651607+03:00 | ERROR | No module named 'python' intercepted in looped logger 2024-12-07 14:32:04.651607\r\n2024-12-07 14:32:06.654012+03:00 | ERROR | No module named 'python' intercepted in looped logger 2024-12-07 14:32:06.654012\r\n2024-12-07 14:32:08.656878+03:00 | ERROR | No module named 'python' intercepted in looped logger 2024-12-07 14:32:08.656878\r\n```\r\n## Future plans\r\n\r\nI want to customize exceptions tracing in asynchronous code. \r\nThe following points will allow us to obtain a complete tree of exceptions that occur during the execution of coroutines:\r\n\r\n* ExceptionGroup supporting: [PEP-654](https://peps.python.org/pep-0654/)\r\n* Exception notes supporting: [PEP-678](https://peps.python.org/pep-0678/)\r\n\r\nI also would like to add additional customization for loggers and add new types of interceptors\r\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "Customized exceptions handling",
"version": "0.1.0",
"project_urls": {
"Examples": "https://github.com/pro100broo/intercept-it/examples",
"Homepage": "https://github.com/pro100broo/intercept-it"
},
"split_keywords": [],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "ba6ece079cfad25246ac53d27166a74a08ba79d99cbc0641b84dab27529c929e",
"md5": "ba0cd998ba73a41a20fc2b5b8fd5f868",
"sha256": "8d67df4e702187263fcaacf06be4f6094578c53d41f52da05c9607666a14884b"
},
"downloads": -1,
"filename": "intercept_it-0.1.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "ba0cd998ba73a41a20fc2b5b8fd5f868",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.12",
"size": 20141,
"upload_time": "2024-12-07T12:13:43",
"upload_time_iso_8601": "2024-12-07T12:13:43.503583Z",
"url": "https://files.pythonhosted.org/packages/ba/6e/ce079cfad25246ac53d27166a74a08ba79d99cbc0641b84dab27529c929e/intercept_it-0.1.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "679f037006350a6d447af19fe264e239e27f20d9cfb17bd98e900cdc83120a5d",
"md5": "5a7f15d92050c28b5ce346cedf4dd1c9",
"sha256": "bb6bcd7606c6df926cabc2aeb990ed8f3cdc8290168ddf733567e0e0585e4929"
},
"downloads": -1,
"filename": "intercept_it-0.1.0.tar.gz",
"has_sig": false,
"md5_digest": "5a7f15d92050c28b5ce346cedf4dd1c9",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.12",
"size": 20160,
"upload_time": "2024-12-07T12:13:45",
"upload_time_iso_8601": "2024-12-07T12:13:45.141351Z",
"url": "https://files.pythonhosted.org/packages/67/9f/037006350a6d447af19fe264e239e27f20d9cfb17bd98e900cdc83120a5d/intercept_it-0.1.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-12-07 12:13:45",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "pro100broo",
"github_project": "intercept-it",
"travis_ci": false,
"coveralls": false,
"github_actions": false,
"requirements": [
{
"name": "annotated-types",
"specs": [
[
"==",
"0.7.0"
]
]
},
{
"name": "colorama",
"specs": [
[
"==",
"0.4.6"
]
]
},
{
"name": "loguru",
"specs": [
[
"==",
"0.7.2"
]
]
},
{
"name": "pydantic",
"specs": [
[
"==",
"2.9.2"
]
]
},
{
"name": "pydantic_core",
"specs": [
[
"==",
"2.23.4"
]
]
},
{
"name": "pytz",
"specs": [
[
"==",
"2024.2"
]
]
},
{
"name": "setuptools",
"specs": [
[
"==",
"75.5.0"
]
]
},
{
"name": "typing_extensions",
"specs": [
[
"==",
"4.12.2"
]
]
},
{
"name": "win32-setctime",
"specs": [
[
"==",
"1.1.0"
]
]
}
],
"lcname": "intercept-it"
}