Multiprocessing-gRPC-Load-Balancer


NameMultiprocessing-gRPC-Load-Balancer JSON
Version 0.0.2 PyPI version JSON
download
home_pagehttps://github.com/Vault-of-Procrastination/Multiprocessing_gRPC_Load_Balancer
SummaryIts a Load Balancer for multiprocessing gRPC Servers.
upload_time2024-07-14 15:26:17
maintainerVault-of-Procrastination
docs_urlNone
authorVault-of-Procrastination
requires_python>=3.12
licenseApache Software License 2.0
keywords grpc load balancer multiprocessing prometheus monitoring
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Multiprocessing_gRPC_Load_Balancer
Load Balancer for multiprocessing grpc server in **Linux**.

> [!IMPORTANT]
> The server doesn't work in Windows, this is because of a property that Linux have and Windows no.
> The *socket.REUSE_PORT*

The search for the server works in both OS.

## Installation
`pip install Multiprocessing_gRPC_Load_Balancer`

## How to use
This is a short code to show how this library works.

> [!IMPORTANT]
> If you want to share data, instances or classes between your servers, you have to handle your self.
> The gRPC class server is created on each process, but you can give arguments already initialized whit multiprocessing capabilities.

### Server
```python
import grpc, time
from typing import Iterable

from Multiprocessing_gRPC_Load_Balancer import Multiprocessing_gRPC_Load_Balancer_Server

from proto_test.Test_pb2_grpc import Test_StreamerServicer, add_Test_StreamerServicer_to_server
from proto_test.Test_pb2 import send_msg, response_msg

class Test_Server(Test_StreamerServicer):
    def __init__(self):
        pass
    
    def One_to_One(self, request: send_msg, context: grpc.ServicerContext) -> response_msg:
        print(request)
        time.sleep(1)
        return response_msg(success = True, msg = str(len(request.send)))
    
    def Many_to_One(self, request_iterator: Iterable[send_msg], context: grpc.ServicerContext) -> response_msg:
        ret = []
        for request in request_iterator:
            print(request)
            time.sleep(1)
            ret.append(request.send)
        ret.reverse()
        return response_msg(success = True, msg = ''.join(ret))
    
    def One_to_Many(self, request: send_msg, context: grpc.ServicerContext) -> Iterable[response_msg]:
        print(request)
        for data in request.send:
            time.sleep(1)
            yield response_msg(success = True, msg = str(ord(data)))
    
    def Many_to_Many(self, request_iterator: Iterable[send_msg], context: grpc.ServicerContext) -> Iterable[response_msg]:
        for request in request_iterator:
            print(request)
            time.sleep(1)
            yield response_msg(success = True, msg = str(len(request.send)))

if __name__ == '__main__':
    # this is the linux port where it would be accessible, select one that was available
    linux_port: int = 'a free port that you want to attach your server'
    
    # this is the number of process that you want to spawn
    num_of_process: int = 3
    
    # this is the number of threads each grpc process will have
    num_of_threads: int = 10
    
    # this is the weight this server will have over the other ones, more weight indicates that will be preferable to select
    num_of_weight: int = 1
    
    server = Multiprocessing_gRPC_Load_Balancer_Server(linux_port, num_of_process, num_of_threads, num_of_weight)
    
    # this is the class where that will handle the service methods, the class should be not initiated
    grpc_service_cls = Test_Server
    
    # this is the function to add the class to the server
    add_service_to_server = add_Test_StreamerServicer_to_server
    
    # this is if you want to block the code until you cancel it, or to continue running more code while the server is up, is default to True
    ## Important if you put the block arg to False you need to handle the infinite loop until you want to close it and run server.close()
    ## If you don't do this, the subprocess will continue consuming resources from your computer or server.
    block_the_code: bool = True
    
    # *args: List[Any] # is a list of arguments that you want to pass to the class onces is running on each child process
    args_for_class = []
    
    # **kwargs: Dict[str, Any] # is a dict of key value arguments that you want to pass to the class onces is running on each child process
    kwargs_for_the_class = {}
    
    server.start(grpc_service_cls, add_service_to_server, block_the_code, *args_for_class, **kwargs_for_the_class)
    
    ### This is just a way to block the code, you can use any way you want, just remember to close the server.
    if not block_the_code:
        from time import sleep
        try:
            while True:
                sleep(86400) # 86400 seconds == 1 day
        except:
            server.close() # Close the server, this safetly stop and join every thread and subprocess created
```

### Client
```python

from typing import List, Union

from proto_test.Test_pb2 import send_msg
from proto_test.Test_pb2_grpc import Test_StreamerStub

from Multiprocessing_gRPC_Load_Balancer import search_servers

class Test_Client:
    def __init__(self, servers: List[str]) -> None:
        self.servers = servers
    
    def one_to_one(self, data: str) -> List[Union[str, int]]:
        with search_servers(self.servers) as channel:
            test_stub = Test_StreamerStub(channel)
            response = test_stub.One_to_One(send_msg(send = data))
            ret = [response.success, int(response.msg)]
        return ret
    
    def many_to_one(self, data: List[str]) -> List[Union[str, int]]:
        with search_servers(self.servers) as channel:
            test_stub = Test_StreamerStub(channel)
            response = test_stub.Many_to_One(iter([send_msg(send = x) for x in data]))
            ret = [response.success, response.msg]
        return ret
    
    def one_to_many(self, data: str) -> List[List[Union[str, int]]]:
        ret = []
        with search_servers(self.servers) as channel:
            test_stub = Test_StreamerStub(channel)
            for response in test_stub.One_to_Many(send_msg(send = data)):
                ret.append([response.success, int(response.msg)])
        return ret
    
    def many_to_many(self, data: List[str]) -> List[List[Union[str, int]]]:
        ret = []
        with search_servers(self.servers) as channel:
            test_stub = Test_StreamerStub(channel)
            for response in test_stub.Many_to_Many(iter([send_msg(send = x) for x in data])):
                ret.append([response.success, int(response.msg)])
        return ret

# just need to give the class a list of host + port of the linux server
# the code will search automatically what server is with less demand and more weight
client = Test_Client(['linux_ip1:linux_port1', 'linux_ip2:linux_port2', 'linux_ip3:linux_port3'])

# one data send, one data receive
client.one_to_one('Test')

# multiple data send, one data receive
client.many_to_one([x for x in 'Test'])

# multiple data send, one data receive
client.one_to_many('Test')

# return a list of each value if complete and the lenght of the text
client.many_to_many(['x'*x for x in range(10)])
```



            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/Vault-of-Procrastination/Multiprocessing_gRPC_Load_Balancer",
    "name": "Multiprocessing-gRPC-Load-Balancer",
    "maintainer": "Vault-of-Procrastination",
    "docs_url": null,
    "requires_python": ">=3.12",
    "maintainer_email": "vault_of_procrastination@outlook.com",
    "keywords": "grpc load balancer multiprocessing prometheus monitoring",
    "author": "Vault-of-Procrastination",
    "author_email": "vault_of_procrastination@outlook.com",
    "download_url": "https://files.pythonhosted.org/packages/0b/26/150cb936d15ec6a41e6331d44f9936494ccbc1f1b25e016221831e353b68/multiprocessing_grpc_load_balancer-0.0.2.tar.gz",
    "platform": null,
    "description": "# Multiprocessing_gRPC_Load_Balancer\r\nLoad Balancer for multiprocessing grpc server in **Linux**.\r\n\r\n> [!IMPORTANT]\r\n> The server doesn't work in Windows, this is because of a property that Linux have and Windows no.\r\n> The *socket.REUSE_PORT*\r\n\r\nThe search for the server works in both OS.\r\n\r\n## Installation\r\n`pip install Multiprocessing_gRPC_Load_Balancer`\r\n\r\n## How to use\r\nThis is a short code to show how this library works.\r\n\r\n> [!IMPORTANT]\r\n> If you want to share data, instances or classes between your servers, you have to handle your self.\r\n> The gRPC class server is created on each process, but you can give arguments already initialized whit multiprocessing capabilities.\r\n\r\n### Server\r\n```python\r\nimport grpc, time\r\nfrom typing import Iterable\r\n\r\nfrom Multiprocessing_gRPC_Load_Balancer import Multiprocessing_gRPC_Load_Balancer_Server\r\n\r\nfrom proto_test.Test_pb2_grpc import Test_StreamerServicer, add_Test_StreamerServicer_to_server\r\nfrom proto_test.Test_pb2 import send_msg, response_msg\r\n\r\nclass Test_Server(Test_StreamerServicer):\r\n    def __init__(self):\r\n        pass\r\n    \r\n    def One_to_One(self, request: send_msg, context: grpc.ServicerContext) -> response_msg:\r\n        print(request)\r\n        time.sleep(1)\r\n        return response_msg(success = True, msg = str(len(request.send)))\r\n    \r\n    def Many_to_One(self, request_iterator: Iterable[send_msg], context: grpc.ServicerContext) -> response_msg:\r\n        ret = []\r\n        for request in request_iterator:\r\n            print(request)\r\n            time.sleep(1)\r\n            ret.append(request.send)\r\n        ret.reverse()\r\n        return response_msg(success = True, msg = ''.join(ret))\r\n    \r\n    def One_to_Many(self, request: send_msg, context: grpc.ServicerContext) -> Iterable[response_msg]:\r\n        print(request)\r\n        for data in request.send:\r\n            time.sleep(1)\r\n            yield response_msg(success = True, msg = str(ord(data)))\r\n    \r\n    def Many_to_Many(self, request_iterator: Iterable[send_msg], context: grpc.ServicerContext) -> Iterable[response_msg]:\r\n        for request in request_iterator:\r\n            print(request)\r\n            time.sleep(1)\r\n            yield response_msg(success = True, msg = str(len(request.send)))\r\n\r\nif __name__ == '__main__':\r\n    # this is the linux port where it would be accessible, select one that was available\r\n    linux_port: int = 'a free port that you want to attach your server'\r\n    \r\n    # this is the number of process that you want to spawn\r\n    num_of_process: int = 3\r\n    \r\n    # this is the number of threads each grpc process will have\r\n    num_of_threads: int = 10\r\n    \r\n    # this is the weight this server will have over the other ones, more weight indicates that will be preferable to select\r\n    num_of_weight: int = 1\r\n    \r\n    server = Multiprocessing_gRPC_Load_Balancer_Server(linux_port, num_of_process, num_of_threads, num_of_weight)\r\n    \r\n    # this is the class where that will handle the service methods, the class should be not initiated\r\n    grpc_service_cls = Test_Server\r\n    \r\n    # this is the function to add the class to the server\r\n    add_service_to_server = add_Test_StreamerServicer_to_server\r\n    \r\n    # this is if you want to block the code until you cancel it, or to continue running more code while the server is up, is default to True\r\n    ## Important if you put the block arg to False you need to handle the infinite loop until you want to close it and run server.close()\r\n    ## If you don't do this, the subprocess will continue consuming resources from your computer or server.\r\n    block_the_code: bool = True\r\n    \r\n    # *args: List[Any] # is a list of arguments that you want to pass to the class onces is running on each child process\r\n    args_for_class = []\r\n    \r\n    # **kwargs: Dict[str, Any] # is a dict of key value arguments that you want to pass to the class onces is running on each child process\r\n    kwargs_for_the_class = {}\r\n    \r\n    server.start(grpc_service_cls, add_service_to_server, block_the_code, *args_for_class, **kwargs_for_the_class)\r\n    \r\n    ### This is just a way to block the code, you can use any way you want, just remember to close the server.\r\n    if not block_the_code:\r\n        from time import sleep\r\n        try:\r\n            while True:\r\n                sleep(86400) # 86400 seconds == 1 day\r\n        except:\r\n            server.close() # Close the server, this safetly stop and join every thread and subprocess created\r\n```\r\n\r\n### Client\r\n```python\r\n\r\nfrom typing import List, Union\r\n\r\nfrom proto_test.Test_pb2 import send_msg\r\nfrom proto_test.Test_pb2_grpc import Test_StreamerStub\r\n\r\nfrom Multiprocessing_gRPC_Load_Balancer import search_servers\r\n\r\nclass Test_Client:\r\n    def __init__(self, servers: List[str]) -> None:\r\n        self.servers = servers\r\n    \r\n    def one_to_one(self, data: str) -> List[Union[str, int]]:\r\n        with search_servers(self.servers) as channel:\r\n            test_stub = Test_StreamerStub(channel)\r\n            response = test_stub.One_to_One(send_msg(send = data))\r\n            ret = [response.success, int(response.msg)]\r\n        return ret\r\n    \r\n    def many_to_one(self, data: List[str]) -> List[Union[str, int]]:\r\n        with search_servers(self.servers) as channel:\r\n            test_stub = Test_StreamerStub(channel)\r\n            response = test_stub.Many_to_One(iter([send_msg(send = x) for x in data]))\r\n            ret = [response.success, response.msg]\r\n        return ret\r\n    \r\n    def one_to_many(self, data: str) -> List[List[Union[str, int]]]:\r\n        ret = []\r\n        with search_servers(self.servers) as channel:\r\n            test_stub = Test_StreamerStub(channel)\r\n            for response in test_stub.One_to_Many(send_msg(send = data)):\r\n                ret.append([response.success, int(response.msg)])\r\n        return ret\r\n    \r\n    def many_to_many(self, data: List[str]) -> List[List[Union[str, int]]]:\r\n        ret = []\r\n        with search_servers(self.servers) as channel:\r\n            test_stub = Test_StreamerStub(channel)\r\n            for response in test_stub.Many_to_Many(iter([send_msg(send = x) for x in data])):\r\n                ret.append([response.success, int(response.msg)])\r\n        return ret\r\n\r\n# just need to give the class a list of host + port of the linux server\r\n# the code will search automatically what server is with less demand and more weight\r\nclient = Test_Client(['linux_ip1:linux_port1', 'linux_ip2:linux_port2', 'linux_ip3:linux_port3'])\r\n\r\n# one data send, one data receive\r\nclient.one_to_one('Test')\r\n\r\n# multiple data send, one data receive\r\nclient.many_to_one([x for x in 'Test'])\r\n\r\n# multiple data send, one data receive\r\nclient.one_to_many('Test')\r\n\r\n# return a list of each value if complete and the lenght of the text\r\nclient.many_to_many(['x'*x for x in range(10)])\r\n```\r\n\r\n\r\n",
    "bugtrack_url": null,
    "license": "Apache Software License 2.0",
    "summary": "Its a Load Balancer for multiprocessing gRPC Servers.",
    "version": "0.0.2",
    "project_urls": {
        "Download": "https://github.com/vault-of-procrastination/Multiprocessing_gRPC_Load_Balancer/tarball/0.0.1",
        "Homepage": "https://github.com/Vault-of-Procrastination/Multiprocessing_gRPC_Load_Balancer"
    },
    "split_keywords": [
        "grpc",
        "load",
        "balancer",
        "multiprocessing",
        "prometheus",
        "monitoring"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "bfa8c8edbf1dbc02df61d60b00fdd64689b7050dca02de7c3e0c57e0a9b5b317",
                "md5": "f651a37d1da8497e59a300309e711129",
                "sha256": "ff5a7a7f71c81335a549afd70a2370580df52642790c2846bf58318e55ca362d"
            },
            "downloads": -1,
            "filename": "Multiprocessing_gRPC_Load_Balancer-0.0.2-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "f651a37d1da8497e59a300309e711129",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.12",
            "size": 14529,
            "upload_time": "2024-07-14T15:26:15",
            "upload_time_iso_8601": "2024-07-14T15:26:15.239265Z",
            "url": "https://files.pythonhosted.org/packages/bf/a8/c8edbf1dbc02df61d60b00fdd64689b7050dca02de7c3e0c57e0a9b5b317/Multiprocessing_gRPC_Load_Balancer-0.0.2-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "0b26150cb936d15ec6a41e6331d44f9936494ccbc1f1b25e016221831e353b68",
                "md5": "3672767b5108e169781fe9b9b39240d8",
                "sha256": "e93cf5a112be54988b7e2d8558abf55b9a9fb891b731d7df3600a2df885eb3f7"
            },
            "downloads": -1,
            "filename": "multiprocessing_grpc_load_balancer-0.0.2.tar.gz",
            "has_sig": false,
            "md5_digest": "3672767b5108e169781fe9b9b39240d8",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.12",
            "size": 12249,
            "upload_time": "2024-07-14T15:26:17",
            "upload_time_iso_8601": "2024-07-14T15:26:17.148183Z",
            "url": "https://files.pythonhosted.org/packages/0b/26/150cb936d15ec6a41e6331d44f9936494ccbc1f1b25e016221831e353b68/multiprocessing_grpc_load_balancer-0.0.2.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-07-14 15:26:17",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "Vault-of-Procrastination",
    "github_project": "Multiprocessing_gRPC_Load_Balancer",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": false,
    "requirements": [],
    "lcname": "multiprocessing-grpc-load-balancer"
}
        
Elapsed time: 0.27422s