pymodbus 3.11.4 で複数のデバイスを 1 つのシミュレータで扱う方法

この記事では、pymodbusで複数のデバイスがある環境を行う方法を記載している。
シミュレータとクライアントの疎通方法は別途以下で記載している。
pymodbusでシミュレータを使いModbus通信する | エクヌツITブログ

1.結論

pymodbus 3.11.4は、デフォルトの機能では複数のデバイスを模擬した試験が行えない。
複数のデバイスを模擬する場合は、下記で説明するコーディングをOSSに対して行う必要がある。
コーディングのみを見たい場合は、4.複数デバイスの実行方法を参照。

2.pymodbusのOSS仕様

pymodbusは、シミュレータ起動時に[setup.json]を読み込んで実行する。
set_up.jsonのデバイスを確認し、デバイスIDが何は以下の箇所で読み込みを行っている。
[フォルダパス]
pymodbus\server\simulator\http_server.py
[クラス名]
ModbusSimulatorServer
[関数名]
__init__(init関数)

        if "device_id" in server:
            # Designated ModBus unit address. Will only serve data if the address matches
            datastore = ModbusServerContext(devices={int(server["device_id"]): self.datastore_context}, single=False)
            del server["device_id"]
        else:
            # Will server any request regardless of addressing
            datastore = ModbusServerContext(devices=self.datastore_context, single=True)

上記のようにデバイスIDが設定されている場合は[single]をFalse、デバイスIDが設定されていない場合は[single]をTrueを設定している。
この[single]がFalseというのを用いて、複数のデバイスを模擬できるのかと思いきや・・・

呼び出し先の[ModbusServerContext]では、[single]がFalseか、Trueかを見ておらず複数のデバイスを模擬できないようになっている。

[フォルダパス]
pymodbus\datastore\context.py
[クラス名]
ModbusServerContext
[関数名]
__init__(init関数)

    def __init__(self, devices=None, single=True):
        """Initialize a new instance of a modbus server context.

        :param devices: A dictionary of client contexts
        :param single: Set to true to treat this as a single context
        """
        self.single = single
        self._devices = devices or {}
        if self.single:
            self._devices = {0: self._devices}

上記のようにTrueの時の処理のみが設定されており、Falseの場合は特に設定されていない。じゃあなんで、singleという引数があるんだと

3.複数デバイスの模擬方法

以下のようにModbusServerContextを設定することで複数のデバイスの模擬が行える。
[フォルダパス]
pymodbus\datastore\context.py
[クラス名]
ModbusServerContext
[関数名]
__init__(init関数)

    def __init__(self, devices=None, single=True):
        """Initialize a new instance of a modbus server context.

        :param devices: A dictionary of client contexts
        :param single: Set to true to treat this as a single context
        """
        self.single = single
        self._devices = devices or {}
        if self.single:
            self._devices = {0: self._devices}
            "追加部分---------------------------"
        else:
            devices_wk = {}
            for key in self._devices:
                for i in range(0, key):
                    devices_wk[i]=self._devices[key]
            self._devices = devices_wk
            "追加部分ここまで---------------------------"

上記のように設定することで、single=False の場合に指定したUnit ID まで同じデバイス設定を複製して実行できるようになる。

4.複数デバイスの実行方法

同フォルダ内に以下のファイルを設定する。
simulator_server.py

from pymodbus.server import ModbusSimulatorServer
import asyncio
import os


async def run():
    simulator = ModbusSimulatorServer(
        modbus_server="server",
        modbus_device="device",
        http_host="localhost",
        http_port=8080)
    await simulator.run_forever()

folder_path = os.path.dirname(__file__)
os.chdir(folder_path)
asyncio.run(run())

setup.json

※今回の例ではデバイスIDを14に設定している。

{
    "server_list": {
        "server": {
            "device_id":14,
            "comm": "tcp",
            "host": "0.0.0.0",
            "port": 5020,
            "ignore_missing_devices": false,
            "framer": "socket",
            "identity": {
                "VendorName": "pymodbus",
                "ProductCode": "PM",
                "VendorUrl": "https://github.com/pymodbus-dev/pymodbus/",
                "ProductName": "pymodbus Server",
                "ModelName": "pymodbus Server",
                "MajorMinorRevision": "3.1.0"
            }
        }
    },
    "device_list": {
        "device": {
            "setup": {
                "co size": 5,
                "di size": 0,
                "hr size": 0,
                "ir size": 0,
                "shared blocks": true,
                "type exception": true,
                "defaults": {
                    "value": {
                        "bits": 0,
                        "uint16": 0,
                        "uint32": 0,
                        "float32": 0.0,
                        "string": "test"
                    },
                    "action": {
                        "bits": "increment",
                        "uint16": "increment",
                        "uint32": "increment",
                        "float32": "increment",
                        "string": "increment"
                    }
                }
            },
            "invalid": [
            ],
            "write": [
            ],
            "bits": [
                {"addr": 0, "value": 100}
            ],
            "uint16": [
            ],
            "uint32": [
            ],
            "float32": [
            ],
            "string": [
            ],
            "repeat": [
            ]
        }
    }
}

client.py
※client.pyは異なるフォルダでも問題ない。

from pymodbus.client import ModbusTcpClient


client = ModbusTcpClient(
            host="127.0.0.1",
            port=5020)       # Create client object0
client.connect()                               # connect to device

result = client.read_coils(0, count=16, device_id=0)  # get information from device
print(result.bits)  
result = client.read_coils(0, count=16, device_id=1)  # get information from device
print(result.bits)  
result = client.read_coils(0, count=16, device_id=2)  # get information from device
print(result.bits)  
result = client.read_coils(0, count=16, device_id=3)  # get information from device
print(result.bits)  
result = client.read_coils(0, count=16, device_id=4)  # get information from device
print(result.bits)  
result = client.read_coils(0, count=16, device_id=5)  # get information from device
print(result.bits)  
result = client.read_coils(0, count=16, device_id=6)  # get information from device
print(result.bits)  
result = client.read_coils(0, count=16, device_id=7)  # get information from device
print(result.bits)  
result = client.read_coils(0, count=16, device_id=8)  # get information from device
print(result.bits)  
result = client.read_coils(0, count=16, device_id=9)  # get information from device
print(result.bits)  
result = client.read_coils(0, count=16, device_id=10)  # get information from device
print(result.bits)  
result = client.read_coils(0, count=16, device_id=11)  # get information from device
print(result.bits)  
result = client.read_coils(0, count=16, device_id=12)  # get information from device
print(result.bits)  
result = client.read_coils(0, count=16, device_id=13)  # get information from device
print(result.bits)  

実行結果:

[True, False, True, False, False, True, True, False, False, False, False, False, False, False, False, False]
[False, True, True, False, False, True, True, False, False, False, False, False, False, False, False, False]
[True, True, True, False, False, True, True, False, False, False, False, False, False, False, False, False]
[False, False, False, True, False, True, True, False, False, False, False, False, False, False, False, False]
[True, False, False, True, False, True, True, False, False, False, False, False, False, False, False, False]
[False, True, False, True, False, True, True, False, False, False, False, False, False, False, False, False]
[True, True, False, True, False, True, True, False, False, False, False, False, False, False, False, False]
[False, False, True, True, False, True, True, False, False, False, False, False, False, False, False, False]
[True, False, True, True, False, True, True, False, False, False, False, False, False, False, False, False]
[False, True, True, True, False, True, True, False, False, False, False, False, False, False, False, False]
[True, True, True, True, False, True, True, False, False, False, False, False, False, False, False, False]
[False, False, False, False, True, True, True, False, False, False, False, False, False, False, False, False]
[True, False, False, False, True, True, True, False, False, False, False, False, False, False, False, False]
[False, True, False, False, True, True, True, False, False, False, False, False, False, False, False, False]

5.補足

コメント

タイトルとURLをコピーしました