ipc-worker


Nameipc-worker JSON
Version 0.1.7 PyPI version JSON
download
home_pagehttps://github.com/ssbuild/ipc_worker
Summaryipc-worker: Inter-Process Communication , muti Process Woker works by share memory or MQ.
upload_time2023-11-08 08:00:52
maintainer
docs_urlNone
authorssbuild
requires_python>=3, <4
licenseApache 2.0
keywords ipc-worker ipc_worker ipc process worker ipc ipc mq fast-ipc process ipc
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            ipc-worker: Inter-Process Communication , muti Process Woker works by share memory or MQ.

```py
support share memory (py>=3.8 and linux) and mq process worker (py >=3.6)

```
```py
# -*- coding: utf-8 -*-
# @Time    : 2021/11/23 9:35

'''
demo share memrory
recommended system linux and python >= 3.8
    recommended linux
    python 3.8

Do not recommended run in windows , it will report an error as follow
    RuntimeError:
            An attempt has been made to start a new process before the
            current process has finished its bootstrapping phase.

'''
import multiprocessing
import os
import signal

from ipc_worker import logger
from ipc_worker.ipc_shm_loader import IPC_shm,SHM_process_worker

class My_worker(SHM_process_worker):
    def __init__(self,config,*args,**kwargs):
        super(My_worker,self).__init__(*args,**kwargs)
        #config info , use by yourself

        logger.info('Process id {}, group name {} ,shm name {}'.format(self._idx,self._group_name,self._shm_name))
        logger.info(config)
        self.config = config


    #Process begin trigger this func
    def run_begin(self):
        logger.info('worker pid {}...'.format(os.getpid()))
        self.handle = None
        pass

    # Process end trigger this func
    def run_end(self):
        if self.handle is not None:
            pass

    #any data put will trigger this func
    def run_once(self,request_data):
        #process request_data
        if isinstance(request_data,dict):
            request_data['b']  = 200
        if self.handle is not None:
            #do some thing
            pass
        return request_data


if __name__ == '__main__':
    config = {
        "anything" : "anything",
        "aa": 100
    }

    evt_quit = multiprocessing.Manager().Event()

    # group_name 为共享内存组名,需唯一
    # manager is an agent  and act as a load balancing
    # worker is real doing your work
    instance = IPC_shm(
        CLS_worker=My_worker,
        worker_args=(config,),  # must be tuple
        worker_num=10,  # number of worker Process
        manager_num=2,  # number of agent Process
        group_name='serving_shm',  # share memory name
        shm_size=1 * 1024 * 1024,  # share memory size
        queue_size=20,  # recv queue size
        is_log_time=False,  # whether log compute time
        daemon=False,
    )

    instance.start()

    #demo produce and consume message , you can process by http
    for i in range(10):
        print('produce message')
        data = {"a" : 100}
        request_id = instance.put(data)
        data = instance.get(request_id)
        print('get process result',data)


    def signal_handler(signum, frame):
        evt_quit.set()
        instance.terminate()
        raise KeyboardInterrupt
    signal.signal(signal.SIGINT, signal_handler)

    try:
        instance.join()
    except Exception as e:
        evt_quit.set()
        instance.terminate()

    del evt_quit
```
```py
# -*- coding: utf-8 -*-
# @Time    : 2021/11/29 15:06
# @Author  : tk
import multiprocessing
import os
import signal
import torch
from ipc_worker import logger
from ipc_worker.ipc_zmq_loader import IPC_zmq, ZMQ_process_worker




class My_worker(ZMQ_process_worker):
    def __init__(self, config, *args, **kwargs):
        super(My_worker, self).__init__(*args, **kwargs)
        # config info , use by yourself
        logger.info('Process id {}, group name {} , identity {}'.format(self._idx, self._group_name, self._identity))
        logger.info(config)
        self.config = config

    # Process begin trigger this func
    def run_begin(self):
        logger.info('worker pid {}...'.format(os.getpid()))
        self.handle = None
        pass

    # Process end trigger this func
    def run_end(self):
        if self.handle is not None:
            pass

    # any data put will trigger this func
    def run_once(self, request_data):
        # process request_data
        print(torch.cuda.device_count(), torch.cuda.current_device())

        if isinstance(request_data, dict):
            request_data['b'] = 200
        if self.handle is not None:
            # do some thing
            pass
        return request_data

if __name__ == '__main__':


    # torch.multiprocessing.set_start_method('spawn', force=True)
    '''
        demo ZMQ depend zmq
        pip install pyzmq
        test pass >= python3.6
    '''

    tmp_dir = './tmp'
    if not os.path.exists(tmp_dir):
        os.mkdir(tmp_dir)

    os.environ['ZEROMQ_SOCK_TMP_DIR'] = tmp_dir


    config = {
        "anything" : "anything",
        "aa": 100
    }

    evt_quit = multiprocessing.Manager().Event()

    # group_name 为共享内存组名,需唯一
    # manager is an agent  and act as a load balancing
    # worker is real doing your work
    instance = IPC_zmq(
        CLS_worker=My_worker,
        worker_args=(config,),  # must be tuple
        worker_num=10,  # number of worker Process
        group_name='serving_zmq',  # share memory name
        evt_quit=evt_quit,
        queue_size=20,  # recv queue size
        is_log_time=True,  # whether log compute time
        daemon=False,
    )
    instance.start()

    #demo produce and consume message , you can process by http
    for i in range(10):
        data = {"a" : 100}
        request_id = instance.put(data)

        data = instance.get(request_id)
        print('get process result',request_id,data)


    def signal_handler(signum, frame):
        evt_quit.set()
        instance.terminate()
        raise KeyboardInterrupt
    signal.signal(signal.SIGINT, signal_handler)

    try:
        instance.join()
    except Exception as e:
        evt_quit.set()
        instance.terminate()
    del evt_quit
```



            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/ssbuild/ipc_worker",
    "name": "ipc-worker",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3, <4",
    "maintainer_email": "",
    "keywords": "ipc-worker,ipc_worker,ipc,process worker,ipc,ipc mq,fast-ipc,process ipc",
    "author": "ssbuild",
    "author_email": "9727464@qq.com",
    "download_url": "",
    "platform": "win32_AMD64",
    "description": "ipc-worker: Inter-Process Communication , muti Process Woker works by share memory or MQ.\n\n```py\nsupport share memory (py>=3.8 and linux) and mq process worker (py >=3.6)\n\n```\n```py\n# -*- coding: utf-8 -*-\n# @Time    : 2021/11/23 9:35\n\n'''\ndemo share memrory\nrecommended system linux and python >= 3.8\n    recommended linux\n    python 3.8\n\nDo not recommended run in windows , it will report an error as follow\n    RuntimeError:\n            An attempt has been made to start a new process before the\n            current process has finished its bootstrapping phase.\n\n'''\nimport multiprocessing\nimport os\nimport signal\n\nfrom ipc_worker import logger\nfrom ipc_worker.ipc_shm_loader import IPC_shm,SHM_process_worker\n\nclass My_worker(SHM_process_worker):\n    def __init__(self,config,*args,**kwargs):\n        super(My_worker,self).__init__(*args,**kwargs)\n        #config info , use by yourself\n\n        logger.info('Process id {}, group name {} ,shm name {}'.format(self._idx,self._group_name,self._shm_name))\n        logger.info(config)\n        self.config = config\n\n\n    #Process begin trigger this func\n    def run_begin(self):\n        logger.info('worker pid {}...'.format(os.getpid()))\n        self.handle = None\n        pass\n\n    # Process end trigger this func\n    def run_end(self):\n        if self.handle is not None:\n            pass\n\n    #any data put will trigger this func\n    def run_once(self,request_data):\n        #process request_data\n        if isinstance(request_data,dict):\n            request_data['b']  = 200\n        if self.handle is not None:\n            #do some thing\n            pass\n        return request_data\n\n\nif __name__ == '__main__':\n    config = {\n        \"anything\" : \"anything\",\n        \"aa\": 100\n    }\n\n    evt_quit = multiprocessing.Manager().Event()\n\n    # group_name \u4e3a\u5171\u4eab\u5185\u5b58\u7ec4\u540d,\u9700\u552f\u4e00\n    # manager is an agent  and act as a load balancing\n    # worker is real doing your work\n    instance = IPC_shm(\n        CLS_worker=My_worker,\n        worker_args=(config,),  # must be tuple\n        worker_num=10,  # number of worker Process\n        manager_num=2,  # number of agent Process\n        group_name='serving_shm',  # share memory name\n        shm_size=1 * 1024 * 1024,  # share memory size\n        queue_size=20,  # recv queue size\n        is_log_time=False,  # whether log compute time\n        daemon=False,\n    )\n\n    instance.start()\n\n    #demo produce and consume message , you can process by http\n    for i in range(10):\n        print('produce message')\n        data = {\"a\" : 100}\n        request_id = instance.put(data)\n        data = instance.get(request_id)\n        print('get process result',data)\n\n\n    def signal_handler(signum, frame):\n        evt_quit.set()\n        instance.terminate()\n        raise KeyboardInterrupt\n    signal.signal(signal.SIGINT, signal_handler)\n\n    try:\n        instance.join()\n    except Exception as e:\n        evt_quit.set()\n        instance.terminate()\n\n    del evt_quit\n```\n```py\n# -*- coding: utf-8 -*-\n# @Time    : 2021/11/29 15:06\n# @Author  : tk\nimport multiprocessing\nimport os\nimport signal\nimport torch\nfrom ipc_worker import logger\nfrom ipc_worker.ipc_zmq_loader import IPC_zmq, ZMQ_process_worker\n\n\n\n\nclass My_worker(ZMQ_process_worker):\n    def __init__(self, config, *args, **kwargs):\n        super(My_worker, self).__init__(*args, **kwargs)\n        # config info , use by yourself\n        logger.info('Process id {}, group name {} , identity {}'.format(self._idx, self._group_name, self._identity))\n        logger.info(config)\n        self.config = config\n\n    # Process begin trigger this func\n    def run_begin(self):\n        logger.info('worker pid {}...'.format(os.getpid()))\n        self.handle = None\n        pass\n\n    # Process end trigger this func\n    def run_end(self):\n        if self.handle is not None:\n            pass\n\n    # any data put will trigger this func\n    def run_once(self, request_data):\n        # process request_data\n        print(torch.cuda.device_count(), torch.cuda.current_device())\n\n        if isinstance(request_data, dict):\n            request_data['b'] = 200\n        if self.handle is not None:\n            # do some thing\n            pass\n        return request_data\n\nif __name__ == '__main__':\n\n\n    # torch.multiprocessing.set_start_method('spawn', force=True)\n    '''\n        demo ZMQ depend zmq\n        pip install pyzmq\n        test pass >= python3.6\n    '''\n\n    tmp_dir = './tmp'\n    if not os.path.exists(tmp_dir):\n        os.mkdir(tmp_dir)\n\n    os.environ['ZEROMQ_SOCK_TMP_DIR'] = tmp_dir\n\n\n    config = {\n        \"anything\" : \"anything\",\n        \"aa\": 100\n    }\n\n    evt_quit = multiprocessing.Manager().Event()\n\n    # group_name \u4e3a\u5171\u4eab\u5185\u5b58\u7ec4\u540d,\u9700\u552f\u4e00\n    # manager is an agent  and act as a load balancing\n    # worker is real doing your work\n    instance = IPC_zmq(\n        CLS_worker=My_worker,\n        worker_args=(config,),  # must be tuple\n        worker_num=10,  # number of worker Process\n        group_name='serving_zmq',  # share memory name\n        evt_quit=evt_quit,\n        queue_size=20,  # recv queue size\n        is_log_time=True,  # whether log compute time\n        daemon=False,\n    )\n    instance.start()\n\n    #demo produce and consume message , you can process by http\n    for i in range(10):\n        data = {\"a\" : 100}\n        request_id = instance.put(data)\n\n        data = instance.get(request_id)\n        print('get process result',request_id,data)\n\n\n    def signal_handler(signum, frame):\n        evt_quit.set()\n        instance.terminate()\n        raise KeyboardInterrupt\n    signal.signal(signal.SIGINT, signal_handler)\n\n    try:\n        instance.join()\n    except Exception as e:\n        evt_quit.set()\n        instance.terminate()\n    del evt_quit\n```\n\n\n",
    "bugtrack_url": null,
    "license": "Apache 2.0",
    "summary": "ipc-worker: Inter-Process Communication , muti Process Woker works by share memory or MQ.",
    "version": "0.1.7",
    "project_urls": {
        "Homepage": "https://github.com/ssbuild/ipc_worker"
    },
    "split_keywords": [
        "ipc-worker",
        "ipc_worker",
        "ipc",
        "process worker",
        "ipc",
        "ipc mq",
        "fast-ipc",
        "process ipc"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "6ddd56bf499d33df269ae1d4be5d7fd163090f59d553ce02387ff933db7787aa",
                "md5": "8d6007f280eb06238354d261755d484e",
                "sha256": "923f950de1be3b63fded3ea96d8af2e85a2eb41dd3259e27abdb7b9432ccedd6"
            },
            "downloads": -1,
            "filename": "ipc_worker-0.1.7-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "8d6007f280eb06238354d261755d484e",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3, <4",
            "size": 19085,
            "upload_time": "2023-11-08T08:00:52",
            "upload_time_iso_8601": "2023-11-08T08:00:52.543160Z",
            "url": "https://files.pythonhosted.org/packages/6d/dd/56bf499d33df269ae1d4be5d7fd163090f59d553ce02387ff933db7787aa/ipc_worker-0.1.7-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2023-11-08 08:00:52",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "ssbuild",
    "github_project": "ipc_worker",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": false,
    "lcname": "ipc-worker"
}
        
Elapsed time: 0.13860s