# Multiprocessing_gRPC_Load_Balancer
Load Balancer for multiprocessing grpc server in **Linux**.
> [!IMPORTANT]
> The server doesn't work in Windows, this is because of a property that Linux have and Windows no.
> The *socket.REUSE_PORT*
The search for the server works in both OS.
## Installation
`pip install Multiprocessing_gRPC_Load_Balancer`
## How to use
This is a short code to show how this library works.
> [!IMPORTANT]
> If you want to share data, instances or classes between your servers, you have to handle your self.
> The gRPC class server is created on each process, but you can give arguments already initialized whit multiprocessing capabilities.
### Server
```python
import grpc, time
from typing import Iterable
from Multiprocessing_gRPC_Load_Balancer import Multiprocessing_gRPC_Load_Balancer_Server
from proto_test.Test_pb2_grpc import Test_StreamerServicer, add_Test_StreamerServicer_to_server
from proto_test.Test_pb2 import send_msg, response_msg
class Test_Server(Test_StreamerServicer):
def __init__(self):
pass
def One_to_One(self, request: send_msg, context: grpc.ServicerContext) -> response_msg:
print(request)
time.sleep(1)
return response_msg(success = True, msg = str(len(request.send)))
def Many_to_One(self, request_iterator: Iterable[send_msg], context: grpc.ServicerContext) -> response_msg:
ret = []
for request in request_iterator:
print(request)
time.sleep(1)
ret.append(request.send)
ret.reverse()
return response_msg(success = True, msg = ''.join(ret))
def One_to_Many(self, request: send_msg, context: grpc.ServicerContext) -> Iterable[response_msg]:
print(request)
for data in request.send:
time.sleep(1)
yield response_msg(success = True, msg = str(ord(data)))
def Many_to_Many(self, request_iterator: Iterable[send_msg], context: grpc.ServicerContext) -> Iterable[response_msg]:
for request in request_iterator:
print(request)
time.sleep(1)
yield response_msg(success = True, msg = str(len(request.send)))
if __name__ == '__main__':
# this is the linux port where it would be accessible, select one that was available
linux_port: int = 'a free port that you want to attach your server'
# this is the number of process that you want to spawn
num_of_process: int = 3
# this is the number of threads each grpc process will have
num_of_threads: int = 10
# this is the weight this server will have over the other ones, more weight indicates that will be preferable to select
num_of_weight: int = 1
server = Multiprocessing_gRPC_Load_Balancer_Server(linux_port, num_of_process, num_of_threads, num_of_weight)
# this is the class where that will handle the service methods, the class should be not initiated
grpc_service_cls = Test_Server
# this is the function to add the class to the server
add_service_to_server = add_Test_StreamerServicer_to_server
# this is if you want to block the code until you cancel it, or to continue running more code while the server is up, is default to True
## Important if you put the block arg to False you need to handle the infinite loop until you want to close it and run server.close()
## If you don't do this, the subprocess will continue consuming resources from your computer or server.
block_the_code: bool = True
# *args: List[Any] # is a list of arguments that you want to pass to the class onces is running on each child process
args_for_class = []
# **kwargs: Dict[str, Any] # is a dict of key value arguments that you want to pass to the class onces is running on each child process
kwargs_for_the_class = {}
server.start(grpc_service_cls, add_service_to_server, block_the_code, *args_for_class, **kwargs_for_the_class)
### This is just a way to block the code, you can use any way you want, just remember to close the server.
if not block_the_code:
from time import sleep
try:
while True:
sleep(86400) # 86400 seconds == 1 day
except:
server.close() # Close the server, this safetly stop and join every thread and subprocess created
```
### Client
```python
from typing import List, Union
from proto_test.Test_pb2 import send_msg
from proto_test.Test_pb2_grpc import Test_StreamerStub
from Multiprocessing_gRPC_Load_Balancer import search_servers
class Test_Client:
def __init__(self, servers: List[str]) -> None:
self.servers = servers
def one_to_one(self, data: str) -> List[Union[str, int]]:
with search_servers(self.servers) as channel:
test_stub = Test_StreamerStub(channel)
response = test_stub.One_to_One(send_msg(send = data))
ret = [response.success, int(response.msg)]
return ret
def many_to_one(self, data: List[str]) -> List[Union[str, int]]:
with search_servers(self.servers) as channel:
test_stub = Test_StreamerStub(channel)
response = test_stub.Many_to_One(iter([send_msg(send = x) for x in data]))
ret = [response.success, response.msg]
return ret
def one_to_many(self, data: str) -> List[List[Union[str, int]]]:
ret = []
with search_servers(self.servers) as channel:
test_stub = Test_StreamerStub(channel)
for response in test_stub.One_to_Many(send_msg(send = data)):
ret.append([response.success, int(response.msg)])
return ret
def many_to_many(self, data: List[str]) -> List[List[Union[str, int]]]:
ret = []
with search_servers(self.servers) as channel:
test_stub = Test_StreamerStub(channel)
for response in test_stub.Many_to_Many(iter([send_msg(send = x) for x in data])):
ret.append([response.success, int(response.msg)])
return ret
# just need to give the class a list of host + port of the linux server
# the code will search automatically what server is with less demand and more weight
client = Test_Client(['linux_ip1:linux_port1', 'linux_ip2:linux_port2', 'linux_ip3:linux_port3'])
# one data send, one data receive
client.one_to_one('Test')
# multiple data send, one data receive
client.many_to_one([x for x in 'Test'])
# multiple data send, one data receive
client.one_to_many('Test')
# return a list of each value if complete and the lenght of the text
client.many_to_many(['x'*x for x in range(10)])
```
Raw data
{
"_id": null,
"home_page": "https://github.com/Vault-of-Procrastination/Multiprocessing_gRPC_Load_Balancer",
"name": "Multiprocessing-gRPC-Load-Balancer",
"maintainer": "Vault-of-Procrastination",
"docs_url": null,
"requires_python": ">=3.12",
"maintainer_email": "vault_of_procrastination@outlook.com",
"keywords": "grpc load balancer multiprocessing prometheus monitoring",
"author": "Vault-of-Procrastination",
"author_email": "vault_of_procrastination@outlook.com",
"download_url": "https://files.pythonhosted.org/packages/0b/26/150cb936d15ec6a41e6331d44f9936494ccbc1f1b25e016221831e353b68/multiprocessing_grpc_load_balancer-0.0.2.tar.gz",
"platform": null,
"description": "# Multiprocessing_gRPC_Load_Balancer\r\nLoad Balancer for multiprocessing grpc server in **Linux**.\r\n\r\n> [!IMPORTANT]\r\n> The server doesn't work in Windows, this is because of a property that Linux have and Windows no.\r\n> The *socket.REUSE_PORT*\r\n\r\nThe search for the server works in both OS.\r\n\r\n## Installation\r\n`pip install Multiprocessing_gRPC_Load_Balancer`\r\n\r\n## How to use\r\nThis is a short code to show how this library works.\r\n\r\n> [!IMPORTANT]\r\n> If you want to share data, instances or classes between your servers, you have to handle your self.\r\n> The gRPC class server is created on each process, but you can give arguments already initialized whit multiprocessing capabilities.\r\n\r\n### Server\r\n```python\r\nimport grpc, time\r\nfrom typing import Iterable\r\n\r\nfrom Multiprocessing_gRPC_Load_Balancer import Multiprocessing_gRPC_Load_Balancer_Server\r\n\r\nfrom proto_test.Test_pb2_grpc import Test_StreamerServicer, add_Test_StreamerServicer_to_server\r\nfrom proto_test.Test_pb2 import send_msg, response_msg\r\n\r\nclass Test_Server(Test_StreamerServicer):\r\n def __init__(self):\r\n pass\r\n \r\n def One_to_One(self, request: send_msg, context: grpc.ServicerContext) -> response_msg:\r\n print(request)\r\n time.sleep(1)\r\n return response_msg(success = True, msg = str(len(request.send)))\r\n \r\n def Many_to_One(self, request_iterator: Iterable[send_msg], context: grpc.ServicerContext) -> response_msg:\r\n ret = []\r\n for request in request_iterator:\r\n print(request)\r\n time.sleep(1)\r\n ret.append(request.send)\r\n ret.reverse()\r\n return response_msg(success = True, msg = ''.join(ret))\r\n \r\n def One_to_Many(self, request: send_msg, context: grpc.ServicerContext) -> Iterable[response_msg]:\r\n print(request)\r\n for data in request.send:\r\n time.sleep(1)\r\n yield response_msg(success = True, msg = str(ord(data)))\r\n \r\n def Many_to_Many(self, request_iterator: Iterable[send_msg], context: grpc.ServicerContext) -> Iterable[response_msg]:\r\n for request in request_iterator:\r\n print(request)\r\n time.sleep(1)\r\n yield response_msg(success = True, msg = str(len(request.send)))\r\n\r\nif __name__ == '__main__':\r\n # this is the linux port where it would be accessible, select one that was available\r\n linux_port: int = 'a free port that you want to attach your server'\r\n \r\n # this is the number of process that you want to spawn\r\n num_of_process: int = 3\r\n \r\n # this is the number of threads each grpc process will have\r\n num_of_threads: int = 10\r\n \r\n # this is the weight this server will have over the other ones, more weight indicates that will be preferable to select\r\n num_of_weight: int = 1\r\n \r\n server = Multiprocessing_gRPC_Load_Balancer_Server(linux_port, num_of_process, num_of_threads, num_of_weight)\r\n \r\n # this is the class where that will handle the service methods, the class should be not initiated\r\n grpc_service_cls = Test_Server\r\n \r\n # this is the function to add the class to the server\r\n add_service_to_server = add_Test_StreamerServicer_to_server\r\n \r\n # this is if you want to block the code until you cancel it, or to continue running more code while the server is up, is default to True\r\n ## Important if you put the block arg to False you need to handle the infinite loop until you want to close it and run server.close()\r\n ## If you don't do this, the subprocess will continue consuming resources from your computer or server.\r\n block_the_code: bool = True\r\n \r\n # *args: List[Any] # is a list of arguments that you want to pass to the class onces is running on each child process\r\n args_for_class = []\r\n \r\n # **kwargs: Dict[str, Any] # is a dict of key value arguments that you want to pass to the class onces is running on each child process\r\n kwargs_for_the_class = {}\r\n \r\n server.start(grpc_service_cls, add_service_to_server, block_the_code, *args_for_class, **kwargs_for_the_class)\r\n \r\n ### This is just a way to block the code, you can use any way you want, just remember to close the server.\r\n if not block_the_code:\r\n from time import sleep\r\n try:\r\n while True:\r\n sleep(86400) # 86400 seconds == 1 day\r\n except:\r\n server.close() # Close the server, this safetly stop and join every thread and subprocess created\r\n```\r\n\r\n### Client\r\n```python\r\n\r\nfrom typing import List, Union\r\n\r\nfrom proto_test.Test_pb2 import send_msg\r\nfrom proto_test.Test_pb2_grpc import Test_StreamerStub\r\n\r\nfrom Multiprocessing_gRPC_Load_Balancer import search_servers\r\n\r\nclass Test_Client:\r\n def __init__(self, servers: List[str]) -> None:\r\n self.servers = servers\r\n \r\n def one_to_one(self, data: str) -> List[Union[str, int]]:\r\n with search_servers(self.servers) as channel:\r\n test_stub = Test_StreamerStub(channel)\r\n response = test_stub.One_to_One(send_msg(send = data))\r\n ret = [response.success, int(response.msg)]\r\n return ret\r\n \r\n def many_to_one(self, data: List[str]) -> List[Union[str, int]]:\r\n with search_servers(self.servers) as channel:\r\n test_stub = Test_StreamerStub(channel)\r\n response = test_stub.Many_to_One(iter([send_msg(send = x) for x in data]))\r\n ret = [response.success, response.msg]\r\n return ret\r\n \r\n def one_to_many(self, data: str) -> List[List[Union[str, int]]]:\r\n ret = []\r\n with search_servers(self.servers) as channel:\r\n test_stub = Test_StreamerStub(channel)\r\n for response in test_stub.One_to_Many(send_msg(send = data)):\r\n ret.append([response.success, int(response.msg)])\r\n return ret\r\n \r\n def many_to_many(self, data: List[str]) -> List[List[Union[str, int]]]:\r\n ret = []\r\n with search_servers(self.servers) as channel:\r\n test_stub = Test_StreamerStub(channel)\r\n for response in test_stub.Many_to_Many(iter([send_msg(send = x) for x in data])):\r\n ret.append([response.success, int(response.msg)])\r\n return ret\r\n\r\n# just need to give the class a list of host + port of the linux server\r\n# the code will search automatically what server is with less demand and more weight\r\nclient = Test_Client(['linux_ip1:linux_port1', 'linux_ip2:linux_port2', 'linux_ip3:linux_port3'])\r\n\r\n# one data send, one data receive\r\nclient.one_to_one('Test')\r\n\r\n# multiple data send, one data receive\r\nclient.many_to_one([x for x in 'Test'])\r\n\r\n# multiple data send, one data receive\r\nclient.one_to_many('Test')\r\n\r\n# return a list of each value if complete and the lenght of the text\r\nclient.many_to_many(['x'*x for x in range(10)])\r\n```\r\n\r\n\r\n",
"bugtrack_url": null,
"license": "Apache Software License 2.0",
"summary": "Its a Load Balancer for multiprocessing gRPC Servers.",
"version": "0.0.2",
"project_urls": {
"Download": "https://github.com/vault-of-procrastination/Multiprocessing_gRPC_Load_Balancer/tarball/0.0.1",
"Homepage": "https://github.com/Vault-of-Procrastination/Multiprocessing_gRPC_Load_Balancer"
},
"split_keywords": [
"grpc",
"load",
"balancer",
"multiprocessing",
"prometheus",
"monitoring"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "bfa8c8edbf1dbc02df61d60b00fdd64689b7050dca02de7c3e0c57e0a9b5b317",
"md5": "f651a37d1da8497e59a300309e711129",
"sha256": "ff5a7a7f71c81335a549afd70a2370580df52642790c2846bf58318e55ca362d"
},
"downloads": -1,
"filename": "Multiprocessing_gRPC_Load_Balancer-0.0.2-py3-none-any.whl",
"has_sig": false,
"md5_digest": "f651a37d1da8497e59a300309e711129",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.12",
"size": 14529,
"upload_time": "2024-07-14T15:26:15",
"upload_time_iso_8601": "2024-07-14T15:26:15.239265Z",
"url": "https://files.pythonhosted.org/packages/bf/a8/c8edbf1dbc02df61d60b00fdd64689b7050dca02de7c3e0c57e0a9b5b317/Multiprocessing_gRPC_Load_Balancer-0.0.2-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "0b26150cb936d15ec6a41e6331d44f9936494ccbc1f1b25e016221831e353b68",
"md5": "3672767b5108e169781fe9b9b39240d8",
"sha256": "e93cf5a112be54988b7e2d8558abf55b9a9fb891b731d7df3600a2df885eb3f7"
},
"downloads": -1,
"filename": "multiprocessing_grpc_load_balancer-0.0.2.tar.gz",
"has_sig": false,
"md5_digest": "3672767b5108e169781fe9b9b39240d8",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.12",
"size": 12249,
"upload_time": "2024-07-14T15:26:17",
"upload_time_iso_8601": "2024-07-14T15:26:17.148183Z",
"url": "https://files.pythonhosted.org/packages/0b/26/150cb936d15ec6a41e6331d44f9936494ccbc1f1b25e016221831e353b68/multiprocessing_grpc_load_balancer-0.0.2.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-07-14 15:26:17",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "Vault-of-Procrastination",
"github_project": "Multiprocessing_gRPC_Load_Balancer",
"travis_ci": false,
"coveralls": false,
"github_actions": false,
"requirements": [],
"lcname": "multiprocessing-grpc-load-balancer"
}