aioconnectors


Nameaioconnectors JSON
Version 1.6.3 PyPI version JSON
download
home_pagehttps://github.com/mori-b/aioconnectors
SummarySimple secure asynchronous message queue
upload_time2023-05-18 17:35:34
maintainer
docs_urlNone
authorMori Benech
requires_python>=3.6
license
keywords message queue broker asyncio simple easy
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            [![PyPI version](https://badge.fury.io/py/aioconnectors.svg)](https://badge.fury.io/py/aioconnectors) [![Downloads](https://static.pepy.tech/personalized-badge/aioconnectors?period=total&units=international_system&left_color=grey&right_color=blue&left_text=downloads)](https://pepy.tech/project/aioconnectors)

           _                                 __
     ___ _(_)__  _______  ___  ___  ___ ____/ /____  _______
    / _ `/ / _ \/ __/ _ \/ _ \/ _ \/ -_) __/ __/ _ \/ __(_-<
    \_,_/_/\___/\__/\___/_//_/_//_/\__/\__/\__/\___/_/ /___/



# aioconnectors
**Simple secure asynchronous message queue**

*<a href="#features">Features</a>*  
*<a href="#installation">Installation</a>*  
*<a href="#exampleptp">Example Point to point : Server and Client</a>*  
*<a href="#exampleps">Example publish/subscribe : Broker, Subscriber, and Publisher</a>*  
*<a href="#hld">High Level Design</a>*  
*<a href="#usecases">Use Cases</a>*  
*<a href="#usage">Usage</a>*  
*<a href="#enc">1.Encryption</a>*  
*<a href="#run">2.Run a connector</a>*  
*<a href="#sendreceive">3.Send/receive messages</a>*  
*<a href="#classes">4.ConnectorManager and ConnectorAPI</a>*  
*<a href="#send">5.send_message</a>*  
*<a href="#management">6.Programmatic management tools</a>*  
*<a href="#cli">7.Command line interface management tools</a>*  
*<a href="#testing">8.Testing tools</a>*  
*<a href="#chat">9.Embedded chat</a>*  
*<a href="#containers">Containers</a>*  
*<a href="#windows">Windows</a>*  


<a name="features"></a>
## FEATURES

aioconnectors is an easy to set up message queue and broker that works on Unix like systems. Requirements are : Python >= 3.6, and openssl installed.  
It provides bidirectional transfer of messages and files, optional authentication and encryption, persistence and reconnection in case of connection loss, proxy support, client filtering.  
It is a point to point broker built on the client/server model, but both peers can push messages. It can also be easily configured as a publish/subscribe broker.  
Based on asyncio, message sending and receiving are asynchronous, with the option to wait asynchronously for a response.  
A connector can be configured with a short json file.  
An embedded command line tool enables to easily run a connector and manage it with shell commands.  
A simple Python API provides functions like starting/stopping a connector, sending a message, receiving messages, and other management capabilities. To support other languages for the API, the file standalone\_api.py only should be transpiled.


<a name="installation"></a>
## INSTALLATION

    pip3 install aioconnectors


<a name="exampleptp"></a>
## BASIC EXAMPLE - POINT TO POINT

You can run a connector with a single shell command 

    python3 -m aioconnectors create_connector <config_json_path>

This is covered in <a href="#run">2-</a>, but this example shows the programmatic way to run connectors.  
This is a basic example of a server and a client sending messages to each other. For more interesting examples, please refer to applications.py or aioconnectors\_test.py.  
For both server and client, connector\_manager is running the connector, and connector\_api is sending/receiving messages.  
In this example, connector\_manager and connector\_api are running in the same process for convenience. They can obviously run in different processes, as shown in the other examples.  
In this example we are running server and client on the same machine since server_sockaddr is set to "127.0.0.1".  
To run server and client on different machines, you should modify server_sockaddr value in both server and client code, with the ip address of the server.  
You can run multiple clients, just set a different client\_name for each client.  

1.No encryption  
You can run the following example code directly, the encryption is disabled.  
In case you want to use this example with encryption, you should read 2. and 3. after the examples.  

### Server example

    import asyncio
    import aioconnectors
    
    loop = asyncio.get_event_loop()
    server_sockaddr = ('127.0.0.1',10673)
    connector_files_dirpath = '/var/tmp/aioconnectors'
    
    #create connector
    connector_manager = aioconnectors.ConnectorManager(is_server=True, server_sockaddr=server_sockaddr, use_ssl=False, use_token=False,
                                                       ssl_allow_all=True, connector_files_dirpath=connector_files_dirpath,
                                                       certificates_directory_path=connector_files_dirpath,
                                                       send_message_types=['any'], recv_message_types=['any'],
                                                       file_recv_config={'any': {'target_directory':connector_files_dirpath}},
                                                       reuse_server_sockaddr=True)
    
    task_manager =  loop.create_task(connector_manager.start_connector())
    loop.run_until_complete(task_manager)
    
    #create api
    connector_api = aioconnectors.ConnectorAPI(is_server=True, server_sockaddr=server_sockaddr,
                                               connector_files_dirpath=connector_files_dirpath,
                                               send_message_types=['any'], recv_message_types=['any'],
                                               default_logger_log_level='INFO')
    
    #start receiving messages
    async def message_received_cb(logger, transport_json , data, binary):
        print('SERVER : message received', transport_json , data.decode())
    loop.create_task(connector_api.start_waiting_for_messages(message_type='any', message_received_cb=message_received_cb))
    
    #start sending messages
    async def send_messages(destination):
        await asyncio.sleep(2)
        index = 0
        while True:
            index += 1
            await connector_api.send_message(data={'application message': f'SERVER MESSAGE {index}'},
                                             message_type='any', destination_id=destination)
            await asyncio.sleep(1)
                                                    
    loop.create_task(send_messages(destination='client1'))
    
    try:
        print(f'Connector is running, check log at {connector_files_dirpath+"/aioconnectors.log"}'
              f', type Ctrl+C to stop')
        loop.run_forever()
    except:
        print('Connector stopped !')
    
    #stop receiving messages
    connector_api.stop_waiting_for_messages(message_type='any')
    
    #stop connector
    task_stop = loop.create_task(connector_manager.stop_connector(delay=None, hard=False, shutdown=True))
    loop.run_until_complete(task_stop)


### Client example

    import asyncio
    import aioconnectors
    
    loop = asyncio.get_event_loop()
    server_sockaddr = ('127.0.0.1',10673)
    connector_files_dirpath = '/var/tmp/aioconnectors'
    client_name = 'client1'
    
    #create connector
    connector_manager = aioconnectors.ConnectorManager(is_server=False, server_sockaddr=server_sockaddr,
                                                       use_ssl=False, ssl_allow_all=True, use_token=False,
                                                       connector_files_dirpath=connector_files_dirpath,
                                                       certificates_directory_path=connector_files_dirpath,
                                                       send_message_types=['any'], recv_message_types=['any'],
                                                       file_recv_config={'any': {'target_directory':connector_files_dirpath}},
                                                       client_name=client_name)
    
    task_manager =  loop.create_task(connector_manager.start_connector())
    loop.run_until_complete(task_manager)
    
    #create api
    connector_api = aioconnectors.ConnectorAPI(is_server=False, server_sockaddr=server_sockaddr,
                                               connector_files_dirpath=connector_files_dirpath, client_name=client_name,
                                               send_message_types=['any'], recv_message_types=['any'],
                                               default_logger_log_level='INFO')
    
    #start receiving messages
    async def message_received_cb(logger, transport_json , data, binary):
        print('CLIENT : message received', transport_json , data.decode())
    loop.create_task(connector_api.start_waiting_for_messages(message_type='any', message_received_cb=message_received_cb))
    
    #start sending messages
    async def send_messages():
        await asyncio.sleep(1)
        index = 0
        while True:
            index += 1
            await connector_api.send_message(data={'application message': f'CLIENT MESSAGE {index}'}, message_type='any')
            await asyncio.sleep(1)
                                           
    loop.create_task(send_messages())
    
    try:
        print(f'Connector is running, check log at {connector_files_dirpath+"/aioconnectors.log"}'
              f', type Ctrl+C to stop')
        loop.run_forever()
    except:
        print('Connector stopped !')
    
    #stop receiving messages
    connector_api.stop_waiting_for_messages(message_type='any')
    
    #stop connector
    task_stop = loop.create_task(connector_manager.stop_connector(delay=None, hard=False, shutdown=True))
    loop.run_until_complete(task_stop)


<a name="exampleps"></a>
## BASIC EXAMPLE - PUBLISH/SUBSCRIBE

You can run the following code of a broker, a publisher and a subscriber in 3 different shells on the same machine out of the box.  
You should modify some values as explained in the previous example in order to run on different machines, and with encryption.  

### Broker example

Just a server with pubsub\_central\_broker=True

    import asyncio
    import aioconnectors

    loop = asyncio.get_event_loop()
    server_sockaddr = ('127.0.0.1',10673)
    connector_files_dirpath = '/var/tmp/aioconnectors'

    #create connector
    connector_manager = aioconnectors.ConnectorManager(is_server=True, server_sockaddr=server_sockaddr, use_ssl=False, use_token=False,
                                                       ssl_allow_all=True, connector_files_dirpath=connector_files_dirpath,
                                                       certificates_directory_path=connector_files_dirpath,
                                                       send_message_types=['any'], recv_message_types=['any'],
                                                       file_recv_config={'any': {'target_directory':connector_files_dirpath}},
                                                       pubsub_central_broker=True, reuse_server_sockaddr=True)

    task_manager =  loop.create_task(connector_manager.start_connector())
    loop.run_until_complete(task_manager)

    #create api
    connector_api = aioconnectors.ConnectorAPI(is_server=True, server_sockaddr=server_sockaddr,
                                               connector_files_dirpath=connector_files_dirpath,
                                               send_message_types=['any'], recv_message_types=['any'],
                                               default_logger_log_level='INFO')

    #start receiving messages
    async def message_received_cb(logger, transport_json , data, binary):
        print('SERVER : message received', transport_json , data.decode())
    loop.create_task(connector_api.start_waiting_for_messages(message_type='any', message_received_cb=message_received_cb))

    try:
        print(f'Connector is running, check log at {connector_files_dirpath+"/aioconnectors.log"}'
              f', type Ctrl+C to stop')
        loop.run_forever()
    except:
        print('Connector stopped !')

    #stop receiving messages
    connector_api.stop_waiting_for_messages(message_type='any')

    #stop connector
    task_stop = loop.create_task(connector_manager.stop_connector(delay=None, hard=False, shutdown=True))
    loop.run_until_complete(task_stop)


### Subscriber example

Just a client with subscribe\_message\_types = [topic1, topic2, ...]

    import asyncio
    import aioconnectors

    loop = asyncio.get_event_loop()
    server_sockaddr = ('127.0.0.1',10673)
    connector_files_dirpath = '/var/tmp/aioconnectors'
    client_name = 'client2'

    #create connector
    connector_manager = aioconnectors.ConnectorManager(is_server=False, server_sockaddr=server_sockaddr,
                                                       use_ssl=False, ssl_allow_all=True, use_token=False,
                                                       connector_files_dirpath=connector_files_dirpath,
                                                       certificates_directory_path=connector_files_dirpath,
                                                       send_message_types=['any'], recv_message_types=['type1'],
                                                       file_recv_config={'type1': {'target_directory':connector_files_dirpath}},
                                                       client_name=client_name, subscribe_message_types=["type1"])

    task_manager =  loop.create_task(connector_manager.start_connector())
    loop.run_until_complete(task_manager)

    #create api
    connector_api = aioconnectors.ConnectorAPI(is_server=False, server_sockaddr=server_sockaddr,
                                               connector_files_dirpath=connector_files_dirpath, client_name=client_name,
                                               send_message_types=['any'], recv_message_types=['type1'],
                                               default_logger_log_level='INFO')

    #start receiving messages
    async def message_received_cb(logger, transport_json , data, binary):
        print('CLIENT : message received', transport_json , data.decode())
    loop.create_task(connector_api.start_waiting_for_messages(message_type='type1', message_received_cb=message_received_cb))

    '''
    #start sending messages
    async def send_messages():
        await asyncio.sleep(1)
        index = 0
        while True:
            index += 1
            await connector_api.send_message(data={'application message': f'CLIENT MESSAGE {index}'}, message_type='any')
            await asyncio.sleep(1)
                                           
    loop.create_task(send_messages())
    '''

    try:
        print(f'Connector is running, check log at {connector_files_dirpath+"/aioconnectors.log"}'
              f', type Ctrl+C to stop')
        loop.run_forever()
    except:
        print('Connector stopped !')

    #stop receiving messages
    connector_api.stop_waiting_for_messages(message_type='type1')

    #stop connector
    task_stop = loop.create_task(connector_manager.stop_connector(delay=None, hard=False, shutdown=True))
    loop.run_until_complete(task_stop)


### Publisher example

Just a client which uses publish\_message instead of send\_message

    import asyncio
    import aioconnectors

    loop = asyncio.get_event_loop()
    server_sockaddr = ('127.0.0.1',10673)
    connector_files_dirpath = '/var/tmp/aioconnectors'
    client_name = 'client1'

    #create connector
    connector_manager = aioconnectors.ConnectorManager(is_server=False, server_sockaddr=server_sockaddr,
                                                       use_ssl=False, ssl_allow_all=True, use_token=False,
                                                       connector_files_dirpath=connector_files_dirpath,
                                                       certificates_directory_path=connector_files_dirpath,
                                                       send_message_types=['type1','type2'], recv_message_types=['any'],
                                                       file_recv_config={'any': {'target_directory':connector_files_dirpath}},
                                                       client_name=client_name, disk_persistence_send=True)

    task_manager =  loop.create_task(connector_manager.start_connector())
    loop.run_until_complete(task_manager)

    #create api
    connector_api = aioconnectors.ConnectorAPI(is_server=False, server_sockaddr=server_sockaddr,
                                               connector_files_dirpath=connector_files_dirpath, client_name=client_name,
                                               send_message_types=['type1','type2'], recv_message_types=['any'],
                                               default_logger_log_level='INFO')

    #start receiving messages
    #async def message_received_cb(logger, transport_json , data, binary):
    #    print('CLIENT : message received', transport_json , data.decode())
    #loop.create_task(connector_api.start_waiting_for_messages(message_type='any', message_received_cb=message_received_cb))

    #start sending messages
    async def send_messages():
        await asyncio.sleep(1)
        index = 0
        #with_file={'src_path':'file_test','dst_type':'any', 'dst_name':'file_dest', 
        #           'delete':False, 'owner':'nobody:nogroup'}                  
        while True:
            index += 1
            print(f'CLIENT : message {index} published')
            #connector_api.publish_message_sync(data={'application message': f'CLIENT MESSAGE {index}'}, message_type='type1')#,        
            await connector_api.publish_message(data={'application message': f'CLIENT MESSAGE {index}'}, message_type='type1')#,
                                                #with_file=with_file, binary=b'\x01\x02\x03')
            #await connector_api.publish_message(data={'application message': f'CLIENT MESSAGE {index}'}, message_type='type2')#,                                            
            await asyncio.sleep(1)
                                           
    loop.create_task(send_messages())

    try:
        print(f'Connector is running, check log at {connector_files_dirpath+"/aioconnectors.log"}'
              f', type Ctrl+C to stop')
        loop.run_forever()
    except:
        print('Connector stopped !')

    #stop receiving messages
    connector_api.stop_waiting_for_messages(message_type='any')

    #stop connector
    task_stop = loop.create_task(connector_manager.stop_connector(delay=None, hard=False, shutdown=True))
    loop.run_until_complete(task_stop)

2.Encryption without authentication  
In order to use encryption, you should set use\_ssl to True in both server and client ConnectorManager instantiations.  
A directory containing certificates must be created before running the example, which is done by a single command :

    python3 -m aioconnectors create_certificates

If you decide to use server_ca=true on your connector server, then you need to add "--ca" (<a href="#classes">4-</a>).  
If you run server and client on different machines, this command should be run on both machines.  

3.Encryption with authentication  
In this example, the kwarg ssl\_allow\_all is true (both on server and client), meaning the communication between server and client if encrypted is not authenticated.  
In case you want to run this example with authentication too, you have 2 options :  
3.1. Set use\_ssl to True and ssl\_allow\_all to False in both server and client ConnectorManager instantiations.  
If you run server and client on the same machine, this only requires to run the command "python3 -m aioconnectors create\_certificates" beforehand like in 2.  
In case the server and client run on different machines, you should run the prerequisite command "python3 -m aioconnectors create_certificates" only once, and copy the generated directory /var/tmp/aioconnectors/certificates/server to your server (preserving symlinks) and /var/tmp/aioconnectors/certificates/client to your client.  
3.2. Set use\_ssl to True, ssl\_allow\_all to True, and use\_token to True, in both server and client ConnectorManager instantiations, to use token authentication. This also requires to run beforehand "python3 -m aioconnectors create_certificates".  


<a name="hld"></a>
## HIGH LEVEL DESIGN

The client and server are connected by one single tcp socket.
When a peer sends a message, it is first sent by unix socket to the connector, then transferred to a different queue for each remote peer. Messages are read from these priority queues and sent to the remote peer on the client/server socket. After a message reaches its peer, it is sent to a queue, one queue per message type. The api listens on a unix socket to receive messages of a specific type, that are read from the corresponding queue.  
The optional encryption uses TLS. The server certificate and the default client certificate are automatically generated and pre-shared, so that a server or client without prior knowledge of these certificates cannot communicate. Then, the server generates on the fly a new certificate per client, so that different clients cannot interfere with one another. Alternatively, the server can generate on the fly a new token per client.  


<a name="usecases"></a>
## USE CASES

-The standard use case is running server and client on separate stations. Each client station can then initiate a connection to the server station.  
The valid message topics are defined in the server and client configurations (send\_message\_types and recv\_message\_types), and the messages are sent point to point.  
In order to have all clients/server connections authenticated and encrypted, you just have to call

    python3 -m aioconnectors create_certificates <optional_directory_path>

And then share the created directories between server and clients as explained in <a href="#enc">1-</a>.  
You can also use a proxy between your client and server, as explained in <a href="#classes">4-</a>.  

-You might prefer to use a publish/subscribe approach.  
This is also supported by configuring a single server as the broker (you just need to set pubsub\_central\_broker=True).  
The other connectors should be clients. A client can subscribe to specific topics (message\_types) by setting the attribute subscribe\_message\_types in its constructor, or by calling the set\_subscribe\_message\_types command on the fly.  

-You might want both sides to be able to initiate a connection, or even to have multiple nodes being able to initiate connections between one another.  
The following lines describe a possible approach to do that using aioconnectors.  
Each node should be running an aioconnector server, and be able to also spawn a aioconnector client each time it initiates a connection to a different remote server. A new application layer handling these connectors could be created, and run on each node.  
Your application might need to know if a peer is already connected before initiating a connection : to do so, you might use the connector_manager.show\_connected\_peers method (explained in <a href="#cli">7-</a>).  
Your application might need to be able to disconnect a specific client on the server : to do so, you might use the connector\_manager.disconnect\_client method.  
A comfortable approach would be to share the certificates directories created in the first step between all the nodes. All nodes would share the same server certificate, and use the same client default certificate to initiate the connection (before receiving their individual certificate). The only differences between clients configurations would be their client_name, and their remote server (the configurations are explained in <a href="#classes">4-</a>).  

-There are multiple tools to let the server filter clients. Your application might need to decide whether to accept a client connection or not.  
The following tools filter clients in this order :  
whitelisted_clients_ip/subnet : in configuration file, or on the fly with add_whitelist_client (it updates the configuration file).  
hook_whitelist_clients(extra_info, source_id) : coroutine that lets you take a decision after having filtered a non whitelisted client (maybe allow it from now on).  
blacklisted_clients_ip/subnet: in configuration file or on the fly with add_blacklist_client.  
whitelisted_clients_id : in configuration file or on the fly with add_whitelist_client (uses regex).  
hook_whitelist_clients(extra_info, source_id) : same.  
blacklisted_clients_id : in configuration file or on the fly with add_blacklist_client (uses regex).  
hook_allow_certificate_creation(source_id) : coroutine that lets you prevent certificate creation based on the source_id.  
hook_server_auth_client(source_id) : coroutine that gives a last opportunity to filter the source_id.  
The hooks must be fed to the ConnectorManager constructor (explained in <a href="#classes">4-</a>).  


<a name="usage"></a>
## USAGE

aioconnectors provides the ConnectorManager class which runs the connectors, and the ConnectorAPI class which sends and receives messages. It provides as well the ConnectorRemoteTool class which can lightly manage the connector outside of the ConnectorManager.  
The ConnectorManager client and server can run on different machines. However, ConnectorAPI and ConnectorRemoteTool communicate internally with their ConnectorManager, and the three must run on the same machine.  
aioconnectors also provides a command line tool accessible by typing

    python3 -m aioconnectors --help


<a name="enc"></a>
### 1.Encryption

Encryption mode is, as everything else, configurable through the ConnectorManager kwargs or config file, as explained later in <a href="#classes">4-</a>. The relevant parameters are use_ssl and ssl_allow_all.  
The default mode is the most secure : use_ssl is enabled and ssl\_allow\_all is disabled, both on server and client.  
-If you choose to use encryption, you should call

    python3 -m aioconnectors create_certificates [<optional_directory_path>] [--ca] [--help]

A directory called "certificates" will be created under your optional\_directory\_path, or under /var/tmp/aioconnectors if not specified.
Under it, 2 subdirectories will be created : certificates/server and certificates/client.  
You need to copy certificates/server to your server (preserving symlinks), and certificates/client to your client. That's all you have to do.  
This is the recommended approach, since it ensures traffic encryption, client and server authentication, and prevents client impersonation.  
Clients use the default certificate to first connect to server, then an individual certificate is generated by the server for each client. Client automatically uses this individual certificate for further connections. This individual certificate is mapped to the client_name.  
The first client named client_name reaching the server is granted a certificate for this client_name. Different clients further attempting to use the same client_name will be rejected.  
When server\_ca is false on server side (default), the client certificates are checked against the certificates pem kept on the server, otherwise against the server CA.  
When using ssl, the default approach is to have server\_ca false (default), meaning your server will generate and manage self signed client certificates, providing certificates visibility, and tools like delete\_client\_certificate to delete client certificates on the fly.  
Using server\_ca true lets your server become a CA with a self signed CA certificate that will sign your client certificates. If you choose to run your server with server\_ca true, then you need the --ca argument in create\_certificates, otherwise you don't need it (default).  
The server\_ca true mode comes with server\_ca\_certs\_not\_stored enabled by default, meaning the client certificates are deleted from server side. Not having to store the client certificates on the server might be an advantage but it doesn't enable you to delete them : if you want to be able to delete them in ca mode, then you might just use server\_ca false. The server\_ca\_certs\_not\_stored option set to false requires to delete the certificates yourself, since it is not currently supported when server_ca is true : this implementation would require something like "openssl ca -gencrl -config certificates/server/server\_ca\_details.conf -out revoked.pem", and also "SSLContext.verify\_flags |= ssl.VERIFY\_CRL\_CHECK\_LEAF" before loading the revoked.pem into SSLContext.load\_verify\_locations.  
-The client also checks the server certificate to prevent MITM.  
Instead of using the generated server certificate, you also have the option to use a hostname for your server and use a CA signed server certificate that the clients will verify. For that you should :  
-On server side, under "certificates" directory, replace /server/server-cert/server.pem and server.key with your signed certificates. You don't need to do that manually, there is a tool that does it :  

    python3 -m aioconnectors replace_server_certificate <custom_server_pem_file_path> [<optional_directory_path>]
    
Note that the custom server pem file should contain the whole chain of .crt including the intermediate certificates.  
and in case you want to roll back to use the original generated server certificate :  

    python3 -m aioconnectors replace_server_certificate --revert
    
-On client side, configure server\_sockaddr with the server hostname instead of IP address, and set client\_cafile\_verify\_server to be the ca cert path (like /etc/ssl/certs/ca-certificates.crt), to enable CA verification of your server certificate.  
-You can delete a client certificate on the server (and also on client) by calling delete\_client\_certificate in

    python3 -m aioconnectors cli

For this purpose, you can also call programmatically the ConnectorManager.delete_client\_certificate method.  
-You shouldn't need to modify the certificates, however there is a way to tweak the certificates template : run create\_certificates once, then modify certificates/server/csr\_details\_template.conf according to your needs (without setting the Organization field), delete other directories under certificates and run create\_certificates again.  
-On server side, you can store manually additional default certificates with their symlink, under certificates/server/client-certs/symlinks. They must be called defaultN where N is an integer, or be another CA certificate in case server\_ca is true.  
-Other options :  

-ssl\_allow\_all and use\_token enabled : this is a similar approach but instead of generating a certificate per client, the server generates a token per client. This approach is simpler. Note that you can also delete the token on the fly by calling delete\_client\_token.  
You can combine ssl\_allow\_all with token\_verify\_peer\_cert (on client and server) and token\_client\_send\_cert (on client) : in order to authenticate the default certificate only. On client side the token\_verify\_peer\_cert can also be the path of ca certificates (like /etc/ssl/certs/ca-certificates.crt) or custom server public certificate.  
token\_client\_verify\_server\_hostname can be the server hostname that your client authenticates (through its certificate).  

By setting ssl\_allow\_all on both sever and client, you can use encryption without the hassle of sharing certificates. In such a case you can run independently create_certificates on server and client side, without the need to copy a directory. This disables authentication, so that any client and server can communicate.  

By unsetting use_ssl, you can disable encryption at all.


<a name="run"></a>
### 2.You have 2 options to run your connectors, either through the command line tool, or programmatically.

2.1.Command line tool  
-To configure the Connector Manager, create a <config\_json\_path> file based on the Manager template json, and configure it according to your needs (more details in <a href="#classes">4-</a>). Relevant for both server and client.  
A Manager template json can be obtained by calling : 

    python3 -m aioconnectors print_config_templates

-Then create and start you connector (both server and client, each with its own <config_json_path>)

    python3 -m aioconnectors create_connector <config_json_path>

If you are testing your connector server and client on the same machine, you can use the configuration generated by print\_config\_templates almost out of the box.  
The only change you should do is set is\_server to False in the client configuration, and use\_ssl to False in both configurations (unless you already run "python3 -m aioconnectors create_certificates").  
If you want to test messages sending/receiving, you should also set a client\_name value in the client configuration.  
Then you can use the other command line testing facilites mentioned in <a href="#testing">8-</a> : on both server and client you can run "python3 -m aioconnectors test\_receive\_messages <config\_json\_path>" and "python3 -m aioconnectors test\_send\_messages <config\_json\_path>".  

2.2.Programmatically, examples are provided in applications.py and in aioconnectors\_test.py.  
To create and start a connector :

    connector_manager = aioconnectors.ConnectorManager(config_file_path=config_file_path)
    await connector_manager.start_connector()

To stop a connector :

    await connector_manager.stop_connector()

To shutdown a connector :

    await connector_manager.stop_connector(shutdown=True)

You don't have to use a config file (config\_file\_path), you can also directly initialize your ConnectorManager kwargs, as shown in the previous basic examples, and in aioconnectors\_test.py.


<a name="sendreceive"></a>
### 3.send/receive messages with the API

3.1.To configure the Connector API, create a <config\_json\_path> file based on the API template json.
Relevant for both server and client. This connector_api config file is a subset of the connector_manager config file. So if you already have a relevant connector_manager config file on your machine, you can reuse it for connector_api, and you don't need to create a different connector_api config file.

    python3 -m aioconnectors print_config_templates
    connector_api = aioconnectors.ConnectorAPI(config_file_path=config_file_path)

3.2.Or you can directly initialize your ConnectorAPI kwargs  

Then you can send and receive messages by calling the following coroutines in your program, as shown in aioconnectors\_test.py, and in applications.py (test\_receive\_messages and test\_send\_messages).  

3.3.To send messages : 

    await connector_api.send_message(data=None, binary=None, **kwargs)

This returns a status (True or False).  
"data" is your message, "binary" is an optional additional binary message in case you want your "data" to be a json for example.
If your "data" is already a binary, then the "binary" field isn't necessary.  
kwargs contain all the transport instructions for this message, as explained in <a href="#send">5-</a>.  
If you set the await\_response kwarg to True, this returns the response, which is a (transport\_json , data, binary) triplet.  
The received transport\_json field contains all the kwargs sent by the peer.  
You can also send messages synchronously, with :

    connector_api.send_message_sync(data=None, binary=None, **kwargs)

Similarly, use the "publish\_message" and "publish\_message\_sync" methods in the publish/subscribe approach.  
More details in <a href="#send">5-</a>.  

3.4.To register to receive messages of a specific message\_type : 

    await connector_api.start_waiting_for_messages(message_type='', message_received_cb=message_received_cb, reuse_uds_path=False)

-**binary** is an optional binary message (or None).  
-**data** is the message data bytes. It is always bytes, so if it was originally sent as a json or a string, you'll have to convert it back by yourself.  
-**message\_received\_cb** is an async def coroutine that you must provide, receiving and processing the message quadruplet (logger, transport\_json, data, binary).  
-**reuse_uds_path** is false by default, preventing multiple listeners of same message type. In case it raises an exception even with a single listener, you might want to find and delete an old uds\_path\_receive\_from\_connector file specified in the exception.  
-**transport\_json** is a json with keys related to the "transport layer" of our message protocol : these are the kwargs sent in send_message. They are detailed in <a href="#send">5-</a>. The main arguments are source\_id, destination\_id, request\_id, response\_id, etc.  
Your application can read these transport arguments to obtain information about peer (source\_id, request\_id if provided, etc), and in order to create a proper response (with correct destination\_id, and response\_id for example if needed, etc).  
transport\_json will contain a with\_file key if a file has been received, more details in <a href="#send">5-</a>.  
-**Note** : if you send a message using send\_message(await\_response=True), the response value is the expected response message : so in that case the response message is not received by the start\_waiting\_for\_messages task.


<a name="classes"></a>
### 4.More details about the ConnectorManager and ConnectorAPI arguments.

    logger=None, use_default_logger=True, default_logger_log_level='INFO', default_logger_rotate=True, config_file_path=<path>,default_logger_bk_count=5

config\_file\_path can be the path of a json file like the following, or instead you can load its items as kwargs, as shown in the basic example later on and in aioconnectors\_test.py  
You can use both kwargs and config\_file\_path : if there are shared items, the ones from config_file_path will override the kwargs, unless you specify config\_file\_overrides\_kwargs=False (True by default).  
The main use case for providing a config\_file\_path while having config\_file\_overrides\_kwargs=False is when you prefer to configure your connector only with kwargs but you also want to let the connector update its config file content on the fly (for example blacklisted\_clients\_id, whitelisted\_clients\_id, or ignore\_peer\_traffic).  

Here is an example of config\_file\_path, with ConnectorManager class arguments, used to create a connector

    {
        "alternate_client_default_cert": false,
        "blacklisted_clients_id": null,
        "blacklisted_clients_ip": null,
        "blacklisted_clients_subnet": null,
        "certificates_directory_path": "/var/tmp/aioconnectors",
        "client_bind_ip": null,
        "client_cafile_verify_server": null,
        "client_name": null,
        "connect_timeout": 10,
        "connector_files_dirpath": "/var/tmp/aioconnectors",
        "debug_msg_counts": true,
        "default_logger_bk_count":5,
        "default_logger_dirpath": "/var/tmp/aioconnectors",
        "default_logger_log_level": "INFO",
        "default_logger_rotate": true,
        "disk_persistence_recv": false,
        "disk_persistence_send": false,
        "enable_client_try_reconnect": true,
        "everybody_can_send_messages": true,
        "file_recv_config": {},
        "ignore_peer_traffic": false,
        "is_server": true,
        "keep_alive_period": null,
        "keep_alive_timeout": 5,
        "max_certs": 1024,
        "max_number_of_unanswered_keep_alive": 2,
        "max_size_file_upload_recv": 8589930194,
        "max_size_file_upload_send": 8589930194,
        "max_size_persistence_path": 1073741824,
        "proxy": {},
        "pubsub_central_broker": false,
        "recv_message_types": [
            "any"
        ],
        "reuse_server_sockaddr": false,
        "reuse_uds_path_commander_server": false,
        "reuse_uds_path_send_to_connector": false,
        "send_message_types": [
            "any"
        ],
        "send_message_types_priorities": {},
        "send_timeout": 50,
        "server_ca": false,
        "server_ca_certs_not_stored": true,
        "server_secure_tls": true,
        "server_sockaddr": [
            "127.0.0.1",
            10673
        ],
        "silent": true,
        "ssl_allow_all": false,
        "subscribe_message_types": [],
        "token_client_send_cert": true,
        "token_client_verify_server_hostname": null,
        "token_server_allow_authorized_non_default_cert": false,
        "token_verify_peer_cert": true,
        "tokens_directory_path": "/var/tmp/aioconnectors",
        "uds_path_receive_preserve_socket": true,
        "uds_path_send_preserve_socket": true,
        "use_ssl": true,
        "use_token": false,
        "whitelisted_clients_id": null,
        "whitelisted_clients_ip": null,
        "whitelisted_clients_subnet": null
    }


Here is an example of config\_file\_path, with ConnectorAPI class arguments, used to send/receive messages.  
These are a subset of ConnectorManager arguments : which means you can use the ConnectorManager config file also for ConnectorAPI.


    {
        "client_name": null,
        "connector_files_dirpath": "/var/tmp/aioconnectors",
        "default_logger_bk_count":5,        
        "default_logger_dirpath": "/var/tmp/aioconnectors",
        "default_logger_log_level": "INFO",
        "default_logger_rotate": true,
        "is_server": true,
        "max_size_chunk_upload": 209715200,
        "pubsub_central_broker": false,
        "receive_from_any_connector_owner": true,
        "recv_message_types": [
            "any"
        ],
        "send_message_types": [
            "any"
        ],
        "server_sockaddr": [
            "127.0.0.1",
            10673
        ],
        "uds_path_receive_preserve_socket": true,
        "uds_path_send_preserve_socket": true
    }


-**alternate\_client\_default\_cert** is false by default : if true it lets the client try to connect alternatively with the default certificate, in case of failure with the private certificate. This can save the hassle of having to delete manually your client certificate when the certificate was already deleted on server side. This affects also the token authentication : the client will try to connect alternatively by requesting a new token if its token fails.  
-**blacklisted\_clients\_id|ip|subnet** : a list of blacklisted clients (regex for blacklisted\_clients\_id), can be updated on the fly with the api functions add|remove\_blacklist\_client or in the cli.  
-**certificates\_directory\_path** is where your certificates are located, if use\_ssl is True. This is the <optional\_directory\_path> where you generated your certificates by calling "python3 -m aioconnectors create\_certificates <optional\_directory\_path>".  
-**client\_cafile\_verify\_server** : On client side, if server\_sockaddr is configured with the server hostname, you can set client\_cafile\_verify\_server to be the ca cert path (like /etc/ssl/certs/ca-certificates.crt), to enable CA verification of you server certificate.  
-**client\_name** is used on client side. It is the name that will be associated with this client on server side. Auto generated if not supplied in ConnectorManager. Mandatory in ConnectorAPI. It should match the regex \^\[0\-9a\-zA\-Z\-\_\:\]\+$  
-**client_bind_ip** is optional, specifies the interface to bind your client. You can use an interface name or its ip address (string).  
-**connect\_timeout** : On client side, the socket timeout to connect to Tsoc. Default is 10s, you might need to increase it when using a server hostname in server\_sockaddr, since sometimes name resolution with getaddrinfo is slow.  
-**connector\_files\_dirpath** is important, it is the path where all internal files are stored. The default is /var/tmp/aioconnectors. unix sockets files, default log files, and persistent files are stored there.  
-**debug_msg_counts** is a boolean, enables to display every 2 minutes a count of messages in the log file, and in stdout if **silent** is disabled.  
-**default\_logger\_rotate** (boolean) can also be an integer telling the maximum size of the log file in bytes.  
-**default\_logger\_bk\_count**  an integer telling the maximum number of gzip compressed logs kept when log rotate is enabled. Default is 5.  
-**disk\_persistence\_recv** : In order to enable persistence between the connector and a message listener (supported on both client and server sides), use disk\_persistence\_recv=True (applies to all message types). disk\_persistence\_recv can also be a list of message types for which to apply persistence. There will be 1 persistence file per message type.  
-**file\_recv\_config** : In order to be able to receive files, you must define the destination path of files according to their associated dst\_type. This is done in file\_recv\_config, as shown in aioconnectors\_test.py. file\_recv\_config = {"target\_directory":"", "owner":"", "override\_existing":False}. **target\_directory** is later formatted using the transport\_json fields : which means you can use a target\_directory value like "/my_destination_files/{message\_type}/{source\_id}". **owner** is optional, it is the owner of the uploaded file. It must be of the form "user:group". **override\_existing** is optional and false by default : when receiving a file with an already existing destination path, it decides whether to override the existing file or not.  
-**enable\_client\_try\_reconnect** is a boolean set to True by default. If enabled, it lets the client try to reconnect automatically to the server every 5 seconds in case of failure.  
-**keep\_alive\_period** is null by default. If an integer then the client periodically sends a ping keep-alive to the server. If **max\_number\_of\_unanswered\_keep\_alive** (default is 2) keep-alive responses are not received by the client, each after **keep\_alive\_timeout** (default is 5s), then the client disconnects and tries to reconnect with the same mechanism used by enable\_client\_try\_reconnect.  
-**everybody\_can\_send\_messages** if True lets anyone send messages through the connector, otherwise the sender must have write permission to the connector. Setting to True requires the connector to run as root.  
-**hook\_allow\_certificate\_creation** : does not appear in the config file (usable as a kwargs only). Only for server. Can be an async def coroutine receiving a client_name and returning a boolean, to let the server accept or block the client_name certificate creation.  
-**hook\_server\_auth\_client** : does not appear in the config file (usable as a kwargs only). Only for server. Can be an async def coroutine receiving a client peername and returning a boolean, to let the server accept or block the client connection. An example exists in the chat implementation in applications.py.  
-**hook\_store\_token** and **hook\_load\_token** : lets you manipulate the token before it is stored on disk, for client only.  
-**hook\_target\_directory** : does not appear in the config file (usable as a kwargs only). A dictionary of the form {dst\_type: custom_function} where custom\_function receives transport\_json as an input and outputs a destination path to be appended to target\_directory. If custom\_function returns None, it has no effect on the target\_directory. If custom\_function returns False, the file is refused. This enables better customization of the target\_directory according to transport\_json. An example exists in the chat implementation in applications.py.  
-**hook\_whitelist\_clients** : does not appear in the config file (usable as a kwargs only). Has 2 arguments : extra_info, peername. Lets you inject some code when blocking non whitelisted client.  
-**hook\_proxy\_authorization** : does not appear in the config file (usable as a kwargs only). Only for client. A function that receives and returns 2 arguments : the proxy username and password. It returns them after an eventual transformation (like a decryption for example).  
-**ignore_peer_traffic** to ignore a peer traffic, can be updated on the fly with the api functions ignore\_peer\_traffic\_enable, ignore\_peer\_traffic\_enable\_unique, or ignore\_peer\_traffic\_disable or in the cli.  
-**is\_server** (boolean) is important to differentiate between server and client  
-**max\_certs** (integer) limits the maximum number of clients that can connect to a server using client ssl certificates.  
-**max_size_chunk_upload** (integer) used only by ConnectorAPI to send a file in chunks, default chunk length is 200MB. You can try a max chunk length of up to 1GB in a fast network, and might need to lower it in a slow network.  
-**max\_size\_file\_upload\_send** and **max\_size\_file\_upload\_recv**: Size limit of the files you send and receive, both on server and on client. Default is 8GB. However best performance is achieved until 1GB. Once you exceed 1GB, the file is divided in 1GB chunks and reassembled after reception, which is time consuming.  
-**disk\_persistence\_send** : In order to enable persistence between client and server (supported on both client and server sides), use disk\_persistence\_send=True (applies to all message types). disk\_persistence\_send can also be a list of message types for which to apply persistence. There will be 1 persistence file per message type. You can limit the persistence files size with **max\_size\_persistence\_path**.  
-**pubsub\_central\_broker** : set to True if you need your server to be the broker. Used in the publish/subscribe approach, not necessary in the point to point approach.  
-**proxy** an optional dictionary like {"enabled":true, "address":"<proxy_url>", "port":<proxy_port>, "authorization":"", "ssl\_server":false}. Relevant only on client side. Lets the client connect to the server through an http(s) proxy with the connect method, if the **enabled** field is true. The authorization field can have a value like {"username":"<username>", "password":"<password>"}. Regardless of the aioconnectors inner encryption, you can set the "ssl\_server" flag in case your proxy listens on ssl : this feature is under development and not tested because such proxy setup is rare.  
-**receive\_from\_any\_connector\_owner** if True lets the api receive messages from a connector being run by any user, otherwise the connector user must have write permission to the api. True by default (requires the api to run as root to be effective).  
-**recv\_message\_types** : the list of message types that can be received by connector. Default is ["any"]. It should include the send\_message\_types using await\_response.  
-**reuse\_server\_sockaddr**, **reuse\_uds\_path\_send\_to\_connector**, **reuse\_uds\_path\_commander\_server** : booleans false by default, that prevent duplicate processes you might create by mistake from using the same sockets. In case your OS is not freeing a closed socket, you still can set the relevant boolean to true.  
-**send\_message\_types** : the list of message types that can be sent from connector. Default is ["any"] if you don't care to differentiate between message types on your 
application level.  
-**send\_message\_types\_priorities** : None, or a dictionary specifying for each send\_message\_type its priority. The priority is an integer, a smaller integer meaning a higher priority. Usually this is not needed, but with very high throughputs you may want to use it in order to ensure that a specific message type will not get drown by other messages. This might starve the lowest priority messages. Usage example : "send\_message\_types\_priorities": {"type\_fast":0, "type\_slow":1}.  
-**send\_timeout** : maximum time for sending a message between peers on the socket. By default 50 seconds. After timeout, the message is lost, the sending peer disconnects, and peers reconnect if enable\_client\_try\_reconnect.  
-**server\_ca** : (server only) If set to false (default), the server authenticates client certificates according to the stored certificates, otherwise according to its CA. You can always add manually defaultN or CA certificates, under certificates/server/client-certs/symlinks.  
-**server\_ca\_certs\_not\_stored** : (server only) True by default. If server\_ca is true, the generated client certificates won't be stored on server side.  
-**server\_secure\_tls** : (server only) If set to true (default), the server allows only clients using TLS version >= v1.2.  
-**server\_sockaddr** can be configured as a tuple when used as a kwarg, or as a list when used in the json, and is mandatory on both server and client sides. You can use an interface name instead of its ip on server side, for example ("eth0", 10673).  
-**subscribe\_message\_types** : In the publish/subscribe approach, specify for your client the message types you want to subscribe to. It is a subset of recv\_message\_types.  
-**tokens\_directory\_path** : The path of your server token json file, or client token file.  
-**token\_verify\_peer\_cert** : True by default. If boolean, True means the server/client verifies its peer certificate according to its default location under certificates_directory_path. On client : can also be a string with full path of a custom server certificate, or even a string with full path of CA certificate to authenticate server hostname (for example "/etc/ssl/certs/ca-certificates.crt", in that case token\_client\_verify\_server\_hostname should be true).  
-**token\_client\_send\_cert** : True by default. Boolean, must be True if server has token\_verify\_peer\_cert enabled : sends the client certificate.  
-**token\_client\_verify\_server\_hostname** : if true, client authenticates the server hostname with token\_verify\_peer\_cert (CA path) during SSL handshake.  
-**token\_server\_allow\_authorized\_non\_default\_cert** : boolean false by default. If true, server using use\_token will allow client with non default authorized certificate, even if this client doesn't use a token.  
-**uds\_path\_receive\_preserve\_socket** should always be True for better performance, your message\_received\_cb coroutine in start\_waiting\_for\_messages is called for each message without socket disconnection between messages (in fact, only 1 disconnection per 100 messages).  
-**uds\_path\_send\_preserve\_socket** should always be True for better performance.  
-**use\_ssl**, **ssl\_allow\_all**, **use\_token** are boolean, must be identical on server and client. use\_ssl enables encryption as explained previously. When ssl\_allow\_all is disabled, certificates validation is enforced. use\_token requires use\_ssl and ssl\_allow\_all both enabled.  
-**whitelisted\_clients\_id|ip|subnet** : a list of whitelisted clients (regex for whitelisted\_clients\_id), can be updated on the fly with the api functions add|remove\_whitelist\_client or in the cli.  


<a name="send"></a>
### 5.More details about the send\_message arguments

    send_message(message_type=None, destination_id=None, request_id=None, response_id=None,
    data=None, data_is_json=True, binary=None, await_response=False, with_file=None,
    wait_for_ack=False, await_response_timeout=None) 
    with_file can be like : {'src_path':'','dst_type':'', 'dst_name':'', 'delete':False, 'owner':''}

send_message is an async coroutine.  
These arguments must be filled on the application layer by the user  
-**await\_response** is False by default, set it to True if your coroutine calling send\_message expects a response value.  
In such a case, the remote peer has to answer with response\_id equal to the request\_id of the request. (This is shown in aioconnectors\_test.py).  
-**await_response_timeout** is None by default. If set to a number, and if await\_response is true, the method waits up to this timeout for the peer response, and if timeout is exceeded it returns False.  
-**data** is the payload of your message. By default it expects a json, but it can be a string, and even bytes. However, using together the "data" argument for a json or a string, and the "binary" argument for binary payload, is a nice way to accompany a binary payload with some textual information. Contrary to "data", **binary** must be bytes, and cannot be a string. A message size should not exceed 1GB.  
-**data\_is\_json** is True by default since it assumes "data" is a json, and it dumps it automatically. Set it to False if "data" is not a json.  
-**destination\_id** is mandatory for server : it is the remote client id. Not needed by client.  
-**message\_type** is mandatory, it enables to have different listeners that receive different message types. You can use "any" as a default.  
-**request\_id** and **response\_id** are optional (integer or string) : they are helpful to keep track of asynchronous messages on the application layer. At the application level, the remote peer should answer with response\_id equal to the request\_id of the request. The request sender can then associate the received response with the request sent.  
-The **publish\_message** and **publish\_message\_sync** methods are the same as the send_message ones, but used by a client in the publish/subscribe approach.  
-The **send\_message\_await\_response** method is the same as send_message, but automatically sets await_response to True.  
-The **send\_message\_sync** method is almost the same as send_message, but called synchronously (not an async coroutine). It can also receive a "loop" as a kwarg. If a loop is running in the background, it schedules and returns a task. Otherwise it returns the peer response if called with await\_response.  
-**wait\_for\_ack** is not recommended for high throughputs, since it slows down dramatically. Basic testing showed a rate of ten messages per second, instead of a few thousands messages per second in the point to point approach.  
Not a benchmark, but some point-to-point and pubsub trials (VM with 8GB RAM and 4 cores) showed that up until 4000 messages (with data of 100 bytes) per second could be received by a server without delay, and from that point the receive queue started to be non empty. This test gave the same result with 100 clients sending each 40 events per second, and with 1 client sending 4000 events per second.  
-**with\_file** lets you embed a file, with {'src\_path':'','dst\_type':'', 'dst\_name':'', 'delete':False, 'owner':''}. **src\_path** is the source path of the file to be sent, **dst\_type** is the type of the file, which enables the remote peer to evaluate the destination path thanks to its ConnectorManager attribute "file\_recv\_config" dictionary. **dst\_name** is the name the file will be stored under. **delete** is a boolean telling if to delete the source file after it has been sent. **owner** is the optional user:group of your uploaded file : if used, it overrides the "owner" value optionally set on server side in file\_recv\_config. If an error occurs while opening the file to send, the file will not be sent but with\_file will still be present in transport\_json received by peer, and will contain an additional key **file\_error** telling the error to the peer application.  
-**tag** lets you add a tag string to your message in transport\_json : it has the advantage of being accessible at reception directly in transport\_json without the need to look into the data structure.  


<a name="management"></a>
### 6.Management programmatic tools

The class ConnectorManager has several methods to manage your connector. These methods are explained in <a href="#cli">7-</a>.  
-**delete\_client\_certificate**, **delete\_client\_token**, **disconnect\_client**, **reload\_tokens**  
-**add\_blacklist_client, remove\_blacklist_client**, **add\_whitelist_client, remove\_whitelist_client**  
-**delete\_previous\_persistence\_remains**  
-**ignore\_peer\_traffic\_show**, **ignore\_peer\_traffic\_enable**, **ignore\_peer\_traffic\_enable\_unique**, **ignore\_peer\_traffic\_disable**  
-**show\_connected\_peers**  
-**show\_log\_level**, **set\_log\_level**  
-**show\_subscribe\_message\_types**, **set\_subscribe\_message\_types**  
-**start\_connector**, **stop\_connector**, **restart\_connector**  
The same methods can be executed remotely, with the ConnectorRemoteTool class. This class is instantiated exactly like ConnectorAPI, with the same arguments (except for receive_from_any_connector_owner)  

    connector_remote_tool = aioconnectors.ConnectorRemoteTool(config_file_path=config_file_path)

An example of ConnectorRemoteTool is available in applications.py in the cli implementation.


<a name="cli"></a>
### 7.Other management command line tools

    python3 -m aioconnectors cli

to run several interesting commands like :   
-**start/stop/restart** your connectors.  
-**show\_connected\_peers** : show currently connected peers.  
-**delete\_client\_certificate** enables your server to delete a specific client certificate. delete\_client\_certificate enables your client to delete its own certificate and fallback using the default one. In order to delete a certificate of a currently connected client, first delete the certificate on server side, which will disconnect the client instantaneously, and then delete the certificate on client side : the client will then reconnect automatically and obtain a new certificate. The client side deletion is not needed in case alternate\_client\_default\_cert is true.  
-**delete\_client\_token** enables your server to delete a specific client token. Enables you client to delete its own token and fallback requesting a new token.  
-**reload\_tokens** reloads tokens after for example modifying them on disk.  
-**disconnect_client** enables your server to disconnect a specific client.  
-**add\_blacklist_client, remove\_blacklist_client** enables your server to blacklist a client by id (regex), ip, or subnet, at runtime. Disconnects the client if blacklisted by id, also deletes its certificate if exists. Kept in the connector config file if exists.  
-**add\_whitelist_client, remove\_whitelist_client** enables your server to whitelist a client by id (regex), ip, or subnet, at runtime. Kept in the connector config file if exists.   
-**peek\_queues** to show the internal queues sizes.  
-**ignore\_peer\_traffic** can be a boolean, or a peer name. When enabled, the connector drops all new messages received from peers, or from the specified peer. It also drops new messages to be sent to all peers, or to the specified peer. This mode can be useful to let the queues evacuate their accumulated messages.  
-**show\_log\_level** to show the current log level.  
-**set\_log\_level** to set the log level on the fly.  
-**show\_subscribe\_message\_types** to show the subscribed message types of a client.  
-**set\_subscribe\_message\_types** to set the list of all subscribed message types of a client.  


<a name="testing"></a>
### 8.Testing command line tools

-To let your connector send pings to a remote connector, and print its replies. 

    python3 -m aioconnectors ping <config_json_path>

-To simulate a simple application waiting for messages, and print all received messages. Your application should not wait for incoming messages when using this testing tool.

    python3 -m aioconnectors test_receive_messages <config_json_path>

-To simulate a simple application sending dummy messages.

    python3 -m aioconnectors test_send_messages <config_json_path>


<a name="chat"></a>
### 9.Funny embedded chat

A simple chat using aioconnectors is embedded. It allows you to exchange messages, files and directories easily between 2 Linux or Mac stations. It can also be configured to execute the commands it receives.  
It is encrypted, and supports authentication by prompting to accept connections.  
It is not a multi user chat, but more of a tool to easily transfer stuff between your computers.

-On the 1st station (server side), type : 

    python3 -m aioconnectors chat

-Then on the 2nd station (client side), type :

    python3 -m aioconnectors chat --target <server_ip> 

You can execute local shell commands by preceding them with a \"\!\".  
You can also upload files during a chat, by typing \"\!upload \<file or dir path\>\".  
Files are uploaded to your current working directory. A directory is transferred as a zip file.  
You can simply unzip a zip file by using \"\!dezip \<file name\>\".  

The cleanest way to exit a chat is by typing \"\!exit\" on both sides.  

-On client side, you can also directly upload a file or directory to the server without opening a chat :

     python3 -m aioconnectors chat --target <server_ip> --upload <file or dir path>

-You can configure client or server (not simultaneously) to execute the commands it receives, by using the --exec <shell_path> option :
    
    python3 -m aioconnectors chat --exec /bin/sh
    python3 -m aioconnectors chat --target <server_ip>
    or
    python3 -m aioconnectors chat
    python3 -m aioconnectors chat --target <server_ip>  --exec /bin/sh

-On server side, you can accept client connections without prompting by specifying --accept :

    python3 -m aioconnectors chat --accept

-More info :

    python3 -m aioconnectors chat --help

-If you need your server to listen on a specific interface :

    python3 -m aioconnectors chat --bind_server_ip <server_ip>

<server\_ip> can be an ip address, or an interface name  

-If you don't want your server to use the default port (10673), use --port on both peers : 

    python3 -m aioconnectors chat --port <port> [--target <server_ip>]
    
-By default the chat has tab completion, you can disable it with --nowrap.


<a name="containers"></a>
## Containers

Connector client and server, as well as connector api, can run in a Docker container, you just need to pip install aioconnectors in a Python image (or any image having Python >= 3.6 and openssl).  
A connector and its connector api must run on the same host, or in the same Kubernetes pod.  
A connector and its connector api can run in the same container, or in different containers. In case you choose to run them in different containers, you must configure their connector_files_dirpath path as a shared volume, in order to let them share their UDS sockets.


<a name="windows"></a>
## Windows ?

To port aioconnectors to Windows, these steps should be taken, and probably more :  
-Replace usage of unix sockets by maybe : local sockets, or named pipes, or uds sockets if and when they are supported.  
Since the implementation relies on unix sockets paths, a possible approach would be to preserve these paths, and manage a mapping between the paths and their corresponding local listening ports.  
-Port the usage of openssl in ssl_helper.py  
-Convert paths  
-Ignore the file uploaded ownership feature  
-Convert the interface to ipaddress function using ip (used for sockaddr and client\_bind\_ip)






            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/mori-b/aioconnectors",
    "name": "aioconnectors",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3.6",
    "maintainer_email": "",
    "keywords": "message queue,broker,asyncio,simple,easy",
    "author": "Mori Benech",
    "author_email": "moribirom@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/04/32/466198301edc938dff7103e8751d89b58dd66b15e1e6b9ed8e77a4a21ac3/aioconnectors-1.6.3.tar.gz",
    "platform": null,
    "description": "[![PyPI version](https://badge.fury.io/py/aioconnectors.svg)](https://badge.fury.io/py/aioconnectors) [![Downloads](https://static.pepy.tech/personalized-badge/aioconnectors?period=total&units=international_system&left_color=grey&right_color=blue&left_text=downloads)](https://pepy.tech/project/aioconnectors)\n\n           _                                 __\n     ___ _(_)__  _______  ___  ___  ___ ____/ /____  _______\n    / _ `/ / _ \\/ __/ _ \\/ _ \\/ _ \\/ -_) __/ __/ _ \\/ __(_-<\n    \\_,_/_/\\___/\\__/\\___/_//_/_//_/\\__/\\__/\\__/\\___/_/ /___/\n\n\n\n# aioconnectors\n**Simple secure asynchronous message queue**\n\n*<a href=\"#features\">Features</a>*  \n*<a href=\"#installation\">Installation</a>*  \n*<a href=\"#exampleptp\">Example Point to point : Server and Client</a>*  \n*<a href=\"#exampleps\">Example publish/subscribe : Broker, Subscriber, and Publisher</a>*  \n*<a href=\"#hld\">High Level Design</a>*  \n*<a href=\"#usecases\">Use Cases</a>*  \n*<a href=\"#usage\">Usage</a>*  \n*<a href=\"#enc\">1.Encryption</a>*  \n*<a href=\"#run\">2.Run a connector</a>*  \n*<a href=\"#sendreceive\">3.Send/receive messages</a>*  \n*<a href=\"#classes\">4.ConnectorManager and ConnectorAPI</a>*  \n*<a href=\"#send\">5.send_message</a>*  \n*<a href=\"#management\">6.Programmatic management tools</a>*  \n*<a href=\"#cli\">7.Command line interface management tools</a>*  \n*<a href=\"#testing\">8.Testing tools</a>*  \n*<a href=\"#chat\">9.Embedded chat</a>*  \n*<a href=\"#containers\">Containers</a>*  \n*<a href=\"#windows\">Windows</a>*  \n\n\n<a name=\"features\"></a>\n## FEATURES\n\naioconnectors is an easy to set up message queue and broker that works on Unix like systems. Requirements are : Python >= 3.6, and openssl installed.  \nIt provides bidirectional transfer of messages and files, optional authentication and encryption, persistence and reconnection in case of connection loss, proxy support, client filtering.  \nIt is a point to point broker built on the client/server model, but both peers can push messages. It can also be easily configured as a publish/subscribe broker.  \nBased on asyncio, message sending and receiving are asynchronous, with the option to wait asynchronously for a response.  \nA connector can be configured with a short json file.  \nAn embedded command line tool enables to easily run a connector and manage it with shell commands.  \nA simple Python API provides functions like starting/stopping a connector, sending a message, receiving messages, and other management capabilities. To support other languages for the API, the file standalone\\_api.py only should be transpiled.\n\n\n<a name=\"installation\"></a>\n## INSTALLATION\n\n    pip3 install aioconnectors\n\n\n<a name=\"exampleptp\"></a>\n## BASIC EXAMPLE - POINT TO POINT\n\nYou can run a connector with a single shell command \n\n    python3 -m aioconnectors create_connector <config_json_path>\n\nThis is covered in <a href=\"#run\">2-</a>, but this example shows the programmatic way to run connectors.  \nThis is a basic example of a server and a client sending messages to each other. For more interesting examples, please refer to applications.py or aioconnectors\\_test.py.  \nFor both server and client, connector\\_manager is running the connector, and connector\\_api is sending/receiving messages.  \nIn this example, connector\\_manager and connector\\_api are running in the same process for convenience. They can obviously run in different processes, as shown in the other examples.  \nIn this example we are running server and client on the same machine since server_sockaddr is set to \"127.0.0.1\".  \nTo run server and client on different machines, you should modify server_sockaddr value in both server and client code, with the ip address of the server.  \nYou can run multiple clients, just set a different client\\_name for each client.  \n\n1.No encryption  \nYou can run the following example code directly, the encryption is disabled.  \nIn case you want to use this example with encryption, you should read 2. and 3. after the examples.  \n\n### Server example\n\n    import asyncio\n    import aioconnectors\n    \n    loop = asyncio.get_event_loop()\n    server_sockaddr = ('127.0.0.1',10673)\n    connector_files_dirpath = '/var/tmp/aioconnectors'\n    \n    #create connector\n    connector_manager = aioconnectors.ConnectorManager(is_server=True, server_sockaddr=server_sockaddr, use_ssl=False, use_token=False,\n                                                       ssl_allow_all=True, connector_files_dirpath=connector_files_dirpath,\n                                                       certificates_directory_path=connector_files_dirpath,\n                                                       send_message_types=['any'], recv_message_types=['any'],\n                                                       file_recv_config={'any': {'target_directory':connector_files_dirpath}},\n                                                       reuse_server_sockaddr=True)\n    \n    task_manager =  loop.create_task(connector_manager.start_connector())\n    loop.run_until_complete(task_manager)\n    \n    #create api\n    connector_api = aioconnectors.ConnectorAPI(is_server=True, server_sockaddr=server_sockaddr,\n                                               connector_files_dirpath=connector_files_dirpath,\n                                               send_message_types=['any'], recv_message_types=['any'],\n                                               default_logger_log_level='INFO')\n    \n    #start receiving messages\n    async def message_received_cb(logger, transport_json , data, binary):\n        print('SERVER : message received', transport_json , data.decode())\n    loop.create_task(connector_api.start_waiting_for_messages(message_type='any', message_received_cb=message_received_cb))\n    \n    #start sending messages\n    async def send_messages(destination):\n        await asyncio.sleep(2)\n        index = 0\n        while True:\n            index += 1\n            await connector_api.send_message(data={'application message': f'SERVER MESSAGE {index}'},\n                                             message_type='any', destination_id=destination)\n            await asyncio.sleep(1)\n                                                    \n    loop.create_task(send_messages(destination='client1'))\n    \n    try:\n        print(f'Connector is running, check log at {connector_files_dirpath+\"/aioconnectors.log\"}'\n              f', type Ctrl+C to stop')\n        loop.run_forever()\n    except:\n        print('Connector stopped !')\n    \n    #stop receiving messages\n    connector_api.stop_waiting_for_messages(message_type='any')\n    \n    #stop connector\n    task_stop = loop.create_task(connector_manager.stop_connector(delay=None, hard=False, shutdown=True))\n    loop.run_until_complete(task_stop)\n\n\n### Client example\n\n    import asyncio\n    import aioconnectors\n    \n    loop = asyncio.get_event_loop()\n    server_sockaddr = ('127.0.0.1',10673)\n    connector_files_dirpath = '/var/tmp/aioconnectors'\n    client_name = 'client1'\n    \n    #create connector\n    connector_manager = aioconnectors.ConnectorManager(is_server=False, server_sockaddr=server_sockaddr,\n                                                       use_ssl=False, ssl_allow_all=True, use_token=False,\n                                                       connector_files_dirpath=connector_files_dirpath,\n                                                       certificates_directory_path=connector_files_dirpath,\n                                                       send_message_types=['any'], recv_message_types=['any'],\n                                                       file_recv_config={'any': {'target_directory':connector_files_dirpath}},\n                                                       client_name=client_name)\n    \n    task_manager =  loop.create_task(connector_manager.start_connector())\n    loop.run_until_complete(task_manager)\n    \n    #create api\n    connector_api = aioconnectors.ConnectorAPI(is_server=False, server_sockaddr=server_sockaddr,\n                                               connector_files_dirpath=connector_files_dirpath, client_name=client_name,\n                                               send_message_types=['any'], recv_message_types=['any'],\n                                               default_logger_log_level='INFO')\n    \n    #start receiving messages\n    async def message_received_cb(logger, transport_json , data, binary):\n        print('CLIENT : message received', transport_json , data.decode())\n    loop.create_task(connector_api.start_waiting_for_messages(message_type='any', message_received_cb=message_received_cb))\n    \n    #start sending messages\n    async def send_messages():\n        await asyncio.sleep(1)\n        index = 0\n        while True:\n            index += 1\n            await connector_api.send_message(data={'application message': f'CLIENT MESSAGE {index}'}, message_type='any')\n            await asyncio.sleep(1)\n                                           \n    loop.create_task(send_messages())\n    \n    try:\n        print(f'Connector is running, check log at {connector_files_dirpath+\"/aioconnectors.log\"}'\n              f', type Ctrl+C to stop')\n        loop.run_forever()\n    except:\n        print('Connector stopped !')\n    \n    #stop receiving messages\n    connector_api.stop_waiting_for_messages(message_type='any')\n    \n    #stop connector\n    task_stop = loop.create_task(connector_manager.stop_connector(delay=None, hard=False, shutdown=True))\n    loop.run_until_complete(task_stop)\n\n\n<a name=\"exampleps\"></a>\n## BASIC EXAMPLE - PUBLISH/SUBSCRIBE\n\nYou can run the following code of a broker, a publisher and a subscriber in 3 different shells on the same machine out of the box.  \nYou should modify some values as explained in the previous example in order to run on different machines, and with encryption.  \n\n### Broker example\n\nJust a server with pubsub\\_central\\_broker=True\n\n    import asyncio\n    import aioconnectors\n\n    loop = asyncio.get_event_loop()\n    server_sockaddr = ('127.0.0.1',10673)\n    connector_files_dirpath = '/var/tmp/aioconnectors'\n\n    #create connector\n    connector_manager = aioconnectors.ConnectorManager(is_server=True, server_sockaddr=server_sockaddr, use_ssl=False, use_token=False,\n                                                       ssl_allow_all=True, connector_files_dirpath=connector_files_dirpath,\n                                                       certificates_directory_path=connector_files_dirpath,\n                                                       send_message_types=['any'], recv_message_types=['any'],\n                                                       file_recv_config={'any': {'target_directory':connector_files_dirpath}},\n                                                       pubsub_central_broker=True, reuse_server_sockaddr=True)\n\n    task_manager =  loop.create_task(connector_manager.start_connector())\n    loop.run_until_complete(task_manager)\n\n    #create api\n    connector_api = aioconnectors.ConnectorAPI(is_server=True, server_sockaddr=server_sockaddr,\n                                               connector_files_dirpath=connector_files_dirpath,\n                                               send_message_types=['any'], recv_message_types=['any'],\n                                               default_logger_log_level='INFO')\n\n    #start receiving messages\n    async def message_received_cb(logger, transport_json , data, binary):\n        print('SERVER : message received', transport_json , data.decode())\n    loop.create_task(connector_api.start_waiting_for_messages(message_type='any', message_received_cb=message_received_cb))\n\n    try:\n        print(f'Connector is running, check log at {connector_files_dirpath+\"/aioconnectors.log\"}'\n              f', type Ctrl+C to stop')\n        loop.run_forever()\n    except:\n        print('Connector stopped !')\n\n    #stop receiving messages\n    connector_api.stop_waiting_for_messages(message_type='any')\n\n    #stop connector\n    task_stop = loop.create_task(connector_manager.stop_connector(delay=None, hard=False, shutdown=True))\n    loop.run_until_complete(task_stop)\n\n\n### Subscriber example\n\nJust a client with subscribe\\_message\\_types = [topic1, topic2, ...]\n\n    import asyncio\n    import aioconnectors\n\n    loop = asyncio.get_event_loop()\n    server_sockaddr = ('127.0.0.1',10673)\n    connector_files_dirpath = '/var/tmp/aioconnectors'\n    client_name = 'client2'\n\n    #create connector\n    connector_manager = aioconnectors.ConnectorManager(is_server=False, server_sockaddr=server_sockaddr,\n                                                       use_ssl=False, ssl_allow_all=True, use_token=False,\n                                                       connector_files_dirpath=connector_files_dirpath,\n                                                       certificates_directory_path=connector_files_dirpath,\n                                                       send_message_types=['any'], recv_message_types=['type1'],\n                                                       file_recv_config={'type1': {'target_directory':connector_files_dirpath}},\n                                                       client_name=client_name, subscribe_message_types=[\"type1\"])\n\n    task_manager =  loop.create_task(connector_manager.start_connector())\n    loop.run_until_complete(task_manager)\n\n    #create api\n    connector_api = aioconnectors.ConnectorAPI(is_server=False, server_sockaddr=server_sockaddr,\n                                               connector_files_dirpath=connector_files_dirpath, client_name=client_name,\n                                               send_message_types=['any'], recv_message_types=['type1'],\n                                               default_logger_log_level='INFO')\n\n    #start receiving messages\n    async def message_received_cb(logger, transport_json , data, binary):\n        print('CLIENT : message received', transport_json , data.decode())\n    loop.create_task(connector_api.start_waiting_for_messages(message_type='type1', message_received_cb=message_received_cb))\n\n    '''\n    #start sending messages\n    async def send_messages():\n        await asyncio.sleep(1)\n        index = 0\n        while True:\n            index += 1\n            await connector_api.send_message(data={'application message': f'CLIENT MESSAGE {index}'}, message_type='any')\n            await asyncio.sleep(1)\n                                           \n    loop.create_task(send_messages())\n    '''\n\n    try:\n        print(f'Connector is running, check log at {connector_files_dirpath+\"/aioconnectors.log\"}'\n              f', type Ctrl+C to stop')\n        loop.run_forever()\n    except:\n        print('Connector stopped !')\n\n    #stop receiving messages\n    connector_api.stop_waiting_for_messages(message_type='type1')\n\n    #stop connector\n    task_stop = loop.create_task(connector_manager.stop_connector(delay=None, hard=False, shutdown=True))\n    loop.run_until_complete(task_stop)\n\n\n### Publisher example\n\nJust a client which uses publish\\_message instead of send\\_message\n\n    import asyncio\n    import aioconnectors\n\n    loop = asyncio.get_event_loop()\n    server_sockaddr = ('127.0.0.1',10673)\n    connector_files_dirpath = '/var/tmp/aioconnectors'\n    client_name = 'client1'\n\n    #create connector\n    connector_manager = aioconnectors.ConnectorManager(is_server=False, server_sockaddr=server_sockaddr,\n                                                       use_ssl=False, ssl_allow_all=True, use_token=False,\n                                                       connector_files_dirpath=connector_files_dirpath,\n                                                       certificates_directory_path=connector_files_dirpath,\n                                                       send_message_types=['type1','type2'], recv_message_types=['any'],\n                                                       file_recv_config={'any': {'target_directory':connector_files_dirpath}},\n                                                       client_name=client_name, disk_persistence_send=True)\n\n    task_manager =  loop.create_task(connector_manager.start_connector())\n    loop.run_until_complete(task_manager)\n\n    #create api\n    connector_api = aioconnectors.ConnectorAPI(is_server=False, server_sockaddr=server_sockaddr,\n                                               connector_files_dirpath=connector_files_dirpath, client_name=client_name,\n                                               send_message_types=['type1','type2'], recv_message_types=['any'],\n                                               default_logger_log_level='INFO')\n\n    #start receiving messages\n    #async def message_received_cb(logger, transport_json , data, binary):\n    #    print('CLIENT : message received', transport_json , data.decode())\n    #loop.create_task(connector_api.start_waiting_for_messages(message_type='any', message_received_cb=message_received_cb))\n\n    #start sending messages\n    async def send_messages():\n        await asyncio.sleep(1)\n        index = 0\n        #with_file={'src_path':'file_test','dst_type':'any', 'dst_name':'file_dest', \n        #           'delete':False, 'owner':'nobody:nogroup'}                  \n        while True:\n            index += 1\n            print(f'CLIENT : message {index} published')\n            #connector_api.publish_message_sync(data={'application message': f'CLIENT MESSAGE {index}'}, message_type='type1')#,        \n            await connector_api.publish_message(data={'application message': f'CLIENT MESSAGE {index}'}, message_type='type1')#,\n                                                #with_file=with_file, binary=b'\\x01\\x02\\x03')\n            #await connector_api.publish_message(data={'application message': f'CLIENT MESSAGE {index}'}, message_type='type2')#,                                            \n            await asyncio.sleep(1)\n                                           \n    loop.create_task(send_messages())\n\n    try:\n        print(f'Connector is running, check log at {connector_files_dirpath+\"/aioconnectors.log\"}'\n              f', type Ctrl+C to stop')\n        loop.run_forever()\n    except:\n        print('Connector stopped !')\n\n    #stop receiving messages\n    connector_api.stop_waiting_for_messages(message_type='any')\n\n    #stop connector\n    task_stop = loop.create_task(connector_manager.stop_connector(delay=None, hard=False, shutdown=True))\n    loop.run_until_complete(task_stop)\n\n2.Encryption without authentication  \nIn order to use encryption, you should set use\\_ssl to True in both server and client ConnectorManager instantiations.  \nA directory containing certificates must be created before running the example, which is done by a single command :\n\n    python3 -m aioconnectors create_certificates\n\nIf you decide to use server_ca=true on your connector server, then you need to add \"--ca\" (<a href=\"#classes\">4-</a>).  \nIf you run server and client on different machines, this command should be run on both machines.  \n\n3.Encryption with authentication  \nIn this example, the kwarg ssl\\_allow\\_all is true (both on server and client), meaning the communication between server and client if encrypted is not authenticated.  \nIn case you want to run this example with authentication too, you have 2 options :  \n3.1. Set use\\_ssl to True and ssl\\_allow\\_all to False in both server and client ConnectorManager instantiations.  \nIf you run server and client on the same machine, this only requires to run the command \"python3 -m aioconnectors create\\_certificates\" beforehand like in 2.  \nIn case the server and client run on different machines, you should run the prerequisite command \"python3 -m aioconnectors create_certificates\" only once, and copy the generated directory /var/tmp/aioconnectors/certificates/server to your server (preserving symlinks) and /var/tmp/aioconnectors/certificates/client to your client.  \n3.2. Set use\\_ssl to True, ssl\\_allow\\_all to True, and use\\_token to True, in both server and client ConnectorManager instantiations, to use token authentication. This also requires to run beforehand \"python3 -m aioconnectors create_certificates\".  \n\n\n<a name=\"hld\"></a>\n## HIGH LEVEL DESIGN\n\nThe client and server are connected by one single tcp socket.\nWhen a peer sends a message, it is first sent by unix socket to the connector, then transferred to a different queue for each remote peer. Messages are read from these priority queues and sent to the remote peer on the client/server socket. After a message reaches its peer, it is sent to a queue, one queue per message type. The api listens on a unix socket to receive messages of a specific type, that are read from the corresponding queue.  \nThe optional encryption uses TLS. The server certificate and the default client certificate are automatically generated and pre-shared, so that a server or client without prior knowledge of these certificates cannot communicate. Then, the server generates on the fly a new certificate per client, so that different clients cannot interfere with one another. Alternatively, the server can generate on the fly a new token per client.  \n\n\n<a name=\"usecases\"></a>\n## USE CASES\n\n-The standard use case is running server and client on separate stations. Each client station can then initiate a connection to the server station.  \nThe valid message topics are defined in the server and client configurations (send\\_message\\_types and recv\\_message\\_types), and the messages are sent point to point.  \nIn order to have all clients/server connections authenticated and encrypted, you just have to call\n\n    python3 -m aioconnectors create_certificates <optional_directory_path>\n\nAnd then share the created directories between server and clients as explained in <a href=\"#enc\">1-</a>.  \nYou can also use a proxy between your client and server, as explained in <a href=\"#classes\">4-</a>.  \n\n-You might prefer to use a publish/subscribe approach.  \nThis is also supported by configuring a single server as the broker (you just need to set pubsub\\_central\\_broker=True).  \nThe other connectors should be clients. A client can subscribe to specific topics (message\\_types) by setting the attribute subscribe\\_message\\_types in its constructor, or by calling the set\\_subscribe\\_message\\_types command on the fly.  \n\n-You might want both sides to be able to initiate a connection, or even to have multiple nodes being able to initiate connections between one another.  \nThe following lines describe a possible approach to do that using aioconnectors.  \nEach node should be running an aioconnector server, and be able to also spawn a aioconnector client each time it initiates a connection to a different remote server. A new application layer handling these connectors could be created, and run on each node.  \nYour application might need to know if a peer is already connected before initiating a connection : to do so, you might use the connector_manager.show\\_connected\\_peers method (explained in <a href=\"#cli\">7-</a>).  \nYour application might need to be able to disconnect a specific client on the server : to do so, you might use the connector\\_manager.disconnect\\_client method.  \nA comfortable approach would be to share the certificates directories created in the first step between all the nodes. All nodes would share the same server certificate, and use the same client default certificate to initiate the connection (before receiving their individual certificate). The only differences between clients configurations would be their client_name, and their remote server (the configurations are explained in <a href=\"#classes\">4-</a>).  \n\n-There are multiple tools to let the server filter clients. Your application might need to decide whether to accept a client connection or not.  \nThe following tools filter clients in this order :  \nwhitelisted_clients_ip/subnet : in configuration file, or on the fly with add_whitelist_client (it updates the configuration file).  \nhook_whitelist_clients(extra_info, source_id) : coroutine that lets you take a decision after having filtered a non whitelisted client (maybe allow it from now on).  \nblacklisted_clients_ip/subnet: in configuration file or on the fly with add_blacklist_client.  \nwhitelisted_clients_id : in configuration file or on the fly with add_whitelist_client (uses regex).  \nhook_whitelist_clients(extra_info, source_id) : same.  \nblacklisted_clients_id : in configuration file or on the fly with add_blacklist_client (uses regex).  \nhook_allow_certificate_creation(source_id) : coroutine that lets you prevent certificate creation based on the source_id.  \nhook_server_auth_client(source_id) : coroutine that gives a last opportunity to filter the source_id.  \nThe hooks must be fed to the ConnectorManager constructor (explained in <a href=\"#classes\">4-</a>).  \n\n\n<a name=\"usage\"></a>\n## USAGE\n\naioconnectors provides the ConnectorManager class which runs the connectors, and the ConnectorAPI class which sends and receives messages. It provides as well the ConnectorRemoteTool class which can lightly manage the connector outside of the ConnectorManager.  \nThe ConnectorManager client and server can run on different machines. However, ConnectorAPI and ConnectorRemoteTool communicate internally with their ConnectorManager, and the three must run on the same machine.  \naioconnectors also provides a command line tool accessible by typing\n\n    python3 -m aioconnectors --help\n\n\n<a name=\"enc\"></a>\n### 1.Encryption\n\nEncryption mode is, as everything else, configurable through the ConnectorManager kwargs or config file, as explained later in <a href=\"#classes\">4-</a>. The relevant parameters are use_ssl and ssl_allow_all.  \nThe default mode is the most secure : use_ssl is enabled and ssl\\_allow\\_all is disabled, both on server and client.  \n-If you choose to use encryption, you should call\n\n    python3 -m aioconnectors create_certificates [<optional_directory_path>] [--ca] [--help]\n\nA directory called \"certificates\" will be created under your optional\\_directory\\_path, or under /var/tmp/aioconnectors if not specified.\nUnder it, 2 subdirectories will be created : certificates/server and certificates/client.  \nYou need to copy certificates/server to your server (preserving symlinks), and certificates/client to your client. That's all you have to do.  \nThis is the recommended approach, since it ensures traffic encryption, client and server authentication, and prevents client impersonation.  \nClients use the default certificate to first connect to server, then an individual certificate is generated by the server for each client. Client automatically uses this individual certificate for further connections. This individual certificate is mapped to the client_name.  \nThe first client named client_name reaching the server is granted a certificate for this client_name. Different clients further attempting to use the same client_name will be rejected.  \nWhen server\\_ca is false on server side (default), the client certificates are checked against the certificates pem kept on the server, otherwise against the server CA.  \nWhen using ssl, the default approach is to have server\\_ca false (default), meaning your server will generate and manage self signed client certificates, providing certificates visibility, and tools like delete\\_client\\_certificate to delete client certificates on the fly.  \nUsing server\\_ca true lets your server become a CA with a self signed CA certificate that will sign your client certificates. If you choose to run your server with server\\_ca true, then you need the --ca argument in create\\_certificates, otherwise you don't need it (default).  \nThe server\\_ca true mode comes with server\\_ca\\_certs\\_not\\_stored enabled by default, meaning the client certificates are deleted from server side. Not having to store the client certificates on the server might be an advantage but it doesn't enable you to delete them : if you want to be able to delete them in ca mode, then you might just use server\\_ca false. The server\\_ca\\_certs\\_not\\_stored option set to false requires to delete the certificates yourself, since it is not currently supported when server_ca is true : this implementation would require something like \"openssl ca -gencrl -config certificates/server/server\\_ca\\_details.conf -out revoked.pem\", and also \"SSLContext.verify\\_flags |= ssl.VERIFY\\_CRL\\_CHECK\\_LEAF\" before loading the revoked.pem into SSLContext.load\\_verify\\_locations.  \n-The client also checks the server certificate to prevent MITM.  \nInstead of using the generated server certificate, you also have the option to use a hostname for your server and use a CA signed server certificate that the clients will verify. For that you should :  \n-On server side, under \"certificates\" directory, replace /server/server-cert/server.pem and server.key with your signed certificates. You don't need to do that manually, there is a tool that does it :  \n\n    python3 -m aioconnectors replace_server_certificate <custom_server_pem_file_path> [<optional_directory_path>]\n    \nNote that the custom server pem file should contain the whole chain of .crt including the intermediate certificates.  \nand in case you want to roll back to use the original generated server certificate :  \n\n    python3 -m aioconnectors replace_server_certificate --revert\n    \n-On client side, configure server\\_sockaddr with the server hostname instead of IP address, and set client\\_cafile\\_verify\\_server to be the ca cert path (like /etc/ssl/certs/ca-certificates.crt), to enable CA verification of your server certificate.  \n-You can delete a client certificate on the server (and also on client) by calling delete\\_client\\_certificate in\n\n    python3 -m aioconnectors cli\n\nFor this purpose, you can also call programmatically the ConnectorManager.delete_client\\_certificate method.  \n-You shouldn't need to modify the certificates, however there is a way to tweak the certificates template : run create\\_certificates once, then modify certificates/server/csr\\_details\\_template.conf according to your needs (without setting the Organization field), delete other directories under certificates and run create\\_certificates again.  \n-On server side, you can store manually additional default certificates with their symlink, under certificates/server/client-certs/symlinks. They must be called defaultN where N is an integer, or be another CA certificate in case server\\_ca is true.  \n-Other options :  \n\n-ssl\\_allow\\_all and use\\_token enabled : this is a similar approach but instead of generating a certificate per client, the server generates a token per client. This approach is simpler. Note that you can also delete the token on the fly by calling delete\\_client\\_token.  \nYou can combine ssl\\_allow\\_all with token\\_verify\\_peer\\_cert (on client and server) and token\\_client\\_send\\_cert (on client) : in order to authenticate the default certificate only. On client side the token\\_verify\\_peer\\_cert can also be the path of ca certificates (like /etc/ssl/certs/ca-certificates.crt) or custom server public certificate.  \ntoken\\_client\\_verify\\_server\\_hostname can be the server hostname that your client authenticates (through its certificate).  \n\nBy setting ssl\\_allow\\_all on both sever and client, you can use encryption without the hassle of sharing certificates. In such a case you can run independently create_certificates on server and client side, without the need to copy a directory. This disables authentication, so that any client and server can communicate.  \n\nBy unsetting use_ssl, you can disable encryption at all.\n\n\n<a name=\"run\"></a>\n### 2.You have 2 options to run your connectors, either through the command line tool, or programmatically.\n\n2.1.Command line tool  \n-To configure the Connector Manager, create a <config\\_json\\_path> file based on the Manager template json, and configure it according to your needs (more details in <a href=\"#classes\">4-</a>). Relevant for both server and client.  \nA Manager template json can be obtained by calling : \n\n    python3 -m aioconnectors print_config_templates\n\n-Then create and start you connector (both server and client, each with its own <config_json_path>)\n\n    python3 -m aioconnectors create_connector <config_json_path>\n\nIf you are testing your connector server and client on the same machine, you can use the configuration generated by print\\_config\\_templates almost out of the box.  \nThe only change you should do is set is\\_server to False in the client configuration, and use\\_ssl to False in both configurations (unless you already run \"python3 -m aioconnectors create_certificates\").  \nIf you want to test messages sending/receiving, you should also set a client\\_name value in the client configuration.  \nThen you can use the other command line testing facilites mentioned in <a href=\"#testing\">8-</a> : on both server and client you can run \"python3 -m aioconnectors test\\_receive\\_messages <config\\_json\\_path>\" and \"python3 -m aioconnectors test\\_send\\_messages <config\\_json\\_path>\".  \n\n2.2.Programmatically, examples are provided in applications.py and in aioconnectors\\_test.py.  \nTo create and start a connector :\n\n    connector_manager = aioconnectors.ConnectorManager(config_file_path=config_file_path)\n    await connector_manager.start_connector()\n\nTo stop a connector :\n\n    await connector_manager.stop_connector()\n\nTo shutdown a connector :\n\n    await connector_manager.stop_connector(shutdown=True)\n\nYou don't have to use a config file (config\\_file\\_path), you can also directly initialize your ConnectorManager kwargs, as shown in the previous basic examples, and in aioconnectors\\_test.py.\n\n\n<a name=\"sendreceive\"></a>\n### 3.send/receive messages with the API\n\n3.1.To configure the Connector API, create a <config\\_json\\_path> file based on the API template json.\nRelevant for both server and client. This connector_api config file is a subset of the connector_manager config file. So if you already have a relevant connector_manager config file on your machine, you can reuse it for connector_api, and you don't need to create a different connector_api config file.\n\n    python3 -m aioconnectors print_config_templates\n    connector_api = aioconnectors.ConnectorAPI(config_file_path=config_file_path)\n\n3.2.Or you can directly initialize your ConnectorAPI kwargs  \n\nThen you can send and receive messages by calling the following coroutines in your program, as shown in aioconnectors\\_test.py, and in applications.py (test\\_receive\\_messages and test\\_send\\_messages).  \n\n3.3.To send messages : \n\n    await connector_api.send_message(data=None, binary=None, **kwargs)\n\nThis returns a status (True or False).  \n\"data\" is your message, \"binary\" is an optional additional binary message in case you want your \"data\" to be a json for example.\nIf your \"data\" is already a binary, then the \"binary\" field isn't necessary.  \nkwargs contain all the transport instructions for this message, as explained in <a href=\"#send\">5-</a>.  \nIf you set the await\\_response kwarg to True, this returns the response, which is a (transport\\_json , data, binary) triplet.  \nThe received transport\\_json field contains all the kwargs sent by the peer.  \nYou can also send messages synchronously, with :\n\n    connector_api.send_message_sync(data=None, binary=None, **kwargs)\n\nSimilarly, use the \"publish\\_message\" and \"publish\\_message\\_sync\" methods in the publish/subscribe approach.  \nMore details in <a href=\"#send\">5-</a>.  \n\n3.4.To register to receive messages of a specific message\\_type : \n\n    await connector_api.start_waiting_for_messages(message_type='', message_received_cb=message_received_cb, reuse_uds_path=False)\n\n-**binary** is an optional binary message (or None).  \n-**data** is the message data bytes. It is always bytes, so if it was originally sent as a json or a string, you'll have to convert it back by yourself.  \n-**message\\_received\\_cb** is an async def coroutine that you must provide, receiving and processing the message quadruplet (logger, transport\\_json, data, binary).  \n-**reuse_uds_path** is false by default, preventing multiple listeners of same message type. In case it raises an exception even with a single listener, you might want to find and delete an old uds\\_path\\_receive\\_from\\_connector file specified in the exception.  \n-**transport\\_json** is a json with keys related to the \"transport layer\" of our message protocol : these are the kwargs sent in send_message. They are detailed in <a href=\"#send\">5-</a>. The main arguments are source\\_id, destination\\_id, request\\_id, response\\_id, etc.  \nYour application can read these transport arguments to obtain information about peer (source\\_id, request\\_id if provided, etc), and in order to create a proper response (with correct destination\\_id, and response\\_id for example if needed, etc).  \ntransport\\_json will contain a with\\_file key if a file has been received, more details in <a href=\"#send\">5-</a>.  \n-**Note** : if you send a message using send\\_message(await\\_response=True), the response value is the expected response message : so in that case the response message is not received by the start\\_waiting\\_for\\_messages task.\n\n\n<a name=\"classes\"></a>\n### 4.More details about the ConnectorManager and ConnectorAPI arguments.\n\n    logger=None, use_default_logger=True, default_logger_log_level='INFO', default_logger_rotate=True, config_file_path=<path>,default_logger_bk_count=5\n\nconfig\\_file\\_path can be the path of a json file like the following, or instead you can load its items as kwargs, as shown in the basic example later on and in aioconnectors\\_test.py  \nYou can use both kwargs and config\\_file\\_path : if there are shared items, the ones from config_file_path will override the kwargs, unless you specify config\\_file\\_overrides\\_kwargs=False (True by default).  \nThe main use case for providing a config\\_file\\_path while having config\\_file\\_overrides\\_kwargs=False is when you prefer to configure your connector only with kwargs but you also want to let the connector update its config file content on the fly (for example blacklisted\\_clients\\_id, whitelisted\\_clients\\_id, or ignore\\_peer\\_traffic).  \n\nHere is an example of config\\_file\\_path, with ConnectorManager class arguments, used to create a connector\n\n    {\n        \"alternate_client_default_cert\": false,\n        \"blacklisted_clients_id\": null,\n        \"blacklisted_clients_ip\": null,\n        \"blacklisted_clients_subnet\": null,\n        \"certificates_directory_path\": \"/var/tmp/aioconnectors\",\n        \"client_bind_ip\": null,\n        \"client_cafile_verify_server\": null,\n        \"client_name\": null,\n        \"connect_timeout\": 10,\n        \"connector_files_dirpath\": \"/var/tmp/aioconnectors\",\n        \"debug_msg_counts\": true,\n        \"default_logger_bk_count\":5,\n        \"default_logger_dirpath\": \"/var/tmp/aioconnectors\",\n        \"default_logger_log_level\": \"INFO\",\n        \"default_logger_rotate\": true,\n        \"disk_persistence_recv\": false,\n        \"disk_persistence_send\": false,\n        \"enable_client_try_reconnect\": true,\n        \"everybody_can_send_messages\": true,\n        \"file_recv_config\": {},\n        \"ignore_peer_traffic\": false,\n        \"is_server\": true,\n        \"keep_alive_period\": null,\n        \"keep_alive_timeout\": 5,\n        \"max_certs\": 1024,\n        \"max_number_of_unanswered_keep_alive\": 2,\n        \"max_size_file_upload_recv\": 8589930194,\n        \"max_size_file_upload_send\": 8589930194,\n        \"max_size_persistence_path\": 1073741824,\n        \"proxy\": {},\n        \"pubsub_central_broker\": false,\n        \"recv_message_types\": [\n            \"any\"\n        ],\n        \"reuse_server_sockaddr\": false,\n        \"reuse_uds_path_commander_server\": false,\n        \"reuse_uds_path_send_to_connector\": false,\n        \"send_message_types\": [\n            \"any\"\n        ],\n        \"send_message_types_priorities\": {},\n        \"send_timeout\": 50,\n        \"server_ca\": false,\n        \"server_ca_certs_not_stored\": true,\n        \"server_secure_tls\": true,\n        \"server_sockaddr\": [\n            \"127.0.0.1\",\n            10673\n        ],\n        \"silent\": true,\n        \"ssl_allow_all\": false,\n        \"subscribe_message_types\": [],\n        \"token_client_send_cert\": true,\n        \"token_client_verify_server_hostname\": null,\n        \"token_server_allow_authorized_non_default_cert\": false,\n        \"token_verify_peer_cert\": true,\n        \"tokens_directory_path\": \"/var/tmp/aioconnectors\",\n        \"uds_path_receive_preserve_socket\": true,\n        \"uds_path_send_preserve_socket\": true,\n        \"use_ssl\": true,\n        \"use_token\": false,\n        \"whitelisted_clients_id\": null,\n        \"whitelisted_clients_ip\": null,\n        \"whitelisted_clients_subnet\": null\n    }\n\n\nHere is an example of config\\_file\\_path, with ConnectorAPI class arguments, used to send/receive messages.  \nThese are a subset of ConnectorManager arguments : which means you can use the ConnectorManager config file also for ConnectorAPI.\n\n\n    {\n        \"client_name\": null,\n        \"connector_files_dirpath\": \"/var/tmp/aioconnectors\",\n        \"default_logger_bk_count\":5,        \n        \"default_logger_dirpath\": \"/var/tmp/aioconnectors\",\n        \"default_logger_log_level\": \"INFO\",\n        \"default_logger_rotate\": true,\n        \"is_server\": true,\n        \"max_size_chunk_upload\": 209715200,\n        \"pubsub_central_broker\": false,\n        \"receive_from_any_connector_owner\": true,\n        \"recv_message_types\": [\n            \"any\"\n        ],\n        \"send_message_types\": [\n            \"any\"\n        ],\n        \"server_sockaddr\": [\n            \"127.0.0.1\",\n            10673\n        ],\n        \"uds_path_receive_preserve_socket\": true,\n        \"uds_path_send_preserve_socket\": true\n    }\n\n\n-**alternate\\_client\\_default\\_cert** is false by default : if true it lets the client try to connect alternatively with the default certificate, in case of failure with the private certificate. This can save the hassle of having to delete manually your client certificate when the certificate was already deleted on server side. This affects also the token authentication : the client will try to connect alternatively by requesting a new token if its token fails.  \n-**blacklisted\\_clients\\_id|ip|subnet** : a list of blacklisted clients (regex for blacklisted\\_clients\\_id), can be updated on the fly with the api functions add|remove\\_blacklist\\_client or in the cli.  \n-**certificates\\_directory\\_path** is where your certificates are located, if use\\_ssl is True. This is the <optional\\_directory\\_path> where you generated your certificates by calling \"python3 -m aioconnectors create\\_certificates <optional\\_directory\\_path>\".  \n-**client\\_cafile\\_verify\\_server** : On client side, if server\\_sockaddr is configured with the server hostname, you can set client\\_cafile\\_verify\\_server to be the ca cert path (like /etc/ssl/certs/ca-certificates.crt), to enable CA verification of you server certificate.  \n-**client\\_name** is used on client side. It is the name that will be associated with this client on server side. Auto generated if not supplied in ConnectorManager. Mandatory in ConnectorAPI. It should match the regex \\^\\[0\\-9a\\-zA\\-Z\\-\\_\\:\\]\\+$  \n-**client_bind_ip** is optional, specifies the interface to bind your client. You can use an interface name or its ip address (string).  \n-**connect\\_timeout** : On client side, the socket timeout to connect to Tsoc. Default is 10s, you might need to increase it when using a server hostname in server\\_sockaddr, since sometimes name resolution with getaddrinfo is slow.  \n-**connector\\_files\\_dirpath** is important, it is the path where all internal files are stored. The default is /var/tmp/aioconnectors. unix sockets files, default log files, and persistent files are stored there.  \n-**debug_msg_counts** is a boolean, enables to display every 2 minutes a count of messages in the log file, and in stdout if **silent** is disabled.  \n-**default\\_logger\\_rotate** (boolean) can also be an integer telling the maximum size of the log file in bytes.  \n-**default\\_logger\\_bk\\_count**  an integer telling the maximum number of gzip compressed logs kept when log rotate is enabled. Default is 5.  \n-**disk\\_persistence\\_recv** : In order to enable persistence between the connector and a message listener (supported on both client and server sides), use disk\\_persistence\\_recv=True (applies to all message types). disk\\_persistence\\_recv can also be a list of message types for which to apply persistence. There will be 1 persistence file per message type.  \n-**file\\_recv\\_config** : In order to be able to receive files, you must define the destination path of files according to their associated dst\\_type. This is done in file\\_recv\\_config, as shown in aioconnectors\\_test.py. file\\_recv\\_config = {\"target\\_directory\":\"\", \"owner\":\"\", \"override\\_existing\":False}. **target\\_directory** is later formatted using the transport\\_json fields : which means you can use a target\\_directory value like \"/my_destination_files/{message\\_type}/{source\\_id}\". **owner** is optional, it is the owner of the uploaded file. It must be of the form \"user:group\". **override\\_existing** is optional and false by default : when receiving a file with an already existing destination path, it decides whether to override the existing file or not.  \n-**enable\\_client\\_try\\_reconnect** is a boolean set to True by default. If enabled, it lets the client try to reconnect automatically to the server every 5 seconds in case of failure.  \n-**keep\\_alive\\_period** is null by default. If an integer then the client periodically sends a ping keep-alive to the server. If **max\\_number\\_of\\_unanswered\\_keep\\_alive** (default is 2) keep-alive responses are not received by the client, each after **keep\\_alive\\_timeout** (default is 5s), then the client disconnects and tries to reconnect with the same mechanism used by enable\\_client\\_try\\_reconnect.  \n-**everybody\\_can\\_send\\_messages** if True lets anyone send messages through the connector, otherwise the sender must have write permission to the connector. Setting to True requires the connector to run as root.  \n-**hook\\_allow\\_certificate\\_creation** : does not appear in the config file (usable as a kwargs only). Only for server. Can be an async def coroutine receiving a client_name and returning a boolean, to let the server accept or block the client_name certificate creation.  \n-**hook\\_server\\_auth\\_client** : does not appear in the config file (usable as a kwargs only). Only for server. Can be an async def coroutine receiving a client peername and returning a boolean, to let the server accept or block the client connection. An example exists in the chat implementation in applications.py.  \n-**hook\\_store\\_token** and **hook\\_load\\_token** : lets you manipulate the token before it is stored on disk, for client only.  \n-**hook\\_target\\_directory** : does not appear in the config file (usable as a kwargs only). A dictionary of the form {dst\\_type: custom_function} where custom\\_function receives transport\\_json as an input and outputs a destination path to be appended to target\\_directory. If custom\\_function returns None, it has no effect on the target\\_directory. If custom\\_function returns False, the file is refused. This enables better customization of the target\\_directory according to transport\\_json. An example exists in the chat implementation in applications.py.  \n-**hook\\_whitelist\\_clients** : does not appear in the config file (usable as a kwargs only). Has 2 arguments : extra_info, peername. Lets you inject some code when blocking non whitelisted client.  \n-**hook\\_proxy\\_authorization** : does not appear in the config file (usable as a kwargs only). Only for client. A function that receives and returns 2 arguments : the proxy username and password. It returns them after an eventual transformation (like a decryption for example).  \n-**ignore_peer_traffic** to ignore a peer traffic, can be updated on the fly with the api functions ignore\\_peer\\_traffic\\_enable, ignore\\_peer\\_traffic\\_enable\\_unique, or ignore\\_peer\\_traffic\\_disable or in the cli.  \n-**is\\_server** (boolean) is important to differentiate between server and client  \n-**max\\_certs** (integer) limits the maximum number of clients that can connect to a server using client ssl certificates.  \n-**max_size_chunk_upload** (integer) used only by ConnectorAPI to send a file in chunks, default chunk length is 200MB. You can try a max chunk length of up to 1GB in a fast network, and might need to lower it in a slow network.  \n-**max\\_size\\_file\\_upload\\_send** and **max\\_size\\_file\\_upload\\_recv**: Size limit of the files you send and receive, both on server and on client. Default is 8GB. However best performance is achieved until 1GB. Once you exceed 1GB, the file is divided in 1GB chunks and reassembled after reception, which is time consuming.  \n-**disk\\_persistence\\_send** : In order to enable persistence between client and server (supported on both client and server sides), use disk\\_persistence\\_send=True (applies to all message types). disk\\_persistence\\_send can also be a list of message types for which to apply persistence. There will be 1 persistence file per message type. You can limit the persistence files size with **max\\_size\\_persistence\\_path**.  \n-**pubsub\\_central\\_broker** : set to True if you need your server to be the broker. Used in the publish/subscribe approach, not necessary in the point to point approach.  \n-**proxy** an optional dictionary like {\"enabled\":true, \"address\":\"<proxy_url>\", \"port\":<proxy_port>, \"authorization\":\"\", \"ssl\\_server\":false}. Relevant only on client side. Lets the client connect to the server through an http(s) proxy with the connect method, if the **enabled** field is true. The authorization field can have a value like {\"username\":\"<username>\", \"password\":\"<password>\"}. Regardless of the aioconnectors inner encryption, you can set the \"ssl\\_server\" flag in case your proxy listens on ssl : this feature is under development and not tested because such proxy setup is rare.  \n-**receive\\_from\\_any\\_connector\\_owner** if True lets the api receive messages from a connector being run by any user, otherwise the connector user must have write permission to the api. True by default (requires the api to run as root to be effective).  \n-**recv\\_message\\_types** : the list of message types that can be received by connector. Default is [\"any\"]. It should include the send\\_message\\_types using await\\_response.  \n-**reuse\\_server\\_sockaddr**, **reuse\\_uds\\_path\\_send\\_to\\_connector**, **reuse\\_uds\\_path\\_commander\\_server** : booleans false by default, that prevent duplicate processes you might create by mistake from using the same sockets. In case your OS is not freeing a closed socket, you still can set the relevant boolean to true.  \n-**send\\_message\\_types** : the list of message types that can be sent from connector. Default is [\"any\"] if you don't care to differentiate between message types on your \napplication level.  \n-**send\\_message\\_types\\_priorities** : None, or a dictionary specifying for each send\\_message\\_type its priority. The priority is an integer, a smaller integer meaning a higher priority. Usually this is not needed, but with very high throughputs you may want to use it in order to ensure that a specific message type will not get drown by other messages. This might starve the lowest priority messages. Usage example : \"send\\_message\\_types\\_priorities\": {\"type\\_fast\":0, \"type\\_slow\":1}.  \n-**send\\_timeout** : maximum time for sending a message between peers on the socket. By default 50 seconds. After timeout, the message is lost, the sending peer disconnects, and peers reconnect if enable\\_client\\_try\\_reconnect.  \n-**server\\_ca** : (server only) If set to false (default), the server authenticates client certificates according to the stored certificates, otherwise according to its CA. You can always add manually defaultN or CA certificates, under certificates/server/client-certs/symlinks.  \n-**server\\_ca\\_certs\\_not\\_stored** : (server only) True by default. If server\\_ca is true, the generated client certificates won't be stored on server side.  \n-**server\\_secure\\_tls** : (server only) If set to true (default), the server allows only clients using TLS version >= v1.2.  \n-**server\\_sockaddr** can be configured as a tuple when used as a kwarg, or as a list when used in the json, and is mandatory on both server and client sides. You can use an interface name instead of its ip on server side, for example (\"eth0\", 10673).  \n-**subscribe\\_message\\_types** : In the publish/subscribe approach, specify for your client the message types you want to subscribe to. It is a subset of recv\\_message\\_types.  \n-**tokens\\_directory\\_path** : The path of your server token json file, or client token file.  \n-**token\\_verify\\_peer\\_cert** : True by default. If boolean, True means the server/client verifies its peer certificate according to its default location under certificates_directory_path. On client : can also be a string with full path of a custom server certificate, or even a string with full path of CA certificate to authenticate server hostname (for example \"/etc/ssl/certs/ca-certificates.crt\", in that case token\\_client\\_verify\\_server\\_hostname should be true).  \n-**token\\_client\\_send\\_cert** : True by default. Boolean, must be True if server has token\\_verify\\_peer\\_cert enabled : sends the client certificate.  \n-**token\\_client\\_verify\\_server\\_hostname** : if true, client authenticates the server hostname with token\\_verify\\_peer\\_cert (CA path) during SSL handshake.  \n-**token\\_server\\_allow\\_authorized\\_non\\_default\\_cert** : boolean false by default. If true, server using use\\_token will allow client with non default authorized certificate, even if this client doesn't use a token.  \n-**uds\\_path\\_receive\\_preserve\\_socket** should always be True for better performance, your message\\_received\\_cb coroutine in start\\_waiting\\_for\\_messages is called for each message without socket disconnection between messages (in fact, only 1 disconnection per 100 messages).  \n-**uds\\_path\\_send\\_preserve\\_socket** should always be True for better performance.  \n-**use\\_ssl**, **ssl\\_allow\\_all**, **use\\_token** are boolean, must be identical on server and client. use\\_ssl enables encryption as explained previously. When ssl\\_allow\\_all is disabled, certificates validation is enforced. use\\_token requires use\\_ssl and ssl\\_allow\\_all both enabled.  \n-**whitelisted\\_clients\\_id|ip|subnet** : a list of whitelisted clients (regex for whitelisted\\_clients\\_id), can be updated on the fly with the api functions add|remove\\_whitelist\\_client or in the cli.  \n\n\n<a name=\"send\"></a>\n### 5.More details about the send\\_message arguments\n\n    send_message(message_type=None, destination_id=None, request_id=None, response_id=None,\n    data=None, data_is_json=True, binary=None, await_response=False, with_file=None,\n    wait_for_ack=False, await_response_timeout=None) \n    with_file can be like : {'src_path':'','dst_type':'', 'dst_name':'', 'delete':False, 'owner':''}\n\nsend_message is an async coroutine.  \nThese arguments must be filled on the application layer by the user  \n-**await\\_response** is False by default, set it to True if your coroutine calling send\\_message expects a response value.  \nIn such a case, the remote peer has to answer with response\\_id equal to the request\\_id of the request. (This is shown in aioconnectors\\_test.py).  \n-**await_response_timeout** is None by default. If set to a number, and if await\\_response is true, the method waits up to this timeout for the peer response, and if timeout is exceeded it returns False.  \n-**data** is the payload of your message. By default it expects a json, but it can be a string, and even bytes. However, using together the \"data\" argument for a json or a string, and the \"binary\" argument for binary payload, is a nice way to accompany a binary payload with some textual information. Contrary to \"data\", **binary** must be bytes, and cannot be a string. A message size should not exceed 1GB.  \n-**data\\_is\\_json** is True by default since it assumes \"data\" is a json, and it dumps it automatically. Set it to False if \"data\" is not a json.  \n-**destination\\_id** is mandatory for server : it is the remote client id. Not needed by client.  \n-**message\\_type** is mandatory, it enables to have different listeners that receive different message types. You can use \"any\" as a default.  \n-**request\\_id** and **response\\_id** are optional (integer or string) : they are helpful to keep track of asynchronous messages on the application layer. At the application level, the remote peer should answer with response\\_id equal to the request\\_id of the request. The request sender can then associate the received response with the request sent.  \n-The **publish\\_message** and **publish\\_message\\_sync** methods are the same as the send_message ones, but used by a client in the publish/subscribe approach.  \n-The **send\\_message\\_await\\_response** method is the same as send_message, but automatically sets await_response to True.  \n-The **send\\_message\\_sync** method is almost the same as send_message, but called synchronously (not an async coroutine). It can also receive a \"loop\" as a kwarg. If a loop is running in the background, it schedules and returns a task. Otherwise it returns the peer response if called with await\\_response.  \n-**wait\\_for\\_ack** is not recommended for high throughputs, since it slows down dramatically. Basic testing showed a rate of ten messages per second, instead of a few thousands messages per second in the point to point approach.  \nNot a benchmark, but some point-to-point and pubsub trials (VM with 8GB RAM and 4 cores) showed that up until 4000 messages (with data of 100 bytes) per second could be received by a server without delay, and from that point the receive queue started to be non empty. This test gave the same result with 100 clients sending each 40 events per second, and with 1 client sending 4000 events per second.  \n-**with\\_file** lets you embed a file, with {'src\\_path':'','dst\\_type':'', 'dst\\_name':'', 'delete':False, 'owner':''}. **src\\_path** is the source path of the file to be sent, **dst\\_type** is the type of the file, which enables the remote peer to evaluate the destination path thanks to its ConnectorManager attribute \"file\\_recv\\_config\" dictionary. **dst\\_name** is the name the file will be stored under. **delete** is a boolean telling if to delete the source file after it has been sent. **owner** is the optional user:group of your uploaded file : if used, it overrides the \"owner\" value optionally set on server side in file\\_recv\\_config. If an error occurs while opening the file to send, the file will not be sent but with\\_file will still be present in transport\\_json received by peer, and will contain an additional key **file\\_error** telling the error to the peer application.  \n-**tag** lets you add a tag string to your message in transport\\_json : it has the advantage of being accessible at reception directly in transport\\_json without the need to look into the data structure.  \n\n\n<a name=\"management\"></a>\n### 6.Management programmatic tools\n\nThe class ConnectorManager has several methods to manage your connector. These methods are explained in <a href=\"#cli\">7-</a>.  \n-**delete\\_client\\_certificate**, **delete\\_client\\_token**, **disconnect\\_client**, **reload\\_tokens**  \n-**add\\_blacklist_client, remove\\_blacklist_client**, **add\\_whitelist_client, remove\\_whitelist_client**  \n-**delete\\_previous\\_persistence\\_remains**  \n-**ignore\\_peer\\_traffic\\_show**, **ignore\\_peer\\_traffic\\_enable**, **ignore\\_peer\\_traffic\\_enable\\_unique**, **ignore\\_peer\\_traffic\\_disable**  \n-**show\\_connected\\_peers**  \n-**show\\_log\\_level**, **set\\_log\\_level**  \n-**show\\_subscribe\\_message\\_types**, **set\\_subscribe\\_message\\_types**  \n-**start\\_connector**, **stop\\_connector**, **restart\\_connector**  \nThe same methods can be executed remotely, with the ConnectorRemoteTool class. This class is instantiated exactly like ConnectorAPI, with the same arguments (except for receive_from_any_connector_owner)  \n\n    connector_remote_tool = aioconnectors.ConnectorRemoteTool(config_file_path=config_file_path)\n\nAn example of ConnectorRemoteTool is available in applications.py in the cli implementation.\n\n\n<a name=\"cli\"></a>\n### 7.Other management command line tools\n\n    python3 -m aioconnectors cli\n\nto run several interesting commands like :   \n-**start/stop/restart** your connectors.  \n-**show\\_connected\\_peers** : show currently connected peers.  \n-**delete\\_client\\_certificate** enables your server to delete a specific client certificate. delete\\_client\\_certificate enables your client to delete its own certificate and fallback using the default one. In order to delete a certificate of a currently connected client, first delete the certificate on server side, which will disconnect the client instantaneously, and then delete the certificate on client side : the client will then reconnect automatically and obtain a new certificate. The client side deletion is not needed in case alternate\\_client\\_default\\_cert is true.  \n-**delete\\_client\\_token** enables your server to delete a specific client token. Enables you client to delete its own token and fallback requesting a new token.  \n-**reload\\_tokens** reloads tokens after for example modifying them on disk.  \n-**disconnect_client** enables your server to disconnect a specific client.  \n-**add\\_blacklist_client, remove\\_blacklist_client** enables your server to blacklist a client by id (regex), ip, or subnet, at runtime. Disconnects the client if blacklisted by id, also deletes its certificate if exists. Kept in the connector config file if exists.  \n-**add\\_whitelist_client, remove\\_whitelist_client** enables your server to whitelist a client by id (regex), ip, or subnet, at runtime. Kept in the connector config file if exists.   \n-**peek\\_queues** to show the internal queues sizes.  \n-**ignore\\_peer\\_traffic** can be a boolean, or a peer name. When enabled, the connector drops all new messages received from peers, or from the specified peer. It also drops new messages to be sent to all peers, or to the specified peer. This mode can be useful to let the queues evacuate their accumulated messages.  \n-**show\\_log\\_level** to show the current log level.  \n-**set\\_log\\_level** to set the log level on the fly.  \n-**show\\_subscribe\\_message\\_types** to show the subscribed message types of a client.  \n-**set\\_subscribe\\_message\\_types** to set the list of all subscribed message types of a client.  \n\n\n<a name=\"testing\"></a>\n### 8.Testing command line tools\n\n-To let your connector send pings to a remote connector, and print its replies. \n\n    python3 -m aioconnectors ping <config_json_path>\n\n-To simulate a simple application waiting for messages, and print all received messages. Your application should not wait for incoming messages when using this testing tool.\n\n    python3 -m aioconnectors test_receive_messages <config_json_path>\n\n-To simulate a simple application sending dummy messages.\n\n    python3 -m aioconnectors test_send_messages <config_json_path>\n\n\n<a name=\"chat\"></a>\n### 9.Funny embedded chat\n\nA simple chat using aioconnectors is embedded. It allows you to exchange messages, files and directories easily between 2 Linux or Mac stations. It can also be configured to execute the commands it receives.  \nIt is encrypted, and supports authentication by prompting to accept connections.  \nIt is not a multi user chat, but more of a tool to easily transfer stuff between your computers.\n\n-On the 1st station (server side), type : \n\n    python3 -m aioconnectors chat\n\n-Then on the 2nd station (client side), type :\n\n    python3 -m aioconnectors chat --target <server_ip> \n\nYou can execute local shell commands by preceding them with a \\\"\\!\\\".  \nYou can also upload files during a chat, by typing \\\"\\!upload \\<file or dir path\\>\\\".  \nFiles are uploaded to your current working directory. A directory is transferred as a zip file.  \nYou can simply unzip a zip file by using \\\"\\!dezip \\<file name\\>\\\".  \n\nThe cleanest way to exit a chat is by typing \\\"\\!exit\\\" on both sides.  \n\n-On client side, you can also directly upload a file or directory to the server without opening a chat :\n\n     python3 -m aioconnectors chat --target <server_ip> --upload <file or dir path>\n\n-You can configure client or server (not simultaneously) to execute the commands it receives, by using the --exec <shell_path> option :\n    \n    python3 -m aioconnectors chat --exec /bin/sh\n    python3 -m aioconnectors chat --target <server_ip>\n    or\n    python3 -m aioconnectors chat\n    python3 -m aioconnectors chat --target <server_ip>  --exec /bin/sh\n\n-On server side, you can accept client connections without prompting by specifying --accept :\n\n    python3 -m aioconnectors chat --accept\n\n-More info :\n\n    python3 -m aioconnectors chat --help\n\n-If you need your server to listen on a specific interface :\n\n    python3 -m aioconnectors chat --bind_server_ip <server_ip>\n\n<server\\_ip> can be an ip address, or an interface name  \n\n-If you don't want your server to use the default port (10673), use --port on both peers : \n\n    python3 -m aioconnectors chat --port <port> [--target <server_ip>]\n    \n-By default the chat has tab completion, you can disable it with --nowrap.\n\n\n<a name=\"containers\"></a>\n## Containers\n\nConnector client and server, as well as connector api, can run in a Docker container, you just need to pip install aioconnectors in a Python image (or any image having Python >= 3.6 and openssl).  \nA connector and its connector api must run on the same host, or in the same Kubernetes pod.  \nA connector and its connector api can run in the same container, or in different containers. In case you choose to run them in different containers, you must configure their connector_files_dirpath path as a shared volume, in order to let them share their UDS sockets.\n\n\n<a name=\"windows\"></a>\n## Windows ?\n\nTo port aioconnectors to Windows, these steps should be taken, and probably more :  \n-Replace usage of unix sockets by maybe : local sockets, or named pipes, or uds sockets if and when they are supported.  \nSince the implementation relies on unix sockets paths, a possible approach would be to preserve these paths, and manage a mapping between the paths and their corresponding local listening ports.  \n-Port the usage of openssl in ssl_helper.py  \n-Convert paths  \n-Ignore the file uploaded ownership feature  \n-Convert the interface to ipaddress function using ip (used for sockaddr and client\\_bind\\_ip)\n\n\n\n\n\n",
    "bugtrack_url": null,
    "license": "",
    "summary": "Simple secure asynchronous message queue",
    "version": "1.6.3",
    "project_urls": {
        "Homepage": "https://github.com/mori-b/aioconnectors"
    },
    "split_keywords": [
        "message queue",
        "broker",
        "asyncio",
        "simple",
        "easy"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "39fdc0014a4a44bda9643b1318cdeaf4f52275c408916ef2fa47eab8765dc88c",
                "md5": "b705a907940e674a99a726c58eee63c9",
                "sha256": "5f2ac46561627c434035454a3aef7d9f5de9f04d5fd9bb2818b53b58be793156"
            },
            "downloads": -1,
            "filename": "aioconnectors-1.6.3-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "b705a907940e674a99a726c58eee63c9",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.6",
            "size": 91665,
            "upload_time": "2023-05-18T17:35:25",
            "upload_time_iso_8601": "2023-05-18T17:35:25.910078Z",
            "url": "https://files.pythonhosted.org/packages/39/fd/c0014a4a44bda9643b1318cdeaf4f52275c408916ef2fa47eab8765dc88c/aioconnectors-1.6.3-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "0432466198301edc938dff7103e8751d89b58dd66b15e1e6b9ed8e77a4a21ac3",
                "md5": "d0d424673f8e2ae0f768b2eccf3b09f5",
                "sha256": "0d6bf907372dac108f85a056ae634eb7246843ce45bc5cafde5f6c5f5efa60c0"
            },
            "downloads": -1,
            "filename": "aioconnectors-1.6.3.tar.gz",
            "has_sig": false,
            "md5_digest": "d0d424673f8e2ae0f768b2eccf3b09f5",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.6",
            "size": 121586,
            "upload_time": "2023-05-18T17:35:34",
            "upload_time_iso_8601": "2023-05-18T17:35:34.310514Z",
            "url": "https://files.pythonhosted.org/packages/04/32/466198301edc938dff7103e8751d89b58dd66b15e1e6b9ed8e77a4a21ac3/aioconnectors-1.6.3.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2023-05-18 17:35:34",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "mori-b",
    "github_project": "aioconnectors",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": false,
    "lcname": "aioconnectors"
}
        
Elapsed time: 0.08503s