subprocplus


Namesubprocplus JSON
Version 0.10 PyPI version JSON
download
home_pagehttps://github.com/hansalemaos/subprocplus
Summarysubprocess + some useful stuff
upload_time2024-03-28 02:03:16
maintainerNone
docs_urlNone
authorJohannes Fischer
requires_pythonNone
licenseMIT
keywords subprocess
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            
# subprocess + some useful stuff 

### Tested against Windows / Python 3.11 / Anaconda

### pip install subprocplus

```py
# Some nice things to do with subprocess
# Includes stuff from:
# https://github.com/zeroSteiner/mayhem
# https://stackoverflow.com/a/29737399/15096247
# https://github.com/hansalemaos/procciao

from subprocplus.subproc import (
    create_new_user,
    disable_internet_for_user,
    remove_user,
    CREATIONINFO,
    Popen,
    PIPE,
    time,
    CREATION_TYPE_LOGON,
    subprocess,
    remove_firewall_rules,
)
import sys


username = "NO11"
password = "NOINTERNET"
# removing a user
remove_user(username)
time.sleep(5)
# creating a new user
usern, passw = create_new_user(new_username=username, new_password=password, admin=True)
time.sleep(5)

# disable internet usage for apps for created new user, returns a list of created rules
rules = disable_internet_for_user(
    username=username,
    password=password,
    apps=[r"C:\Windows\System32\curl.exe", r"C:\msys64\usr\bin\wget.exe"],
    new_display_name1=None,
    new_display_name2=None,
)
time.sleep(5)
ci = CREATIONINFO(
    CREATION_TYPE_LOGON,  # CREATIONINFO for logged on user (from  https://stackoverflow.com/a/29737399/15096247 )
    lpUsername=username,
    lpPassword=password,
    dwCreationFlags=subprocess.CREATE_NO_WINDOW,  # invisible
    dwLogonFlags=1,
)
cmd = "powershell.exe"
p1 = Popen(
    cmd,
    suspended=False,
    creationinfo=ci,
    stdout=PIPE,
    stderr=PIPE,
    stdin=PIPE,
    print_stdout=True,
    print_stderr=True,
)  # starting the subprocess as a logged on user

print("Process Id: %d" % p1.pid)
print("Thread Id: %d" % p1._processinfo.dwThreadId)
p1.sendcommand(
    "ls",
    clean_old=True,  # all results are saved in p1.out_dict and p2.err_dict
    restart_on_fail=True,  # restarts if there is a broken pipe or something else
    max_restarts=3,
    sleep_after_restart=10,
)
time.sleep(5)
print(
    p1.get_last_stdout(clean=True)
)  # converts p1.out_dict to a list and clears p1.out_dict
print(
    p1.get_last_stderr(clean=True)
)  # converts p1.err_dict to a list and clears p1.err_dict
p1.sendcommand(
    r"C:\Windows\System32\curl.exe google.com"
)  # connection error, because it is blocked
time.sleep(3)
p1.sendcommand(
    r"C:\msys64\usr\bin\wget.exe google.com"
)  # connection error, because it is blocked
time.sleep(3)
remove_firewall_rules(rules)  # deleting the created firewall rules
time.sleep(5)
p1.sendcommand(r"C:\Windows\System32\curl.exe google.com")  # no more connection error
time.sleep(3)
p1.sendcommand(r"C:\msys64\usr\bin\wget.exe google.com")  # no more connection error
time.sleep(3)
print(p1.get_last_stdout(clean=True))
print(p1.get_last_stderr(clean=True))
p1.stdin.close()  # provoking an error, because it is closed
time.sleep(5)
p1.sendcommand(
    "dir",
    clean_old=True,
    restart_on_fail=True,
    max_restarts=3,
    sleep_after_restart=10,
)  # reconnects after error
time.sleep(5)
print(p1.get_last_stdout(clean=True))

ci1 = CREATIONINFO(
    CREATION_TYPE_LOGON,
    lpUsername=username,
    lpPassword=password,
    dwCreationFlags=subprocess.CREATE_NO_WINDOW,
    dwLogonFlags=1,
)
p2 = Popen(
    r"C:\Windows\System32\cmd.exe",
    suspended=True,  # doesn't start
    creationinfo=ci1,
    stdout=PIPE,
    stderr=PIPE,
    stdin=PIPE,
)
print("Process Id: %d" % p2.pid)
print("Thread Id: %d" % p2._processinfo.dwThreadId)
assert not p2._child_started
input("Press enter to start")
p2.start()  # there we go
assert p2._child_started

time.sleep(5)
# https://github.com/zeroSteiner/mayhem
for key, item in p2.maps.items():
    try:
        # reads the memory. If NumPy is installed, it will be much faster,
        # because it uses NumPy's buffer protocol.
        # Be careful when using it without NumPy, it might print forever.
        mymem = p2.read_memory(key, item.size).view("V1").view("S1")
        print(mymem)
    except Exception as e:  # some protected areas can't be read
        sys.stderr.write(f"{e}")
        sys.stderr.flush()
```

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/hansalemaos/subprocplus",
    "name": "subprocplus",
    "maintainer": null,
    "docs_url": null,
    "requires_python": null,
    "maintainer_email": null,
    "keywords": "subprocess",
    "author": "Johannes Fischer",
    "author_email": "aulasparticularesdealemaosp@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/6e/6e/64f5b61a9cbdf81d3cb6cfd3a8de12b3dda3692d462ec58cd1c90801a4eb/subprocplus-0.10.tar.gz",
    "platform": null,
    "description": "\r\n# subprocess + some useful stuff \r\n\r\n### Tested against Windows / Python 3.11 / Anaconda\r\n\r\n### pip install subprocplus\r\n\r\n```py\r\n# Some nice things to do with subprocess\r\n# Includes stuff from:\r\n# https://github.com/zeroSteiner/mayhem\r\n# https://stackoverflow.com/a/29737399/15096247\r\n# https://github.com/hansalemaos/procciao\r\n\r\nfrom subprocplus.subproc import (\r\n    create_new_user,\r\n    disable_internet_for_user,\r\n    remove_user,\r\n    CREATIONINFO,\r\n    Popen,\r\n    PIPE,\r\n    time,\r\n    CREATION_TYPE_LOGON,\r\n    subprocess,\r\n    remove_firewall_rules,\r\n)\r\nimport sys\r\n\r\n\r\nusername = \"NO11\"\r\npassword = \"NOINTERNET\"\r\n# removing a user\r\nremove_user(username)\r\ntime.sleep(5)\r\n# creating a new user\r\nusern, passw = create_new_user(new_username=username, new_password=password, admin=True)\r\ntime.sleep(5)\r\n\r\n# disable internet usage for apps for created new user, returns a list of created rules\r\nrules = disable_internet_for_user(\r\n    username=username,\r\n    password=password,\r\n    apps=[r\"C:\\Windows\\System32\\curl.exe\", r\"C:\\msys64\\usr\\bin\\wget.exe\"],\r\n    new_display_name1=None,\r\n    new_display_name2=None,\r\n)\r\ntime.sleep(5)\r\nci = CREATIONINFO(\r\n    CREATION_TYPE_LOGON,  # CREATIONINFO for logged on user (from  https://stackoverflow.com/a/29737399/15096247 )\r\n    lpUsername=username,\r\n    lpPassword=password,\r\n    dwCreationFlags=subprocess.CREATE_NO_WINDOW,  # invisible\r\n    dwLogonFlags=1,\r\n)\r\ncmd = \"powershell.exe\"\r\np1 = Popen(\r\n    cmd,\r\n    suspended=False,\r\n    creationinfo=ci,\r\n    stdout=PIPE,\r\n    stderr=PIPE,\r\n    stdin=PIPE,\r\n    print_stdout=True,\r\n    print_stderr=True,\r\n)  # starting the subprocess as a logged on user\r\n\r\nprint(\"Process Id: %d\" % p1.pid)\r\nprint(\"Thread Id: %d\" % p1._processinfo.dwThreadId)\r\np1.sendcommand(\r\n    \"ls\",\r\n    clean_old=True,  # all results are saved in p1.out_dict and p2.err_dict\r\n    restart_on_fail=True,  # restarts if there is a broken pipe or something else\r\n    max_restarts=3,\r\n    sleep_after_restart=10,\r\n)\r\ntime.sleep(5)\r\nprint(\r\n    p1.get_last_stdout(clean=True)\r\n)  # converts p1.out_dict to a list and clears p1.out_dict\r\nprint(\r\n    p1.get_last_stderr(clean=True)\r\n)  # converts p1.err_dict to a list and clears p1.err_dict\r\np1.sendcommand(\r\n    r\"C:\\Windows\\System32\\curl.exe google.com\"\r\n)  # connection error, because it is blocked\r\ntime.sleep(3)\r\np1.sendcommand(\r\n    r\"C:\\msys64\\usr\\bin\\wget.exe google.com\"\r\n)  # connection error, because it is blocked\r\ntime.sleep(3)\r\nremove_firewall_rules(rules)  # deleting the created firewall rules\r\ntime.sleep(5)\r\np1.sendcommand(r\"C:\\Windows\\System32\\curl.exe google.com\")  # no more connection error\r\ntime.sleep(3)\r\np1.sendcommand(r\"C:\\msys64\\usr\\bin\\wget.exe google.com\")  # no more connection error\r\ntime.sleep(3)\r\nprint(p1.get_last_stdout(clean=True))\r\nprint(p1.get_last_stderr(clean=True))\r\np1.stdin.close()  # provoking an error, because it is closed\r\ntime.sleep(5)\r\np1.sendcommand(\r\n    \"dir\",\r\n    clean_old=True,\r\n    restart_on_fail=True,\r\n    max_restarts=3,\r\n    sleep_after_restart=10,\r\n)  # reconnects after error\r\ntime.sleep(5)\r\nprint(p1.get_last_stdout(clean=True))\r\n\r\nci1 = CREATIONINFO(\r\n    CREATION_TYPE_LOGON,\r\n    lpUsername=username,\r\n    lpPassword=password,\r\n    dwCreationFlags=subprocess.CREATE_NO_WINDOW,\r\n    dwLogonFlags=1,\r\n)\r\np2 = Popen(\r\n    r\"C:\\Windows\\System32\\cmd.exe\",\r\n    suspended=True,  # doesn't start\r\n    creationinfo=ci1,\r\n    stdout=PIPE,\r\n    stderr=PIPE,\r\n    stdin=PIPE,\r\n)\r\nprint(\"Process Id: %d\" % p2.pid)\r\nprint(\"Thread Id: %d\" % p2._processinfo.dwThreadId)\r\nassert not p2._child_started\r\ninput(\"Press enter to start\")\r\np2.start()  # there we go\r\nassert p2._child_started\r\n\r\ntime.sleep(5)\r\n# https://github.com/zeroSteiner/mayhem\r\nfor key, item in p2.maps.items():\r\n    try:\r\n        # reads the memory. If NumPy is installed, it will be much faster,\r\n        # because it uses NumPy's buffer protocol.\r\n        # Be careful when using it without NumPy, it might print forever.\r\n        mymem = p2.read_memory(key, item.size).view(\"V1\").view(\"S1\")\r\n        print(mymem)\r\n    except Exception as e:  # some protected areas can't be read\r\n        sys.stderr.write(f\"{e}\")\r\n        sys.stderr.flush()\r\n```\r\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "subprocess + some useful stuff",
    "version": "0.10",
    "project_urls": {
        "Homepage": "https://github.com/hansalemaos/subprocplus"
    },
    "split_keywords": [
        "subprocess"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "ad10ad0aec7bd3d9b38559dd34ae6337bd9153982b3b89a27eeeb7bba225d930",
                "md5": "23132457a49a4875947b1646f37d2299",
                "sha256": "0def1f15bf0df966889c9716af9e7566831c077d02fb76c3733e5dbc81ce7fc3"
            },
            "downloads": -1,
            "filename": "subprocplus-0.10-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "23132457a49a4875947b1646f37d2299",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": null,
            "size": 592291,
            "upload_time": "2024-03-28T02:03:14",
            "upload_time_iso_8601": "2024-03-28T02:03:14.099248Z",
            "url": "https://files.pythonhosted.org/packages/ad/10/ad0aec7bd3d9b38559dd34ae6337bd9153982b3b89a27eeeb7bba225d930/subprocplus-0.10-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "6e6e64f5b61a9cbdf81d3cb6cfd3a8de12b3dda3692d462ec58cd1c90801a4eb",
                "md5": "14730632d2305c0860dd06b0f89266eb",
                "sha256": "a9615bea26e28f23d67d68cb99d129d95910e45b78b3329b69d6a162bdbb0c67"
            },
            "downloads": -1,
            "filename": "subprocplus-0.10.tar.gz",
            "has_sig": false,
            "md5_digest": "14730632d2305c0860dd06b0f89266eb",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": null,
            "size": 569412,
            "upload_time": "2024-03-28T02:03:16",
            "upload_time_iso_8601": "2024-03-28T02:03:16.897143Z",
            "url": "https://files.pythonhosted.org/packages/6e/6e/64f5b61a9cbdf81d3cb6cfd3a8de12b3dda3692d462ec58cd1c90801a4eb/subprocplus-0.10.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-03-28 02:03:16",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "hansalemaos",
    "github_project": "subprocplus",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": false,
    "requirements": [],
    "lcname": "subprocplus"
}
        
Elapsed time: 0.26116s