ipc-worker: Inter-Process Communication , muti Process Woker works by share memory or MQ.
```py
support share memory (py>=3.8 and linux) and mq process worker (py >=3.6)
```
```py
# -*- coding: utf-8 -*-
# @Time : 2021/11/23 9:35
'''
demo share memrory
recommended system linux and python >= 3.8
recommended linux
python 3.8
Do not recommended run in windows , it will report an error as follow
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
'''
import multiprocessing
import os
import signal
from ipc_worker import logger
from ipc_worker.ipc_shm_loader import IPC_shm,SHM_process_worker
class My_worker(SHM_process_worker):
def __init__(self,config,*args,**kwargs):
super(My_worker,self).__init__(*args,**kwargs)
#config info , use by yourself
logger.info('Process id {}, group name {} ,shm name {}'.format(self._idx,self._group_name,self._shm_name))
logger.info(config)
self.config = config
#Process begin trigger this func
def run_begin(self):
logger.info('worker pid {}...'.format(os.getpid()))
self.handle = None
pass
# Process end trigger this func
def run_end(self):
if self.handle is not None:
pass
#any data put will trigger this func
def run_once(self,request_data):
#process request_data
if isinstance(request_data,dict):
request_data['b'] = 200
if self.handle is not None:
#do some thing
pass
return request_data
if __name__ == '__main__':
config = {
"anything" : "anything",
"aa": 100
}
evt_quit = multiprocessing.Manager().Event()
# group_name 为共享内存组名,需唯一
# manager is an agent and act as a load balancing
# worker is real doing your work
instance = IPC_shm(
CLS_worker=My_worker,
worker_args=(config,), # must be tuple
worker_num=10, # number of worker Process
manager_num=2, # number of agent Process
group_name='serving_shm', # share memory name
shm_size=1 * 1024 * 1024, # share memory size
queue_size=20, # recv queue size
is_log_time=False, # whether log compute time
daemon=False,
)
instance.start()
#demo produce and consume message , you can process by http
for i in range(10):
print('produce message')
data = {"a" : 100}
request_id = instance.put(data)
data = instance.get(request_id)
print('get process result',data)
def signal_handler(signum, frame):
evt_quit.set()
instance.terminate()
raise KeyboardInterrupt
signal.signal(signal.SIGINT, signal_handler)
try:
instance.join()
except Exception as e:
evt_quit.set()
instance.terminate()
del evt_quit
```
```py
# -*- coding: utf-8 -*-
# @Time : 2021/11/29 15:06
# @Author : tk
import multiprocessing
import os
import signal
import torch
from ipc_worker import logger
from ipc_worker.ipc_zmq_loader import IPC_zmq, ZMQ_process_worker
class My_worker(ZMQ_process_worker):
def __init__(self, config, *args, **kwargs):
super(My_worker, self).__init__(*args, **kwargs)
# config info , use by yourself
logger.info('Process id {}, group name {} , identity {}'.format(self._idx, self._group_name, self._identity))
logger.info(config)
self.config = config
# Process begin trigger this func
def run_begin(self):
logger.info('worker pid {}...'.format(os.getpid()))
self.handle = None
pass
# Process end trigger this func
def run_end(self):
if self.handle is not None:
pass
# any data put will trigger this func
def run_once(self, request_data):
# process request_data
print(torch.cuda.device_count(), torch.cuda.current_device())
if isinstance(request_data, dict):
request_data['b'] = 200
if self.handle is not None:
# do some thing
pass
return request_data
if __name__ == '__main__':
# torch.multiprocessing.set_start_method('spawn', force=True)
'''
demo ZMQ depend zmq
pip install pyzmq
test pass >= python3.6
'''
tmp_dir = './tmp'
if not os.path.exists(tmp_dir):
os.mkdir(tmp_dir)
os.environ['ZEROMQ_SOCK_TMP_DIR'] = tmp_dir
config = {
"anything" : "anything",
"aa": 100
}
evt_quit = multiprocessing.Manager().Event()
# group_name 为共享内存组名,需唯一
# manager is an agent and act as a load balancing
# worker is real doing your work
instance = IPC_zmq(
CLS_worker=My_worker,
worker_args=(config,), # must be tuple
worker_num=10, # number of worker Process
group_name='serving_zmq', # share memory name
evt_quit=evt_quit,
queue_size=20, # recv queue size
is_log_time=True, # whether log compute time
daemon=False,
)
instance.start()
#demo produce and consume message , you can process by http
for i in range(10):
data = {"a" : 100}
request_id = instance.put(data)
data = instance.get(request_id)
print('get process result',request_id,data)
def signal_handler(signum, frame):
evt_quit.set()
instance.terminate()
raise KeyboardInterrupt
signal.signal(signal.SIGINT, signal_handler)
try:
instance.join()
except Exception as e:
evt_quit.set()
instance.terminate()
del evt_quit
```
Raw data
{
"_id": null,
"home_page": "https://github.com/ssbuild/ipc_worker",
"name": "ipc-worker",
"maintainer": "",
"docs_url": null,
"requires_python": ">=3, <4",
"maintainer_email": "",
"keywords": "ipc-worker,ipc_worker,ipc,process worker,ipc,ipc mq,fast-ipc,process ipc",
"author": "ssbuild",
"author_email": "9727464@qq.com",
"download_url": "",
"platform": "win32_AMD64",
"description": "ipc-worker: Inter-Process Communication , muti Process Woker works by share memory or MQ.\n\n```py\nsupport share memory (py>=3.8 and linux) and mq process worker (py >=3.6)\n\n```\n```py\n# -*- coding: utf-8 -*-\n# @Time : 2021/11/23 9:35\n\n'''\ndemo share memrory\nrecommended system linux and python >= 3.8\n recommended linux\n python 3.8\n\nDo not recommended run in windows , it will report an error as follow\n RuntimeError:\n An attempt has been made to start a new process before the\n current process has finished its bootstrapping phase.\n\n'''\nimport multiprocessing\nimport os\nimport signal\n\nfrom ipc_worker import logger\nfrom ipc_worker.ipc_shm_loader import IPC_shm,SHM_process_worker\n\nclass My_worker(SHM_process_worker):\n def __init__(self,config,*args,**kwargs):\n super(My_worker,self).__init__(*args,**kwargs)\n #config info , use by yourself\n\n logger.info('Process id {}, group name {} ,shm name {}'.format(self._idx,self._group_name,self._shm_name))\n logger.info(config)\n self.config = config\n\n\n #Process begin trigger this func\n def run_begin(self):\n logger.info('worker pid {}...'.format(os.getpid()))\n self.handle = None\n pass\n\n # Process end trigger this func\n def run_end(self):\n if self.handle is not None:\n pass\n\n #any data put will trigger this func\n def run_once(self,request_data):\n #process request_data\n if isinstance(request_data,dict):\n request_data['b'] = 200\n if self.handle is not None:\n #do some thing\n pass\n return request_data\n\n\nif __name__ == '__main__':\n config = {\n \"anything\" : \"anything\",\n \"aa\": 100\n }\n\n evt_quit = multiprocessing.Manager().Event()\n\n # group_name \u4e3a\u5171\u4eab\u5185\u5b58\u7ec4\u540d,\u9700\u552f\u4e00\n # manager is an agent and act as a load balancing\n # worker is real doing your work\n instance = IPC_shm(\n CLS_worker=My_worker,\n worker_args=(config,), # must be tuple\n worker_num=10, # number of worker Process\n manager_num=2, # number of agent Process\n group_name='serving_shm', # share memory name\n shm_size=1 * 1024 * 1024, # share memory size\n queue_size=20, # recv queue size\n is_log_time=False, # whether log compute time\n daemon=False,\n )\n\n instance.start()\n\n #demo produce and consume message , you can process by http\n for i in range(10):\n print('produce message')\n data = {\"a\" : 100}\n request_id = instance.put(data)\n data = instance.get(request_id)\n print('get process result',data)\n\n\n def signal_handler(signum, frame):\n evt_quit.set()\n instance.terminate()\n raise KeyboardInterrupt\n signal.signal(signal.SIGINT, signal_handler)\n\n try:\n instance.join()\n except Exception as e:\n evt_quit.set()\n instance.terminate()\n\n del evt_quit\n```\n```py\n# -*- coding: utf-8 -*-\n# @Time : 2021/11/29 15:06\n# @Author : tk\nimport multiprocessing\nimport os\nimport signal\nimport torch\nfrom ipc_worker import logger\nfrom ipc_worker.ipc_zmq_loader import IPC_zmq, ZMQ_process_worker\n\n\n\n\nclass My_worker(ZMQ_process_worker):\n def __init__(self, config, *args, **kwargs):\n super(My_worker, self).__init__(*args, **kwargs)\n # config info , use by yourself\n logger.info('Process id {}, group name {} , identity {}'.format(self._idx, self._group_name, self._identity))\n logger.info(config)\n self.config = config\n\n # Process begin trigger this func\n def run_begin(self):\n logger.info('worker pid {}...'.format(os.getpid()))\n self.handle = None\n pass\n\n # Process end trigger this func\n def run_end(self):\n if self.handle is not None:\n pass\n\n # any data put will trigger this func\n def run_once(self, request_data):\n # process request_data\n print(torch.cuda.device_count(), torch.cuda.current_device())\n\n if isinstance(request_data, dict):\n request_data['b'] = 200\n if self.handle is not None:\n # do some thing\n pass\n return request_data\n\nif __name__ == '__main__':\n\n\n # torch.multiprocessing.set_start_method('spawn', force=True)\n '''\n demo ZMQ depend zmq\n pip install pyzmq\n test pass >= python3.6\n '''\n\n tmp_dir = './tmp'\n if not os.path.exists(tmp_dir):\n os.mkdir(tmp_dir)\n\n os.environ['ZEROMQ_SOCK_TMP_DIR'] = tmp_dir\n\n\n config = {\n \"anything\" : \"anything\",\n \"aa\": 100\n }\n\n evt_quit = multiprocessing.Manager().Event()\n\n # group_name \u4e3a\u5171\u4eab\u5185\u5b58\u7ec4\u540d,\u9700\u552f\u4e00\n # manager is an agent and act as a load balancing\n # worker is real doing your work\n instance = IPC_zmq(\n CLS_worker=My_worker,\n worker_args=(config,), # must be tuple\n worker_num=10, # number of worker Process\n group_name='serving_zmq', # share memory name\n evt_quit=evt_quit,\n queue_size=20, # recv queue size\n is_log_time=True, # whether log compute time\n daemon=False,\n )\n instance.start()\n\n #demo produce and consume message , you can process by http\n for i in range(10):\n data = {\"a\" : 100}\n request_id = instance.put(data)\n\n data = instance.get(request_id)\n print('get process result',request_id,data)\n\n\n def signal_handler(signum, frame):\n evt_quit.set()\n instance.terminate()\n raise KeyboardInterrupt\n signal.signal(signal.SIGINT, signal_handler)\n\n try:\n instance.join()\n except Exception as e:\n evt_quit.set()\n instance.terminate()\n del evt_quit\n```\n\n\n",
"bugtrack_url": null,
"license": "Apache 2.0",
"summary": "ipc-worker: Inter-Process Communication , muti Process Woker works by share memory or MQ.",
"version": "0.1.7",
"project_urls": {
"Homepage": "https://github.com/ssbuild/ipc_worker"
},
"split_keywords": [
"ipc-worker",
"ipc_worker",
"ipc",
"process worker",
"ipc",
"ipc mq",
"fast-ipc",
"process ipc"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "6ddd56bf499d33df269ae1d4be5d7fd163090f59d553ce02387ff933db7787aa",
"md5": "8d6007f280eb06238354d261755d484e",
"sha256": "923f950de1be3b63fded3ea96d8af2e85a2eb41dd3259e27abdb7b9432ccedd6"
},
"downloads": -1,
"filename": "ipc_worker-0.1.7-py3-none-any.whl",
"has_sig": false,
"md5_digest": "8d6007f280eb06238354d261755d484e",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3, <4",
"size": 19085,
"upload_time": "2023-11-08T08:00:52",
"upload_time_iso_8601": "2023-11-08T08:00:52.543160Z",
"url": "https://files.pythonhosted.org/packages/6d/dd/56bf499d33df269ae1d4be5d7fd163090f59d553ce02387ff933db7787aa/ipc_worker-0.1.7-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2023-11-08 08:00:52",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "ssbuild",
"github_project": "ipc_worker",
"travis_ci": false,
"coveralls": false,
"github_actions": false,
"lcname": "ipc-worker"
}