# SDPLC
SDPLC is a python library that is able to simulate a Modbus server, OPC UA server and a HTTPS server at the same time.
The Modbus and OPC UA server is compatable with standard clients and most SCADA systems. You could use SDPLC as a simulated PLC for developping and testing.
The HTTPS server allows RESTful API calls to retrive the available variables/registers, as well as read and write operations.
Addtionally, you can use the "Akatosh" library (pre-bundled) to create logic that mimic a real PLC that has been programmed.
Thanks "asyncua" and "pymodbus" for the implementation of OPC UA and Modbus protocols.
## Update
### 1.0.0
Initial release.
### 1.0.1
1. Implemented configuration file validation
2. Rewrite value sync logic
### 1.0.2
This version will include the capability to connect with down stream ModBus/OPC UA server, so that your logic written with Akatosh will be able to actually control real PLCs!
### 1.0.3
1. Bug fixes
## Examples
The following example shows how to run a simulated tank system. Then, a controller will be implemented to control the tank level.
### Simulated PLC:
```python
from math import inf
import uvicorn
from Akatosh.event import event
from Akatosh.universe import Mundus
from FasterAPI.app import app
from asyncua import ua
from sdplc import logger
from sdplc.sdplc import simPLC
from sdplc.router import sim_plc_router
from FasterAPI.cert import generate_key_and_csr, generate_root_ca, sign_certificate
app.include_router(sim_plc_router)
time = 0
@event(at=0, step=1, till=inf, label="Tank Level Sensor", priority=2)
async def tank_level_sensor():
current_tank_level = await simPLC.read_node("Tank Level")
variable = [
variable for variable in simPLC.nodes if variable.qualified_name == "Tank Level"
][0]
logger.info(f"Tank Level Sensor: {current_tank_level}")
@event(at=0, till=inf, label="Tank Level Simulation", priority=2)
async def sim_tank_level():
global time
eclpsed_time = Mundus.time - time
inlet = await simPLC.read_node("Inlet Valve")
outlet = await simPLC.read_node("Outlet Valve")
current_tank_level = await simPLC.read_node("Tank Level")
if inlet is True:
current_tank_level += 10 * eclpsed_time
if outlet is True:
current_tank_level -= 5 * eclpsed_time
await simPLC.write_node("Tank Level", current_tank_level)
time = Mundus.time
if __name__ == "__main__":
simPLC.init(
config_file="./example_plc.yaml",
)
uvicorn.run("FasterAPI.app:app", host="0.0.0.0", port=8080)
```
```yaml
server: "ModBus"
modbus_server_config:
type: "udp"
address: 0.0.0.0
port: 1502
byte_order: "big"
word_order: "big"
nodes:
- qualified_name: "Inlet Valve"
value: false
modbus:
slave: 0
address: 0
type: "c"
opcua:
namespace: "0"
node_qualified_name: "0"
- qualified_name: "Outlet Valve"
value: false
modbus:
slave: 0
address: 1
type: "c"
opcua:
namespace: "0"
node_qualified_name: "0"
- qualified_name: "Tank Level"
value: 0.0
modbus:
slave: 0
address: 0
type: "i"
register_size: 64
opcua:
namespace: "0"
node_qualified_name: "0"
- qualified_name: "Blender"
value: false
modbus:
slave: 0
address: 2
type: "c"
opcua:
namespace: "0"
node_qualified_name: "0"
```
### Controller:
```python
import socket
from math import inf
import uvicorn
from Akatosh.event import event
from Akatosh.universe import Mundus
from FasterAPI.app import app
from asyncua import ua
from sdplc import logger
from sdplc.sdplc import simPLC
from sdplc.router import sim_plc_router
from FasterAPI.cert import generate_key_and_csr, generate_root_ca, sign_certificate
app.include_router(sim_plc_router)
time = 0
ca, ca_crt = generate_root_ca(
common_name=socket.gethostname(),
subject_alternative_names=[socket.gethostname()],
directory="./",
)
server_key, server_csr = generate_key_and_csr(
common_name=socket.gethostname(),
san_dns_names=[socket.gethostname()],
san_uris=["uri:ulfaric:SDPLC"],
directory="./",
)
sign_certificate(csr=server_csr, issuer_key=ca, issuer_cert=ca_crt, directory="./")
@event(at=0, till=inf, label="Valve Control", priority=2)
async def inlet_control():
current_tank_level = await simPLC.read_node("Tank Level")
if current_tank_level <= 0:
inlet_state = await simPLC.read_node("Inlet Valve")
if inlet_state == False:
await simPLC.write_node("Inlet Valve", True)
await simPLC.write_node("Outlet Valve", False)
logger.info(
"Tank level reached lower threshold, closing outlet valve and opening inlet valve"
)
if current_tank_level >= 50 and current_tank_level < 100:
inlet_state = await simPLC.read_node("Inlet Valve")
outlet_state = await simPLC.read_node("Outlet Valve")
if outlet_state == False:
await simPLC.write_node("Outlet Valve", True)
logger.info("Tank level reached high threshold, opening both valves")
if inlet_state == False:
await simPLC.write_node("Inlet Valve", True)
logger.info("Tank level reached high threshold, opening both valves")
if current_tank_level >= 150:
inlet_state = await simPLC.read_node("Inlet Valve")
if inlet_state == True:
await simPLC.write_node("Inlet Valve", False)
await simPLC.write_node("Outlet Valve", True)
logger.info(
"Tank level reached critical threshold, closing inlet valve and opening outlet valve"
)
@event(at=0, step=1, till=inf, label="Tank Level Sensor", priority=2)
async def tank_level_sensor():
current_tank_level = await simPLC.read_node("Tank Level")
variable = [
variable for variable in simPLC.nodes if variable.qualified_name == "Tank Level"
][0]
logger.info(f"Tank Level Sensor: {current_tank_level}")
@event(at=0, till=inf, label="Blender", priority=2)
async def blender():
current_tank_level = await simPLC.read_node("Tank Level")
if current_tank_level >= 100:
await simPLC.write_node("Blender", True)
else:
await simPLC.write_node("Blender", False)
if __name__ == "__main__":
simPLC.init(
config_file="./example_controller.yaml",
)
uvicorn.run("FasterAPI.app:app", host="0.0.0.0", port=8088)
```
```yaml
client: "ModBus"
modbus_client_config:
type: "udp"
address: 127.0.0.1
port: 1502
byte_order: "big"
word_order: "big"
nodes:
- qualified_name: "Inlet Valve"
value: false
modbus:
slave: 0
address: 0
type: "c"
opcua:
namespace: "0"
node_qualified_name: "0"
- qualified_name: "Outlet Valve"
value: true
modbus:
slave: 0
address: 1
type: "c"
opcua:
namespace: "0"
node_qualified_name: "0"
- qualified_name: "Tank Level"
value: 0.0
modbus:
slave: 0
address: 0
type: "i"
register_size: 64
opcua:
namespace: "0"
node_qualified_name: "0"
- qualified_name: "Blender"
value: false
modbus:
slave: 0
address: 2
type: "c"
opcua:
namespace: "0"
node_qualified_name: "0"
```
### Modbus
You can also just simulate a Modbus server. In this mode, you will have to add registers manually.
```python
from sdplc.modbus.server import modbusServer
modbusServer.create_slave(0)
for i in range(0, 10):
modbusServer.create_coil(0, i, False)
for i in range(0, 10):
modbusServer.create_discrete_input(0, i, False)
modbusServer.create_holding_register(0, 0, 0, 64)
modbusServer.create_input_register(0, 0, 1000, 64)
modbusServer.start()
```
### OPC UA
You can also just simulate a OPC UA server. You will also need to manually create namespace, node and variables.
```python
from sdplc.opcua.server import opcuaServer
opcuaServer.init()
namespace = "http://example.org"
opcuaServer.register_namespace(namespace)
node = opcuaServer.register_node("Tank", namespace)
opcuaServer.register_variable("Tank Level", node=node, writeable=True, value=0)
opcuaServer.register_variable(
"Tank Temperature", node_qualified_name="Tank", writeable=True, value=0
)
opcuaServer.start()
```
Raw data
{
"_id": null,
"home_page": "https://github.com/ulfaric/SDPLC",
"name": "sdplc",
"maintainer": null,
"docs_url": null,
"requires_python": "<4.0,>=3.10",
"maintainer_email": null,
"keywords": null,
"author": "ulfaric",
"author_email": "ryf0510@live.com",
"download_url": "https://files.pythonhosted.org/packages/16/25/43b9ae3c5fcb1b710860f2273127d5d6936133e8572829208fbe7fc8d2c7/sdplc-1.0.7.tar.gz",
"platform": null,
"description": "# SDPLC\n\nSDPLC is a python library that is able to simulate a Modbus server, OPC UA server and a HTTPS server at the same time.\n\nThe Modbus and OPC UA server is compatable with standard clients and most SCADA systems. You could use SDPLC as a simulated PLC for developping and testing.\n\nThe HTTPS server allows RESTful API calls to retrive the available variables/registers, as well as read and write operations.\n\nAddtionally, you can use the \"Akatosh\" library (pre-bundled) to create logic that mimic a real PLC that has been programmed.\n\nThanks \"asyncua\" and \"pymodbus\" for the implementation of OPC UA and Modbus protocols.\n\n## Update\n\n### 1.0.0\n\nInitial release.\n\n### 1.0.1\n\n1. Implemented configuration file validation\n2. Rewrite value sync logic\n\n### 1.0.2\n\nThis version will include the capability to connect with down stream ModBus/OPC UA server, so that your logic written with Akatosh will be able to actually control real PLCs!\n\n### 1.0.3\n\n1. Bug fixes\n\n## Examples\n\nThe following example shows how to run a simulated tank system. Then, a controller will be implemented to control the tank level.\n\n### Simulated PLC:\n\n```python\nfrom math import inf\n\nimport uvicorn\nfrom Akatosh.event import event\nfrom Akatosh.universe import Mundus\nfrom FasterAPI.app import app\nfrom asyncua import ua\nfrom sdplc import logger\nfrom sdplc.sdplc import simPLC\nfrom sdplc.router import sim_plc_router\nfrom FasterAPI.cert import generate_key_and_csr, generate_root_ca, sign_certificate\n\napp.include_router(sim_plc_router)\n\ntime = 0\n@event(at=0, step=1, till=inf, label=\"Tank Level Sensor\", priority=2)\nasync def tank_level_sensor():\n current_tank_level = await simPLC.read_node(\"Tank Level\")\n variable = [\n variable for variable in simPLC.nodes if variable.qualified_name == \"Tank Level\"\n ][0]\n logger.info(f\"Tank Level Sensor: {current_tank_level}\")\n\n\n@event(at=0, till=inf, label=\"Tank Level Simulation\", priority=2)\nasync def sim_tank_level():\n global time\n eclpsed_time = Mundus.time - time\n inlet = await simPLC.read_node(\"Inlet Valve\")\n outlet = await simPLC.read_node(\"Outlet Valve\")\n current_tank_level = await simPLC.read_node(\"Tank Level\")\n if inlet is True:\n current_tank_level += 10 * eclpsed_time\n if outlet is True:\n current_tank_level -= 5 * eclpsed_time\n await simPLC.write_node(\"Tank Level\", current_tank_level)\n time = Mundus.time\n\nif __name__ == \"__main__\":\n simPLC.init(\n config_file=\"./example_plc.yaml\",\n )\n uvicorn.run(\"FasterAPI.app:app\", host=\"0.0.0.0\", port=8080)\n\n```\n\n```yaml\nserver: \"ModBus\"\n \nmodbus_server_config:\n type: \"udp\"\n address: 0.0.0.0\n port: 1502\n byte_order: \"big\"\n word_order: \"big\"\n\nnodes:\n - qualified_name: \"Inlet Valve\"\n value: false\n modbus:\n slave: 0\n address: 0\n type: \"c\"\n opcua:\n namespace: \"0\"\n node_qualified_name: \"0\"\n\n - qualified_name: \"Outlet Valve\"\n value: false\n modbus:\n slave: 0\n address: 1\n type: \"c\"\n opcua:\n namespace: \"0\"\n node_qualified_name: \"0\"\n\n - qualified_name: \"Tank Level\"\n value: 0.0\n modbus:\n slave: 0\n address: 0\n type: \"i\"\n register_size: 64\n opcua:\n namespace: \"0\"\n node_qualified_name: \"0\"\n\n - qualified_name: \"Blender\"\n value: false\n modbus:\n slave: 0\n address: 2\n type: \"c\"\n opcua:\n namespace: \"0\"\n node_qualified_name: \"0\"\n```\n\n### Controller:\n```python\nimport socket\nfrom math import inf\n\nimport uvicorn\nfrom Akatosh.event import event\nfrom Akatosh.universe import Mundus\nfrom FasterAPI.app import app\nfrom asyncua import ua\nfrom sdplc import logger\nfrom sdplc.sdplc import simPLC\nfrom sdplc.router import sim_plc_router\nfrom FasterAPI.cert import generate_key_and_csr, generate_root_ca, sign_certificate\n\napp.include_router(sim_plc_router)\n\ntime = 0\n\n\nca, ca_crt = generate_root_ca(\n common_name=socket.gethostname(),\n subject_alternative_names=[socket.gethostname()],\n directory=\"./\",\n)\nserver_key, server_csr = generate_key_and_csr(\n common_name=socket.gethostname(),\n san_dns_names=[socket.gethostname()],\n san_uris=[\"uri:ulfaric:SDPLC\"],\n directory=\"./\",\n)\nsign_certificate(csr=server_csr, issuer_key=ca, issuer_cert=ca_crt, directory=\"./\")\n\n\n@event(at=0, till=inf, label=\"Valve Control\", priority=2)\nasync def inlet_control():\n current_tank_level = await simPLC.read_node(\"Tank Level\")\n if current_tank_level <= 0:\n inlet_state = await simPLC.read_node(\"Inlet Valve\")\n if inlet_state == False:\n await simPLC.write_node(\"Inlet Valve\", True)\n await simPLC.write_node(\"Outlet Valve\", False)\n logger.info(\n \"Tank level reached lower threshold, closing outlet valve and opening inlet valve\"\n )\n\n if current_tank_level >= 50 and current_tank_level < 100:\n inlet_state = await simPLC.read_node(\"Inlet Valve\")\n outlet_state = await simPLC.read_node(\"Outlet Valve\")\n if outlet_state == False:\n await simPLC.write_node(\"Outlet Valve\", True)\n logger.info(\"Tank level reached high threshold, opening both valves\")\n if inlet_state == False:\n await simPLC.write_node(\"Inlet Valve\", True)\n logger.info(\"Tank level reached high threshold, opening both valves\")\n\n if current_tank_level >= 150:\n inlet_state = await simPLC.read_node(\"Inlet Valve\")\n if inlet_state == True:\n await simPLC.write_node(\"Inlet Valve\", False)\n await simPLC.write_node(\"Outlet Valve\", True)\n logger.info(\n \"Tank level reached critical threshold, closing inlet valve and opening outlet valve\"\n )\n\n\n@event(at=0, step=1, till=inf, label=\"Tank Level Sensor\", priority=2)\nasync def tank_level_sensor():\n current_tank_level = await simPLC.read_node(\"Tank Level\")\n variable = [\n variable for variable in simPLC.nodes if variable.qualified_name == \"Tank Level\"\n ][0]\n logger.info(f\"Tank Level Sensor: {current_tank_level}\")\n\n\n@event(at=0, till=inf, label=\"Blender\", priority=2)\nasync def blender():\n current_tank_level = await simPLC.read_node(\"Tank Level\")\n if current_tank_level >= 100:\n await simPLC.write_node(\"Blender\", True)\n else:\n await simPLC.write_node(\"Blender\", False)\n\n\nif __name__ == \"__main__\":\n simPLC.init(\n config_file=\"./example_controller.yaml\",\n )\n uvicorn.run(\"FasterAPI.app:app\", host=\"0.0.0.0\", port=8088)\n\n```\n```yaml\nclient: \"ModBus\"\n \nmodbus_client_config:\n type: \"udp\"\n address: 127.0.0.1\n port: 1502\n byte_order: \"big\"\n word_order: \"big\"\n\n\nnodes:\n - qualified_name: \"Inlet Valve\"\n value: false\n modbus:\n slave: 0\n address: 0\n type: \"c\"\n opcua:\n namespace: \"0\"\n node_qualified_name: \"0\"\n\n - qualified_name: \"Outlet Valve\"\n value: true\n modbus:\n slave: 0\n address: 1\n type: \"c\"\n opcua:\n namespace: \"0\"\n node_qualified_name: \"0\"\n\n - qualified_name: \"Tank Level\"\n value: 0.0\n modbus:\n slave: 0\n address: 0\n type: \"i\"\n register_size: 64\n opcua:\n namespace: \"0\"\n node_qualified_name: \"0\"\n\n - qualified_name: \"Blender\"\n value: false\n modbus:\n slave: 0\n address: 2\n type: \"c\"\n opcua:\n namespace: \"0\"\n node_qualified_name: \"0\"\n```\n\n\n\n### Modbus\n\nYou can also just simulate a Modbus server. In this mode, you will have to add registers manually.\n\n```python\nfrom sdplc.modbus.server import modbusServer\n\nmodbusServer.create_slave(0)\n\nfor i in range(0, 10):\n modbusServer.create_coil(0, i, False)\n\nfor i in range(0, 10):\n modbusServer.create_discrete_input(0, i, False)\n\nmodbusServer.create_holding_register(0, 0, 0, 64)\nmodbusServer.create_input_register(0, 0, 1000, 64)\n\nmodbusServer.start()\n```\n\n### OPC UA\n\nYou can also just simulate a OPC UA server. You will also need to manually create namespace, node and variables.\n\n```python\nfrom sdplc.opcua.server import opcuaServer\n\nopcuaServer.init()\n\nnamespace = \"http://example.org\"\nopcuaServer.register_namespace(namespace)\nnode = opcuaServer.register_node(\"Tank\", namespace)\nopcuaServer.register_variable(\"Tank Level\", node=node, writeable=True, value=0)\nopcuaServer.register_variable(\n \"Tank Temperature\", node_qualified_name=\"Tank\", writeable=True, value=0\n)\n\nopcuaServer.start()\n```\n",
"bugtrack_url": null,
"license": "CC-BY-NC-ND-4.0",
"summary": "PLC Simulation and Remote Control",
"version": "1.0.7",
"project_urls": {
"Homepage": "https://github.com/ulfaric/SDPLC",
"Repository": "https://github.com/ulfaric/SDPLC"
},
"split_keywords": [],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "5b38c5dc51910717e7d4f6060a96064454313db1296baa2670f671be98b84cf0",
"md5": "c477479739113575eb8607732faac005",
"sha256": "ad9d07568806af5f92cb7b7141aef3bd0f2c3ebed39cc96285e7188f1f37e323"
},
"downloads": -1,
"filename": "sdplc-1.0.7-py3-none-any.whl",
"has_sig": false,
"md5_digest": "c477479739113575eb8607732faac005",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "<4.0,>=3.10",
"size": 233364,
"upload_time": "2024-12-07T12:08:57",
"upload_time_iso_8601": "2024-12-07T12:08:57.231022Z",
"url": "https://files.pythonhosted.org/packages/5b/38/c5dc51910717e7d4f6060a96064454313db1296baa2670f671be98b84cf0/sdplc-1.0.7-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "162543b9ae3c5fcb1b710860f2273127d5d6936133e8572829208fbe7fc8d2c7",
"md5": "dd1dae0ed45288bfa995fd1edf82cccf",
"sha256": "da546fd24d53f3cf8291155551afa6c052dbc634434235c7f7f9501d529d7f94"
},
"downloads": -1,
"filename": "sdplc-1.0.7.tar.gz",
"has_sig": false,
"md5_digest": "dd1dae0ed45288bfa995fd1edf82cccf",
"packagetype": "sdist",
"python_version": "source",
"requires_python": "<4.0,>=3.10",
"size": 222627,
"upload_time": "2024-12-07T12:08:59",
"upload_time_iso_8601": "2024-12-07T12:08:59.375726Z",
"url": "https://files.pythonhosted.org/packages/16/25/43b9ae3c5fcb1b710860f2273127d5d6936133e8572829208fbe7fc8d2c7/sdplc-1.0.7.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-12-07 12:08:59",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "ulfaric",
"github_project": "SDPLC",
"travis_ci": false,
"coveralls": false,
"github_actions": false,
"lcname": "sdplc"
}