# Pythonic toolbox
[![PyPI version](https://badge.fury.io/py/pythonic-toolbox.svg)](https://badge.fury.io/py/pythonic-toolbox)
[![License](https://img.shields.io/badge/License-Apache_2.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)
[![Supported Python versions](https://img.shields.io/pypi/pyversions/pythonic-toolbox.svg?style=flat&logo=python&logoColor=yellow&labelColor=5c5c5c)](https://pypi.org/project/pythonic-toolbox)
[![Stability](https://img.shields.io/pypi/status/pythonic-toolbox.svg?style=flat)](https://badge.fury.io/py/pythonic-toolbox)
[![CodeQL Status](https://github.com/albertmenglongli/pythonic-toolbox/actions/workflows/codeql-analysis.yml/badge.svg?branch=master)](https://github.com/albertmenglongli/pythonic-toolbox/actions/workflows/codeql-analysis.yml)
[![Python3.6 Test Status](https://github.com/albertmenglongli/pythonic-toolbox/actions/workflows/tests-python-versions.yml/badge.svg?branch=master)](https://github.com/albertmenglongli/pythonic-toolbox/actions/workflows/tests-python-versions.yml)
[![SNYK Status](https://snyk.io/test/github/albertmenglongli/pythonic-toolbox/badge.svg)](https://snyk.io/test/github/albertmenglongli/pythonic-toolbox)
## Table of Contents
* [Introduction](#Introduction)
* [Installation](#Installation)
* [Usage](#Usage)
* [decorators](#decorators)
* [ignore_unexpected_kwargs](#ignore_unexpected_kwargs)
* [retry](#retry)
* [deque_utils](#deque_utils)
* [deque_pop_any](#deque_pop_any)
* [deque_split](#deque_split)
* [dict_utils](#dict_utils)
* [DictObj](#DictObj)
* [FinalDictObj](#FinalDictObj)
* [RangeKeyDict](#RangeKeyDict)
* [StrKeyIdDict](#StrKeyIdDict)
* [collect_leaves](#collect_leaves)
* [dict_until](#dict_until)
* [select_list_of_dicts](#select_list_of_dicts)
* [unique_list_of_dicts](#unique_list_of_dicts)
* [walk_leaves](#walk_leaves)
* [functional_utils](#functional_utils)
* [filter_multi](#filter_multi)
* [list_utils](#list_utils)
* [filter_allowable](#filter_allowable)
* [sort_with_custom_orders](#sort_with_custom_orders)
* [unpack_list](#unpack_list)
* [until](#until)
* [string_utils](#string_utils)
* [substitute_string_template_dict](#substitute_string_template_dict)
* [context](#context)
* [SkipContext](#SkipContext)
> README.md is auto generated by the script **tests/generate_readme_markdown.py** from testing files,
>
> **DO NOT EDIT DIRECTLY!** ;)
```bash
python3 tests/generate_readme_markdown.py
```
## Introduction
A python3.6+ toolbox with multi useful utils, functions, decorators in pythonic way, and is fully tested from python3.6 to python3.11 .
## Installation
```bash
pip3 install pythonic-toolbox --upgrade
```
## Usage
### decorators
#### ignore_unexpected_kwargs
```python3
import pytest
from pythonic_toolbox.decorators.common import ignore_unexpected_kwargs
# Following functions are named under Metasyntactic Variables, like:
# foobar, foo, bar, baz, qux, quux, quuz, corge,
# grault, garply, waldo, fred, plugh, xyzzy, thud
def foo(a, b=0, c=3):
return a, b, c
dct = {'a': 1, 'b': 2, 'd': 4}
with pytest.raises(TypeError) as __:
assert foo(**dct) == (1, 2, 3)
wrapped_foo = ignore_unexpected_kwargs(foo)
assert wrapped_foo(**dct) == (1, 2, 3)
assert wrapped_foo(0, 0, 0) == (0, 0, 0)
assert wrapped_foo(a=1, b=2, c=3) == (1, 2, 3)
@ignore_unexpected_kwargs
def bar(*args: int):
return sum(args)
# should not change original behavior
assert bar(1, 2, 3) == 6
assert bar(1, 2, 3, unexpected='Gotcha') == 6
nums = [1, 2, 3]
assert bar(*nums, unexpected='Gotcha') == 6
@ignore_unexpected_kwargs
def qux(a, b, **kwargs):
# function with Parameter.VAR_KEYWORD Aka **kwargs
return a, b, kwargs.get('c', 3), kwargs.get('d', 4)
assert qux(**{'a': 1, 'b': 2, 'd': 4, 'e': 5}) == (1, 2, 3, 4)
class Person:
@ignore_unexpected_kwargs
def __init__(self, name, age, sex):
self.name = name
self.age = age
self.sex = sex
@classmethod
@ignore_unexpected_kwargs
def create(cls, name, age, sex):
return cls(name, age, sex)
@staticmethod
@ignore_unexpected_kwargs
def greetings(name):
return f'Hello, I am {name}'
params = {
'name': 'albert',
'age': 34,
'sex': 'male',
'height': '170cm',
}
__ = Person(**params)
__ = Person('albert', 35, 'male', height='170cm')
# test cases for classmethod, staticmethod
__ = Person.create(**params)
assert Person.greetings(**params)
```
#### retry
```python3
import pytest
from pythonic_toolbox.decorators.common import retry
# use decorator without any arguments, using retry default params
@retry
def func_fail_first_time():
"""func_fail_first_time"""
self = func_fail_first_time
if not hasattr(self, 'call_times'):
# set attribute call_times for function, to count call times
self.call_times = 0
self.call_times += 1
if self.call_times == 1:
raise Exception('Fail when first called')
return 'ok'
assert func_fail_first_time() == 'ok'
assert func_fail_first_time.call_times == 2
assert func_fail_first_time.__doc__ == 'func_fail_first_time'
@retry(tries=2, delay=0.1) # use decorator with customized params
def func_fail_twice():
"""func_fail_twice"""
self = func_fail_twice
if not hasattr(self, 'call_times'):
self.call_times = 0
self.call_times += 1
if self.call_times <= 2:
raise Exception('Fail when called first, second time')
return 'ok'
assert func_fail_twice() == 'ok'
assert func_fail_twice.call_times == 3
assert func_fail_twice.__doc__ == 'func_fail_twice'
@retry(tries=2, delay=0.1)
def func_fail_three_times():
"""func_fail_three_times"""
self = func_fail_three_times
if not hasattr(self, 'call_times'):
self.call_times = 0
self.call_times += 1
if self.call_times <= 3: # 1, 2, 3
raise Exception('Fail when called first, second, third time')
return 'ok'
with pytest.raises(Exception) as exec_info:
func_fail_three_times()
assert func_fail_three_times.call_times == 3
assert exec_info.value.args[0] == 'Fail when called first, second, third time'
def raw_func_fail_first_time():
"""func_fail_first_time"""
self = raw_func_fail_first_time
if not hasattr(self, 'call_times'):
# set attribute call_times for function, to count call times
self.call_times = 0
self.call_times += 1
if self.call_times == 1:
raise Exception('Fail when first called')
return 'ok'
assert retry(raw_func_fail_first_time)() == 'ok'
# test cases when function has arguments, kwargs
@retry(tries=1, delay=0.1)
def func_fail_first_time_with_parameters(p1, p2):
"""func_fail_first_time"""
self = func_fail_first_time_with_parameters
if not hasattr(self, 'call_times'):
# set attribute call_times for function, to count call times
self.call_times = 0
self.call_times += 1
if self.call_times == 1:
raise Exception('Fail when first called')
return p1 + p2
assert func_fail_first_time_with_parameters(1, 2) == 3
def func_fail_first_time_with_parameters(p1, p2):
"""func_fail_first_time"""
self = func_fail_first_time_with_parameters
if not hasattr(self, 'call_times'):
# set attribute call_times for function, to count call times
self.call_times = 0
self.call_times += 1
if self.call_times == 1:
raise Exception('Fail when first called')
return p1 + p2
assert retry(tries=1, delay=0.1)(func_fail_first_time_with_parameters)(1, 2) == 3
assert retry(tries=1, delay=0.1)(func_fail_first_time_with_parameters)(p1=1, p2=2) == 3
import asyncio
@retry
async def async_func_fail_first_time():
"""async_func_fail_first_time"""
self = async_func_fail_first_time
if not hasattr(self, 'call_times'):
self.call_times = 0
self.call_times += 1
if self.call_times == 1:
raise Exception('Fail when first called')
return 'ok'
@retry(delay=0.1)
async def async_func_fail_first_time2():
"""async_func_fail_first_time2"""
self = async_func_fail_first_time2
if not hasattr(self, 'call_times'):
self.call_times = 0
self.call_times += 1
if self.call_times == 1:
raise Exception('Fail when first called')
return 'ok'
async def async_main():
assert await async_func_fail_first_time() == 'ok'
assert async_func_fail_first_time.__doc__ == 'async_func_fail_first_time'
assert async_func_fail_first_time.call_times == 2
assert await async_func_fail_first_time2() == 'ok'
assert async_func_fail_first_time2.call_times == 2
assert async_func_fail_first_time2.__doc__ == 'async_func_fail_first_time2'
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(async_main())
finally:
loop.close()
import random
fail_count = 0
@retry(delay=0.1)
async def always_fail_func():
nonlocal fail_count
fail_count += 1
await asyncio.sleep(random.random())
raise ValueError()
async def async_main_for_always_fail():
nonlocal fail_count
tasks = [always_fail_func() for i in range(0, 3)]
results = await asyncio.gather(*tasks, return_exceptions=True)
assert all(map(lambda e: isinstance(e, ValueError), results))
assert fail_count == 2 * 3 # each func run twice, three func calls
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(async_main_for_always_fail())
finally:
loop.close()
```
### deque_utils
#### deque_pop_any
```python3
from collections import deque
import pytest
from pythonic_toolbox.utils.deque_utils import deque_pop_any
queue = deque([1, 2, 3, 4, 5])
assert deque_pop_any(queue, idx=1) == 2
assert queue == deque([1, 3, 4, 5])
# edge case: same as deque.popleft()
queue = deque([1, 2, 3, 4, 5])
assert deque_pop_any(queue, idx=0) == 1
assert queue == deque([2, 3, 4, 5])
# edge case: same as deque.popright()
queue = deque([1, 2, 3, 4, 5])
assert deque_pop_any(queue, idx=len(queue) - 1) == 5
assert queue == deque([1, 2, 3, 4])
queue = deque([1, 2, 3, 4, 5])
with pytest.raises(IndexError) as exec_info:
deque_pop_any(queue, idx=102)
# edge case: pop from empty deque
queue = deque()
with pytest.raises(IndexError) as exec_info:
deque_pop_any(queue, idx=0)
assert exec_info.value.args[0] == 'pop from empty deque'
```
#### deque_split
```python3
import pytest
from collections import deque
from pythonic_toolbox.utils.deque_utils import deque_split
queue1, queue2 = deque_split(deque([1, 2, 3, 4, 5]), num=3)
assert queue1 == deque([1, 2, 3])
assert queue2 == deque([4, 5])
queue1, queue2 = deque_split(deque([1, 2, 3, 4, 5]), num=0)
assert queue1 == deque([])
assert queue2 == deque([1, 2, 3, 4, 5])
queue1, queue2 = deque_split(deque([1, 2, 3, 4, 5]), num=100)
assert queue1 == deque([1, 2, 3, 4, 5])
assert queue2 == deque([])
with pytest.raises(ValueError) as exec_info:
deque_split(deque([1, 2, 3, 4, 5]), -1)
assert exec_info.value.args[0] == 'num must be integer: 0 <= num <= sys.maxsize'
```
### dict_utils
#### DictObj
```python3
from copy import deepcopy
import pytest
from pythonic_toolbox.utils.dict_utils import DictObj
naive_dct = {
'key1': 'val1',
'key2': 'val2',
}
obj = DictObj(naive_dct)
# test basic functional methods like dict
assert len(obj) == 2
assert bool(obj) is True
# same behavior like ordinary dict according to the python version (FILO for popitem for 3.6+)
assert obj.popitem() == ('key2', 'val2')
assert obj.popitem() == ('key1', 'val1')
with pytest.raises(KeyError) as __:
obj.popitem()
# a key can be treated like an attribute
# an attribute can be treated like a key
obj.key3 = 'val3'
assert obj.pop('key3') == 'val3'
with pytest.raises(KeyError) as __:
obj.pop('key4')
obj.key5 = 'val5'
del obj.key5
with pytest.raises(KeyError) as __:
obj.pop('key5')
with pytest.raises(AttributeError) as __:
del obj.key5
# test deepcopy
obj = DictObj({'languages': ['Chinese', 'English']})
copied_obj = deepcopy(obj)
assert copied_obj == obj
copied_obj.languages = obj.languages + ['Japanese']
assert obj.languages == ['Chinese', 'English']
assert copied_obj.languages == ['Chinese', 'English', 'Japanese']
assert copied_obj != obj
person_dct = {'name': 'Albert', 'age': '34', 'sex': 'Male', 'languages': ['Chinese', 'English']}
person = DictObj(person_dct)
assert DictObj(person_dct) == DictObj(person_dct)
assert person.to_dict() == person_dct
assert set(person.keys()) == {'name', 'age', 'sex', 'languages'}
assert hasattr(person, 'name') is True
assert person.name == 'Albert'
assert person['name'] == 'Albert'
person.languages.append('Japanese')
assert person.languages == ['Chinese', 'English', 'Japanese']
person.height = '170'
assert person['height'] == '170'
assert 'height' in person
assert 'height' in person.keys()
assert hasattr(person, 'height') is True
del person['height']
assert 'height' not in person
assert 'height' not in person.keys()
person['height'] = '170cm'
person.update({'weight': '50'})
weight_val = person.pop('weight')
assert weight_val == '50'
person.update(DictObj({'weight': '50kg'}))
assert person.weight == '50kg'
expected = {
'name': 'Albert', 'age': '34', 'sex': 'Male',
'languages': ['Chinese', 'English', 'Japanese'], # appended new language
'height': '170cm', # new added attribute
'weight': '50kg', # new added attribute
}
assert person.to_dict() == expected
repr_expected: str = ("{'name': 'Albert', 'age': '34', 'sex': 'Male', "
"'languages': ['Chinese', 'English', 'Japanese'],"
" 'height': '170cm', 'weight': '50kg'}")
assert repr(person) == repr_expected
# nested structure will be detected, and changed to DictObj
chessboard_data = {
'position': [
[{'name': 'knight'}, {'name': 'pawn'}],
[{'name': 'pawn'}, {'name': 'queen'}],
]
}
chessboard_obj = DictObj(chessboard_data)
# test comparing instances of DictObj
assert DictObj(chessboard_data) == DictObj(chessboard_data)
assert isinstance(chessboard_obj.position, list)
assert len(chessboard_obj.position) == 2
assert isinstance(chessboard_obj.position[0][0], DictObj)
assert chessboard_obj.position[0][0].name == 'knight'
assert chessboard_obj.position[1][1].name == 'queen'
# edge case empty DictObj
empty_dict_obj = DictObj({})
assert len(empty_dict_obj) == 0
assert bool(empty_dict_obj) is False
obj_dict = DictObj({'data': 'oops'})
assert obj_dict.data == 'oops'
# params validation
invalid_key_dct = {
1: '1',
}
# test when dict's key is not str
with pytest.raises(ValueError) as __:
__ = DictObj(invalid_key_dct)
complicated_key_dct = {
'1abc': 'Gotcha', # '1abc' is not valid identifier for Python, so obj.1abc will cause SyntaxError
'class': 'MyClass', # 'class' is keyword in Python, so obj.class will cause SyntaxError
}
obj_dict = DictObj(complicated_key_dct)
assert obj_dict['1abc'] == 'Gotcha'
assert getattr(obj_dict, '1abc') == 'Gotcha'
# you can access '1abc' as attribute by adding prefix '_'
assert obj_dict._1abc == 'Gotcha'
del obj_dict._1abc
assert obj_dict['class'] == 'MyClass'
assert getattr(obj_dict, 'class') == 'MyClass'
# you can access 'class' as attribute by adding prefix '_'
assert obj_dict._class == 'MyClass'
# test re-assign new value for 'class'
obj_dict._class = 'MyClass2'
assert obj_dict._class == 'MyClass2'
assert obj_dict['class'] == 'MyClass2'
assert getattr(obj_dict, 'class') == 'MyClass2'
del obj_dict._class
# if assign new attributes (_2, _try), ObjDict will treat it like what the originally are
# this is fully considered by design, you're not encouraged to mess up keys
obj_dict._2x = 'NewAttr'
assert obj_dict._2x == 'NewAttr'
assert obj_dict['_2x'] == 'NewAttr'
with pytest.raises(KeyError):
__ = obj_dict['2x']
with pytest.raises(AttributeError):
__ = getattr(obj_dict, '2x')
obj_dict._try = 'NewAttr'
assert obj_dict._try == 'NewAttr'
assert obj_dict['_try'] == 'NewAttr'
with pytest.raises(KeyError):
__ = obj_dict['NewAttr']
with pytest.raises(AttributeError):
__ = getattr(obj_dict, 'NewAttr')
# Demo for messing up key 'class'
# delete and re-assign _class
complicated_key_dct = {
'class': 'MyClass', # 'class' is keyword in Python, so obj.class will cause SyntaxError
}
obj_dict = DictObj(complicated_key_dct)
assert obj_dict['class'] == 'MyClass'
obj_dict._class = 'MyClass2'
assert obj_dict['class'] == 'MyClass2'
del obj_dict._class
# obj_dict has no knowledge about 'class' or '_class'
# so '_class' is a brand-new attribute, and will be stored as '_class'
obj_dict._class = 'MyClass3'
with pytest.raises(KeyError):
# Oops!!! by-design
# 'class' cannot be accessed as key anymore,
# because we store '_class' as key as other valid keys behave
assert obj_dict['class'] == 'MyClass3'
assert obj_dict['_class'] == 'MyClass3'
# thread safe testing
import sys
from threading import Thread
from pythonic_toolbox.decorators.decorator_utils import method_synchronized
class MyObjDict(DictObj):
# implement a thread-safe method to increase the value of cnt
@method_synchronized
def increase_cnt_by_n(self, n):
self.cnt += n
def increase_cnt_by_100(dict_obj):
for i in range(100):
dict_obj.increase_cnt_by_n(1)
sw_interval = sys.getswitchinterval()
try:
sys.setswitchinterval(0.0001)
my_dict_obj = MyObjDict({'cnt': 0})
threads = [Thread(target=increase_cnt_by_100, args=(my_dict_obj,)) for _ in range(100)]
[t.start() for t in threads]
[t.join() for t in threads]
assert my_dict_obj.cnt == 10000
finally:
sys.setswitchinterval(sw_interval)
# test copy/deepcopy of DictObj
import copy
person = DictObj({'name': 'albert', 'age': 33})
team = DictObj({'leader': person})
shallow_copy_of_team = copy.copy(team)
assert team.leader is shallow_copy_of_team.leader
deep_copy_of_team = copy.deepcopy(team)
assert team.leader is not deep_copy_of_team.leader
assert team.leader == deep_copy_of_team.leader
```
#### FinalDictObj
```python3
from typing import cast
import pytest
from pythonic_toolbox.utils.dict_utils import FinalDictObj
person_dct = {'name': 'Albert', 'age': '34', 'sex': 'Male', 'languages': ['Chinese', 'English']}
fixed_person = FinalDictObj(person_dct)
assert fixed_person.name == 'Albert'
# FINAL means once initialized, you cannot change the key/attribute anymore
with pytest.raises(RuntimeError) as exec_info:
fixed_person.name = 'Steve'
expected_error_str = 'Cannot modify attribute/item in an already initialized FinalDictObj'
assert exec_info.value.args[0] == expected_error_str
with pytest.raises(RuntimeError) as __:
fixed_person.popitem()
with pytest.raises(RuntimeError) as __:
fixed_person.pop('name')
assert isinstance(fixed_person.languages, tuple)
with pytest.raises(AttributeError) as exec_info:
# list values are changed into tuple to avoid being modified
cast(list, fixed_person.languages).append('Japanese')
expected_error_str = "'tuple' object has no attribute 'append'"
assert exec_info.value.args[0] == expected_error_str
assert fixed_person.to_dict() == person_dct
# nested structure will be detected, and changed to FinalDictObj
chessboard_data = {
'position': [
[{'name': 'knight'}, {'name': 'pawn'}],
[{'name': 'pawn'}, {'name': 'queen'}],
]
}
chessboard_obj = FinalDictObj(chessboard_data)
# test comparing instances of FinalDictObj
assert FinalDictObj(chessboard_data) == FinalDictObj(chessboard_data)
assert isinstance(chessboard_obj.position, tuple)
assert isinstance(chessboard_obj.position[0][0], FinalDictObj)
assert chessboard_obj.position[1][1].name == 'queen'
with pytest.raises(RuntimeError) as __:
chessboard_obj.position[1][1].name = 'knight'
# test for keyword/non-identifier key as attribute
final_obj_dict = FinalDictObj({
'class': 'MyClass', # 'class' is keyword in Python, so obj.class will cause SyntaxError
})
assert final_obj_dict['class'] == 'MyClass'
assert getattr(final_obj_dict, 'class') == 'MyClass'
assert final_obj_dict._class == 'MyClass'
# test copy/deepcopy of FileDictObj
import copy
person = FinalDictObj({'name': 'albert', 'age': 33})
team = FinalDictObj({'leader': person})
shallow_copy_of_team = copy.copy(team)
assert team.leader is shallow_copy_of_team.leader
assert team.leader == shallow_copy_of_team.leader
deep_copy_of_team = copy.deepcopy(team)
assert team.leader is not deep_copy_of_team.leader
assert team.leader == deep_copy_of_team.leader
```
#### RangeKeyDict
```python3
import pytest
from pythonic_toolbox.utils.dict_utils import RangeKeyDict
# test normal case
range_key_dict: RangeKeyDict[float, str] = RangeKeyDict({
(float('-inf'), 0): 'Negative',
(0, 60): 'F', # 0 <= val < 60
(60, 70): 'D', # 60 <= val < 70
(70, 80): 'C', # 70 <= val < 80
(80, 90): 'B', # 80 <= val < 90
(90, 100): 'A', # 90 <= val < 100
100: 'A+', # val == 100
})
# Big O of querying is O(log n), n is the number of ranges, due to using bisect inside
assert range_key_dict[-1] == 'Negative'
assert range_key_dict[0] == 'F'
assert range_key_dict[55] == 'F'
assert range_key_dict[60] == 'D'
assert range_key_dict[75] == 'C'
assert range_key_dict[85] == 'B'
assert range_key_dict[95] == 'A'
assert range_key_dict[100] == 'A+'
with pytest.raises(KeyError) as exec_info:
_ = range_key_dict['95'] # when key is not comparable with other integer keys
assert exec_info.value.args[0] == "KeyError: '95' is not comparable with other keys"
with pytest.raises(KeyError) as exec_info:
_ = range_key_dict[150]
assert exec_info.value.args[0] == 'KeyError: 150'
assert range_key_dict.get(150, 'N/A') == 'N/A'
# test comparison with other RangeKeyDict
assert RangeKeyDict({(0, 10): '1'}) == RangeKeyDict({(0, 10): '1'})
assert RangeKeyDict({(0, 10): '1'}) != RangeKeyDict({(0, 10): '2'})
assert RangeKeyDict({(0, 10): '1'}) != RangeKeyDict({(0, 1000): '1'})
with pytest.raises(ValueError):
# [1, 1) is not a valid range
# there's no value x satisfy 1 <= x < 1
RangeKeyDict({(1, 1): '1'})
with pytest.raises(ValueError):
# [1, -1) is not a valid range
RangeKeyDict({(1, -1): '1'})
# validate input keys types and detect range overlaps(segment intersect)
with pytest.raises(ValueError) as exec_info:
RangeKeyDict({
(0, 10): 'val-between-0-and-10',
(0, 5): 'val-between-0-and-5'
})
expected_error_msg = ("Duplicated left boundary key 0 detected: "
"(0, 10): 'val-between-0-and-10', (0, 5): 'val-between-0-and-5'")
assert exec_info.value.args[0] == expected_error_msg
with pytest.raises(ValueError) as exec_info:
RangeKeyDict({
(0, 10): 'val-between-0-and-10',
(5, 15): 'val-between-5-and-15'
})
expected_error_msg = ("Overlap detected: "
"(0, 10): 'val-between-0-and-10', (5, 15): 'val-between-5-and-15'")
assert exec_info.value.args[0] == expected_error_msg
# test RangeKeyDict with no continuous ranges
range_key_dict: RangeKeyDict[float, str] = RangeKeyDict({
(0, 60): 'F', # 0 <= val < 60
(70, 80): 'C', # 70 <= val < 80
})
assert range_key_dict[10] == 'F'
with pytest.raises(KeyError) as exec_info:
_ = range_key_dict[-100]
assert exec_info.value.args[0] == 'KeyError: -100'
with pytest.raises(KeyError) as exec_info:
_ = range_key_dict[65]
assert exec_info.value.args[0] == 'KeyError: 65'
with pytest.raises(KeyError) as exec_info:
_ = range_key_dict[100]
assert exec_info.value.args[0] == 'KeyError: 100'
from functools import total_ordering
@total_ordering
class Age:
def __init__(self, val: float):
if not isinstance(val, (int, float)):
raise ValueError('Invalid age value')
self.val = val
def __le__(self, other):
return self.val <= other.val
def __repr__(self):
return f'Age({repr(self.val)})'
def __hash__(self):
return hash(self.val)
age_categories_map: RangeKeyDict[Age, str] = RangeKeyDict({
(Age(0), Age(2)): 'Baby',
(Age(2), Age(15)): 'Children',
(Age(15), Age(25)): 'Youth',
(Age(25), Age(65)): 'Adults',
(Age(65), Age(123)): 'Seniors',
})
assert age_categories_map[Age(0.5)] == 'Baby'
assert age_categories_map[Age(12)] == 'Children'
assert age_categories_map[Age(20)] == 'Youth'
assert age_categories_map[Age(35)] == 'Adults'
assert age_categories_map[Age(70)] == 'Seniors'
```
#### StrKeyIdDict
```python3
import pytest
from pythonic_toolbox.utils.dict_utils import StrKeyIdDict
data = {1: 'a', 2: 'b', '3': 'c'}
my_dict = StrKeyIdDict(data)
# usage: value can be accessed by id (str: int-like/uuid-like/whatever) or id (int)
assert my_dict['1'] == my_dict[1] == 'a'
assert my_dict.keys() == {'1', '2', '3'} # all keys are str type
my_dict['4'] = 'd'
assert my_dict['4'] == 'd'
my_dict[4] = 'd'
assert my_dict['4'] == 'd'
my_dict.update({4: 'd'})
assert my_dict['4'] == 'd'
# test comparing instances of the class
assert StrKeyIdDict(data) == StrKeyIdDict(data)
assert StrKeyIdDict(data) != StrKeyIdDict(dict(data, **{'4': 'd'}))
assert StrKeyIdDict(data) == {'1': 'a', '2': 'b', '3': 'c'}
assert StrKeyIdDict(data) != {'1': 'a', '2': 'b', '3': 'd'}
assert StrKeyIdDict(data) != {1: 'a', 2: 'b', 3: 'c'} # StrKeyIdDict assumes all keys are strings
# test delete key
del my_dict[4]
assert my_dict.keys() == {'1', '2', '3'} # '4' is not in the dict anymore
# assign value to an arbitrary string key that is not in the dict
my_dict.update({'some-uuid': 'something'})
assert my_dict['some-uuid'] == 'something'
with pytest.raises(TypeError):
# key '1', 1 both stands for key '1',
# so we get duplicated keys when initializing instance, oops!
my_dict = StrKeyIdDict({'1': 'a', 1: 'A'})
assert my_dict.get(1) == 'a'
assert my_dict.get('NotExistKey') is None
assert my_dict.get('NotExistKey', 'NotExistValue') == 'NotExistValue'
# test edge cases
assert StrKeyIdDict() == {}
# test shallow copy
my_dict[5] = ['e1', 'e2', 'e3']
copy_dict = my_dict.copy()
copy_dict[1] = 'A'
assert my_dict[1] == 'a'
my_dict['5'].append('e4')
assert copy_dict['5'] == ['e1', 'e2', 'e3', 'e4']
# test deep copy
from copy import deepcopy
copy_dict = deepcopy(my_dict)
my_dict[5].append('e5')
assert my_dict['5'] == ['e1', 'e2', 'e3', 'e4', 'e5']
assert copy_dict[5] == ['e1', 'e2', 'e3', 'e4']
# test constructor
my_dict = StrKeyIdDict(uuid1='a', uuid2='b')
assert my_dict['uuid1'] == 'a'
# test constructor (from keys)
my_dict = StrKeyIdDict.fromkeys([1, 2, 3], None)
assert my_dict == {'1': None, '2': None, '3': None}
# test update and overwrite
my_dict.update(StrKeyIdDict({1: 'a', 2: 'b', 3: 'c', 4: 'd'}))
assert my_dict == {'1': 'a', '2': 'b', '3': 'c', '4': 'd'}
my_dict = StrKeyIdDict([(1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')])
assert my_dict['1'] == my_dict[1] == 'a'
# reassign StrKeyIdDict instance to another StrKeyIdDict instance
my_dict = StrKeyIdDict(my_dict)
assert my_dict == {'1': 'a', '2': 'b', '3': 'c', '4': 'd'}
assert dict(my_dict) == {'1': 'a', '2': 'b', '3': 'c', '4': 'd'}
# test case when "key" is "data", which is a reserved keyword inside StrKeyIdDict
my_dict = StrKeyIdDict({'data': 'data_value', '1': 'a'})
assert my_dict['data'] == 'data_value'
assert my_dict['1'] == 'a'
# delete key 'data', should not affect other keys
del my_dict['data']
assert my_dict['1'] == 'a'
```
#### collect_leaves
```python3
from pythonic_toolbox.utils.dict_utils import collect_leaves
# a nested dict-like struct
my_dict = {
'node_1': {
'node_1_1': {
'node_1_1_1': 'A',
},
'node_1_2': {
'node_1_2_1': 'B',
'node_1_2_2': 'C',
'node_1_2_3': None,
},
'node_1_3': [ # dict list
{
'node_1_3_1_1': 'D',
'node_1_3_1_2': 'E',
},
{
'node_1_3_2_1': 'FF',
'node_1_3_2_2': 'GG',
}
]
}}
expected = ['A', 'B', 'C', None, 'D', 'E', 'FF', 'GG']
assert collect_leaves(my_dict) == expected
expected = ['A', 'B', 'C', 'D', 'E', 'FF', 'GG']
assert collect_leaves(my_dict, leaf_pred=lambda lf: lf) == expected
assert collect_leaves(my_dict, keypath_pred=lambda kp: len(kp) == 1) == []
expected = ['B', 'C']
assert collect_leaves(my_dict, keypath_pred=lambda kp: kp[-1] in {'node_1_2_1', 'node_1_2_2'}) == expected
expected = ['C']
assert collect_leaves(my_dict, leaf_pred=lambda lf: lf == 'C') == expected
assert collect_leaves(my_dict,
keypath_pred=lambda kp: kp[-1] == 'node_1_2_2',
leaf_pred=lambda lf: lf == 'C') == expected
assert collect_leaves(my_dict,
keypath_pred=lambda kp: kp[-1] == 'node_1_1_1',
leaf_pred=lambda lf: lf == 'C') == []
expected = ['D', 'E', 'FF', 'GG']
assert collect_leaves(my_dict,
keypath_pred=lambda kp: len(kp) >= 2 and kp[-2] == 'node_1_3') == expected
expected = ['FF', 'GG']
assert collect_leaves(my_dict,
keypath_pred=lambda kp: len(kp) >= 2 and kp[-2] == 'node_1_3',
leaf_pred=lambda lf: isinstance(lf, str) and len(lf) == 2) == expected
# edge cases
assert collect_leaves([]) == []
assert collect_leaves({}) == []
assert collect_leaves(None) == []
```
#### dict_until
```python3
from pythonic_toolbox.utils.dict_utils import dict_until
data = {'full_name': 'Albert Lee', 'pen_name': None}
assert dict_until(data, keys=['name', 'full_name']) == 'Albert Lee'
assert dict_until(data, keys=['full_name', 'name']) == 'Albert Lee'
assert dict_until(data, keys=['name', 'english_name']) is None
assert dict_until(data, keys=['name', 'english_name'], default='anonymous') == 'anonymous'
# test when pen_name is set None on purpose
assert dict_until(data, keys=['pen_name'], default='anonymous') is None
# test when value with None value is not acceptable
assert dict_until(data, keys=['pen_name'], terminate=lambda x: x is not None, default='anonymous') == 'anonymous'
```
#### select_list_of_dicts
```python3
from pythonic_toolbox.utils.dict_utils import select_list_of_dicts
dict_lst = [
{'name': 'Tony Stark', 'sex': 'male', 'age': 49, 'alias': 'Iron Man'},
{'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
# another Peter Parker from multiverse
{'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
# age unknown for Carol Danvers, no age field
{'name': 'Carol Danvers', 'sex': 'female', 'alias': 'Captain Marvel'},
{'name': 'Natasha Romanoff', 'sex': 'female', 'age': 35, 'alias': 'Black Widow'},
]
assert select_list_of_dicts(dict_lst, look_like={'name': 'Peter Parker'}) == [
{'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
{'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'}]
assert select_list_of_dicts(dict_lst, look_like={'sex': 'female'}) == [
{'name': 'Carol Danvers', 'sex': 'female', 'alias': 'Captain Marvel'},
{'name': 'Natasha Romanoff', 'sex': 'female', 'age': 35, 'alias': 'Black Widow'}]
assert select_list_of_dicts(dict_lst, look_like={'sex': 'female'}, keys=['name']) == [
{'name': 'Carol Danvers'}, {'name': 'Natasha Romanoff'}]
# unique is supported for return list
assert select_list_of_dicts(dict_lst, look_like={'sex': 'male'}, keys=['name', 'age']) == [
{'name': 'Tony Stark', 'age': 49},
{'name': 'Peter Parker', 'age': 16},
{'name': 'Peter Parker', 'age': 16},
]
assert select_list_of_dicts(dict_lst, look_like={'sex': 'male'}, keys=['name', 'age'], unique=True) == [
{'name': 'Tony Stark', 'age': 49},
{'name': 'Peter Parker', 'age': 16}]
# dict keys are ordered as the keys passed-in
assert list(select_list_of_dicts(dict_lst, keys=['name', 'age'], unique=True)[0].keys()) == ['name', 'age']
assert list(select_list_of_dicts(dict_lst, keys=['age', 'name'], unique=True)[0].keys()) == ['age', 'name']
# locate Captain Marvel, with default val for missing key
assert select_list_of_dicts(dict_lst,
look_like={'alias': 'Captain Marvel'},
keys=['name', 'sex', 'age', 'alias'],
val_for_missing_key='Unknown')[0]['age'] == 'Unknown'
# edge cases, get the original dict
assert select_list_of_dicts([]) == []
assert select_list_of_dicts(dict_lst) == dict_lst
# new list of dicts is returned, leaving the original list of dicts untouched
black_widow = select_list_of_dicts(dict_lst, look_like={'name': 'Natasha Romanoff'})[0]
black_widow['age'] += 1
assert black_widow['age'] == 36
# we don't modify the original dict data, Natasha is always 35 years old
assert select_list_of_dicts(dict_lst, look_like={'name': 'Natasha Romanoff'})[0]['age'] == 35
# preds provide more flexibility, filter the ones with age info
assert len(select_list_of_dicts(dict_lst, preds=[lambda d: 'age' in d, lambda d: d['age'] >= 0])) == 4
assert len(select_list_of_dicts(dict_lst, preds=[lambda d: 'age' in d, lambda d: d['age'] >= 0], unique=True)) == 3
# combine look_like and preds parameters
expected = [{'name': 'Tony Stark', 'sex': 'male', 'age': 49, 'alias': 'Iron Man'}]
assert select_list_of_dicts(dict_lst, look_like={'sex': 'male'},
preds=[lambda d: 'age' in d, lambda d: d['age'] > 20]) == expected
# empty list is returned if no dict matches the criteria
assert select_list_of_dicts(dict_lst, look_like={'sex': 'male'},
preds=[lambda d: 'sex' in d and d['sex'] == 'female']) == []
```
#### unique_list_of_dicts
```python3
from pythonic_toolbox.utils.dict_utils import unique_list_of_dicts
dict_lst = [
{'name': 'Tony Stark', 'sex': 'male', 'age': 49, 'alias': 'Iron Man'},
{'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
# Peter Parkers from multiverse in same age.
{'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
{'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
# test same dict, but the order of dict is different
{'name': 'Peter Parker', 'sex': 'male', 'alias': 'Spider Man', 'age': 16},
]
# Only one Peter Parker will be kept, for all data are exactly same.
assert unique_list_of_dicts(dict_lst) == [
{'name': 'Tony Stark', 'sex': 'male', 'age': 49, 'alias': 'Iron Man'},
{'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},
]
# edge cases
assert unique_list_of_dicts([]) == []
```
#### walk_leaves
```python3
from pythonic_toolbox.utils.dict_utils import walk_leaves
data = {
'k1': {
'k1_1': 1,
'k1_2': 2,
},
'k2': 'N/A', # stands for not available
}
expected = {
'k1': {
'k1_1': 2,
'k1_2': 4,
},
'k2': 'N/A', # stands for not available
}
assert walk_leaves(data) == data # no transform function provided, just a deepcopy
assert walk_leaves(data, trans_fun=lambda x: x * 2 if isinstance(x, int) else x) == expected
# if inplace is set True, will change data inplace, return nothing
assert walk_leaves(data, trans_fun=lambda x: x * 2 if isinstance(x, int) else x, inplace=True) is None
assert data == expected
data = [{'name': 'lml', 'age': 33}, {'name': 'albert', 'age': 18}]
expected = [{'name': 'lml', 'age': 66}, {'name': 'albert', 'age': 36}]
assert walk_leaves(data, trans_fun=lambda x: x * 2 if isinstance(x, int) else x) == expected
assert walk_leaves(data, trans_fun=lambda x: x * 2 if isinstance(x, int) else x, inplace=True) is None
assert data == expected
# edge cases
assert walk_leaves(None) is None
assert walk_leaves([]) == []
assert walk_leaves({}) == {}
assert walk_leaves(None, inplace=True) is None
assert walk_leaves([], inplace=True) is None
assert walk_leaves({}, inplace=True) is None
```
### functional_utils
#### filter_multi
```python3
from pythonic_toolbox.utils.functional_utils import lfilter_multi, filter_multi
from collections.abc import Iterable
def is_even(x):
return x % 2 == 0
def is_divisible_by_5(x):
return x % 5 == 0
# select numbers which are divisible by 2 and 5
assert lfilter_multi([is_even, is_divisible_by_5], range(1, 30)) == [10, 20]
assert lfilter_multi([is_even, is_divisible_by_5], [5, 10, 15, 20]) == [10, 20]
from itertools import count, takewhile
# if you want to pass an iterator, make sure the iterator will end/break,
# Note: a bare count(start=0, step=2) will generate number like 0, 2, 4, 6, .... (never ends)
even_numbers_less_equal_than_50 = takewhile(lambda x: x <= 50, count(start=0, step=2))
expected = [0, 10, 20, 30, 40, 50]
assert lfilter_multi([is_even, is_divisible_by_5], even_numbers_less_equal_than_50) == expected
# testing for filter_multi, not converted to list directly
num_iterator = filter_multi([is_even, is_divisible_by_5], [5, 10, 15, 20])
assert type(num_iterator) is filter
assert isinstance(num_iterator, Iterable)
expected = [10, 20]
for idx, value in enumerate(num_iterator):
assert value == expected[idx]
# when items are infinite, choose filter_multi instead of lfilter_multi
expected = [0, 10, 20, 30, 40, 50]
for idx, value in enumerate(filter_multi([is_even, is_divisible_by_5], count(start=0, step=1))):
if value > 50:
break
else:
assert value == expected[idx]
```
### list_utils
#### filter_allowable
```python3
from pythonic_toolbox.utils.list_utils import filter_allowable
fruits = ['apple', 'banana', 'orange']
vegetables = ['carrot', 'potato', 'tomato']
meats = ['beef', 'chicken', 'fish']
foods = fruits + vegetables + meats
assert list(filter_allowable(foods)) == foods
assert list(filter_allowable(foods, allow_list=[], block_list=[])) == foods
assert list(filter_allowable(foods, allow_list=['apple', 'banana', 'blueberry'])) == ['apple', 'banana']
assert list(filter_allowable(foods, allow_list=[], block_list=foods)) == []
assert list(filter_allowable(foods, block_list=meats)) == fruits + vegetables
assert list(filter_allowable(foods, allow_list=['apple'], block_list=[])) == ['apple']
assert list(filter_allowable(foods, allow_list=['apple'], block_list=['apple'])) == []
assert list(filter_allowable(foods + ['blueberry'], allow_list=[], block_list=foods)) == ['blueberry']
assert list(filter_allowable(['blueberry'], allow_list=[], block_list=[])) == ['blueberry']
assert list(filter_allowable(['blueberry'], allow_list=[], block_list=['apple', 'banana'])) == ['blueberry']
assert list(filter_allowable(['blueberry'], allow_list=['orange'], block_list=['apple', 'banana'])) == []
# test cases with parameter key
assert list(filter_allowable(foods, allow_list=['a', 'b'], key=lambda x: x[0])) == ['apple', 'banana', 'beef']
# test some basic cases
assert list(filter_allowable()) == []
assert list(filter_allowable(candidates=None)) == []
assert list(filter_allowable(candidates=[])) == []
assert list(filter_allowable(candidates=[], allow_list=[], block_list=[])) == []
```
#### sort_with_custom_orders
```python3
from operator import itemgetter
from typing import List
import pytest
from pythonic_toolbox.utils.list_utils import sort_with_custom_orders
# basic usage
values = ['branch2', 'branch1', 'branch3', 'master', 'release']
expected = ['master', 'release', 'branch1', 'branch2', 'branch3']
assert sort_with_custom_orders(values, prefix_orders=['master', 'release']) == expected
assert sort_with_custom_orders(values, prefix_orders=['master', 'release'], reverse=True) == expected[::-1]
values = [1, 2, 3, 9, 9]
expected = [9, 9, 1, 2, 3]
assert sort_with_custom_orders(values, prefix_orders=[9, 8, 7]) == expected
values = [1, 2, 3, 9]
expected = [9, 2, 3, 1]
assert sort_with_custom_orders(values, prefix_orders=[9], suffix_orders=[1]) == expected
assert sort_with_custom_orders([]) == []
assert sort_with_custom_orders([], prefix_orders=[], suffix_orders=[]) == []
assert sort_with_custom_orders([], prefix_orders=['master']) == []
# tests for unhashable values
values = [[2, 2], [1, 1], [3, 3], [6, 0]]
assert sort_with_custom_orders(values, prefix_orders=[[3, 3]]) == [[3, 3], [1, 1], [2, 2], [6, 0]]
# if "key" is provided, items are sorted in order of key(item)
# items in prefix_orders/suffix_orders don't need to be one-one correspondence with items to sort
# sum([6]) == sum([3, 3]) == sum([6, 0])
assert sort_with_custom_orders(values, prefix_orders=[[6]], key=sum) == [[3, 3], [6, 0], [1, 1], [2, 2]]
# tests for list of dicts
values = [{2: 2}, {1: 1}, {1: 2}]
assert sort_with_custom_orders(values, prefix_orders=[{2: 2}],
key=lambda data: sum(data.values())) == [{2: 2}, {1: 2}, {1: 1}]
branch_info: List[dict] = [{'branch': 'master', 'commit_id': 'v1.2'}, {'branch': 'release', 'commit_id': 'v1.1'}]
# Assume that we prefer choosing branch in order: release > master > others (develop, hotfix etc.)
res = sort_with_custom_orders(branch_info,
prefix_orders=[{'branch': 'release'}, {'branch': 'master'}],
key=itemgetter('branch'))
expected = [{'branch': 'release', 'commit_id': 'v1.1'}, {'branch': 'master', 'commit_id': 'v1.2'}]
assert res == expected
branch_info = [{'branch': 'develop', 'commit_id': 'v1.3'}, {'branch': 'master', 'commit_id': 'v1.2'}]
res = sort_with_custom_orders(branch_info,
prefix_orders=[{'branch': 'release'}, {'branch': 'master'}],
key=itemgetter('branch'))
expected = [{'branch': 'master', 'commit_id': 'v1.2'}, {'branch': 'develop', 'commit_id': 'v1.3'}]
assert res == expected
# tests for exceptions
with pytest.raises(ValueError) as exec_info:
sort_with_custom_orders([1, 2, 3], prefix_orders=[3], suffix_orders=[3])
assert exec_info.value.args[0] == 'prefix and suffix contains same value'
with pytest.raises(ValueError) as exec_info:
sort_with_custom_orders([1, 2, 3], prefix_orders=[1, 1])
assert exec_info.value.args[0] == 'prefix_orders contains duplicated values'
# tests for class
class Person:
def __init__(self, id, name, age):
self.id = id
self.name = name
self.age = age
def __lt__(self, other: 'Person'):
return self.age < other.age
def __eq__(self, other: 'Person'):
return self.age == other.age
def __hash__(self):
return self.id
def __str__(self):
return f'Person({self.id}, {self.name}, {self.age})'
def __repr__(self):
return str(self)
Albert = Person(1, 'Albert', 28)
Alice = Person(2, 'Alice', 26)
Menglong = Person(3, 'Menglong', 33)
persons = [Albert, Alice, Menglong]
expected = [Alice, Albert, Menglong]
assert sort_with_custom_orders(persons) == expected
expected = [Menglong, Alice, Albert]
assert sort_with_custom_orders(persons, prefix_orders=[Menglong, Person(4, 'Anyone', 40)]) == expected
```
#### unpack_list
```python3
import pytest
from pythonic_toolbox.utils.list_utils import unpack_list
first, second, third = unpack_list(['a', 'b', 'c', 'd'], target_num=3)
assert first == 'a' and second == 'b' and third == 'c'
first, second, third = unpack_list(['a', 'b'], target_num=3, default=None)
assert first == 'a' and second == 'b' and third is None
first, second, third = unpack_list(range(1, 3), target_num=3, default=None)
assert first == 1 and second == 2 and third is None
first, second, third = unpack_list([], target_num=3, default=0)
assert first == second == third == 0
first, second, *rest = unpack_list(['a', 'b', 'c'], target_num=4, default='x')
assert first == 'a' and second == 'b' and rest == ['c', 'x']
# test case for type range
first, second, third = unpack_list(range(1, 3), target_num=3, default=None)
assert first == 1 and second == 2 and third is None
def fib():
a, b = 0, 1
while 1:
yield a
a, b = b, a + b
# test case for type generator
fib_generator = fib() # generates data like [0, 1, 1, 2, 3, 5, 8, 13, 21 ...]
first, second, third, *rest = unpack_list(fib_generator, target_num=6)
assert first == 0 and second == 1 and third == 1
assert rest == [2, 3, 5]
seventh, eighth = unpack_list(fib_generator, target_num=2)
assert seventh == 8 and eighth == 13
# test edge case, nothing to unpack
empty = unpack_list([], target_num=0, default=None)
assert empty == []
res = unpack_list([], target_num=2, default=None)
assert res == [None, None]
empty = unpack_list(['a', 'b'], target_num=0, default=None)
assert empty == []
empty = unpack_list(range(0, 0), target_num=0)
assert empty == []
empty = unpack_list(iter([]), target_num=0, default=None)
assert empty == []
with pytest.raises(ValueError):
# ValueError: not enough values to unpack (expected 3, got 2)
first, second, third = unpack_list([1, 2], target_num=2)
```
#### until
```python3
from itertools import count
from pythonic_toolbox.utils.list_utils import until
# basic usage
counter = count(1, 2) # generator of odd numbers: 1, 3, 5, 7 ...
assert until(counter, lambda x: x > 10) == 11
assert until([1, 2, 3], lambda x: x > 10, default=11) == 11
# test case for when there's no default value and no item in the iterable satisfies the condition
assert until([1, 2, 3], lambda x: x > 10) is None
# edge cases
assert until([], default=3) == 3 # nothing provided, return default
assert until(None, lambda x: x > 10, default=11) == 11
# test case for when there's no item in the counter satisfies the condition
# the following codes will run forever, so comment them out
# counter = count(1, 2) # generator of odd numbers: 1, 3, 5, 7 ...
# assert until(counter, lambda x: x % 2 == 0) is None
# test case for when max_iter_num is provided, only iterate the counter for max_iter_num times
counter = count(1, 2) # generator of odd numbers: 1, 3, 5, 7 ...
assert until(counter, lambda x: x % 2 == 0, default=None, max_iter_num=100) is None
numbers = [1, 2, 3, 4, 5, 6]
assert until(numbers, lambda x: x >= 5, default=None, max_iter_num=1) is None
assert until(numbers, lambda x: x >= 5, default=None, max_iter_num=4) is None
assert until(numbers, lambda x: x >= 5, default=None, max_iter_num=5) == 5
assert until(numbers, lambda x: x >= 5, default=None, max_iter_num=100) == 5
```
### string_utils
#### substitute_string_template_dict
```python3
from unittest.mock import patch, PropertyMock
import pytest
from pythonic_toolbox.utils.string_utils import substitute_string_template_dict, CycleError
# simple usage
# both $variable ${variable} declarations are supported in string template format
str_template_dict = {
'greeting': 'Good Morning, Everyone!',
'first_name': 'Albert',
'last_name': 'Lee',
'full_name': '$first_name $last_name',
'age': 34,
'speech': '$greeting, I am $full_name, a ${age}-year-old programmer, very glad to meet you!'
}
output_dict = substitute_string_template_dict(str_template_dict)
assert output_dict['full_name'] == 'Albert Lee'
expected_speech = 'Good Morning, Everyone!, I am Albert Lee, a 34-year-old programmer, very glad to meet you!'
assert output_dict['speech'] == expected_speech
# complex usage, with dynamic values, and multi value-providing holders
str_template_dict = {
'first_name': 'Daenerys',
'last_name': 'Targaryen',
'nick_name': 'Dany',
'full_name': '$first_name $last_name',
'speech': "$nick_name: I'm $full_name ($title1, $title2, $title3), it's $current_time_str, $greeting!",
}
variables_dict = {'title1': 'Queen of Meereen',
'title2': 'Mother of Dragons'}
class DynamicVariables:
@property
def current_time_str(self):
import datetime
return datetime.datetime.now().strftime("%H:%M:%S")
class DefaultUnknownTitle:
"""
A class will always return UnknownTitle, when try to access attribute like
title1, title2, ..., titleX
"""
def __getattribute__(self, item):
if isinstance(item, str) and item.startswith('title') and item[len(item) - 1:].isdigit():
return 'UnknownTitle'
return super(DefaultUnknownTitle, self).__getattribute__(item)
expected_speech = ("Dany: I'm Daenerys Targaryen (Queen of Meereen, Mother of Dragons, UnknownTitle), "
"it's 08:00:00, good morning everyone!")
# using mock to make DynamicVariables().current_time_str always return 08:00:00
with patch.object(DynamicVariables, 'current_time_str', return_value='08:00:00', new_callable=PropertyMock):
output_dict = substitute_string_template_dict(str_template_dict, variables_dict, DynamicVariables(),
DefaultUnknownTitle(),
greeting='good morning everyone')
assert output_dict['speech'] == expected_speech
# edge cases
assert substitute_string_template_dict({}) == {}
# cycle detection
str_template_dict = {
'variable_a': 'Hello $variable_b', # variable_a depends on variable_b
'variable_b': 'Hello $variable_a', # variable_b depends on variable_a, it's a cycle!
}
with pytest.raises(CycleError) as exec_info:
substitute_string_template_dict(str_template_dict)
```
### context
#### SkipContext
```python3
import itertools
import pytest
from pythonic_toolbox.utils.context_utils import SkipContext
# Usage: define a class that inherits the SkipContext,
# and takes control of the skip or not logic
class MyWorkStation(SkipContext):
def __init__(self, week_day: str):
working_days = {'monday', 'tuesday', 'wednesday', 'thursday', 'friday'}
weekends = {'saturday', 'sunday'}
if week_day.lower() not in working_days.union(weekends):
raise ValueError(f'Invalid weekday {week_day}')
skip = True if week_day.lower() in weekends else False
super(MyWorkStation, self).__init__(skip=skip)
seven_week_days = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
logged_opening_days = []
total_working_hours = 0
for cur_week_day in seven_week_days:
# MyWorkStation will skip the code block when encountering weekends
with MyWorkStation(week_day=cur_week_day):
# log this working day
logged_opening_days.append(cur_week_day)
# accumulate working hours, 8 hours on each working day
total_working_hours += 8
# only working days are logged
assert logged_opening_days == ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday']
assert total_working_hours == 8 * 5
# test basic SkipContext
count_iterator = itertools.count(start=0, step=1)
flg_skip = True
with SkipContext(skip=flg_skip):
# if skip = True, all codes inside the context will be skipped(not executed)
next(count_iterator) # this will not be executed
assert sum([1, 1]) == 3
raise Exception('Codes will not be executed')
assert next(count_iterator) == 0 # check previous context is skipped
flg_skip = False
with SkipContext(skip=flg_skip):
# codes will be executed as normal, if skip = False
next(count_iterator) # generate value 1
assert sum([1, 1]) == 2
assert next(count_iterator) == 2 # check previous context is executed
with pytest.raises(Exception) as exec_info:
with SkipContext(skip=False):
# if skip = False, this SkipContextManager is transparent,
# internal exception will be detected as normal
raise Exception('MyError')
assert exec_info.value.args[0] == 'MyError'
# another example: ensure there will be only one job, who acquire the lock, run the increase +1
from multiprocessing import Manager, Pool
import time
from pythonic_toolbox.utils.context_utils import SkipContext
def plain_cronjob_increase(ns, lock):
start = time.time()
with lock:
now = time.time()
if now - start >= 0.5:
pass
else:
ns.cnt += 1
time.sleep(1)
return ns.cnt
class PreemptiveLockContext(SkipContext):
def __init__(self, lock):
self.start_time = time.perf_counter()
self.lock = lock
self.acquired = self.lock.acquire(timeout=0.5)
skip = not self.acquired
super(PreemptiveLockContext, self).__init__(skip=skip)
def __exit__(self, type, value, traceback):
if self.acquired:
time.sleep(1)
self.lock.release()
if type is None:
return # No exception
else:
if issubclass(type, self.SkipContentException):
return True # Suppress special SkipWithBlockException
return False
def cronjob_increase(ns, lock):
# for those who cannot acquire the lock within some time
# this context block will be skipped, quite simple
with PreemptiveLockContext(lock):
ns.cnt += 1
return ns.cnt
manager = Manager()
lock = manager.Lock()
ns = manager.Namespace()
pool = Pool(2)
ns.cnt = 0
processes = [pool.apply_async(plain_cronjob_increase, args=(ns, lock)) for __ in range(0, 2)]
result = [p.get() for p in processes]
assert result == [1, 1]
assert ns.cnt == 1
# reset global cnt=0
ns.cnt = 0
processes = [pool.apply_async(cronjob_increase, args=(ns, lock)) for __ in range(0, 2)]
result = [p.get() for p in processes]
assert result == [1, 1]
assert ns.cnt == 1
```
Raw data
{
"_id": null,
"home_page": "https://github.com/albertmenglongli/pythonic-toolbox",
"name": "pythonic-toolbox",
"maintainer": "",
"docs_url": null,
"requires_python": "",
"maintainer_email": "",
"keywords": "toolbox",
"author": "menglong.li",
"author_email": "albert.menglongli@gmail.com",
"download_url": "https://files.pythonhosted.org/packages/5f/7e/5d121510194fc42afeaaa426fd30a0b514248f04478e64543c9884bec106/pythonic-toolbox-1.1.39.tar.gz",
"platform": null,
"description": "# Pythonic toolbox\n\n[![PyPI version](https://badge.fury.io/py/pythonic-toolbox.svg)](https://badge.fury.io/py/pythonic-toolbox)\n[![License](https://img.shields.io/badge/License-Apache_2.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)\n[![Supported Python versions](https://img.shields.io/pypi/pyversions/pythonic-toolbox.svg?style=flat&logo=python&logoColor=yellow&labelColor=5c5c5c)](https://pypi.org/project/pythonic-toolbox)\n[![Stability](https://img.shields.io/pypi/status/pythonic-toolbox.svg?style=flat)](https://badge.fury.io/py/pythonic-toolbox)\n[![CodeQL Status](https://github.com/albertmenglongli/pythonic-toolbox/actions/workflows/codeql-analysis.yml/badge.svg?branch=master)](https://github.com/albertmenglongli/pythonic-toolbox/actions/workflows/codeql-analysis.yml)\n[![Python3.6 Test Status](https://github.com/albertmenglongli/pythonic-toolbox/actions/workflows/tests-python-versions.yml/badge.svg?branch=master)](https://github.com/albertmenglongli/pythonic-toolbox/actions/workflows/tests-python-versions.yml)\n[![SNYK Status](https://snyk.io/test/github/albertmenglongli/pythonic-toolbox/badge.svg)](https://snyk.io/test/github/albertmenglongli/pythonic-toolbox)\n\n\n## Table of Contents\n* [Introduction](#Introduction)\n* [Installation](#Installation)\n* [Usage](#Usage)\n\t* [decorators](#decorators)\n\t\t* [ignore_unexpected_kwargs](#ignore_unexpected_kwargs)\n\t\t* [retry](#retry)\n\t* [deque_utils](#deque_utils)\n\t\t* [deque_pop_any](#deque_pop_any)\n\t\t* [deque_split](#deque_split)\n\t* [dict_utils](#dict_utils)\n\t\t* [DictObj](#DictObj)\n\t\t* [FinalDictObj](#FinalDictObj)\n\t\t* [RangeKeyDict](#RangeKeyDict)\n\t\t* [StrKeyIdDict](#StrKeyIdDict)\n\t\t* [collect_leaves](#collect_leaves)\n\t\t* [dict_until](#dict_until)\n\t\t* [select_list_of_dicts](#select_list_of_dicts)\n\t\t* [unique_list_of_dicts](#unique_list_of_dicts)\n\t\t* [walk_leaves](#walk_leaves)\n\t* [functional_utils](#functional_utils)\n\t\t* [filter_multi](#filter_multi)\n\t* [list_utils](#list_utils)\n\t\t* [filter_allowable](#filter_allowable)\n\t\t* [sort_with_custom_orders](#sort_with_custom_orders)\n\t\t* [unpack_list](#unpack_list)\n\t\t* [until](#until)\n\t* [string_utils](#string_utils)\n\t\t* [substitute_string_template_dict](#substitute_string_template_dict)\n\t* [context](#context)\n\t\t* [SkipContext](#SkipContext)\n\n\n> README.md is auto generated by the script **tests/generate_readme_markdown.py** from testing files,\n>\n> **DO NOT EDIT DIRECTLY!** ;)\n\n```bash\npython3 tests/generate_readme_markdown.py\n```\n\n\n## Introduction\n\nA python3.6+ toolbox with multi useful utils, functions, decorators in pythonic way, and is fully tested from python3.6 to python3.11 .\n\n## Installation\n\n```bash\npip3 install pythonic-toolbox --upgrade\n```\n\n## Usage\n\n### decorators\n\n#### ignore_unexpected_kwargs\n\n```python3\nimport pytest\nfrom pythonic_toolbox.decorators.common import ignore_unexpected_kwargs\n\n# Following functions are named under Metasyntactic Variables, like:\n# foobar, foo, bar, baz, qux, quux, quuz, corge,\n# grault, garply, waldo, fred, plugh, xyzzy, thud\n\ndef foo(a, b=0, c=3):\n return a, b, c\n\ndct = {'a': 1, 'b': 2, 'd': 4}\nwith pytest.raises(TypeError) as __:\n assert foo(**dct) == (1, 2, 3)\n\nwrapped_foo = ignore_unexpected_kwargs(foo)\nassert wrapped_foo(**dct) == (1, 2, 3)\n\nassert wrapped_foo(0, 0, 0) == (0, 0, 0)\nassert wrapped_foo(a=1, b=2, c=3) == (1, 2, 3)\n\n@ignore_unexpected_kwargs\ndef bar(*args: int):\n return sum(args)\n\n# should not change original behavior\nassert bar(1, 2, 3) == 6\nassert bar(1, 2, 3, unexpected='Gotcha') == 6\nnums = [1, 2, 3]\nassert bar(*nums, unexpected='Gotcha') == 6\n\n@ignore_unexpected_kwargs\ndef qux(a, b, **kwargs):\n # function with Parameter.VAR_KEYWORD Aka **kwargs\n return a, b, kwargs.get('c', 3), kwargs.get('d', 4)\n\nassert qux(**{'a': 1, 'b': 2, 'd': 4, 'e': 5}) == (1, 2, 3, 4)\n\nclass Person:\n @ignore_unexpected_kwargs\n def __init__(self, name, age, sex):\n self.name = name\n self.age = age\n self.sex = sex\n\n @classmethod\n @ignore_unexpected_kwargs\n def create(cls, name, age, sex):\n return cls(name, age, sex)\n\n @staticmethod\n @ignore_unexpected_kwargs\n def greetings(name):\n return f'Hello, I am {name}'\n\nparams = {\n 'name': 'albert',\n 'age': 34,\n 'sex': 'male',\n 'height': '170cm',\n}\n__ = Person(**params)\n__ = Person('albert', 35, 'male', height='170cm')\n\n# test cases for classmethod, staticmethod\n__ = Person.create(**params)\nassert Person.greetings(**params)\n\n```\n\n#### retry\n\n```python3\nimport pytest\n\nfrom pythonic_toolbox.decorators.common import retry\n\n# use decorator without any arguments, using retry default params\n@retry\ndef func_fail_first_time():\n \"\"\"func_fail_first_time\"\"\"\n self = func_fail_first_time\n if not hasattr(self, 'call_times'):\n # set attribute call_times for function, to count call times\n self.call_times = 0\n self.call_times += 1\n if self.call_times == 1:\n raise Exception('Fail when first called')\n return 'ok'\n\nassert func_fail_first_time() == 'ok'\nassert func_fail_first_time.call_times == 2\nassert func_fail_first_time.__doc__ == 'func_fail_first_time'\n\n@retry(tries=2, delay=0.1) # use decorator with customized params\ndef func_fail_twice():\n \"\"\"func_fail_twice\"\"\"\n self = func_fail_twice\n if not hasattr(self, 'call_times'):\n self.call_times = 0\n self.call_times += 1\n if self.call_times <= 2:\n raise Exception('Fail when called first, second time')\n return 'ok'\n\nassert func_fail_twice() == 'ok'\nassert func_fail_twice.call_times == 3\nassert func_fail_twice.__doc__ == 'func_fail_twice'\n\n@retry(tries=2, delay=0.1)\ndef func_fail_three_times():\n \"\"\"func_fail_three_times\"\"\"\n self = func_fail_three_times\n if not hasattr(self, 'call_times'):\n self.call_times = 0\n self.call_times += 1\n if self.call_times <= 3: # 1, 2, 3\n raise Exception('Fail when called first, second, third time')\n return 'ok'\n\nwith pytest.raises(Exception) as exec_info:\n func_fail_three_times()\nassert func_fail_three_times.call_times == 3\nassert exec_info.value.args[0] == 'Fail when called first, second, third time'\n\ndef raw_func_fail_first_time():\n \"\"\"func_fail_first_time\"\"\"\n self = raw_func_fail_first_time\n if not hasattr(self, 'call_times'):\n # set attribute call_times for function, to count call times\n self.call_times = 0\n self.call_times += 1\n if self.call_times == 1:\n raise Exception('Fail when first called')\n return 'ok'\n\nassert retry(raw_func_fail_first_time)() == 'ok'\n\n# test cases when function has arguments, kwargs\n@retry(tries=1, delay=0.1)\ndef func_fail_first_time_with_parameters(p1, p2):\n \"\"\"func_fail_first_time\"\"\"\n self = func_fail_first_time_with_parameters\n if not hasattr(self, 'call_times'):\n # set attribute call_times for function, to count call times\n self.call_times = 0\n self.call_times += 1\n if self.call_times == 1:\n raise Exception('Fail when first called')\n return p1 + p2\n\nassert func_fail_first_time_with_parameters(1, 2) == 3\n\ndef func_fail_first_time_with_parameters(p1, p2):\n \"\"\"func_fail_first_time\"\"\"\n self = func_fail_first_time_with_parameters\n if not hasattr(self, 'call_times'):\n # set attribute call_times for function, to count call times\n self.call_times = 0\n self.call_times += 1\n if self.call_times == 1:\n raise Exception('Fail when first called')\n return p1 + p2\n\nassert retry(tries=1, delay=0.1)(func_fail_first_time_with_parameters)(1, 2) == 3\nassert retry(tries=1, delay=0.1)(func_fail_first_time_with_parameters)(p1=1, p2=2) == 3\n\nimport asyncio\n\n@retry\nasync def async_func_fail_first_time():\n \"\"\"async_func_fail_first_time\"\"\"\n self = async_func_fail_first_time\n if not hasattr(self, 'call_times'):\n self.call_times = 0\n self.call_times += 1\n if self.call_times == 1:\n raise Exception('Fail when first called')\n return 'ok'\n\n@retry(delay=0.1)\nasync def async_func_fail_first_time2():\n \"\"\"async_func_fail_first_time2\"\"\"\n self = async_func_fail_first_time2\n if not hasattr(self, 'call_times'):\n self.call_times = 0\n self.call_times += 1\n if self.call_times == 1:\n raise Exception('Fail when first called')\n return 'ok'\n\nasync def async_main():\n assert await async_func_fail_first_time() == 'ok'\n assert async_func_fail_first_time.__doc__ == 'async_func_fail_first_time'\n assert async_func_fail_first_time.call_times == 2\n assert await async_func_fail_first_time2() == 'ok'\n assert async_func_fail_first_time2.call_times == 2\n assert async_func_fail_first_time2.__doc__ == 'async_func_fail_first_time2'\n\nloop = asyncio.get_event_loop()\nif loop.is_closed():\n loop = asyncio.new_event_loop()\ntry:\n loop.run_until_complete(async_main())\nfinally:\n loop.close()\n\nimport random\nfail_count = 0\n\n@retry(delay=0.1)\nasync def always_fail_func():\n nonlocal fail_count\n fail_count += 1\n await asyncio.sleep(random.random())\n raise ValueError()\n\nasync def async_main_for_always_fail():\n nonlocal fail_count\n tasks = [always_fail_func() for i in range(0, 3)]\n results = await asyncio.gather(*tasks, return_exceptions=True)\n assert all(map(lambda e: isinstance(e, ValueError), results))\n assert fail_count == 2 * 3 # each func run twice, three func calls\n\nloop = asyncio.get_event_loop()\nif loop.is_closed():\n loop = asyncio.new_event_loop()\ntry:\n loop.run_until_complete(async_main_for_always_fail())\nfinally:\n loop.close()\n\n```\n\n### deque_utils\n\n#### deque_pop_any\n\n```python3\nfrom collections import deque\n\nimport pytest\nfrom pythonic_toolbox.utils.deque_utils import deque_pop_any\n\nqueue = deque([1, 2, 3, 4, 5])\nassert deque_pop_any(queue, idx=1) == 2\nassert queue == deque([1, 3, 4, 5])\n\n# edge case: same as deque.popleft()\nqueue = deque([1, 2, 3, 4, 5])\nassert deque_pop_any(queue, idx=0) == 1\nassert queue == deque([2, 3, 4, 5])\n\n# edge case: same as deque.popright()\nqueue = deque([1, 2, 3, 4, 5])\nassert deque_pop_any(queue, idx=len(queue) - 1) == 5\nassert queue == deque([1, 2, 3, 4])\n\nqueue = deque([1, 2, 3, 4, 5])\nwith pytest.raises(IndexError) as exec_info:\n deque_pop_any(queue, idx=102)\n\n# edge case: pop from empty deque\nqueue = deque()\nwith pytest.raises(IndexError) as exec_info:\n deque_pop_any(queue, idx=0)\nassert exec_info.value.args[0] == 'pop from empty deque'\n\n```\n\n#### deque_split\n\n```python3\nimport pytest\n\nfrom collections import deque\n\nfrom pythonic_toolbox.utils.deque_utils import deque_split\n\nqueue1, queue2 = deque_split(deque([1, 2, 3, 4, 5]), num=3)\nassert queue1 == deque([1, 2, 3])\nassert queue2 == deque([4, 5])\n\nqueue1, queue2 = deque_split(deque([1, 2, 3, 4, 5]), num=0)\nassert queue1 == deque([])\nassert queue2 == deque([1, 2, 3, 4, 5])\n\nqueue1, queue2 = deque_split(deque([1, 2, 3, 4, 5]), num=100)\nassert queue1 == deque([1, 2, 3, 4, 5])\nassert queue2 == deque([])\n\nwith pytest.raises(ValueError) as exec_info:\n deque_split(deque([1, 2, 3, 4, 5]), -1)\nassert exec_info.value.args[0] == 'num must be integer: 0 <= num <= sys.maxsize'\n\n```\n\n### dict_utils\n\n#### DictObj\n\n```python3\nfrom copy import deepcopy\n\nimport pytest\nfrom pythonic_toolbox.utils.dict_utils import DictObj\n\nnaive_dct = {\n 'key1': 'val1',\n 'key2': 'val2',\n}\n\nobj = DictObj(naive_dct)\n\n# test basic functional methods like dict\nassert len(obj) == 2\nassert bool(obj) is True\n# same behavior like ordinary dict according to the python version (FILO for popitem for 3.6+)\nassert obj.popitem() == ('key2', 'val2')\nassert obj.popitem() == ('key1', 'val1')\nwith pytest.raises(KeyError) as __:\n obj.popitem()\n\n# a key can be treated like an attribute\n# an attribute can be treated like a key\nobj.key3 = 'val3'\nassert obj.pop('key3') == 'val3'\nwith pytest.raises(KeyError) as __:\n obj.pop('key4')\nobj.key5 = 'val5'\ndel obj.key5\nwith pytest.raises(KeyError) as __:\n obj.pop('key5')\nwith pytest.raises(AttributeError) as __:\n del obj.key5\n\n# test deepcopy\nobj = DictObj({'languages': ['Chinese', 'English']})\ncopied_obj = deepcopy(obj)\nassert copied_obj == obj\ncopied_obj.languages = obj.languages + ['Japanese']\nassert obj.languages == ['Chinese', 'English']\nassert copied_obj.languages == ['Chinese', 'English', 'Japanese']\nassert copied_obj != obj\n\nperson_dct = {'name': 'Albert', 'age': '34', 'sex': 'Male', 'languages': ['Chinese', 'English']}\n\nperson = DictObj(person_dct)\nassert DictObj(person_dct) == DictObj(person_dct)\nassert person.to_dict() == person_dct\nassert set(person.keys()) == {'name', 'age', 'sex', 'languages'}\nassert hasattr(person, 'name') is True\nassert person.name == 'Albert'\nassert person['name'] == 'Albert'\nperson.languages.append('Japanese')\nassert person.languages == ['Chinese', 'English', 'Japanese']\n\nperson.height = '170'\nassert person['height'] == '170'\nassert 'height' in person\nassert 'height' in person.keys()\nassert hasattr(person, 'height') is True\ndel person['height']\nassert 'height' not in person\nassert 'height' not in person.keys()\nperson['height'] = '170cm'\n\nperson.update({'weight': '50'})\nweight_val = person.pop('weight')\nassert weight_val == '50'\nperson.update(DictObj({'weight': '50kg'}))\nassert person.weight == '50kg'\n\nexpected = {\n 'name': 'Albert', 'age': '34', 'sex': 'Male',\n 'languages': ['Chinese', 'English', 'Japanese'], # appended new language\n 'height': '170cm', # new added attribute\n 'weight': '50kg', # new added attribute\n}\nassert person.to_dict() == expected\n\nrepr_expected: str = (\"{'name': 'Albert', 'age': '34', 'sex': 'Male', \"\n \"'languages': ['Chinese', 'English', 'Japanese'],\"\n \" 'height': '170cm', 'weight': '50kg'}\")\nassert repr(person) == repr_expected\n\n# nested structure will be detected, and changed to DictObj\nchessboard_data = {\n 'position': [\n [{'name': 'knight'}, {'name': 'pawn'}],\n [{'name': 'pawn'}, {'name': 'queen'}],\n ]\n}\nchessboard_obj = DictObj(chessboard_data)\n# test comparing instances of DictObj\nassert DictObj(chessboard_data) == DictObj(chessboard_data)\nassert isinstance(chessboard_obj.position, list)\nassert len(chessboard_obj.position) == 2\nassert isinstance(chessboard_obj.position[0][0], DictObj)\nassert chessboard_obj.position[0][0].name == 'knight'\nassert chessboard_obj.position[1][1].name == 'queen'\n\n# edge case empty DictObj\nempty_dict_obj = DictObj({})\nassert len(empty_dict_obj) == 0\nassert bool(empty_dict_obj) is False\n\nobj_dict = DictObj({'data': 'oops'})\nassert obj_dict.data == 'oops'\n\n# params validation\ninvalid_key_dct = {\n 1: '1',\n}\n\n# test when dict's key is not str\nwith pytest.raises(ValueError) as __:\n __ = DictObj(invalid_key_dct)\n\ncomplicated_key_dct = {\n '1abc': 'Gotcha', # '1abc' is not valid identifier for Python, so obj.1abc will cause SyntaxError\n 'class': 'MyClass', # 'class' is keyword in Python, so obj.class will cause SyntaxError\n}\n\nobj_dict = DictObj(complicated_key_dct)\nassert obj_dict['1abc'] == 'Gotcha'\nassert getattr(obj_dict, '1abc') == 'Gotcha'\n# you can access '1abc' as attribute by adding prefix '_'\nassert obj_dict._1abc == 'Gotcha'\ndel obj_dict._1abc\n\nassert obj_dict['class'] == 'MyClass'\nassert getattr(obj_dict, 'class') == 'MyClass'\n# you can access 'class' as attribute by adding prefix '_'\nassert obj_dict._class == 'MyClass'\n\n# test re-assign new value for 'class'\nobj_dict._class = 'MyClass2'\nassert obj_dict._class == 'MyClass2'\nassert obj_dict['class'] == 'MyClass2'\nassert getattr(obj_dict, 'class') == 'MyClass2'\ndel obj_dict._class\n\n# if assign new attributes (_2, _try), ObjDict will treat it like what the originally are\n# this is fully considered by design, you're not encouraged to mess up keys\nobj_dict._2x = 'NewAttr'\nassert obj_dict._2x == 'NewAttr'\nassert obj_dict['_2x'] == 'NewAttr'\nwith pytest.raises(KeyError):\n __ = obj_dict['2x']\nwith pytest.raises(AttributeError):\n __ = getattr(obj_dict, '2x')\n\nobj_dict._try = 'NewAttr'\nassert obj_dict._try == 'NewAttr'\nassert obj_dict['_try'] == 'NewAttr'\nwith pytest.raises(KeyError):\n __ = obj_dict['NewAttr']\nwith pytest.raises(AttributeError):\n __ = getattr(obj_dict, 'NewAttr')\n\n# Demo for messing up key 'class'\n# delete and re-assign _class\ncomplicated_key_dct = {\n 'class': 'MyClass', # 'class' is keyword in Python, so obj.class will cause SyntaxError\n}\nobj_dict = DictObj(complicated_key_dct)\n\nassert obj_dict['class'] == 'MyClass'\nobj_dict._class = 'MyClass2'\nassert obj_dict['class'] == 'MyClass2'\ndel obj_dict._class\n\n# obj_dict has no knowledge about 'class' or '_class'\n# so '_class' is a brand-new attribute, and will be stored as '_class'\nobj_dict._class = 'MyClass3'\nwith pytest.raises(KeyError):\n # Oops!!! by-design\n # 'class' cannot be accessed as key anymore,\n # because we store '_class' as key as other valid keys behave\n assert obj_dict['class'] == 'MyClass3'\nassert obj_dict['_class'] == 'MyClass3'\n\n# thread safe testing\nimport sys\nfrom threading import Thread\nfrom pythonic_toolbox.decorators.decorator_utils import method_synchronized\n\nclass MyObjDict(DictObj):\n # implement a thread-safe method to increase the value of cnt\n @method_synchronized\n def increase_cnt_by_n(self, n):\n self.cnt += n\n\ndef increase_cnt_by_100(dict_obj):\n for i in range(100):\n dict_obj.increase_cnt_by_n(1)\n\nsw_interval = sys.getswitchinterval()\ntry:\n sys.setswitchinterval(0.0001)\n my_dict_obj = MyObjDict({'cnt': 0})\n threads = [Thread(target=increase_cnt_by_100, args=(my_dict_obj,)) for _ in range(100)]\n [t.start() for t in threads]\n [t.join() for t in threads]\n assert my_dict_obj.cnt == 10000\nfinally:\n sys.setswitchinterval(sw_interval)\n\n# test copy/deepcopy of DictObj\nimport copy\n\nperson = DictObj({'name': 'albert', 'age': 33})\nteam = DictObj({'leader': person})\nshallow_copy_of_team = copy.copy(team)\nassert team.leader is shallow_copy_of_team.leader\n\ndeep_copy_of_team = copy.deepcopy(team)\nassert team.leader is not deep_copy_of_team.leader\nassert team.leader == deep_copy_of_team.leader\n\n```\n\n#### FinalDictObj\n\n```python3\nfrom typing import cast\n\nimport pytest\nfrom pythonic_toolbox.utils.dict_utils import FinalDictObj\n\nperson_dct = {'name': 'Albert', 'age': '34', 'sex': 'Male', 'languages': ['Chinese', 'English']}\n\nfixed_person = FinalDictObj(person_dct)\nassert fixed_person.name == 'Albert'\n\n# FINAL means once initialized, you cannot change the key/attribute anymore\nwith pytest.raises(RuntimeError) as exec_info:\n fixed_person.name = 'Steve'\nexpected_error_str = 'Cannot modify attribute/item in an already initialized FinalDictObj'\nassert exec_info.value.args[0] == expected_error_str\n\nwith pytest.raises(RuntimeError) as __:\n fixed_person.popitem()\n\nwith pytest.raises(RuntimeError) as __:\n fixed_person.pop('name')\n\nassert isinstance(fixed_person.languages, tuple)\nwith pytest.raises(AttributeError) as exec_info:\n # list values are changed into tuple to avoid being modified\n cast(list, fixed_person.languages).append('Japanese')\nexpected_error_str = \"'tuple' object has no attribute 'append'\"\nassert exec_info.value.args[0] == expected_error_str\nassert fixed_person.to_dict() == person_dct\n\n# nested structure will be detected, and changed to FinalDictObj\nchessboard_data = {\n 'position': [\n [{'name': 'knight'}, {'name': 'pawn'}],\n [{'name': 'pawn'}, {'name': 'queen'}],\n ]\n}\nchessboard_obj = FinalDictObj(chessboard_data)\n# test comparing instances of FinalDictObj\nassert FinalDictObj(chessboard_data) == FinalDictObj(chessboard_data)\nassert isinstance(chessboard_obj.position, tuple)\nassert isinstance(chessboard_obj.position[0][0], FinalDictObj)\nassert chessboard_obj.position[1][1].name == 'queen'\nwith pytest.raises(RuntimeError) as __:\n chessboard_obj.position[1][1].name = 'knight'\n\n# test for keyword/non-identifier key as attribute\nfinal_obj_dict = FinalDictObj({\n 'class': 'MyClass', # 'class' is keyword in Python, so obj.class will cause SyntaxError\n})\nassert final_obj_dict['class'] == 'MyClass'\nassert getattr(final_obj_dict, 'class') == 'MyClass'\nassert final_obj_dict._class == 'MyClass'\n\n# test copy/deepcopy of FileDictObj\nimport copy\nperson = FinalDictObj({'name': 'albert', 'age': 33})\nteam = FinalDictObj({'leader': person})\nshallow_copy_of_team = copy.copy(team)\nassert team.leader is shallow_copy_of_team.leader\nassert team.leader == shallow_copy_of_team.leader\n\ndeep_copy_of_team = copy.deepcopy(team)\nassert team.leader is not deep_copy_of_team.leader\nassert team.leader == deep_copy_of_team.leader\n\n```\n\n#### RangeKeyDict\n\n```python3\nimport pytest\nfrom pythonic_toolbox.utils.dict_utils import RangeKeyDict\n\n# test normal case\nrange_key_dict: RangeKeyDict[float, str] = RangeKeyDict({\n (float('-inf'), 0): 'Negative',\n (0, 60): 'F', # 0 <= val < 60\n (60, 70): 'D', # 60 <= val < 70\n (70, 80): 'C', # 70 <= val < 80\n (80, 90): 'B', # 80 <= val < 90\n (90, 100): 'A', # 90 <= val < 100\n 100: 'A+', # val == 100\n})\n\n# Big O of querying is O(log n), n is the number of ranges, due to using bisect inside\nassert range_key_dict[-1] == 'Negative'\nassert range_key_dict[0] == 'F'\nassert range_key_dict[55] == 'F'\nassert range_key_dict[60] == 'D'\nassert range_key_dict[75] == 'C'\nassert range_key_dict[85] == 'B'\nassert range_key_dict[95] == 'A'\nassert range_key_dict[100] == 'A+'\n\nwith pytest.raises(KeyError) as exec_info:\n _ = range_key_dict['95'] # when key is not comparable with other integer keys\nassert exec_info.value.args[0] == \"KeyError: '95' is not comparable with other keys\"\n\nwith pytest.raises(KeyError) as exec_info:\n _ = range_key_dict[150]\nassert exec_info.value.args[0] == 'KeyError: 150'\n\nassert range_key_dict.get(150, 'N/A') == 'N/A'\n\n# test comparison with other RangeKeyDict\nassert RangeKeyDict({(0, 10): '1'}) == RangeKeyDict({(0, 10): '1'})\nassert RangeKeyDict({(0, 10): '1'}) != RangeKeyDict({(0, 10): '2'})\nassert RangeKeyDict({(0, 10): '1'}) != RangeKeyDict({(0, 1000): '1'})\n\nwith pytest.raises(ValueError):\n # [1, 1) is not a valid range\n # there's no value x satisfy 1 <= x < 1\n RangeKeyDict({(1, 1): '1'})\n\nwith pytest.raises(ValueError):\n # [1, -1) is not a valid range\n RangeKeyDict({(1, -1): '1'})\n\n# validate input keys types and detect range overlaps(segment intersect)\nwith pytest.raises(ValueError) as exec_info:\n RangeKeyDict({\n (0, 10): 'val-between-0-and-10',\n (0, 5): 'val-between-0-and-5'\n })\nexpected_error_msg = (\"Duplicated left boundary key 0 detected: \"\n \"(0, 10): 'val-between-0-and-10', (0, 5): 'val-between-0-and-5'\")\nassert exec_info.value.args[0] == expected_error_msg\n\nwith pytest.raises(ValueError) as exec_info:\n RangeKeyDict({\n (0, 10): 'val-between-0-and-10',\n (5, 15): 'val-between-5-and-15'\n })\nexpected_error_msg = (\"Overlap detected: \"\n \"(0, 10): 'val-between-0-and-10', (5, 15): 'val-between-5-and-15'\")\nassert exec_info.value.args[0] == expected_error_msg\n\n# test RangeKeyDict with no continuous ranges\nrange_key_dict: RangeKeyDict[float, str] = RangeKeyDict({\n (0, 60): 'F', # 0 <= val < 60\n (70, 80): 'C', # 70 <= val < 80\n})\n\nassert range_key_dict[10] == 'F'\n\nwith pytest.raises(KeyError) as exec_info:\n _ = range_key_dict[-100]\nassert exec_info.value.args[0] == 'KeyError: -100'\n\nwith pytest.raises(KeyError) as exec_info:\n _ = range_key_dict[65]\nassert exec_info.value.args[0] == 'KeyError: 65'\n\nwith pytest.raises(KeyError) as exec_info:\n _ = range_key_dict[100]\nassert exec_info.value.args[0] == 'KeyError: 100'\n\nfrom functools import total_ordering\n\n@total_ordering\nclass Age:\n def __init__(self, val: float):\n if not isinstance(val, (int, float)):\n raise ValueError('Invalid age value')\n self.val = val\n\n def __le__(self, other):\n return self.val <= other.val\n\n def __repr__(self):\n return f'Age({repr(self.val)})'\n\n def __hash__(self):\n return hash(self.val)\n\nage_categories_map: RangeKeyDict[Age, str] = RangeKeyDict({\n (Age(0), Age(2)): 'Baby',\n (Age(2), Age(15)): 'Children',\n (Age(15), Age(25)): 'Youth',\n (Age(25), Age(65)): 'Adults',\n (Age(65), Age(123)): 'Seniors',\n})\n\nassert age_categories_map[Age(0.5)] == 'Baby'\nassert age_categories_map[Age(12)] == 'Children'\nassert age_categories_map[Age(20)] == 'Youth'\nassert age_categories_map[Age(35)] == 'Adults'\nassert age_categories_map[Age(70)] == 'Seniors'\n\n```\n\n#### StrKeyIdDict\n\n```python3\nimport pytest\nfrom pythonic_toolbox.utils.dict_utils import StrKeyIdDict\n\ndata = {1: 'a', 2: 'b', '3': 'c'}\nmy_dict = StrKeyIdDict(data)\n\n# usage: value can be accessed by id (str: int-like/uuid-like/whatever) or id (int)\nassert my_dict['1'] == my_dict[1] == 'a'\nassert my_dict.keys() == {'1', '2', '3'} # all keys are str type\nmy_dict['4'] = 'd'\nassert my_dict['4'] == 'd'\nmy_dict[4] = 'd'\nassert my_dict['4'] == 'd'\nmy_dict.update({4: 'd'})\nassert my_dict['4'] == 'd'\n\n# test comparing instances of the class\nassert StrKeyIdDict(data) == StrKeyIdDict(data)\nassert StrKeyIdDict(data) != StrKeyIdDict(dict(data, **{'4': 'd'}))\nassert StrKeyIdDict(data) == {'1': 'a', '2': 'b', '3': 'c'}\nassert StrKeyIdDict(data) != {'1': 'a', '2': 'b', '3': 'd'}\nassert StrKeyIdDict(data) != {1: 'a', 2: 'b', 3: 'c'} # StrKeyIdDict assumes all keys are strings\n\n# test delete key\ndel my_dict[4]\nassert my_dict.keys() == {'1', '2', '3'} # '4' is not in the dict anymore\n\n# assign value to an arbitrary string key that is not in the dict\nmy_dict.update({'some-uuid': 'something'})\nassert my_dict['some-uuid'] == 'something'\n\nwith pytest.raises(TypeError):\n # key '1', 1 both stands for key '1',\n # so we get duplicated keys when initializing instance, oops!\n my_dict = StrKeyIdDict({'1': 'a', 1: 'A'})\n\nassert my_dict.get(1) == 'a'\nassert my_dict.get('NotExistKey') is None\nassert my_dict.get('NotExistKey', 'NotExistValue') == 'NotExistValue'\n\n# test edge cases\nassert StrKeyIdDict() == {}\n\n# test shallow copy\nmy_dict[5] = ['e1', 'e2', 'e3']\ncopy_dict = my_dict.copy()\ncopy_dict[1] = 'A'\nassert my_dict[1] == 'a'\nmy_dict['5'].append('e4')\nassert copy_dict['5'] == ['e1', 'e2', 'e3', 'e4']\n\n# test deep copy\nfrom copy import deepcopy\n\ncopy_dict = deepcopy(my_dict)\nmy_dict[5].append('e5')\nassert my_dict['5'] == ['e1', 'e2', 'e3', 'e4', 'e5']\nassert copy_dict[5] == ['e1', 'e2', 'e3', 'e4']\n\n# test constructor\nmy_dict = StrKeyIdDict(uuid1='a', uuid2='b')\nassert my_dict['uuid1'] == 'a'\n\n# test constructor (from keys)\nmy_dict = StrKeyIdDict.fromkeys([1, 2, 3], None)\nassert my_dict == {'1': None, '2': None, '3': None}\n# test update and overwrite\nmy_dict.update(StrKeyIdDict({1: 'a', 2: 'b', 3: 'c', 4: 'd'}))\nassert my_dict == {'1': 'a', '2': 'b', '3': 'c', '4': 'd'}\n\nmy_dict = StrKeyIdDict([(1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')])\nassert my_dict['1'] == my_dict[1] == 'a'\n\n# reassign StrKeyIdDict instance to another StrKeyIdDict instance\nmy_dict = StrKeyIdDict(my_dict)\nassert my_dict == {'1': 'a', '2': 'b', '3': 'c', '4': 'd'}\nassert dict(my_dict) == {'1': 'a', '2': 'b', '3': 'c', '4': 'd'}\n\n# test case when \"key\" is \"data\", which is a reserved keyword inside StrKeyIdDict\nmy_dict = StrKeyIdDict({'data': 'data_value', '1': 'a'})\nassert my_dict['data'] == 'data_value'\nassert my_dict['1'] == 'a'\n# delete key 'data', should not affect other keys\ndel my_dict['data']\nassert my_dict['1'] == 'a'\n\n```\n\n#### collect_leaves\n\n```python3\nfrom pythonic_toolbox.utils.dict_utils import collect_leaves\n\n# a nested dict-like struct\nmy_dict = {\n 'node_1': {\n 'node_1_1': {\n 'node_1_1_1': 'A',\n },\n 'node_1_2': {\n 'node_1_2_1': 'B',\n 'node_1_2_2': 'C',\n 'node_1_2_3': None,\n },\n 'node_1_3': [ # dict list\n {\n 'node_1_3_1_1': 'D',\n 'node_1_3_1_2': 'E',\n },\n {\n 'node_1_3_2_1': 'FF',\n 'node_1_3_2_2': 'GG',\n }\n ]\n }}\n\nexpected = ['A', 'B', 'C', None, 'D', 'E', 'FF', 'GG']\nassert collect_leaves(my_dict) == expected\n\nexpected = ['A', 'B', 'C', 'D', 'E', 'FF', 'GG']\nassert collect_leaves(my_dict, leaf_pred=lambda lf: lf) == expected\n\nassert collect_leaves(my_dict, keypath_pred=lambda kp: len(kp) == 1) == []\n\nexpected = ['B', 'C']\nassert collect_leaves(my_dict, keypath_pred=lambda kp: kp[-1] in {'node_1_2_1', 'node_1_2_2'}) == expected\n\nexpected = ['C']\nassert collect_leaves(my_dict, leaf_pred=lambda lf: lf == 'C') == expected\nassert collect_leaves(my_dict,\n keypath_pred=lambda kp: kp[-1] == 'node_1_2_2',\n leaf_pred=lambda lf: lf == 'C') == expected\n\nassert collect_leaves(my_dict,\n keypath_pred=lambda kp: kp[-1] == 'node_1_1_1',\n leaf_pred=lambda lf: lf == 'C') == []\n\nexpected = ['D', 'E', 'FF', 'GG']\nassert collect_leaves(my_dict,\n keypath_pred=lambda kp: len(kp) >= 2 and kp[-2] == 'node_1_3') == expected\n\nexpected = ['FF', 'GG']\nassert collect_leaves(my_dict,\n keypath_pred=lambda kp: len(kp) >= 2 and kp[-2] == 'node_1_3',\n leaf_pred=lambda lf: isinstance(lf, str) and len(lf) == 2) == expected\n\n# edge cases\nassert collect_leaves([]) == []\nassert collect_leaves({}) == []\nassert collect_leaves(None) == []\n\n```\n\n#### dict_until\n\n```python3\nfrom pythonic_toolbox.utils.dict_utils import dict_until\n\ndata = {'full_name': 'Albert Lee', 'pen_name': None}\nassert dict_until(data, keys=['name', 'full_name']) == 'Albert Lee'\nassert dict_until(data, keys=['full_name', 'name']) == 'Albert Lee'\nassert dict_until(data, keys=['name', 'english_name']) is None\nassert dict_until(data, keys=['name', 'english_name'], default='anonymous') == 'anonymous'\n# test when pen_name is set None on purpose\nassert dict_until(data, keys=['pen_name'], default='anonymous') is None\n# test when value with None value is not acceptable\nassert dict_until(data, keys=['pen_name'], terminate=lambda x: x is not None, default='anonymous') == 'anonymous'\n\n```\n\n#### select_list_of_dicts\n\n```python3\nfrom pythonic_toolbox.utils.dict_utils import select_list_of_dicts\n\ndict_lst = [\n {'name': 'Tony Stark', 'sex': 'male', 'age': 49, 'alias': 'Iron Man'},\n {'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},\n # another Peter Parker from multiverse\n {'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},\n # age unknown for Carol Danvers, no age field\n {'name': 'Carol Danvers', 'sex': 'female', 'alias': 'Captain Marvel'},\n {'name': 'Natasha Romanoff', 'sex': 'female', 'age': 35, 'alias': 'Black Widow'},\n]\n\nassert select_list_of_dicts(dict_lst, look_like={'name': 'Peter Parker'}) == [\n {'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},\n {'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'}]\n\nassert select_list_of_dicts(dict_lst, look_like={'sex': 'female'}) == [\n {'name': 'Carol Danvers', 'sex': 'female', 'alias': 'Captain Marvel'},\n {'name': 'Natasha Romanoff', 'sex': 'female', 'age': 35, 'alias': 'Black Widow'}]\n\nassert select_list_of_dicts(dict_lst, look_like={'sex': 'female'}, keys=['name']) == [\n {'name': 'Carol Danvers'}, {'name': 'Natasha Romanoff'}]\n\n# unique is supported for return list\nassert select_list_of_dicts(dict_lst, look_like={'sex': 'male'}, keys=['name', 'age']) == [\n {'name': 'Tony Stark', 'age': 49},\n {'name': 'Peter Parker', 'age': 16},\n {'name': 'Peter Parker', 'age': 16},\n]\n\nassert select_list_of_dicts(dict_lst, look_like={'sex': 'male'}, keys=['name', 'age'], unique=True) == [\n {'name': 'Tony Stark', 'age': 49},\n {'name': 'Peter Parker', 'age': 16}]\n\n# dict keys are ordered as the keys passed-in\nassert list(select_list_of_dicts(dict_lst, keys=['name', 'age'], unique=True)[0].keys()) == ['name', 'age']\nassert list(select_list_of_dicts(dict_lst, keys=['age', 'name'], unique=True)[0].keys()) == ['age', 'name']\n\n# locate Captain Marvel, with default val for missing key\nassert select_list_of_dicts(dict_lst,\n look_like={'alias': 'Captain Marvel'},\n keys=['name', 'sex', 'age', 'alias'],\n val_for_missing_key='Unknown')[0]['age'] == 'Unknown'\n\n# edge cases, get the original dict\nassert select_list_of_dicts([]) == []\nassert select_list_of_dicts(dict_lst) == dict_lst\n\n# new list of dicts is returned, leaving the original list of dicts untouched\nblack_widow = select_list_of_dicts(dict_lst, look_like={'name': 'Natasha Romanoff'})[0]\nblack_widow['age'] += 1\nassert black_widow['age'] == 36\n# we don't modify the original dict data, Natasha is always 35 years old\nassert select_list_of_dicts(dict_lst, look_like={'name': 'Natasha Romanoff'})[0]['age'] == 35\n\n# preds provide more flexibility, filter the ones with age info\nassert len(select_list_of_dicts(dict_lst, preds=[lambda d: 'age' in d, lambda d: d['age'] >= 0])) == 4\nassert len(select_list_of_dicts(dict_lst, preds=[lambda d: 'age' in d, lambda d: d['age'] >= 0], unique=True)) == 3\n\n# combine look_like and preds parameters\nexpected = [{'name': 'Tony Stark', 'sex': 'male', 'age': 49, 'alias': 'Iron Man'}]\nassert select_list_of_dicts(dict_lst, look_like={'sex': 'male'},\n preds=[lambda d: 'age' in d, lambda d: d['age'] > 20]) == expected\n\n# empty list is returned if no dict matches the criteria\nassert select_list_of_dicts(dict_lst, look_like={'sex': 'male'},\n preds=[lambda d: 'sex' in d and d['sex'] == 'female']) == []\n\n```\n\n#### unique_list_of_dicts\n\n```python3\nfrom pythonic_toolbox.utils.dict_utils import unique_list_of_dicts\n\ndict_lst = [\n {'name': 'Tony Stark', 'sex': 'male', 'age': 49, 'alias': 'Iron Man'},\n {'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},\n # Peter Parkers from multiverse in same age.\n {'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},\n {'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},\n # test same dict, but the order of dict is different\n {'name': 'Peter Parker', 'sex': 'male', 'alias': 'Spider Man', 'age': 16},\n]\n\n# Only one Peter Parker will be kept, for all data are exactly same.\nassert unique_list_of_dicts(dict_lst) == [\n {'name': 'Tony Stark', 'sex': 'male', 'age': 49, 'alias': 'Iron Man'},\n {'name': 'Peter Parker', 'sex': 'male', 'age': 16, 'alias': 'Spider Man'},\n]\n\n# edge cases\nassert unique_list_of_dicts([]) == []\n\n```\n\n#### walk_leaves\n\n```python3\nfrom pythonic_toolbox.utils.dict_utils import walk_leaves\n\ndata = {\n 'k1': {\n 'k1_1': 1,\n 'k1_2': 2,\n },\n 'k2': 'N/A', # stands for not available\n}\n\nexpected = {\n 'k1': {\n 'k1_1': 2,\n 'k1_2': 4,\n },\n 'k2': 'N/A', # stands for not available\n}\nassert walk_leaves(data) == data # no transform function provided, just a deepcopy\nassert walk_leaves(data, trans_fun=lambda x: x * 2 if isinstance(x, int) else x) == expected\n\n# if inplace is set True, will change data inplace, return nothing\nassert walk_leaves(data, trans_fun=lambda x: x * 2 if isinstance(x, int) else x, inplace=True) is None\nassert data == expected\n\ndata = [{'name': 'lml', 'age': 33}, {'name': 'albert', 'age': 18}]\nexpected = [{'name': 'lml', 'age': 66}, {'name': 'albert', 'age': 36}]\nassert walk_leaves(data, trans_fun=lambda x: x * 2 if isinstance(x, int) else x) == expected\nassert walk_leaves(data, trans_fun=lambda x: x * 2 if isinstance(x, int) else x, inplace=True) is None\nassert data == expected\n\n# edge cases\nassert walk_leaves(None) is None\nassert walk_leaves([]) == []\nassert walk_leaves({}) == {}\nassert walk_leaves(None, inplace=True) is None\nassert walk_leaves([], inplace=True) is None\nassert walk_leaves({}, inplace=True) is None\n\n```\n\n### functional_utils\n\n#### filter_multi\n\n```python3\nfrom pythonic_toolbox.utils.functional_utils import lfilter_multi, filter_multi\nfrom collections.abc import Iterable\n\ndef is_even(x):\n return x % 2 == 0\n\ndef is_divisible_by_5(x):\n return x % 5 == 0\n\n# select numbers which are divisible by 2 and 5\nassert lfilter_multi([is_even, is_divisible_by_5], range(1, 30)) == [10, 20]\nassert lfilter_multi([is_even, is_divisible_by_5], [5, 10, 15, 20]) == [10, 20]\n\nfrom itertools import count, takewhile\n# if you want to pass an iterator, make sure the iterator will end/break,\n# Note: a bare count(start=0, step=2) will generate number like 0, 2, 4, 6, .... (never ends)\neven_numbers_less_equal_than_50 = takewhile(lambda x: x <= 50, count(start=0, step=2))\nexpected = [0, 10, 20, 30, 40, 50]\nassert lfilter_multi([is_even, is_divisible_by_5], even_numbers_less_equal_than_50) == expected\n\n# testing for filter_multi, not converted to list directly\nnum_iterator = filter_multi([is_even, is_divisible_by_5], [5, 10, 15, 20])\nassert type(num_iterator) is filter\nassert isinstance(num_iterator, Iterable)\nexpected = [10, 20]\nfor idx, value in enumerate(num_iterator):\n assert value == expected[idx]\n\n# when items are infinite, choose filter_multi instead of lfilter_multi\nexpected = [0, 10, 20, 30, 40, 50]\nfor idx, value in enumerate(filter_multi([is_even, is_divisible_by_5], count(start=0, step=1))):\n if value > 50:\n break\n else:\n assert value == expected[idx]\n\n```\n\n### list_utils\n\n#### filter_allowable\n\n```python3\nfrom pythonic_toolbox.utils.list_utils import filter_allowable\n\nfruits = ['apple', 'banana', 'orange']\nvegetables = ['carrot', 'potato', 'tomato']\nmeats = ['beef', 'chicken', 'fish']\n\nfoods = fruits + vegetables + meats\n\nassert list(filter_allowable(foods)) == foods\nassert list(filter_allowable(foods, allow_list=[], block_list=[])) == foods\nassert list(filter_allowable(foods, allow_list=['apple', 'banana', 'blueberry'])) == ['apple', 'banana']\nassert list(filter_allowable(foods, allow_list=[], block_list=foods)) == []\nassert list(filter_allowable(foods, block_list=meats)) == fruits + vegetables\nassert list(filter_allowable(foods, allow_list=['apple'], block_list=[])) == ['apple']\nassert list(filter_allowable(foods, allow_list=['apple'], block_list=['apple'])) == []\nassert list(filter_allowable(foods + ['blueberry'], allow_list=[], block_list=foods)) == ['blueberry']\nassert list(filter_allowable(['blueberry'], allow_list=[], block_list=[])) == ['blueberry']\nassert list(filter_allowable(['blueberry'], allow_list=[], block_list=['apple', 'banana'])) == ['blueberry']\nassert list(filter_allowable(['blueberry'], allow_list=['orange'], block_list=['apple', 'banana'])) == []\n\n# test cases with parameter key\nassert list(filter_allowable(foods, allow_list=['a', 'b'], key=lambda x: x[0])) == ['apple', 'banana', 'beef']\n\n# test some basic cases\nassert list(filter_allowable()) == []\nassert list(filter_allowable(candidates=None)) == []\nassert list(filter_allowable(candidates=[])) == []\nassert list(filter_allowable(candidates=[], allow_list=[], block_list=[])) == []\n\n```\n\n#### sort_with_custom_orders\n\n```python3\nfrom operator import itemgetter\nfrom typing import List\n\nimport pytest\nfrom pythonic_toolbox.utils.list_utils import sort_with_custom_orders\n\n# basic usage\nvalues = ['branch2', 'branch1', 'branch3', 'master', 'release']\nexpected = ['master', 'release', 'branch1', 'branch2', 'branch3']\nassert sort_with_custom_orders(values, prefix_orders=['master', 'release']) == expected\nassert sort_with_custom_orders(values, prefix_orders=['master', 'release'], reverse=True) == expected[::-1]\n\nvalues = [1, 2, 3, 9, 9]\nexpected = [9, 9, 1, 2, 3]\nassert sort_with_custom_orders(values, prefix_orders=[9, 8, 7]) == expected\n\nvalues = [1, 2, 3, 9]\nexpected = [9, 2, 3, 1]\nassert sort_with_custom_orders(values, prefix_orders=[9], suffix_orders=[1]) == expected\n\nassert sort_with_custom_orders([]) == []\nassert sort_with_custom_orders([], prefix_orders=[], suffix_orders=[]) == []\nassert sort_with_custom_orders([], prefix_orders=['master']) == []\n\n# tests for unhashable values\nvalues = [[2, 2], [1, 1], [3, 3], [6, 0]]\nassert sort_with_custom_orders(values, prefix_orders=[[3, 3]]) == [[3, 3], [1, 1], [2, 2], [6, 0]]\n# if \"key\" is provided, items are sorted in order of key(item)\n# items in prefix_orders/suffix_orders don't need to be one-one correspondence with items to sort\n# sum([6]) == sum([3, 3]) == sum([6, 0])\nassert sort_with_custom_orders(values, prefix_orders=[[6]], key=sum) == [[3, 3], [6, 0], [1, 1], [2, 2]]\n\n# tests for list of dicts\nvalues = [{2: 2}, {1: 1}, {1: 2}]\nassert sort_with_custom_orders(values, prefix_orders=[{2: 2}],\n key=lambda data: sum(data.values())) == [{2: 2}, {1: 2}, {1: 1}]\n\nbranch_info: List[dict] = [{'branch': 'master', 'commit_id': 'v1.2'}, {'branch': 'release', 'commit_id': 'v1.1'}]\n# Assume that we prefer choosing branch in order: release > master > others (develop, hotfix etc.)\nres = sort_with_custom_orders(branch_info,\n prefix_orders=[{'branch': 'release'}, {'branch': 'master'}],\n key=itemgetter('branch'))\nexpected = [{'branch': 'release', 'commit_id': 'v1.1'}, {'branch': 'master', 'commit_id': 'v1.2'}]\nassert res == expected\n\nbranch_info = [{'branch': 'develop', 'commit_id': 'v1.3'}, {'branch': 'master', 'commit_id': 'v1.2'}]\nres = sort_with_custom_orders(branch_info,\n prefix_orders=[{'branch': 'release'}, {'branch': 'master'}],\n key=itemgetter('branch'))\nexpected = [{'branch': 'master', 'commit_id': 'v1.2'}, {'branch': 'develop', 'commit_id': 'v1.3'}]\nassert res == expected\n\n# tests for exceptions\nwith pytest.raises(ValueError) as exec_info:\n sort_with_custom_orders([1, 2, 3], prefix_orders=[3], suffix_orders=[3])\nassert exec_info.value.args[0] == 'prefix and suffix contains same value'\n\nwith pytest.raises(ValueError) as exec_info:\n sort_with_custom_orders([1, 2, 3], prefix_orders=[1, 1])\nassert exec_info.value.args[0] == 'prefix_orders contains duplicated values'\n\n# tests for class\nclass Person:\n def __init__(self, id, name, age):\n self.id = id\n self.name = name\n self.age = age\n\n def __lt__(self, other: 'Person'):\n return self.age < other.age\n\n def __eq__(self, other: 'Person'):\n return self.age == other.age\n\n def __hash__(self):\n return self.id\n\n def __str__(self):\n return f'Person({self.id}, {self.name}, {self.age})'\n\n def __repr__(self):\n return str(self)\n\nAlbert = Person(1, 'Albert', 28)\nAlice = Person(2, 'Alice', 26)\nMenglong = Person(3, 'Menglong', 33)\n\npersons = [Albert, Alice, Menglong]\nexpected = [Alice, Albert, Menglong]\nassert sort_with_custom_orders(persons) == expected\n\nexpected = [Menglong, Alice, Albert]\nassert sort_with_custom_orders(persons, prefix_orders=[Menglong, Person(4, 'Anyone', 40)]) == expected\n\n```\n\n#### unpack_list\n\n```python3\nimport pytest\nfrom pythonic_toolbox.utils.list_utils import unpack_list\n\nfirst, second, third = unpack_list(['a', 'b', 'c', 'd'], target_num=3)\nassert first == 'a' and second == 'b' and third == 'c'\n\nfirst, second, third = unpack_list(['a', 'b'], target_num=3, default=None)\nassert first == 'a' and second == 'b' and third is None\n\nfirst, second, third = unpack_list(range(1, 3), target_num=3, default=None)\nassert first == 1 and second == 2 and third is None\n\nfirst, second, third = unpack_list([], target_num=3, default=0)\nassert first == second == third == 0\n\nfirst, second, *rest = unpack_list(['a', 'b', 'c'], target_num=4, default='x')\nassert first == 'a' and second == 'b' and rest == ['c', 'x']\n\n# test case for type range\nfirst, second, third = unpack_list(range(1, 3), target_num=3, default=None)\nassert first == 1 and second == 2 and third is None\n\ndef fib():\n a, b = 0, 1\n while 1:\n yield a\n a, b = b, a + b\n\n# test case for type generator\nfib_generator = fib() # generates data like [0, 1, 1, 2, 3, 5, 8, 13, 21 ...]\nfirst, second, third, *rest = unpack_list(fib_generator, target_num=6)\nassert first == 0 and second == 1 and third == 1\nassert rest == [2, 3, 5]\nseventh, eighth = unpack_list(fib_generator, target_num=2)\nassert seventh == 8 and eighth == 13\n\n# test edge case, nothing to unpack\nempty = unpack_list([], target_num=0, default=None)\nassert empty == []\n\nres = unpack_list([], target_num=2, default=None)\nassert res == [None, None]\n\nempty = unpack_list(['a', 'b'], target_num=0, default=None)\nassert empty == []\n\nempty = unpack_list(range(0, 0), target_num=0)\nassert empty == []\n\nempty = unpack_list(iter([]), target_num=0, default=None)\nassert empty == []\n\nwith pytest.raises(ValueError):\n # ValueError: not enough values to unpack (expected 3, got 2)\n first, second, third = unpack_list([1, 2], target_num=2)\n\n```\n\n#### until\n\n```python3\nfrom itertools import count\n\nfrom pythonic_toolbox.utils.list_utils import until\n\n# basic usage\ncounter = count(1, 2) # generator of odd numbers: 1, 3, 5, 7 ...\nassert until(counter, lambda x: x > 10) == 11\n\nassert until([1, 2, 3], lambda x: x > 10, default=11) == 11\n\n# test case for when there's no default value and no item in the iterable satisfies the condition\nassert until([1, 2, 3], lambda x: x > 10) is None\n\n# edge cases\nassert until([], default=3) == 3 # nothing provided, return default\nassert until(None, lambda x: x > 10, default=11) == 11\n\n# test case for when there's no item in the counter satisfies the condition\n# the following codes will run forever, so comment them out\n# counter = count(1, 2) # generator of odd numbers: 1, 3, 5, 7 ...\n# assert until(counter, lambda x: x % 2 == 0) is None\n\n# test case for when max_iter_num is provided, only iterate the counter for max_iter_num times\ncounter = count(1, 2) # generator of odd numbers: 1, 3, 5, 7 ...\nassert until(counter, lambda x: x % 2 == 0, default=None, max_iter_num=100) is None\n\nnumbers = [1, 2, 3, 4, 5, 6]\nassert until(numbers, lambda x: x >= 5, default=None, max_iter_num=1) is None\nassert until(numbers, lambda x: x >= 5, default=None, max_iter_num=4) is None\nassert until(numbers, lambda x: x >= 5, default=None, max_iter_num=5) == 5\nassert until(numbers, lambda x: x >= 5, default=None, max_iter_num=100) == 5\n\n```\n\n### string_utils\n\n#### substitute_string_template_dict\n\n```python3\nfrom unittest.mock import patch, PropertyMock\n\nimport pytest\nfrom pythonic_toolbox.utils.string_utils import substitute_string_template_dict, CycleError\n\n# simple usage\n# both $variable ${variable} declarations are supported in string template format\nstr_template_dict = {\n 'greeting': 'Good Morning, Everyone!',\n 'first_name': 'Albert',\n 'last_name': 'Lee',\n 'full_name': '$first_name $last_name',\n 'age': 34,\n 'speech': '$greeting, I am $full_name, a ${age}-year-old programmer, very glad to meet you!'\n}\noutput_dict = substitute_string_template_dict(str_template_dict)\nassert output_dict['full_name'] == 'Albert Lee'\nexpected_speech = 'Good Morning, Everyone!, I am Albert Lee, a 34-year-old programmer, very glad to meet you!'\nassert output_dict['speech'] == expected_speech\n\n# complex usage, with dynamic values, and multi value-providing holders\nstr_template_dict = {\n 'first_name': 'Daenerys',\n 'last_name': 'Targaryen',\n 'nick_name': 'Dany',\n 'full_name': '$first_name $last_name',\n 'speech': \"$nick_name: I'm $full_name ($title1, $title2, $title3), it's $current_time_str, $greeting!\",\n}\n\nvariables_dict = {'title1': 'Queen of Meereen',\n 'title2': 'Mother of Dragons'}\n\nclass DynamicVariables:\n @property\n def current_time_str(self):\n import datetime\n return datetime.datetime.now().strftime(\"%H:%M:%S\")\n\nclass DefaultUnknownTitle:\n \"\"\"\n A class will always return UnknownTitle, when try to access attribute like\n title1, title2, ..., titleX\n \"\"\"\n\n def __getattribute__(self, item):\n if isinstance(item, str) and item.startswith('title') and item[len(item) - 1:].isdigit():\n return 'UnknownTitle'\n return super(DefaultUnknownTitle, self).__getattribute__(item)\n\nexpected_speech = (\"Dany: I'm Daenerys Targaryen (Queen of Meereen, Mother of Dragons, UnknownTitle), \"\n \"it's 08:00:00, good morning everyone!\")\n\n# using mock to make DynamicVariables().current_time_str always return 08:00:00\nwith patch.object(DynamicVariables, 'current_time_str', return_value='08:00:00', new_callable=PropertyMock):\n output_dict = substitute_string_template_dict(str_template_dict, variables_dict, DynamicVariables(),\n DefaultUnknownTitle(),\n greeting='good morning everyone')\n assert output_dict['speech'] == expected_speech\n\n# edge cases\nassert substitute_string_template_dict({}) == {}\n\n# cycle detection\nstr_template_dict = {\n 'variable_a': 'Hello $variable_b', # variable_a depends on variable_b\n 'variable_b': 'Hello $variable_a', # variable_b depends on variable_a, it's a cycle!\n}\n\nwith pytest.raises(CycleError) as exec_info:\n substitute_string_template_dict(str_template_dict)\n\n```\n\n### context\n\n#### SkipContext\n\n```python3\nimport itertools\n\nimport pytest\nfrom pythonic_toolbox.utils.context_utils import SkipContext\n\n# Usage: define a class that inherits the SkipContext,\n# and takes control of the skip or not logic\nclass MyWorkStation(SkipContext):\n\n def __init__(self, week_day: str):\n working_days = {'monday', 'tuesday', 'wednesday', 'thursday', 'friday'}\n weekends = {'saturday', 'sunday'}\n\n if week_day.lower() not in working_days.union(weekends):\n raise ValueError(f'Invalid weekday {week_day}')\n\n skip = True if week_day.lower() in weekends else False\n super(MyWorkStation, self).__init__(skip=skip)\n\nseven_week_days = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']\nlogged_opening_days = []\ntotal_working_hours = 0\n\nfor cur_week_day in seven_week_days:\n # MyWorkStation will skip the code block when encountering weekends\n with MyWorkStation(week_day=cur_week_day):\n # log this working day\n logged_opening_days.append(cur_week_day)\n # accumulate working hours, 8 hours on each working day\n total_working_hours += 8\n\n# only working days are logged\nassert logged_opening_days == ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday']\nassert total_working_hours == 8 * 5\n\n# test basic SkipContext\ncount_iterator = itertools.count(start=0, step=1)\n\nflg_skip = True\nwith SkipContext(skip=flg_skip):\n # if skip = True, all codes inside the context will be skipped(not executed)\n next(count_iterator) # this will not be executed\n assert sum([1, 1]) == 3\n raise Exception('Codes will not be executed')\n\nassert next(count_iterator) == 0 # check previous context is skipped\n\nflg_skip = False\nwith SkipContext(skip=flg_skip):\n # codes will be executed as normal, if skip = False\n next(count_iterator) # generate value 1\n assert sum([1, 1]) == 2\n\nassert next(count_iterator) == 2 # check previous context is executed\n\nwith pytest.raises(Exception) as exec_info:\n with SkipContext(skip=False):\n # if skip = False, this SkipContextManager is transparent,\n # internal exception will be detected as normal\n raise Exception('MyError')\nassert exec_info.value.args[0] == 'MyError'\n\n# another example: ensure there will be only one job, who acquire the lock, run the increase +1\n\nfrom multiprocessing import Manager, Pool\nimport time\n\nfrom pythonic_toolbox.utils.context_utils import SkipContext\n\n\ndef plain_cronjob_increase(ns, lock):\n start = time.time()\n with lock:\n now = time.time()\n if now - start >= 0.5:\n pass\n else:\n ns.cnt += 1\n time.sleep(1)\n return ns.cnt\n\n\nclass PreemptiveLockContext(SkipContext):\n def __init__(self, lock):\n self.start_time = time.perf_counter()\n self.lock = lock\n self.acquired = self.lock.acquire(timeout=0.5)\n skip = not self.acquired\n super(PreemptiveLockContext, self).__init__(skip=skip)\n\n def __exit__(self, type, value, traceback):\n if self.acquired:\n time.sleep(1)\n self.lock.release()\n if type is None:\n return # No exception\n else:\n if issubclass(type, self.SkipContentException):\n return True # Suppress special SkipWithBlockException\n return False\n\n\ndef cronjob_increase(ns, lock):\n # for those who cannot acquire the lock within some time\n # this context block will be skipped, quite simple\n with PreemptiveLockContext(lock):\n ns.cnt += 1\n return ns.cnt\n\n\n\nmanager = Manager()\nlock = manager.Lock()\nns = manager.Namespace()\npool = Pool(2)\n\nns.cnt = 0\nprocesses = [pool.apply_async(plain_cronjob_increase, args=(ns, lock)) for __ in range(0, 2)]\nresult = [p.get() for p in processes]\nassert result == [1, 1]\nassert ns.cnt == 1\n\n# reset global cnt=0\nns.cnt = 0\nprocesses = [pool.apply_async(cronjob_increase, args=(ns, lock)) for __ in range(0, 2)]\nresult = [p.get() for p in processes]\nassert result == [1, 1]\nassert ns.cnt == 1\n\n```\n\n",
"bugtrack_url": null,
"license": "Apache2.0",
"summary": "a toolbox with pythonic utils, tools",
"version": "1.1.39",
"project_urls": {
"Homepage": "https://github.com/albertmenglongli/pythonic-toolbox"
},
"split_keywords": [
"toolbox"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "99ea03fdf7e398b8fa88d19531a024e257f6139d59d261f87169e72d90472a53",
"md5": "01730d524f79a32bb5ee582eae99d9d8",
"sha256": "9064b346ecd655d2889475cc17dd6617a74d18f2e90956b95ec6f3904609ce49"
},
"downloads": -1,
"filename": "pythonic_toolbox-1.1.39-py3-none-any.whl",
"has_sig": false,
"md5_digest": "01730d524f79a32bb5ee582eae99d9d8",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": null,
"size": 31765,
"upload_time": "2024-03-18T02:42:48",
"upload_time_iso_8601": "2024-03-18T02:42:48.486550Z",
"url": "https://files.pythonhosted.org/packages/99/ea/03fdf7e398b8fa88d19531a024e257f6139d59d261f87169e72d90472a53/pythonic_toolbox-1.1.39-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "5f7e5d121510194fc42afeaaa426fd30a0b514248f04478e64543c9884bec106",
"md5": "2b7b94f2c2144e1ee4ef58646c1ee76d",
"sha256": "35cf55a0dfb41c799d8b059f1f175ab708e87365d71365432d508f641bd8520a"
},
"downloads": -1,
"filename": "pythonic-toolbox-1.1.39.tar.gz",
"has_sig": false,
"md5_digest": "2b7b94f2c2144e1ee4ef58646c1ee76d",
"packagetype": "sdist",
"python_version": "source",
"requires_python": null,
"size": 53923,
"upload_time": "2024-03-18T02:42:51",
"upload_time_iso_8601": "2024-03-18T02:42:51.348565Z",
"url": "https://files.pythonhosted.org/packages/5f/7e/5d121510194fc42afeaaa426fd30a0b514248f04478e64543c9884bec106/pythonic-toolbox-1.1.39.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-03-18 02:42:51",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "albertmenglongli",
"github_project": "pythonic-toolbox",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"requirements": [],
"lcname": "pythonic-toolbox"
}