この記事では、pymodbusで複数のデバイスがある環境を行う方法を記載している。
シミュレータとクライアントの疎通方法は別途以下で記載している。
pymodbusでシミュレータを使いModbus通信する | エクヌツITブログ
1.結論
pymodbus 3.11.4は、デフォルトの機能では複数のデバイスを模擬した試験が行えない。
複数のデバイスを模擬する場合は、下記で説明するコーディングをOSSに対して行う必要がある。
コーディングのみを見たい場合は、4.複数デバイスの実行方法を参照。
2.pymodbusのOSS仕様
pymodbusは、シミュレータ起動時に[setup.json]を読み込んで実行する。
set_up.jsonのデバイスを確認し、デバイスIDが何は以下の箇所で読み込みを行っている。
[フォルダパス]
pymodbus\server\simulator\http_server.py
[クラス名]
ModbusSimulatorServer
[関数名]
__init__(init関数)
if "device_id" in server:
# Designated ModBus unit address. Will only serve data if the address matches
datastore = ModbusServerContext(devices={int(server["device_id"]): self.datastore_context}, single=False)
del server["device_id"]
else:
# Will server any request regardless of addressing
datastore = ModbusServerContext(devices=self.datastore_context, single=True)
上記のようにデバイスIDが設定されている場合は[single]をFalse、デバイスIDが設定されていない場合は[single]をTrueを設定している。
この[single]がFalseというのを用いて、複数のデバイスを模擬できるのかと思いきや・・・
呼び出し先の[ModbusServerContext]では、[single]がFalseか、Trueかを見ておらず複数のデバイスを模擬できないようになっている。
[フォルダパス]
pymodbus\datastore\context.py
[クラス名]
ModbusServerContext
[関数名]
__init__(init関数)
def __init__(self, devices=None, single=True):
"""Initialize a new instance of a modbus server context.
:param devices: A dictionary of client contexts
:param single: Set to true to treat this as a single context
"""
self.single = single
self._devices = devices or {}
if self.single:
self._devices = {0: self._devices}
上記のようにTrueの時の処理のみが設定されており、Falseの場合は特に設定されていない。じゃあなんで、singleという引数があるんだと
3.複数デバイスの模擬方法
以下のようにModbusServerContextを設定することで複数のデバイスの模擬が行える。
[フォルダパス]
pymodbus\datastore\context.py
[クラス名]
ModbusServerContext
[関数名]
__init__(init関数)
def __init__(self, devices=None, single=True):
"""Initialize a new instance of a modbus server context.
:param devices: A dictionary of client contexts
:param single: Set to true to treat this as a single context
"""
self.single = single
self._devices = devices or {}
if self.single:
self._devices = {0: self._devices}
"追加部分---------------------------"
else:
devices_wk = {}
for key in self._devices:
for i in range(0, key):
devices_wk[i]=self._devices[key]
self._devices = devices_wk
"追加部分ここまで---------------------------"
上記のように設定することで、single=False の場合に指定したUnit ID まで同じデバイス設定を複製して実行できるようになる。
4.複数デバイスの実行方法
同フォルダ内に以下のファイルを設定する。
simulator_server.py
from pymodbus.server import ModbusSimulatorServer
import asyncio
import os
async def run():
simulator = ModbusSimulatorServer(
modbus_server="server",
modbus_device="device",
http_host="localhost",
http_port=8080)
await simulator.run_forever()
folder_path = os.path.dirname(__file__)
os.chdir(folder_path)
asyncio.run(run())
setup.json
※今回の例ではデバイスIDを14に設定している。
{
"server_list": {
"server": {
"device_id":14,
"comm": "tcp",
"host": "0.0.0.0",
"port": 5020,
"ignore_missing_devices": false,
"framer": "socket",
"identity": {
"VendorName": "pymodbus",
"ProductCode": "PM",
"VendorUrl": "https://github.com/pymodbus-dev/pymodbus/",
"ProductName": "pymodbus Server",
"ModelName": "pymodbus Server",
"MajorMinorRevision": "3.1.0"
}
}
},
"device_list": {
"device": {
"setup": {
"co size": 5,
"di size": 0,
"hr size": 0,
"ir size": 0,
"shared blocks": true,
"type exception": true,
"defaults": {
"value": {
"bits": 0,
"uint16": 0,
"uint32": 0,
"float32": 0.0,
"string": "test"
},
"action": {
"bits": "increment",
"uint16": "increment",
"uint32": "increment",
"float32": "increment",
"string": "increment"
}
}
},
"invalid": [
],
"write": [
],
"bits": [
{"addr": 0, "value": 100}
],
"uint16": [
],
"uint32": [
],
"float32": [
],
"string": [
],
"repeat": [
]
}
}
}
client.py
※client.pyは異なるフォルダでも問題ない。
from pymodbus.client import ModbusTcpClient
client = ModbusTcpClient(
host="127.0.0.1",
port=5020) # Create client object0
client.connect() # connect to device
result = client.read_coils(0, count=16, device_id=0) # get information from device
print(result.bits)
result = client.read_coils(0, count=16, device_id=1) # get information from device
print(result.bits)
result = client.read_coils(0, count=16, device_id=2) # get information from device
print(result.bits)
result = client.read_coils(0, count=16, device_id=3) # get information from device
print(result.bits)
result = client.read_coils(0, count=16, device_id=4) # get information from device
print(result.bits)
result = client.read_coils(0, count=16, device_id=5) # get information from device
print(result.bits)
result = client.read_coils(0, count=16, device_id=6) # get information from device
print(result.bits)
result = client.read_coils(0, count=16, device_id=7) # get information from device
print(result.bits)
result = client.read_coils(0, count=16, device_id=8) # get information from device
print(result.bits)
result = client.read_coils(0, count=16, device_id=9) # get information from device
print(result.bits)
result = client.read_coils(0, count=16, device_id=10) # get information from device
print(result.bits)
result = client.read_coils(0, count=16, device_id=11) # get information from device
print(result.bits)
result = client.read_coils(0, count=16, device_id=12) # get information from device
print(result.bits)
result = client.read_coils(0, count=16, device_id=13) # get information from device
print(result.bits)
実行結果:
[True, False, True, False, False, True, True, False, False, False, False, False, False, False, False, False]
[False, True, True, False, False, True, True, False, False, False, False, False, False, False, False, False]
[True, True, True, False, False, True, True, False, False, False, False, False, False, False, False, False]
[False, False, False, True, False, True, True, False, False, False, False, False, False, False, False, False]
[True, False, False, True, False, True, True, False, False, False, False, False, False, False, False, False]
[False, True, False, True, False, True, True, False, False, False, False, False, False, False, False, False]
[True, True, False, True, False, True, True, False, False, False, False, False, False, False, False, False]
[False, False, True, True, False, True, True, False, False, False, False, False, False, False, False, False]
[True, False, True, True, False, True, True, False, False, False, False, False, False, False, False, False]
[False, True, True, True, False, True, True, False, False, False, False, False, False, False, False, False]
[True, True, True, True, False, True, True, False, False, False, False, False, False, False, False, False]
[False, False, False, False, True, True, True, False, False, False, False, False, False, False, False, False]
[True, False, False, False, True, True, True, False, False, False, False, False, False, False, False, False]
[False, True, False, False, True, True, True, False, False, False, False, False, False, False, False, False]
5.補足
- 各種設定の方法などは別のページで紹介している
pymodbusでシミュレータを使いModbus通信する | エクヌツITブログ
pymodbus で device_idを指定して通信する | エクヌツITブログ - この方法は「同じ設定のデバイスを複数作る」用途に向いている
→ Unit ID ごとに異なるレジスタ構成を持たせたい場合は別途個別設定が必要 - pymodbus のバージョンによって内部実装が変わる可能性があるため、将来のアップデートでは別の方法が必要になる場合がある

コメント