#!/usr/bin/env python3 """Command line Modbus RTU terminal for the AD Keil project.""" from __future__ import annotations import argparse import cmd import csv import dataclasses import re import shlex import sys import time from pathlib import Path from typing import Iterable DEFAULT_BAUD = 512000 DEFAULT_SLAVE = 1 DEFAULT_TIMEOUT = 0.5 DEFAULT_POLARITY = 1 DEFAULT_POLE_PAIRS = 1 DEFAULT_PHASE_SHUNT_OHM = 0.1 DEVICE_ID = 0xAD01 class TerminalError(Exception): """User-facing terminal error.""" class ModbusError(TerminalError): """Modbus transport or protocol error.""" class ModbusExceptionError(ModbusError): def __init__(self, function: int, code: int) -> None: super().__init__( f"Modbus exception: function=0x{function:02X}, code={code} " f"({MODBUS_EXCEPTIONS.get(code, 'unknown')})" ) self.function = function self.code = code @dataclasses.dataclass(frozen=True) class Register: addr: int name: str access: str factor: float | None = None unit: str = "" signed: bool = False enum: dict[int, str] | None = None flags: dict[int, str] | None = None note: str = "" MODBUS_EXCEPTIONS = { 1: "illegal function", 2: "illegal data address", 3: "illegal data value", 4: "server device failure", } MODES = { 0: "IDLE", 1: "STATOR_RESISTANCE", 2: "NO_LOAD_MAGNETIZING", 3: "LOCKED_ROTOR_LEAKAGE", 4: "INERTIA_FRICTION", 5: "DATA_LOGGING", 6: "PWM_TEST_UH", 7: "PWM_TEST_UL", 8: "PWM_TEST_VH", 9: "PWM_TEST_VL", 10: "PWM_TEST_WH", 11: "PWM_TEST_WL", 12: "PWM_TEST_ALL", 13: "AUTO_IDENTIFICATION", 14: "ROTATION_3HZ", } MODE_ALIASES = { "idle": 0, "stop": 0, "rs": 1, "stator": 1, "stator_resistance": 1, "no_load": 2, "magnetizing": 2, "locked": 3, "locked_rotor": 3, "inertia": 4, "friction": 4, "log": 5, "logging": 5, "data": 5, "data_logging": 5, "uh": 6, "ul": 7, "vh": 8, "vl": 9, "wh": 10, "wl": 11, "all": 12, "pwm_all": 12, "auto": 13, "auto_id": 13, "auto_identification": 13, "rotation": 14, "rot": 14, "rotation_3hz": 14, } STATUS_FLAGS = { 0: "ACTIVE", 1: "POWER_TEST_BLOCKED", 2: "POWER_STAGE_ARMED", 3: "FAULT_LATCHED", 4: "TIMEOUT", 5: "LOCKED_ROTOR_BLOCKED", 6: "SAFETY_LIMITS_UNKNOWN", 7: "DATA_VALID", 8: "COMPLETE", } FAULT_FLAGS = { 0: "OVERCURRENT", 1: "OVERVOLTAGE", 2: "UNDERVOLTAGE", 3: "OVERTEMPERATURE", 4: "DRIVER", 5: "EMERGENCY_STOP", 6: "TIMEOUT", 7: "NULL_INPUT", } MEAS_STATUS_FLAGS = { 0: "OVERCURRENT", 1: "OVERVOLTAGE", 2: "UNDERVOLTAGE", 3: "OVERTEMPERATURE", 4: "DRIVER_FAULT", 5: "EMERGENCY_STOP", } VALID_FLAGS = { 0: "RS", 1: "RR", 2: "LS", 3: "LR", 4: "LM", 5: "LL", 6: "J", 7: "B", 8: "NOMINALS", 9: "POLE_PAIRS", } PWM_OUTPUTS = { 0: "NONE", 1: "UH", 2: "UL", 3: "VH", 4: "VL", 5: "WH", 6: "WL", 7: "ALL", } ID_STAGES = { 0: "IDLE", 1: "SETTLE", 2: "MEASURE", 3: "PULSE", 4: "ACCEL", 5: "COAST", } TIMING_MODES = {0: "UP", 1: "CENTER"} MOTOR_TYPES = {0: "AD_SINE", 1: "BLDC_6STEP"} POLARITY_FLAGS = {0: "MAIN_INVERTED", 1: "COMP_INVERTED"} REGISTERS = [ Register(0x0000, "device_id", "R", enum={DEVICE_ID: "AD device"}), Register(0x0001, "protocol_version", "R"), Register(0x0002, "control_enable", "R/W", enum={0: "disabled", 1: "enabled"}), Register(0x0003, "control_mode", "R/W", enum=MODES), Register(0x0004, "control_reset_faults", "W", note="write 1 to reset faults"), Register(0x0005, "control_stop", "W", note="write 1 to stop"), Register(0x0006, "control_locked_rotor_allowed", "R/W", enum={0: "no", 1: "yes"}), Register(0x0007, "control_pwm_duty", "R/W", factor=0.0001, unit="duty"), Register(0x0008, "limit_current", "R/W", factor=0.01, unit="A"), Register(0x0009, "limit_overvoltage", "R/W", factor=0.1, unit="V"), Register(0x000A, "limit_undervoltage", "R/W", factor=0.1, unit="V"), Register(0x000B, "limit_speed", "R/W", unit="rpm"), Register(0x000C, "limit_temperature", "R/W", factor=0.1, unit="C", signed=True), Register(0x000D, "control_rotation_freq", "R/W", factor=0.1, unit="Hz"), Register(0x000E, "control_rotation_mod", "R/W", factor=0.0001, unit="mod"), Register(0x000F, "control_pwm_polarity_flags", "R/W", flags=POLARITY_FLAGS), Register(0x0010, "status_mode", "R", enum=MODES), Register(0x0011, "status_flags_lo", "R", flags=STATUS_FLAGS), Register(0x0012, "status_flags_hi", "R"), Register(0x0013, "fault_flags_lo", "R", flags=FAULT_FLAGS), Register(0x0014, "fault_flags_hi", "R"), Register(0x0015, "power_stage_allowed", "R", enum={0: "no", 1: "yes"}), Register(0x0016, "test_running", "R", enum={0: "no", 1: "yes"}), Register(0x0017, "inverter_pwm_running", "R", enum={0: "no", 1: "yes"}), Register(0x0018, "inverter_service_output", "R", enum=PWM_OUTPUTS), Register(0x0019, "inverter_id_stage", "R", enum=ID_STAGES), Register(0x001A, "control_pwm_timing_mode", "R/W", enum=TIMING_MODES), Register(0x001B, "control_motor_control_type", "R/W", enum=MOTOR_TYPES), Register(0x001E, "control_rotation_ramp_ms", "R/W", unit="ms"), Register(0x001F, "control_reset_current_peaks", "W", note="write 1 to reset phase current peaks"), Register(0x0020, "meas_ia", "R", factor=0.001, unit="A", signed=True), Register(0x0021, "meas_ib", "R", factor=0.001, unit="A", signed=True), Register(0x0022, "meas_ic", "R", factor=0.001, unit="A", signed=True), Register(0x0023, "meas_vdc", "R", factor=0.1, unit="V"), Register(0x0024, "meas_temp", "R", factor=0.1, unit="C", signed=True), Register(0x0025, "meas_status_lo", "R", flags=MEAS_STATUS_FLAGS), Register(0x0026, "meas_status_hi", "R"), Register(0x0027, "meas_speed", "R", unit="rpm", signed=True), Register(0x0028, "meas_ia_rms", "R", factor=0.001, unit="A"), Register(0x0029, "meas_ib_rms", "R", factor=0.001, unit="A"), Register(0x002A, "meas_ic_rms", "R", factor=0.001, unit="A"), Register(0x002B, "meas_torque", "R", factor=0.001, unit="Nm", signed=True), Register(0x002C, "meas_ia_peak", "R", factor=0.001, unit="A"), Register(0x002D, "meas_ib_peak", "R", factor=0.001, unit="A"), Register(0x002E, "meas_ic_peak", "R", factor=0.001, unit="A"), Register(0x002F, "meas_slip", "R", factor=0.01, unit="%", signed=True), Register(0x0030, "param_valid_mask_lo", "R", flags=VALID_FLAGS), Register(0x0031, "param_valid_mask_hi", "R"), Register(0x0032, "param_rs", "R", factor=0.001, unit="ohm"), Register(0x0033, "param_ls_lo", "R", unit="uH"), Register(0x0034, "param_ls_hi", "R", unit="uH"), Register(0x0035, "param_ll_lo", "R", unit="uH"), Register(0x0036, "param_ll_hi", "R", unit="uH"), Register(0x0037, "param_rr", "R", factor=0.001, unit="ohm"), Register(0x0038, "param_lr_lo", "R", unit="uH"), Register(0x0039, "param_lr_hi", "R", unit="uH"), Register(0x003A, "param_lm_lo", "R", unit="uH"), Register(0x003B, "param_lm_hi", "R", unit="uH"), Register(0x003C, "param_j_lo", "R", unit="nkg*m2"), Register(0x003D, "param_j_hi", "R", unit="nkg*m2"), Register(0x003E, "param_b_lo", "R", unit="nNm*s"), Register(0x003F, "param_b_hi", "R", unit="nNm*s"), Register(0x0040, "param_pole_pairs", "R/W", unit="pairs"), Register(0x0041, "param_phase_shunt", "R/W", factor=0.001, unit="ohm"), ] REGISTER_BY_ADDR = {reg.addr: reg for reg in REGISTERS} REGISTER_BY_NAME = {reg.name: reg.addr for reg in REGISTERS} REGISTER_ALIASES = { "enable": 0x0002, "mode": 0x0003, "reset": 0x0004, "reset_faults": 0x0004, "stop": 0x0005, "locked": 0x0006, "locked_rotor": 0x0006, "duty": 0x0007, "current": 0x0008, "overvoltage": 0x0009, "undervoltage": 0x000A, "speed_limit": 0x000B, "temp_limit": 0x000C, "freq": 0x000D, "rotation_freq": 0x000D, "rotation_mod": 0x000E, "ramp": 0x001E, "ramp_ms": 0x001E, "rotation_ramp": 0x001E, "rotation_ramp_ms": 0x001E, "reset_peaks": 0x001F, "reset_current_peaks": 0x001F, "polarity": 0x000F, "status": 0x0011, "faults": 0x0013, "ia": 0x0020, "ib": 0x0021, "ic": 0x0022, "vdc": 0x0023, "temp": 0x0024, "rpm": 0x0027, "torque": 0x002B, "ia_peak": 0x002C, "ib_peak": 0x002D, "ic_peak": 0x002E, "slip": 0x002F, "slip_percent": 0x002F, "valid": 0x0030, "rs": 0x0032, "rr": 0x0037, "pole_pairs": 0x0040, "poles": 0x0040, "pairs": 0x0040, "phase_shunt": 0x0041, "phase_shunt_ohm": 0x0041, "phase_shunt_mohm": 0x0041, "param_phase_shunt_mohm": 0x0041, "rshunt": 0x0041, "shunt": 0x0041, } def require_serial_module(): try: import serial # type: ignore[import-not-found] except ModuleNotFoundError as exc: raise TerminalError( "pyserial is not installed. Install it with: " "python -m pip install -r AD_Keil_Project\\tools\\requirements-ad-terminal.txt" ) from exc return serial def crc16_modbus(data: bytes | bytearray) -> int: crc = 0xFFFF for byte in data: crc ^= byte for _ in range(8): if crc & 0x0001: crc = (crc >> 1) ^ 0xA001 else: crc >>= 1 crc &= 0xFFFF return crc def add_crc(frame: bytes | bytearray) -> bytes: crc = crc16_modbus(frame) return bytes(frame) + bytes((crc & 0xFF, (crc >> 8) & 0xFF)) def check_crc(frame: bytes | bytearray) -> bool: if len(frame) < 4: return False expected = frame[-2] | (frame[-1] << 8) return crc16_modbus(frame[:-2]) == expected def to_s16(value: int) -> int: value &= 0xFFFF return value - 0x10000 if value & 0x8000 else value def u16(value: int) -> int: if not 0 <= value <= 0xFFFF: raise TerminalError(f"value out of uint16 range: {value}") return value def u32_from_words(lo: int, hi: int) -> int: return (lo & 0xFFFF) | ((hi & 0xFFFF) << 16) def parse_int(text: str) -> int: try: return int(text, 0) except ValueError as exc: raise TerminalError(f"not an integer: {text}") from exc def argparse_int(text: str) -> int: try: return parse_int(text) except TerminalError as exc: raise argparse.ArgumentTypeError(str(exc)) from exc def normalize_key(text: str) -> str: return text.strip().lower().replace("-", "_").replace(".", "_") def parse_register_address(text: str) -> int: key = normalize_key(text) if key in REGISTER_BY_NAME: return REGISTER_BY_NAME[key] if key in REGISTER_ALIASES: return REGISTER_ALIASES[key] if key.startswith("ad_modbus_reg_"): key = key.removeprefix("ad_modbus_reg_") if key in REGISTER_BY_NAME: return REGISTER_BY_NAME[key] if key in REGISTER_ALIASES: return REGISTER_ALIASES[key] match = re.fullmatch(r"h?0*([0-9]{1,3})", key) if match: value = int(match.group(1), 10) else: value = parse_int(text) if 40001 <= value <= 40066: value -= 40001 if not 0 <= value <= 0xFFFF: raise TerminalError(f"register address out of range: {text}") return value def parse_u16_value(text: str) -> int: return u16(parse_int(text)) def parse_mode(text: str) -> int: key = normalize_key(text) if key in MODE_ALIASES: return MODE_ALIASES[key] for value, name in MODES.items(): if key == name.lower(): return value value = parse_int(text) if value not in MODES: raise TerminalError(f"unknown AD mode: {text}") return value def parse_timing(text: str) -> int: key = normalize_key(text) if key in ("up", "edge", "0"): return 0 if key in ("center", "centre", "1"): return 1 raise TerminalError(f"unknown PWM timing mode: {text}") def parse_motor_type(text: str) -> int: key = normalize_key(text) if key in ("ad", "sine", "ad_sine", "0"): return 0 if key in ("bldc", "6step", "six_step", "bldc_6step", "1"): return 1 raise TerminalError(f"unknown motor control type: {text}") def scaled_u16(value: float, factor: float, field: str) -> int: raw = int(round(value / factor)) if not 0 <= raw <= 0xFFFF: raise TerminalError(f"{field} out of register range: {value}") return raw def pole_pairs_u16(value: float) -> int: raw = float(value) rounded = int(raw) if raw != float(rounded): raise TerminalError(f"pole pairs must be an integer: {value}") if not 0 <= rounded <= 64: raise TerminalError(f"pole pairs out of range 0..64: {value}") return rounded def format_float(value: float, factor: float) -> str: if factor >= 1: decimals = 0 else: decimals = min(6, max(0, len(str(factor).split(".")[-1].rstrip("0")))) return f"{value:.{decimals}f}" def decode_flags(value: int, names: dict[int, str]) -> str: active = [name for bit, name in sorted(names.items()) if value & (1 << bit)] return ", ".join(active) if active else "none" def format_reg_value(addr: int, value: int) -> str: reg = REGISTER_BY_ADDR.get(addr) if reg is None: return f"0x{value:04X}" raw = to_s16(value) if reg.signed else value parts: list[str] = [] if reg.factor is not None: parts.append(f"{format_float(raw * reg.factor, reg.factor)} {reg.unit}".rstrip()) elif reg.unit and reg.unit not in ("uH", "nkg*m2", "nNm*s"): parts.append(f"{raw} {reg.unit}") if reg.enum: parts.append(reg.enum.get(value, "unknown")) if reg.flags: parts.append(decode_flags(value, reg.flags)) if not parts: if reg.name == "device_id": parts.append(f"0x{value:04X}") else: parts.append(str(raw)) return " | ".join(parts) def print_table(headers: list[str], rows: Iterable[Iterable[object]]) -> None: materialized = [[str(cell) for cell in row] for row in rows] widths = [len(header) for header in headers] for row in materialized: for idx, cell in enumerate(row): widths[idx] = max(widths[idx], len(cell)) print(" ".join(header.ljust(widths[idx]) for idx, header in enumerate(headers))) print(" ".join("-" * width for width in widths)) for row in materialized: print(" ".join(cell.ljust(widths[idx]) for idx, cell in enumerate(row))) class ModbusRTUClient: def __init__( self, port: str, baudrate: int = DEFAULT_BAUD, slave: int = DEFAULT_SLAVE, timeout: float = DEFAULT_TIMEOUT, debug: bool = False, ) -> None: self.port = port self.baudrate = baudrate self.slave = slave self.timeout = timeout self.debug = debug self.serial = None def __enter__(self) -> "ModbusRTUClient": serial = require_serial_module() self.serial = serial.Serial( port=self.port, baudrate=self.baudrate, bytesize=8, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=self.timeout, write_timeout=self.timeout, ) return self def __exit__(self, exc_type, exc, tb) -> None: if self.serial is not None: self.serial.close() self.serial = None def _read_exact(self, size: int) -> bytes: assert self.serial is not None data = self.serial.read(size) if len(data) != size: raise ModbusError(f"timeout waiting for {size} bytes, got {len(data)}") return bytes(data) def _request(self, function: int, payload: bytes) -> bytes: assert self.serial is not None request = add_crc(bytes((self.slave, function)) + payload) if self.debug: print(f"> {request.hex(' ')}") self.serial.reset_input_buffer() self.serial.write(request) self.serial.flush() head = self._read_exact(2) response_function = head[1] if response_function & 0x80: response = head + self._read_exact(3) elif response_function == 0x03: byte_count = self._read_exact(1) response = head + byte_count + self._read_exact(byte_count[0] + 2) elif response_function in (0x06, 0x10): response = head + self._read_exact(6) else: raise ModbusError(f"unexpected response function: 0x{response_function:02X}") if self.debug: print(f"< {response.hex(' ')}") if not check_crc(response): raise ModbusError(f"bad CRC in response: {response.hex(' ')}") if response[0] != self.slave: raise ModbusError(f"unexpected slave address: {response[0]}") if response_function & 0x80: raise ModbusExceptionError(response_function & 0x7F, response[2]) if response_function != function: raise ModbusError( f"unexpected response function: expected 0x{function:02X}, " f"got 0x{response_function:02X}" ) return response def read_holding(self, address: int, quantity: int) -> list[int]: if not 1 <= quantity <= 60: raise TerminalError("quantity must be 1..60") payload = bytes( ( (address >> 8) & 0xFF, address & 0xFF, (quantity >> 8) & 0xFF, quantity & 0xFF, ) ) response = self._request(0x03, payload) byte_count = response[2] if byte_count != quantity * 2: raise ModbusError(f"unexpected byte count: {byte_count}") data = response[3 : 3 + byte_count] return [(data[i] << 8) | data[i + 1] for i in range(0, len(data), 2)] def write_single(self, address: int, value: int) -> None: value = u16(value) payload = bytes( ( (address >> 8) & 0xFF, address & 0xFF, (value >> 8) & 0xFF, value & 0xFF, ) ) response = self._request(0x06, payload) if response[2:6] != payload: raise ModbusError("write echo does not match request") def write_multiple(self, address: int, values: list[int]) -> None: if not values: raise TerminalError("write-many needs at least one value") if len(values) > 59: raise TerminalError("too many registers for one write") words = bytearray() for value in values: value = u16(value) words.extend(((value >> 8) & 0xFF, value & 0xFF)) quantity = len(values) payload = bytes( ( (address >> 8) & 0xFF, address & 0xFF, (quantity >> 8) & 0xFF, quantity & 0xFF, len(words), ) ) + bytes(words) response = self._request(0x10, payload) if response[2] != ((address >> 8) & 0xFF) or response[3] != (address & 0xFF): raise ModbusError("write-many address echo does not match request") echoed_quantity = (response[4] << 8) | response[5] if echoed_quantity != quantity: raise ModbusError("write-many quantity echo does not match request") def list_ports() -> None: require_serial_module() from serial.tools import list_ports as pyserial_list_ports # type: ignore[import-not-found] ports = list(pyserial_list_ports.comports()) if not ports: print("No serial ports found.") return rows = [] for port in ports: rows.append([port.device, port.description, port.hwid]) print_table(["port", "description", "hwid"], rows) def resolve_port(args: argparse.Namespace) -> str: port = getattr(args, "port", None) if not port: raise TerminalError("serial port is required; use --port COMx or --port auto") if port.lower() != "auto": return port require_serial_module() from serial.tools import list_ports as pyserial_list_ports # type: ignore[import-not-found] candidates = [item.device for item in pyserial_list_ports.comports()] for candidate in candidates: try: with ModbusRTUClient( candidate, baudrate=args.baud, slave=args.slave, timeout=args.timeout, debug=False, ) as client: regs = client.read_holding(0, 2) if regs and regs[0] == DEVICE_ID: print(f"Using {candidate}") return candidate except TerminalError: continue except Exception: continue raise TerminalError("AD board was not found on available serial ports") def open_client(args: argparse.Namespace) -> ModbusRTUClient: return ModbusRTUClient( resolve_port(args), baudrate=args.baud, slave=args.slave, timeout=args.timeout, debug=args.debug, ) def print_registers(values: list[int], start: int) -> None: rows = [] for offset, value in enumerate(values): addr = start + offset reg = REGISTER_BY_ADDR.get(addr) rows.append( [ f"{addr:04d}", f"0x{addr:04X}", f"400{addr + 1:02d}", reg.name if reg else "?", reg.access if reg else "?", f"{value}", f"0x{value:04X}", format_reg_value(addr, value), ] ) print_table(["dec", "hex", "4x", "name", "rw", "raw", "hexval", "value"], rows) def command_ping(client: ModbusRTUClient) -> None: regs = client.read_holding(0, 2) print_registers(regs, 0) if regs[0] != DEVICE_ID: raise TerminalError(f"unexpected device id: 0x{regs[0]:04X}") print("OK") def command_read(client: ModbusRTUClient, address_text: str, quantity: int) -> None: address = parse_register_address(address_text) values = client.read_holding(address, quantity) print_registers(values, address) def command_write(client: ModbusRTUClient, address_text: str, value_text: str) -> None: address = parse_register_address(address_text) value = parse_u16_value(value_text) client.write_single(address, value) print(f"Wrote {REGISTER_BY_ADDR.get(address, Register(address, '?', '?')).name} " f"(0x{address:04X}) = {value} / 0x{value:04X}") def command_write_many( client: ModbusRTUClient, address_text: str, value_texts: list[str] ) -> None: address = parse_register_address(address_text) values = [parse_u16_value(value) for value in value_texts] client.write_multiple(address, values) print(f"Wrote {len(values)} registers from 0x{address:04X}") def print_status(client: ModbusRTUClient) -> None: values = client.read_holding(0x0010, 12) status = u32_from_words(values[1], values[2]) faults = u32_from_words(values[3], values[4]) rows = [ ["mode", f"{values[0]} ({MODES.get(values[0], 'unknown')})"], ["status", f"0x{status:08X} ({decode_flags(status, STATUS_FLAGS)})"], ["faults", f"0x{faults:08X} ({decode_flags(faults, FAULT_FLAGS)})"], ["power_stage_allowed", format_reg_value(0x0015, values[5])], ["test_running", format_reg_value(0x0016, values[6])], ["pwm_running", format_reg_value(0x0017, values[7])], ["service_output", format_reg_value(0x0018, values[8])], ["id_stage", format_reg_value(0x0019, values[9])], ["pwm_timing", format_reg_value(0x001A, values[10])], ["motor_control", format_reg_value(0x001B, values[11])], ] print_table(["field", "value"], rows) def read_measurement_registers(client: ModbusRTUClient) -> list[int]: try: return client.read_holding(0x0020, 16) except ModbusExceptionError as exc: if exc.code not in (2, 3): raise try: return client.read_holding(0x0020, 15) + [0] except ModbusExceptionError as exc: if exc.code not in (2, 3): raise return client.read_holding(0x0020, 12) + [0, 0, 0, 0] def read_measurement_dict(client: ModbusRTUClient) -> dict[str, object]: values = read_measurement_registers(client) meas_status = u32_from_words(values[5], values[6]) return { "ia_A": to_s16(values[0]) * 0.001, "ib_A": to_s16(values[1]) * 0.001, "ic_A": to_s16(values[2]) * 0.001, "vdc_V": values[3] * 0.1, "temp_C": to_s16(values[4]) * 0.1, "meas_status": meas_status, "speed_rpm": to_s16(values[7]), "ia_rms_A": values[8] * 0.001, "ib_rms_A": values[9] * 0.001, "ic_rms_A": values[10] * 0.001, "torque_Nm": to_s16(values[11]) * 0.001, "ia_peak_A": values[12] * 0.001, "ib_peak_A": values[13] * 0.001, "ic_peak_A": values[14] * 0.001, "slip_percent": to_s16(values[15]) * 0.01, } def print_measurements(client: ModbusRTUClient) -> None: data = read_measurement_dict(client) rows = [ ["ia_A", f"{data['ia_A']:.3f}"], ["ib_A", f"{data['ib_A']:.3f}"], ["ic_A", f"{data['ic_A']:.3f}"], ["vdc_V", f"{data['vdc_V']:.1f}"], ["temp_C", f"{data['temp_C']:.1f}"], ["speed_rpm", data["speed_rpm"]], ["ia_rms_A", f"{data['ia_rms_A']:.3f}"], ["ib_rms_A", f"{data['ib_rms_A']:.3f}"], ["ic_rms_A", f"{data['ic_rms_A']:.3f}"], ["ia_peak_A", f"{data['ia_peak_A']:.3f}"], ["ib_peak_A", f"{data['ib_peak_A']:.3f}"], ["ic_peak_A", f"{data['ic_peak_A']:.3f}"], ["torque_Nm", f"{data['torque_Nm']:.3f}"], ["slip_percent", f"{data['slip_percent']:.2f}"], [ "meas_status", f"0x{int(data['meas_status']):08X} " f"({decode_flags(int(data['meas_status']), MEAS_STATUS_FLAGS)})", ], ] print_table(["measurement", "value"], rows) def read_parameter_registers(client: ModbusRTUClient) -> list[int]: try: return client.read_holding(0x0030, 18) except ModbusExceptionError as exc: if exc.code not in (2, 3): raise try: return client.read_holding(0x0030, 17) + [0] except ModbusExceptionError as exc: if exc.code not in (2, 3): raise return client.read_holding(0x0030, 16) + [0, 0] def print_params(client: ModbusRTUClient) -> None: values = read_parameter_registers(client) valid = u32_from_words(values[0], values[1]) params = { "Rs_ohm": values[2] * 0.001, "Ls_H": u32_from_words(values[3], values[4]) * 1e-6, "Ll_H": u32_from_words(values[5], values[6]) * 1e-6, "Rr_ohm": values[7] * 0.001, "Lr_H": u32_from_words(values[8], values[9]) * 1e-6, "Lm_H": u32_from_words(values[10], values[11]) * 1e-6, "J_kg_m2": u32_from_words(values[12], values[13]) * 1e-9, "B_Nm_s": u32_from_words(values[14], values[15]) * 1e-9, "pole_pairs": values[16], "phase_shunt_ohm": values[17] * 0.001, } rows = [["valid_mask", f"0x{valid:08X} ({decode_flags(valid, VALID_FLAGS)})"]] rows.extend([key, f"{value:.9g}"] for key, value in params.items()) print_table(["parameter", "value"], rows) def start_values(args: argparse.Namespace) -> tuple[int, list[int], list[int], int, int, int]: mode = parse_mode(args.mode) locked = 1 if args.locked_rotor or mode == 3 else 0 main_block = [ scaled_u16(args.duty, 0.0001, "duty"), scaled_u16(args.current_limit, 0.01, "current limit"), scaled_u16(args.overvoltage, 0.1, "overvoltage limit"), scaled_u16(args.undervoltage, 0.1, "undervoltage limit"), scaled_u16(args.speed_limit, 1.0, "speed limit"), scaled_u16(args.temp_limit, 0.1, "temperature limit"), scaled_u16(args.rotation_freq, 0.1, "rotation frequency"), scaled_u16(args.rotation_mod, 0.0001, "rotation modulation"), u16(args.polarity & 0x0003), ] control_block = [parse_timing(args.timing), parse_motor_type(args.motor)] pole_pairs = pole_pairs_u16(args.pole_pairs) phase_shunt = scaled_u16(args.phase_shunt_ohm, 0.001, "phase shunt") return mode, [locked], main_block + control_block, u16(args.rotation_ramp_ms), pole_pairs, phase_shunt def write_optional_single(client: ModbusRTUClient, address: int, value: int) -> bool: try: client.write_single(address, value) return True except ModbusExceptionError as exc: if exc.code in (2, 3): return False raise def stop_compatible(client: ModbusRTUClient) -> bool: client.write_single(0x0002, 0) return write_optional_single(client, 0x0005, 1) def print_optional_register_warning( stop_supported: bool, reset_supported: bool, ramp_supported: bool, pole_pairs_supported: bool, phase_shunt_supported: bool, ) -> None: missing = [] if not stop_supported: missing.append("0x0005 stop") if not reset_supported: missing.append("0x0004 reset") if not ramp_supported: missing.append("0x001E ramp") if not pole_pairs_supported: missing.append("0x0040 pole_pairs") if not phase_shunt_supported: missing.append("0x0041 phase_shunt") if missing: print(f"Warning: firmware skipped optional register(s): {', '.join(missing)}") def command_start(client: ModbusRTUClient, args: argparse.Namespace) -> None: mode, locked_values, values, ramp_value, pole_pairs_value, phase_shunt_value = start_values(args) main_values = values[:9] tail_values = values[9:] stop_supported = stop_compatible(client) reset_supported = write_optional_single(client, 0x0004, 1) client.write_single(0x0006, locked_values[0]) client.write_multiple(0x0007, main_values) client.write_multiple(0x001A, tail_values) ramp_supported = write_optional_single(client, 0x001E, ramp_value) pole_pairs_supported = write_optional_single(client, 0x0040, pole_pairs_value) phase_shunt_supported = write_optional_single(client, 0x0041, phase_shunt_value) client.write_single(0x0003, mode) client.write_single(0x0002, 1) print(f"Started mode {mode} ({MODES.get(mode, 'unknown')})") print_optional_register_warning( stop_supported, reset_supported, ramp_supported, pole_pairs_supported, phase_shunt_supported, ) def command_stop(client: ModbusRTUClient) -> None: stop_supported = stop_compatible(client) print("Stop requested") if not stop_supported: print("Warning: 0x0005 stop register is not supported; used control_enable=0") def command_reset(client: ModbusRTUClient) -> None: reset_supported = write_optional_single(client, 0x0004, 1) print("Fault reset requested") if not reset_supported: print("Warning: 0x0004 reset register is not supported") def command_log(client: ModbusRTUClient, args: argparse.Namespace) -> None: headers = [ "host_time_s", "ia_A", "ib_A", "ic_A", "vdc_V", "temp_C", "speed_rpm", "ia_rms_A", "ib_rms_A", "ic_rms_A", "ia_peak_A", "ib_peak_A", "ic_peak_A", "torque_Nm", "slip_percent", "meas_status", ] csv_file = None writer = None if args.csv: csv_path = Path(args.csv) csv_file = csv_path.open("w", newline="", encoding="utf-8") writer = csv.DictWriter(csv_file, fieldnames=headers) writer.writeheader() print(f"Writing CSV to {csv_path}") else: print(",".join(headers)) try: count = 0 while args.count == 0 or count < args.count: data = read_measurement_dict(client) row = { "host_time_s": f"{time.time():.3f}", "ia_A": f"{float(data['ia_A']):.3f}", "ib_A": f"{float(data['ib_A']):.3f}", "ic_A": f"{float(data['ic_A']):.3f}", "vdc_V": f"{float(data['vdc_V']):.1f}", "temp_C": f"{float(data['temp_C']):.1f}", "speed_rpm": data["speed_rpm"], "ia_rms_A": f"{float(data['ia_rms_A']):.3f}", "ib_rms_A": f"{float(data['ib_rms_A']):.3f}", "ic_rms_A": f"{float(data['ic_rms_A']):.3f}", "ia_peak_A": f"{float(data['ia_peak_A']):.3f}", "ib_peak_A": f"{float(data['ib_peak_A']):.3f}", "ic_peak_A": f"{float(data['ic_peak_A']):.3f}", "torque_Nm": f"{float(data['torque_Nm']):.3f}", "slip_percent": f"{float(data['slip_percent']):.2f}", "meas_status": f"0x{int(data['meas_status']):08X}", } if writer: writer.writerow(row) csv_file.flush() else: print(",".join(str(row[key]) for key in headers)) count += 1 time.sleep(args.interval) except KeyboardInterrupt: print("\nLog stopped") finally: if csv_file: csv_file.close() def command_regs(filter_text: str | None) -> None: rows = [] needle = normalize_key(filter_text) if filter_text else None for reg in REGISTERS: haystack = f"{reg.addr} {reg.name} {reg.access} {reg.note}".lower() if needle and needle not in haystack: continue rows.append( [ f"{reg.addr:04d}", f"0x{reg.addr:04X}", f"400{reg.addr + 1:02d}", reg.name, reg.access, reg.unit, reg.note, ] ) print_table(["dec", "hex", "4x", "name", "rw", "unit", "note"], rows) class ADShell(cmd.Cmd): intro = "AD Modbus terminal. Type help or ? to list commands." prompt = "ad> " def __init__(self, client: ModbusRTUClient) -> None: super().__init__() self.client = client def _run(self, func, *args) -> None: try: func(*args) except TerminalError as exc: print(f"error: {exc}") def do_ping(self, line: str) -> None: "ping: read device id and protocol version" self._run(command_ping, self.client) def do_read(self, line: str) -> None: "read ADDR [QTY]: read holding registers" args = shlex.split(line) if not args: print("usage: read ADDR [QTY]") return try: quantity = parse_int(args[1]) if len(args) > 1 else 1 except TerminalError as exc: print(f"error: {exc}") return self._run(command_read, self.client, args[0], quantity) def do_write(self, line: str) -> None: "write ADDR VALUE: write one holding register" args = shlex.split(line) if len(args) != 2: print("usage: write ADDR VALUE") return self._run(command_write, self.client, args[0], args[1]) def do_status(self, line: str) -> None: "status: show AD mode, flags and inverter state" self._run(print_status, self.client) def do_measure(self, line: str) -> None: "measure: show current measurements" self._run(print_measurements, self.client) def do_params(self, line: str) -> None: "params: show identified motor parameters" self._run(print_params, self.client) def do_stop(self, line: str) -> None: "stop: request stop" self._run(command_stop, self.client) def do_reset(self, line: str) -> None: "reset: reset latched AD faults" self._run(command_reset, self.client) def do_regs(self, line: str) -> None: "regs [FILTER]: list known holding registers" args = shlex.split(line) command_regs(args[0] if args else None) def do_start(self, line: str) -> None: "start MODE [key=value ...]: start AD test mode" try: args = parse_shell_start(line) except TerminalError as exc: print(f"error: {exc}") return self._run(command_start, self.client, args) def do_log(self, line: str) -> None: "log [INTERVAL] [COUNT]: stream measurements to stdout" parts = shlex.split(line) try: args = argparse.Namespace( interval=float(parts[0]) if len(parts) >= 1 else 0.5, count=int(parts[1]) if len(parts) >= 2 else 0, csv=None, ) except ValueError as exc: print(f"error: {exc}") return self._run(command_log, self.client, args) def do_quit(self, line: str) -> bool: "quit: leave terminal" return True def do_exit(self, line: str) -> bool: "exit: leave terminal" return True def do_EOF(self, line: str) -> bool: print() return True def parse_shell_start(line: str) -> argparse.Namespace: parts = shlex.split(line) if not parts: raise TerminalError("usage: start MODE [duty=0.08] [current=10] [...]") values = { "mode": parts[0], "duty": 0.08, "current_limit": 10.0, "overvoltage": 60.0, "undervoltage": 0.0, "speed_limit": 1000.0, "temp_limit": 85.0, "rotation_freq": 3.0, "rotation_mod": 0.35, "rotation_ramp_ms": 3000, "polarity": DEFAULT_POLARITY, "timing": "center", "motor": "ad", "pole_pairs": DEFAULT_POLE_PAIRS, "phase_shunt_ohm": DEFAULT_PHASE_SHUNT_OHM, "locked_rotor": False, } for token in parts[1:]: if "=" not in token: values["duty"] = float(token) continue key, raw_value = token.split("=", 1) key = normalize_key(key) if key in ("current", "current_limit"): values["current_limit"] = float(raw_value) elif key in ("ov", "overvoltage"): values["overvoltage"] = float(raw_value) elif key in ("uv", "undervoltage"): values["undervoltage"] = float(raw_value) elif key in ("speed", "speed_limit"): values["speed_limit"] = float(raw_value) elif key in ("temp", "temperature", "temp_limit"): values["temp_limit"] = float(raw_value) elif key in ("freq", "rotation_freq"): values["rotation_freq"] = float(raw_value) elif key in ("mod", "rotation_mod"): values["rotation_mod"] = float(raw_value) elif key in ("ramp", "ramp_ms", "rotation_ramp", "rotation_ramp_ms"): values["rotation_ramp_ms"] = parse_int(raw_value) elif key == "duty": values["duty"] = float(raw_value) elif key == "polarity": values["polarity"] = parse_int(raw_value) elif key == "timing": values["timing"] = raw_value elif key == "motor": values["motor"] = raw_value elif key in ("pole_pairs", "poles", "pairs"): values["pole_pairs"] = float(raw_value) elif key in ("phase_shunt", "phase_shunt_ohm", "rshunt", "shunt"): values["phase_shunt_ohm"] = float(raw_value) elif key in ("locked", "locked_rotor"): values["locked_rotor"] = raw_value.lower() in ("1", "yes", "true", "on") else: raise TerminalError(f"unknown start option: {key}") return argparse.Namespace(**values) def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="AD project Modbus RTU terminal (USART2/ST-LINK VCP, 115200 8N1)." ) parser.add_argument("--port", help="serial port, for example COM7; use auto to probe") parser.add_argument("--baud", type=int, default=DEFAULT_BAUD, help="baudrate") parser.add_argument("--slave", type=int, default=DEFAULT_SLAVE, help="Modbus slave id") parser.add_argument("--timeout", type=float, default=DEFAULT_TIMEOUT, help="serial timeout, s") parser.add_argument("--debug", action="store_true", help="print raw Modbus frames") sub = parser.add_subparsers(dest="command") sub.add_parser("ports", help="list serial ports") regs = sub.add_parser("regs", help="list known AD holding registers") regs.add_argument("filter", nargs="?") sub.add_parser("ping", help="read device id and protocol version") read = sub.add_parser("read", help="read holding registers") read.add_argument("address") read.add_argument("quantity", nargs="?", type=int, default=1) write = sub.add_parser("write", help="write one holding register") write.add_argument("address") write.add_argument("value") write_many = sub.add_parser("write-many", help="write sequential holding registers") write_many.add_argument("address") write_many.add_argument("values", nargs="+") sub.add_parser("status", help="show AD status and faults") sub.add_parser("measure", help="show measurements") sub.add_parser("params", help="show identified motor parameters") sub.add_parser("stop", help="request stop") sub.add_parser("reset", help="reset faults") start = sub.add_parser("start", help="prepare limits and start mode") start.add_argument("mode", help="mode number/name, e.g. logging, uh, auto, rotation") start.add_argument("--duty", type=float, default=0.08) start.add_argument("--current-limit", type=float, default=10.0) start.add_argument("--overvoltage", type=float, default=60.0) start.add_argument("--undervoltage", type=float, default=0.0) start.add_argument("--speed-limit", type=float, default=1000.0) start.add_argument("--temp-limit", type=float, default=85.0) start.add_argument("--rotation-freq", type=float, default=3.0) start.add_argument("--rotation-mod", type=float, default=0.35) start.add_argument("--rotation-ramp-ms", type=argparse_int, default=3000) start.add_argument("--polarity", type=argparse_int, default=DEFAULT_POLARITY) start.add_argument("--timing", default="center", help="up or center") start.add_argument("--motor", default="ad", help="ad or bldc") start.add_argument("--pole-pairs", type=float, default=DEFAULT_POLE_PAIRS) start.add_argument("--phase-shunt-ohm", type=float, default=DEFAULT_PHASE_SHUNT_OHM) start.add_argument("--locked-rotor", action="store_true") log = sub.add_parser("log", help="stream measurements") log.add_argument("--interval", type=float, default=0.5) log.add_argument("--count", type=int, default=0, help="0 means forever") log.add_argument("--csv", help="write CSV file instead of stdout") sub.add_parser("shell", help="interactive terminal") return parser def main(argv: list[str] | None = None) -> int: parser = build_parser() args = parser.parse_args(argv) try: if args.command == "ports": list_ports() return 0 if args.command == "regs": command_regs(args.filter) return 0 if args.command is None: if args.port: args.command = "shell" else: parser.print_help() return 0 with open_client(args) as client: if args.command == "ping": command_ping(client) elif args.command == "read": command_read(client, args.address, args.quantity) elif args.command == "write": command_write(client, args.address, args.value) elif args.command == "write-many": command_write_many(client, args.address, args.values) elif args.command == "status": print_status(client) elif args.command == "measure": print_measurements(client) elif args.command == "params": print_params(client) elif args.command == "start": command_start(client, args) elif args.command == "stop": command_stop(client) elif args.command == "reset": command_reset(client) elif args.command == "log": command_log(client, args) elif args.command == "shell": ADShell(client).cmdloop() else: parser.error(f"unknown command: {args.command}") except TerminalError as exc: print(f"error: {exc}", file=sys.stderr) return 2 except KeyboardInterrupt: print("\nInterrupted", file=sys.stderr) return 130 return 0 if __name__ == "__main__": raise SystemExit(main())