ofenaus


Nameofenaus JSON
Version 0.11 PyPI version JSON
download
home_pagehttps://github.com/hansalemaos/ofenaus
SummaryTimeout decorator for functions! Works on Windows with multiprocessing, threading, subprocess!
upload_time2023-01-22 09:48:23
maintainer
docs_urlNone
authorJohannes Fischer
requires_python
licenseMIT
keywords windows decorator timeout
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            
# Timeout decorator for functions! Works on Windows with multiprocessing, threading, subprocess!

### Kill a function after a certain time.

#### Tested on Windows 10 / Python 3.9.13 



```python

pip install ofenaus

```



### Here are some examples 

##### There might be situations where killing the function doesn't work (asyncio, for example) 



```python

import subprocess

import threading

from multiprocessing import Process, Lock

import os



from ofenaus import ofen_aus, sleep # use sleep from this module instead time.sleep() 





@ofen_aus(timeout=3, print_debug=True, print_exceptions=False)

def subprocess_ping():

    pp = subprocess.Popen("ping -t 8.8.8.8") # Only subprocess.Popen can be killed! subprocess.run, subprocess.call etc. can't be killed.

	

    while True:

        sleep(1)





@ofen_aus(timeout=3, print_debug=True, print_exceptions=False)

def thread_lock_aquire():

    my_lock = threading.Lock()

    my_lock.acquire()

    my_lock.acquire()

    while True:

        print("test aquire")

        sleep(1)





@ofen_aus(timeout=3, print_debug=True, print_exceptions=False)

def deamon_thread():

    def f_check(x: int) -> None:

        for _ in range(x):

            print(_)

            sleep(1)



    f_check(0)

    test_1 = threading.Thread(target=f_check, args=(20,))

    test_1.name = "xx"

    test_1.daemon = True

    test_1.start()

    test_1.join()





@ofen_aus(

    timeout=2,

    timeout_value=2000, # the function will return this value if any kind (!) of Exception is raised, not only TimeoutError.

    print_debug=False,

    show_remaining_time=False,

    print_exceptions=False,

    timeout_message="Time is over!",

)

def testing2():

    sleep(1)

    a = 10

    os.chdir("c:\\")

    pp = subprocess.Popen("ping 8.8.8.8")

    print(pp)

    sleep(1)



    b = 20

    sleep(1)



    c = a + b

    sleep(1)





@ofen_aus(timeout=1, timeout_value="not good", show_remaining_time=False)

def testa(i45):

    for k in range(2):

        print(k + i45)

        sleep(1)

    return i45





@ofen_aus(timeout=4)

def fx(name):

    while True:



        print("hello", name)

        sleep(1)





@ofen_aus(timeout=4)

def faax(l, i):

    l.acquire()

    try:

        print("hello world", i)

        sleep(5)

    finally:

        l.release()





if __name__ == "__main__":

    p = Process(target=fx, args=("bob",))

    p.start()

    p.join()

    do = subprocess_ping()

    print(do)

    thread_lock_aquire()

    deamon_thread()

    lock = Lock()

    for num in range(5):

        Process(target=faax, args=(lock, num)).start()

    testa_value = testa(555)

    print(testa_value)

	

	

	

hello bob

hello bob

hello bob

hello bob

hello bob

Time is over!

Pinging 8.8.8.8 with 32 bytes of data:

Reply from 8.8.8.8: bytes=32 time=9ms TTL=119

Reply from 8.8.8.8: bytes=32 time=12ms TTL=119

Reply from 8.8.8.8: bytes=32 time=9ms TTL=119

Reply from 8.8.8.8: bytes=32 time=10ms TTL=119

Time is over!

None

Time is over!

0

1

2

Time is over!

555

hello world 0

556

Time is over!

not good

Time is over!

hello worldhello world 2 

1

Time is over!

Time is over!

hello world 4

Time is over!

Time is over!	





```


            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/hansalemaos/ofenaus",
    "name": "ofenaus",
    "maintainer": "",
    "docs_url": null,
    "requires_python": "",
    "maintainer_email": "",
    "keywords": "windows,decorator,timeout",
    "author": "Johannes Fischer",
    "author_email": "<aulasparticularesdealemaosp@gmail.com>",
    "download_url": "https://files.pythonhosted.org/packages/b0/b4/7af7b3272a1332160da4541d6c6a7a7a03a8784f0b57965c48e3e0a8b16e/ofenaus-0.11.tar.gz",
    "platform": null,
    "description": "\n# Timeout decorator for functions! Works on Windows with multiprocessing, threading, subprocess!\n\n### Kill a function after a certain time.\n\n#### Tested on Windows 10 / Python 3.9.13 \n\n\n\n```python\n\npip install ofenaus\n\n```\n\n\n\n### Here are some examples \n\n##### There might be situations where killing the function doesn't work (asyncio, for example) \n\n\n\n```python\n\nimport subprocess\n\nimport threading\n\nfrom multiprocessing import Process, Lock\n\nimport os\n\n\n\nfrom ofenaus import ofen_aus, sleep # use sleep from this module instead time.sleep() \n\n\n\n\n\n@ofen_aus(timeout=3, print_debug=True, print_exceptions=False)\n\ndef subprocess_ping():\n\n    pp = subprocess.Popen(\"ping -t 8.8.8.8\") # Only subprocess.Popen can be killed! subprocess.run, subprocess.call etc. can't be killed.\n\n\t\n\n    while True:\n\n        sleep(1)\n\n\n\n\n\n@ofen_aus(timeout=3, print_debug=True, print_exceptions=False)\n\ndef thread_lock_aquire():\n\n    my_lock = threading.Lock()\n\n    my_lock.acquire()\n\n    my_lock.acquire()\n\n    while True:\n\n        print(\"test aquire\")\n\n        sleep(1)\n\n\n\n\n\n@ofen_aus(timeout=3, print_debug=True, print_exceptions=False)\n\ndef deamon_thread():\n\n    def f_check(x: int) -> None:\n\n        for _ in range(x):\n\n            print(_)\n\n            sleep(1)\n\n\n\n    f_check(0)\n\n    test_1 = threading.Thread(target=f_check, args=(20,))\n\n    test_1.name = \"xx\"\n\n    test_1.daemon = True\n\n    test_1.start()\n\n    test_1.join()\n\n\n\n\n\n@ofen_aus(\n\n    timeout=2,\n\n    timeout_value=2000, # the function will return this value if any kind (!) of Exception is raised, not only TimeoutError.\n\n    print_debug=False,\n\n    show_remaining_time=False,\n\n    print_exceptions=False,\n\n    timeout_message=\"Time is over!\",\n\n)\n\ndef testing2():\n\n    sleep(1)\n\n    a = 10\n\n    os.chdir(\"c:\\\\\")\n\n    pp = subprocess.Popen(\"ping 8.8.8.8\")\n\n    print(pp)\n\n    sleep(1)\n\n\n\n    b = 20\n\n    sleep(1)\n\n\n\n    c = a + b\n\n    sleep(1)\n\n\n\n\n\n@ofen_aus(timeout=1, timeout_value=\"not good\", show_remaining_time=False)\n\ndef testa(i45):\n\n    for k in range(2):\n\n        print(k + i45)\n\n        sleep(1)\n\n    return i45\n\n\n\n\n\n@ofen_aus(timeout=4)\n\ndef fx(name):\n\n    while True:\n\n\n\n        print(\"hello\", name)\n\n        sleep(1)\n\n\n\n\n\n@ofen_aus(timeout=4)\n\ndef faax(l, i):\n\n    l.acquire()\n\n    try:\n\n        print(\"hello world\", i)\n\n        sleep(5)\n\n    finally:\n\n        l.release()\n\n\n\n\n\nif __name__ == \"__main__\":\n\n    p = Process(target=fx, args=(\"bob\",))\n\n    p.start()\n\n    p.join()\n\n    do = subprocess_ping()\n\n    print(do)\n\n    thread_lock_aquire()\n\n    deamon_thread()\n\n    lock = Lock()\n\n    for num in range(5):\n\n        Process(target=faax, args=(lock, num)).start()\n\n    testa_value = testa(555)\n\n    print(testa_value)\n\n\t\n\n\t\n\n\t\n\nhello bob\n\nhello bob\n\nhello bob\n\nhello bob\n\nhello bob\n\nTime is over!\n\nPinging 8.8.8.8 with 32 bytes of data:\n\nReply from 8.8.8.8: bytes=32 time=9ms TTL=119\n\nReply from 8.8.8.8: bytes=32 time=12ms TTL=119\n\nReply from 8.8.8.8: bytes=32 time=9ms TTL=119\n\nReply from 8.8.8.8: bytes=32 time=10ms TTL=119\n\nTime is over!\n\nNone\n\nTime is over!\n\n0\n\n1\n\n2\n\nTime is over!\n\n555\n\nhello world 0\n\n556\n\nTime is over!\n\nnot good\n\nTime is over!\n\nhello worldhello world 2 \n\n1\n\nTime is over!\n\nTime is over!\n\nhello world 4\n\nTime is over!\n\nTime is over!\t\n\n\n\n\n\n```\n\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "Timeout decorator for functions! Works on Windows with multiprocessing, threading, subprocess!",
    "version": "0.11",
    "split_keywords": [
        "windows",
        "decorator",
        "timeout"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "a4f98fded130f8974923a03ca9e2b4288c6bae78ffc49441e4b00b037e6fa66b",
                "md5": "a613b16c65cab90d1785e02ef3268c29",
                "sha256": "b3561ad58a34a0bc06a0400b31344d748f26424b30f36d13386226e55e5ffbfa"
            },
            "downloads": -1,
            "filename": "ofenaus-0.11-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "a613b16c65cab90d1785e02ef3268c29",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": null,
            "size": 7483,
            "upload_time": "2023-01-22T09:48:21",
            "upload_time_iso_8601": "2023-01-22T09:48:21.761327Z",
            "url": "https://files.pythonhosted.org/packages/a4/f9/8fded130f8974923a03ca9e2b4288c6bae78ffc49441e4b00b037e6fa66b/ofenaus-0.11-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "b0b47af7b3272a1332160da4541d6c6a7a7a03a8784f0b57965c48e3e0a8b16e",
                "md5": "f0e57045f28a7fb67ba247ab5744d651",
                "sha256": "41a07bbd366956e6b8e861f0b0c79f0423205b24efc8b927b85ebc184fb0a04b"
            },
            "downloads": -1,
            "filename": "ofenaus-0.11.tar.gz",
            "has_sig": false,
            "md5_digest": "f0e57045f28a7fb67ba247ab5744d651",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": null,
            "size": 6529,
            "upload_time": "2023-01-22T09:48:23",
            "upload_time_iso_8601": "2023-01-22T09:48:23.394547Z",
            "url": "https://files.pythonhosted.org/packages/b0/b4/7af7b3272a1332160da4541d6c6a7a7a03a8784f0b57965c48e3e0a8b16e/ofenaus-0.11.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2023-01-22 09:48:23",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "github_user": "hansalemaos",
    "github_project": "ofenaus",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": false,
    "requirements": [],
    "lcname": "ofenaus"
}
        
Elapsed time: 0.07395s