# Timeout decorator for functions! Works on Windows with multiprocessing, threading, subprocess!
### Kill a function after a certain time.
#### Tested on Windows 10 / Python 3.9.13
```python
pip install ofenaus
```
### Here are some examples
##### There might be situations where killing the function doesn't work (asyncio, for example)
```python
import subprocess
import threading
from multiprocessing import Process, Lock
import os
from ofenaus import ofen_aus, sleep # use sleep from this module instead time.sleep()
@ofen_aus(timeout=3, print_debug=True, print_exceptions=False)
def subprocess_ping():
pp = subprocess.Popen("ping -t 8.8.8.8") # Only subprocess.Popen can be killed! subprocess.run, subprocess.call etc. can't be killed.
while True:
sleep(1)
@ofen_aus(timeout=3, print_debug=True, print_exceptions=False)
def thread_lock_aquire():
my_lock = threading.Lock()
my_lock.acquire()
my_lock.acquire()
while True:
print("test aquire")
sleep(1)
@ofen_aus(timeout=3, print_debug=True, print_exceptions=False)
def deamon_thread():
def f_check(x: int) -> None:
for _ in range(x):
print(_)
sleep(1)
f_check(0)
test_1 = threading.Thread(target=f_check, args=(20,))
test_1.name = "xx"
test_1.daemon = True
test_1.start()
test_1.join()
@ofen_aus(
timeout=2,
timeout_value=2000, # the function will return this value if any kind (!) of Exception is raised, not only TimeoutError.
print_debug=False,
show_remaining_time=False,
print_exceptions=False,
timeout_message="Time is over!",
)
def testing2():
sleep(1)
a = 10
os.chdir("c:\\")
pp = subprocess.Popen("ping 8.8.8.8")
print(pp)
sleep(1)
b = 20
sleep(1)
c = a + b
sleep(1)
@ofen_aus(timeout=1, timeout_value="not good", show_remaining_time=False)
def testa(i45):
for k in range(2):
print(k + i45)
sleep(1)
return i45
@ofen_aus(timeout=4)
def fx(name):
while True:
print("hello", name)
sleep(1)
@ofen_aus(timeout=4)
def faax(l, i):
l.acquire()
try:
print("hello world", i)
sleep(5)
finally:
l.release()
if __name__ == "__main__":
p = Process(target=fx, args=("bob",))
p.start()
p.join()
do = subprocess_ping()
print(do)
thread_lock_aquire()
deamon_thread()
lock = Lock()
for num in range(5):
Process(target=faax, args=(lock, num)).start()
testa_value = testa(555)
print(testa_value)
hello bob
hello bob
hello bob
hello bob
hello bob
Time is over!
Pinging 8.8.8.8 with 32 bytes of data:
Reply from 8.8.8.8: bytes=32 time=9ms TTL=119
Reply from 8.8.8.8: bytes=32 time=12ms TTL=119
Reply from 8.8.8.8: bytes=32 time=9ms TTL=119
Reply from 8.8.8.8: bytes=32 time=10ms TTL=119
Time is over!
None
Time is over!
0
1
2
Time is over!
555
hello world 0
556
Time is over!
not good
Time is over!
hello worldhello world 2
1
Time is over!
Time is over!
hello world 4
Time is over!
Time is over!
```
Raw data
{
"_id": null,
"home_page": "https://github.com/hansalemaos/ofenaus",
"name": "ofenaus",
"maintainer": "",
"docs_url": null,
"requires_python": "",
"maintainer_email": "",
"keywords": "windows,decorator,timeout",
"author": "Johannes Fischer",
"author_email": "<aulasparticularesdealemaosp@gmail.com>",
"download_url": "https://files.pythonhosted.org/packages/b0/b4/7af7b3272a1332160da4541d6c6a7a7a03a8784f0b57965c48e3e0a8b16e/ofenaus-0.11.tar.gz",
"platform": null,
"description": "\n# Timeout decorator for functions! Works on Windows with multiprocessing, threading, subprocess!\n\n### Kill a function after a certain time.\n\n#### Tested on Windows 10 / Python 3.9.13 \n\n\n\n```python\n\npip install ofenaus\n\n```\n\n\n\n### Here are some examples \n\n##### There might be situations where killing the function doesn't work (asyncio, for example) \n\n\n\n```python\n\nimport subprocess\n\nimport threading\n\nfrom multiprocessing import Process, Lock\n\nimport os\n\n\n\nfrom ofenaus import ofen_aus, sleep # use sleep from this module instead time.sleep() \n\n\n\n\n\n@ofen_aus(timeout=3, print_debug=True, print_exceptions=False)\n\ndef subprocess_ping():\n\n pp = subprocess.Popen(\"ping -t 8.8.8.8\") # Only subprocess.Popen can be killed! subprocess.run, subprocess.call etc. can't be killed.\n\n\t\n\n while True:\n\n sleep(1)\n\n\n\n\n\n@ofen_aus(timeout=3, print_debug=True, print_exceptions=False)\n\ndef thread_lock_aquire():\n\n my_lock = threading.Lock()\n\n my_lock.acquire()\n\n my_lock.acquire()\n\n while True:\n\n print(\"test aquire\")\n\n sleep(1)\n\n\n\n\n\n@ofen_aus(timeout=3, print_debug=True, print_exceptions=False)\n\ndef deamon_thread():\n\n def f_check(x: int) -> None:\n\n for _ in range(x):\n\n print(_)\n\n sleep(1)\n\n\n\n f_check(0)\n\n test_1 = threading.Thread(target=f_check, args=(20,))\n\n test_1.name = \"xx\"\n\n test_1.daemon = True\n\n test_1.start()\n\n test_1.join()\n\n\n\n\n\n@ofen_aus(\n\n timeout=2,\n\n timeout_value=2000, # the function will return this value if any kind (!) of Exception is raised, not only TimeoutError.\n\n print_debug=False,\n\n show_remaining_time=False,\n\n print_exceptions=False,\n\n timeout_message=\"Time is over!\",\n\n)\n\ndef testing2():\n\n sleep(1)\n\n a = 10\n\n os.chdir(\"c:\\\\\")\n\n pp = subprocess.Popen(\"ping 8.8.8.8\")\n\n print(pp)\n\n sleep(1)\n\n\n\n b = 20\n\n sleep(1)\n\n\n\n c = a + b\n\n sleep(1)\n\n\n\n\n\n@ofen_aus(timeout=1, timeout_value=\"not good\", show_remaining_time=False)\n\ndef testa(i45):\n\n for k in range(2):\n\n print(k + i45)\n\n sleep(1)\n\n return i45\n\n\n\n\n\n@ofen_aus(timeout=4)\n\ndef fx(name):\n\n while True:\n\n\n\n print(\"hello\", name)\n\n sleep(1)\n\n\n\n\n\n@ofen_aus(timeout=4)\n\ndef faax(l, i):\n\n l.acquire()\n\n try:\n\n print(\"hello world\", i)\n\n sleep(5)\n\n finally:\n\n l.release()\n\n\n\n\n\nif __name__ == \"__main__\":\n\n p = Process(target=fx, args=(\"bob\",))\n\n p.start()\n\n p.join()\n\n do = subprocess_ping()\n\n print(do)\n\n thread_lock_aquire()\n\n deamon_thread()\n\n lock = Lock()\n\n for num in range(5):\n\n Process(target=faax, args=(lock, num)).start()\n\n testa_value = testa(555)\n\n print(testa_value)\n\n\t\n\n\t\n\n\t\n\nhello bob\n\nhello bob\n\nhello bob\n\nhello bob\n\nhello bob\n\nTime is over!\n\nPinging 8.8.8.8 with 32 bytes of data:\n\nReply from 8.8.8.8: bytes=32 time=9ms TTL=119\n\nReply from 8.8.8.8: bytes=32 time=12ms TTL=119\n\nReply from 8.8.8.8: bytes=32 time=9ms TTL=119\n\nReply from 8.8.8.8: bytes=32 time=10ms TTL=119\n\nTime is over!\n\nNone\n\nTime is over!\n\n0\n\n1\n\n2\n\nTime is over!\n\n555\n\nhello world 0\n\n556\n\nTime is over!\n\nnot good\n\nTime is over!\n\nhello worldhello world 2 \n\n1\n\nTime is over!\n\nTime is over!\n\nhello world 4\n\nTime is over!\n\nTime is over!\t\n\n\n\n\n\n```\n\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "Timeout decorator for functions! Works on Windows with multiprocessing, threading, subprocess!",
"version": "0.11",
"split_keywords": [
"windows",
"decorator",
"timeout"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "a4f98fded130f8974923a03ca9e2b4288c6bae78ffc49441e4b00b037e6fa66b",
"md5": "a613b16c65cab90d1785e02ef3268c29",
"sha256": "b3561ad58a34a0bc06a0400b31344d748f26424b30f36d13386226e55e5ffbfa"
},
"downloads": -1,
"filename": "ofenaus-0.11-py3-none-any.whl",
"has_sig": false,
"md5_digest": "a613b16c65cab90d1785e02ef3268c29",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": null,
"size": 7483,
"upload_time": "2023-01-22T09:48:21",
"upload_time_iso_8601": "2023-01-22T09:48:21.761327Z",
"url": "https://files.pythonhosted.org/packages/a4/f9/8fded130f8974923a03ca9e2b4288c6bae78ffc49441e4b00b037e6fa66b/ofenaus-0.11-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "b0b47af7b3272a1332160da4541d6c6a7a7a03a8784f0b57965c48e3e0a8b16e",
"md5": "f0e57045f28a7fb67ba247ab5744d651",
"sha256": "41a07bbd366956e6b8e861f0b0c79f0423205b24efc8b927b85ebc184fb0a04b"
},
"downloads": -1,
"filename": "ofenaus-0.11.tar.gz",
"has_sig": false,
"md5_digest": "f0e57045f28a7fb67ba247ab5744d651",
"packagetype": "sdist",
"python_version": "source",
"requires_python": null,
"size": 6529,
"upload_time": "2023-01-22T09:48:23",
"upload_time_iso_8601": "2023-01-22T09:48:23.394547Z",
"url": "https://files.pythonhosted.org/packages/b0/b4/7af7b3272a1332160da4541d6c6a7a7a03a8784f0b57965c48e3e0a8b16e/ofenaus-0.11.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2023-01-22 09:48:23",
"github": true,
"gitlab": false,
"bitbucket": false,
"github_user": "hansalemaos",
"github_project": "ofenaus",
"travis_ci": false,
"coveralls": false,
"github_actions": false,
"requirements": [],
"lcname": "ofenaus"
}