Files

1680 lines
62 KiB
Python

#!/usr/bin/env python3
"""Browser GUI for the AD Modbus RTU terminal."""
from __future__ import annotations
import argparse
import base64
import hmac
import json
import os
import threading
import time
import webbrowser
from http import HTTPStatus
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from urllib.parse import parse_qs, urlparse
import ad_terminal as ad
DEFAULT_WEB_PORT = 8765
DEFAULT_AUTH_USER = "ad"
AUTH_REALM = "AD GUI"
INDEX_HTML = r"""<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>AD Modbus GUI</title>
<style>
:root {
--bg: #f5f7fa;
--surface: #ffffff;
--line: #d8dee8;
--text: #17202a;
--muted: #617084;
--blue: #1f6feb;
--blue-dark: #174ea6;
--green: #16833a;
--red: #b42318;
--amber: #946200;
--code: #0b253a;
}
* { box-sizing: border-box; }
body {
margin: 0;
font-family: "Segoe UI", Arial, sans-serif;
background: var(--bg);
color: var(--text);
letter-spacing: 0;
}
header {
height: 58px;
display: flex;
align-items: center;
gap: 14px;
padding: 0 18px;
background: #102033;
color: #fff;
border-bottom: 1px solid #07111d;
}
.mark {
width: 34px;
height: 34px;
display: grid;
place-items: center;
border-radius: 6px;
background: #2d9cdb;
color: white;
font-weight: 700;
font-size: 15px;
}
h1 {
margin: 0;
font-size: 18px;
font-weight: 650;
}
.subhead {
color: #c6d0dd;
font-size: 13px;
}
.shell {
display: grid;
grid-template-columns: 320px minmax(0, 1fr);
gap: 12px;
padding: 12px;
min-height: calc(100vh - 58px);
}
aside, main, .panel {
background: var(--surface);
border: 1px solid var(--line);
border-radius: 8px;
}
aside {
padding: 12px;
display: flex;
flex-direction: column;
gap: 12px;
}
main {
min-width: 0;
display: grid;
grid-template-rows: auto minmax(0, 1fr) 150px;
}
.panel {
padding: 12px;
}
.section-title {
display: flex;
align-items: center;
justify-content: space-between;
margin: 0 0 10px;
color: var(--code);
font-size: 13px;
font-weight: 700;
text-transform: uppercase;
}
.grid {
display: grid;
grid-template-columns: 1fr 1fr;
gap: 8px;
}
.field {
display: flex;
flex-direction: column;
gap: 4px;
min-width: 0;
}
label {
color: var(--muted);
font-size: 12px;
}
input, select {
width: 100%;
height: 32px;
padding: 5px 8px;
border: 1px solid #bfccd9;
border-radius: 6px;
background: #fff;
color: var(--text);
font: inherit;
font-size: 13px;
}
input[type="checkbox"] {
width: 14px;
height: 14px;
flex: 0 0 14px;
padding: 0;
margin: 0;
}
.button-row {
display: grid;
grid-template-columns: repeat(2, minmax(0, 1fr));
gap: 8px;
}
button {
min-height: 34px;
border: 1px solid #b8c4d2;
border-radius: 6px;
background: #f8fafc;
color: var(--text);
font: inherit;
font-size: 13px;
cursor: pointer;
}
button:hover { background: #eef3f8; }
button.primary {
background: var(--blue);
border-color: var(--blue);
color: #fff;
}
button.primary:hover { background: var(--blue-dark); }
button.danger {
color: #fff;
background: var(--red);
border-color: var(--red);
}
button.good {
color: #fff;
background: var(--green);
border-color: var(--green);
}
button:disabled {
opacity: 0.55;
cursor: default;
}
.status-line {
display: flex;
align-items: center;
gap: 8px;
min-height: 22px;
color: var(--muted);
font-size: 13px;
}
.dot {
width: 10px;
height: 10px;
border-radius: 50%;
background: #8a95a3;
}
.dot.on { background: var(--green); }
.dot.err { background: var(--red); }
.tabs {
display: flex;
gap: 2px;
padding: 10px 10px 0;
border-bottom: 1px solid var(--line);
}
.tab {
padding: 9px 14px;
border: 1px solid transparent;
border-bottom: 0;
border-radius: 7px 7px 0 0;
background: transparent;
min-height: 38px;
}
.tab.active {
background: #fff;
border-color: var(--line);
color: var(--blue-dark);
font-weight: 650;
}
.view {
display: none;
min-height: 0;
overflow: auto;
padding: 12px;
}
.view.active { display: block; }
.kv {
display: grid;
grid-template-columns: repeat(2, minmax(260px, 1fr));
gap: 10px;
}
.metric {
border: 1px solid #e0e6ee;
border-radius: 7px;
padding: 10px;
min-height: 62px;
background: #fff;
}
.metric .name {
color: var(--muted);
font-size: 12px;
margin-bottom: 5px;
}
.metric .value {
font-family: Consolas, "Cascadia Mono", monospace;
font-size: 15px;
color: var(--code);
overflow-wrap: anywhere;
}
.led-layout {
display: grid;
grid-template-columns: max-content max-content;
gap: 4px;
margin-top: 6px;
align-items: start;
}
.led-panel {
border: 1px solid #e0e6ee;
border-radius: 7px;
padding: 6px;
background: #fff;
}
.led-title {
margin-bottom: 4px;
color: var(--muted);
font-size: 12px;
font-weight: 700;
text-transform: uppercase;
}
.led-grid {
display: grid;
grid-template-columns: max-content;
grid-auto-rows: 18px;
gap: 1px;
}
.led-item {
display: flex;
align-items: center;
gap: 5px;
min-height: 18px;
color: var(--code);
font-family: Consolas, "Cascadia Mono", monospace;
font-size: 11px;
overflow-wrap: anywhere;
}
.led-square {
width: 13px;
height: 13px;
flex: 0 0 13px;
border: 1px solid #b7c3d1;
border-radius: 3px;
background: #e8edf3;
box-shadow: inset 0 0 0 2px rgba(255,255,255,0.55);
}
.led-square.on {
border-color: #0f6b32;
background: var(--green);
box-shadow: 0 0 0 2px rgba(22,131,58,0.16), inset 0 0 0 2px rgba(255,255,255,0.24);
}
.led-panel.fault .led-square.on {
border-color: #8f1c13;
background: var(--red);
box-shadow: 0 0 0 2px rgba(180,35,24,0.16), inset 0 0 0 2px rgba(255,255,255,0.2);
}
.led-bit {
color: var(--muted);
min-width: 32px;
font-family: Consolas, "Cascadia Mono", monospace;
}
table {
width: 100%;
border-collapse: collapse;
table-layout: fixed;
font-size: 13px;
}
th, td {
border-bottom: 1px solid #e2e7ee;
padding: 7px 8px;
text-align: left;
vertical-align: top;
overflow-wrap: anywhere;
}
th {
position: sticky;
top: 0;
background: #f8fafc;
color: var(--muted);
font-size: 12px;
z-index: 1;
}
tr.selected { background: #e9f2ff; }
.register-toolbar {
display: grid;
grid-template-columns: 110px 70px 110px 110px minmax(0, 1fr) 110px 110px 110px;
gap: 8px;
margin-bottom: 10px;
align-items: end;
}
.log {
height: 100%;
min-height: 120px;
overflow: auto;
padding: 10px;
background: #0f1d2b;
color: #dbe7f3;
font-family: Consolas, "Cascadia Mono", monospace;
font-size: 12px;
white-space: pre-wrap;
}
.wide { grid-column: 1 / -1; }
.run-toggles {
grid-column: 1 / -1;
display: grid;
grid-template-columns: repeat(2, minmax(0, 1fr));
gap: 6px;
}
.check-pill {
min-height: 28px;
display: flex;
align-items: center;
gap: 6px;
padding: 4px 8px;
border: 1px solid #b8c4d2;
border-radius: 6px;
background: #f8fafc;
color: var(--text);
font-size: 11px;
line-height: 1.15;
cursor: pointer;
}
.check-pill:hover { background: #eef3f8; }
.quick {
display: grid;
grid-template-columns: repeat(2, minmax(0, 1fr));
gap: 8px;
}
.small-note {
color: var(--muted);
font-size: 12px;
min-height: 18px;
}
@media (max-width: 900px) {
.shell { grid-template-columns: 1fr; }
.kv { grid-template-columns: 1fr; }
.led-layout { grid-template-columns: 1fr; }
.register-toolbar { grid-template-columns: 1fr 1fr; }
.run-toggles { grid-template-columns: 1fr; }
main { grid-template-rows: auto minmax(420px, 1fr) 150px; }
}
.view-toolbar {
display: flex;
align-items: center;
gap: 10px;
margin-bottom: 10px;
flex-wrap: wrap;
color: var(--muted);
font-size: 13px;
}
.view-toolbar input[type="text"] {
width: 80px;
}
</style>
</head>
<body>
<header>
<div class="mark">AD</div>
<div>
<h1>AD Modbus GUI</h1>
<div class="subhead">USART2 / ST-LINK VCP / 512000 8N1 / slave 1</div>
</div>
</header>
<div class="shell">
<aside>
<section class="panel">
<div class="section-title">Connection</div>
<div class="grid">
<div class="field wide">
<label for="port">Port</label>
<input id="port" list="ports" value="COM31" placeholder="COM31">
<datalist id="ports"></datalist>
</div>
<div class="field"><label for="baud">Baud</label><input id="baud" value="512000"></div>
<div class="field"><label for="slave">Slave</label><input id="slave" value="1"></div>
<div class="field"><label for="timeout">Timeout s</label><input id="timeout" value="0.5"></div>
<button id="refreshPorts">Ports</button>
<button id="connect" class="primary">Connect</button>
<button id="disconnect">Disconnect</button>
<button id="ping">Ping</button>
</div>
<div class="status-line" style="margin-top:10px">
<span id="connDot" class="dot"></span>
<span id="connText">Disconnected</span>
</div>
</section>
<section class="panel">
<div class="section-title">Run Control</div>
<div class="grid">
<div class="field wide"><label for="mode">Mode</label><select id="mode"></select></div>
<div class="field"><label for="duty">Duty</label><input id="duty" value="0.08"></div>
<div class="field"><label for="current">Current A</label><input id="current" value="10.0"></div>
<div class="field"><label for="overvoltage">Overvolt V</label><input id="overvoltage" value="60.0"></div>
<div class="field"><label for="undervoltage">Undervolt V</label><input id="undervoltage" value="0.0"></div>
<div class="field"><label for="speed">Speed rpm</label><input id="speed" value="1000"></div>
<div class="field"><label for="temp">Temp C</label><input id="temp" value="85.0"></div>
<div class="field"><label for="rotFreq">Rot freq Hz</label><input id="rotFreq" value="3.0"></div>
<div class="field"><label for="rotMod">Rot mod</label><input id="rotMod" value="0.35"></div>
<div class="field"><label for="rotRamp">Ramp ms</label><input id="rotRamp" value="3000"></div>
<div class="field"><label for="polarity">Polarity</label><input id="polarity" value="1"></div>
<div class="field"><label for="timing">Timing</label><select id="timing"><option>center</option><option>up</option></select></div>
<div class="field"><label for="motor">Motor</label><select id="motor"><option>ad</option><option>bldc</option></select></div>
<div class="field"><label for="polePairs">Pole pairs</label><input id="polePairs" value="1"></div>
<div class="field"><label for="phaseShunt">Shunt ohm</label><input id="phaseShunt" value="0.1"></div>
<div class="run-toggles">
<label class="check-pill"><input id="locked" type="checkbox"> Locked rotor</label>
<label class="check-pill"><input id="applyNow" type="checkbox"> Apply now</label>
</div>
</div>
<div class="button-row" style="margin-top:10px">
<button id="start" class="good">Start</button>
<button id="stop" class="danger">Stop</button>
<button id="reset">Reset faults</button>
<button id="refreshAll">Refresh</button>
</div>
</section>
<section class="panel">
<div class="section-title">Quick Modes</div>
<div class="quick" id="quickModes"></div>
</section>
<section class="panel">
<div class="section-title">Auto</div>
<div class="grid">
<label class="wide"><input id="auto" type="checkbox"> Auto refresh</label>
<div class="field"><label for="interval">Interval ms</label><input id="interval" value="500"></div>
<button id="csvStart">CSV start</button>
<button id="csvStop">CSV stop</button>
</div>
<div id="csvNote" class="small-note"></div>
</section>
</aside>
<main>
<nav class="tabs" id="tabs">
<button class="tab active" data-tab="statusView">Status</button>
<button class="tab" data-tab="measureView">Measurements</button>
<button class="tab" data-tab="paramsView">Parameters</button>
<button class="tab" data-tab="registerView">Registers</button>
</nav>
<section class="view active" id="statusView">
<div class="kv" id="statusGrid"></div>
<div class="led-layout">
<div class="led-panel">
<div class="led-title">H017_status_flags_lo</div>
<div class="led-grid" id="statusLedGrid"></div>
</div>
<div class="led-panel fault">
<div class="led-title">H019_fault_flags_lo</div>
<div class="led-grid" id="faultLedGrid"></div>
</div>
</div>
</section>
<section class="view" id="measureView">
<div class="view-toolbar">
<label><input id="measurePoll" type="checkbox"> Poll measurements</label>
<label><input id="resetCurrentPeaks" type="checkbox"> Reset current peaks</label>
<label for="measureInterval">Interval ms</label>
<input id="measureInterval" type="text" value="200">
<button id="measureRefresh">Read now</button>
</div>
<div class="kv" id="measureGrid"></div>
</section>
<section class="view" id="paramsView"><div class="kv" id="paramsGrid"></div></section>
<section class="view" id="registerView">
<div class="view-toolbar">
<label><input id="registerPoll" type="checkbox"> Poll registers</label>
<label for="registerInterval">Interval ms</label>
<input id="registerInterval" type="text" value="500">
<button id="registerRefresh">Read now</button>
</div>
<div class="register-toolbar">
<div class="field"><label for="readAddr">Read addr</label><input id="readAddr" value="0"></div>
<div class="field"><label for="readQty">Qty</label><input id="readQty" value="2"></div>
<button id="readRange">Read range</button>
<button id="readGroups">Read groups</button>
<div></div>
<div class="field"><label for="writeAddr">Write addr</label><input id="writeAddr" value="duty"></div>
<div class="field"><label for="writeValue">Value</label><input id="writeValue" value="800"></div>
<button id="writeOne">Write one</button>
</div>
<table>
<thead>
<tr>
<th style="width:70px">Dec</th>
<th style="width:90px">Hex</th>
<th style="width:230px">Name</th>
<th style="width:70px">RW</th>
<th style="width:130px">Raw</th>
<th>Decoded</th>
</tr>
</thead>
<tbody id="registerRows"></tbody>
</table>
</section>
<section class="log" id="log"></section>
</main>
</div>
<script>
const $ = (id) => document.getElementById(id);
const state = {
connected: false,
registers: [],
registerRows: new Map(),
autoTimer: null,
measureTimer: null,
registerTimer: null,
applyTimer: null,
csvRows: [],
csvLogging: false,
};
function log(message, kind = "info") {
const ts = new Date().toLocaleTimeString();
$("log").textContent += `[${ts}] ${kind.toUpperCase()}: ${message}\n`;
$("log").scrollTop = $("log").scrollHeight;
}
async function api(path, options = {}) {
const response = await fetch(path, {
headers: {"Content-Type": "application/json"},
...options,
});
const data = await response.json();
if (!response.ok || !data.ok) {
throw new Error(data.error || response.statusText);
}
return data;
}
function setConnected(connected, text) {
state.connected = connected;
$("connText").textContent = text || (connected ? "Connected" : "Disconnected");
$("connDot").classList.toggle("on", connected);
$("connDot").classList.toggle("err", !connected && text && text.includes("Error"));
}
function metric(parent, name) {
const node = document.createElement("div");
node.className = "metric";
node.innerHTML = `<div class="name"></div><div class="value">-</div>`;
node.querySelector(".name").textContent = name;
parent.appendChild(node);
return node.querySelector(".value");
}
const statusFields = {};
const measureFields = {};
const paramFields = {};
const statusLeds = {};
const faultLeds = {};
function buildMetrics() {
["mode","status","faults","power_stage_allowed","test_running","pwm_running","service_output","id_stage","pwm_timing","motor_control"]
.forEach(name => statusFields[name] = metric($("statusGrid"), name));
["ia_A","ib_A","ic_A","vdc_V","temp_C","speed_rpm","ia_rms_A","ib_rms_A","ic_rms_A","ia_peak_A","ib_peak_A","ic_peak_A","torque_Nm","slip_percent","meas_status"]
.forEach(name => measureFields[name] = metric($("measureGrid"), name));
["valid_mask","Rs_ohm","Ls_H","Ll_H","Rr_ohm","Lr_H","Lm_H","J_kg_m2","B_Nm_s","pole_pairs","phase_shunt_ohm"]
.forEach(name => paramFields[name] = metric($("paramsGrid"), name));
}
function buildLedGrid(parent, flags, store) {
parent.innerHTML = "";
Object.keys(store).forEach(key => delete store[key]);
flags.forEach(flag => {
const node = document.createElement("div");
node.className = "led-item";
node.innerHTML = `<span class="led-square"></span><span class="led-bit">bit ${flag.bit}</span><span class="led-label"></span>`;
node.querySelector(".led-label").textContent = flag.label;
parent.appendChild(node);
store[flag.bit] = {
root: node,
square: node.querySelector(".led-square"),
};
});
}
function updateLedGrid(store, flags) {
flags.forEach(flag => {
const item = store[flag.bit];
if (!item) return;
item.square.classList.toggle("on", flag.active);
item.root.title = flag.active ? "ON" : "OFF";
});
}
async function loadRegistry() {
const data = await api("/api/registry");
state.registers = data.registers;
buildLedGrid($("statusLedGrid"), data.status_flags, statusLeds);
buildLedGrid($("faultLedGrid"), data.fault_flags, faultLeds);
const mode = $("mode");
mode.innerHTML = "";
Object.entries(data.modes).forEach(([value, name]) => {
const option = document.createElement("option");
option.value = value;
option.textContent = `${value} ${name}`;
if (value === "5") option.selected = true;
mode.appendChild(option);
});
const quick = $("quickModes");
[
["Logging", "logging"], ["Auto ID", "auto"], ["Rotation", "rotation"],
["PWM UH", "uh"], ["PWM UL", "ul"], ["PWM VH", "vh"], ["PWM VL", "vl"],
["PWM WH", "wh"], ["PWM WL", "wl"], ["PWM ALL", "all"]
].forEach(([label, value]) => {
const button = document.createElement("button");
button.textContent = label;
button.onclick = () => startMode(value);
quick.appendChild(button);
});
const rows = $("registerRows");
rows.innerHTML = "";
state.registerRows.clear();
data.registers.forEach(reg => {
const tr = document.createElement("tr");
tr.dataset.addr = reg.addr;
tr.innerHTML = `<td>${reg.addr}</td><td>${reg.hex}</td><td>${reg.name}</td><td>${reg.access}</td><td>-</td><td>-</td>`;
tr.onclick = () => {
document.querySelectorAll("#registerRows tr").forEach(row => row.classList.remove("selected"));
tr.classList.add("selected");
$("readAddr").value = reg.addr;
$("writeAddr").value = reg.name;
};
rows.appendChild(tr);
state.registerRows.set(reg.addr, tr);
});
}
async function refreshPorts() {
try {
const data = await api("/api/ports");
const list = $("ports");
list.innerHTML = "";
data.ports.forEach(port => {
const option = document.createElement("option");
option.value = port.device;
option.label = port.description || port.device;
list.appendChild(option);
});
if (!$("port").value && data.ports.length) $("port").value = data.ports[0].device;
log(`Found ${data.ports.length} serial port(s)`);
} catch (error) {
log(error.message, "error");
}
}
async function connect() {
try {
const payload = {
port: $("port").value,
baud: Number($("baud").value),
slave: Number($("slave").value),
timeout: Number($("timeout").value),
};
const data = await api("/api/connect", {method: "POST", body: JSON.stringify(payload)});
setConnected(true, `Connected: ${data.port}`);
log(`Connected to ${data.port}`);
await refreshAll();
} catch (error) {
setConnected(false, `Error: ${error.message}`);
log(error.message, "error");
}
}
async function disconnect() {
try {
await api("/api/disconnect", {method: "POST", body: "{}"});
} catch (error) {
log(error.message, "error");
}
stopAuto();
stopMeasurePoll();
stopRegisterPoll();
setConnected(false, "Disconnected");
log("Disconnected");
}
async function ping() {
const data = await api("/api/ping");
updateRegisters(data.registers);
log(`Ping OK: device=${data.device_id}, protocol=${data.protocol}`);
}
function startPayload(modeOverride = null) {
return {
mode: modeOverride || $("mode").value,
duty: Number($("duty").value),
current_limit: Number($("current").value),
overvoltage: Number($("overvoltage").value),
undervoltage: Number($("undervoltage").value),
speed_limit: Number($("speed").value),
temp_limit: Number($("temp").value),
rotation_freq: Number($("rotFreq").value),
rotation_mod: Number($("rotMod").value),
rotation_ramp_ms: Number($("rotRamp").value),
polarity: Number($("polarity").value),
timing: $("timing").value,
motor: $("motor").value,
pole_pairs: Number($("polePairs").value),
phase_shunt_ohm: Number($("phaseShunt").value),
locked_rotor: $("locked").checked,
};
}
async function startMode(modeOverride = null) {
try {
const data = await api("/api/start", {method: "POST", body: JSON.stringify(startPayload(modeOverride))});
$("mode").value = String(data.mode);
log(`Started ${data.mode} ${data.mode_name}`);
if (data.warning) log(data.warning, "warn");
await refreshStatus();
} catch (error) {
log(error.message, "error");
}
}
function scheduleApplyRunControl() {
if (!$("applyNow").checked) return;
if (state.applyTimer) clearTimeout(state.applyTimer);
state.applyTimer = setTimeout(applyRunControl, 250);
}
async function applyRunControl() {
state.applyTimer = null;
if (!$("applyNow").checked) return;
try {
const data = await api("/api/apply-run-control", {
method: "POST",
body: JSON.stringify(startPayload()),
});
updateRegisters(data.registers);
log(`Applied run control: ${data.mode} ${data.mode_name}`);
} catch (error) {
log(error.message, "error");
}
}
async function stopAd() {
try {
const data = await api("/api/stop", {method: "POST", body: "{}"});
log("Stop requested");
if (data.warning) log(data.warning, "warn");
await refreshStatus();
} catch (error) {
log(error.message, "error");
}
}
async function resetFaults() {
try {
const data = await api("/api/reset", {method: "POST", body: "{}"});
log("Fault reset requested");
if (data.warning) log(data.warning, "warn");
await refreshStatus();
} catch (error) {
log(error.message, "error");
}
}
function updateRegisters(items) {
items.forEach(item => {
const row = state.registerRows.get(item.addr);
if (!row) return;
row.children[4].textContent = `${item.raw} / ${item.hex_value}`;
row.children[5].textContent = item.decoded;
});
}
async function refreshStatus() {
const data = await api("/api/status");
Object.entries(data.status).forEach(([key, value]) => {
if (statusFields[key]) statusFields[key].textContent = value;
});
updateLedGrid(statusLeds, data.status_leds);
updateLedGrid(faultLeds, data.fault_leds);
updateRegisters(data.registers);
}
async function refreshMeasurements() {
const data = await api("/api/measure");
Object.entries(data.measurements).forEach(([key, value]) => {
if (measureFields[key]) measureFields[key].textContent = value;
});
updateRegisters(data.registers);
appendCsv(data.raw);
}
async function resetCurrentPeaks() {
if (!$("resetCurrentPeaks").checked) return;
try {
const data = await api("/api/reset-current-peaks", {method: "POST", body: "{}"});
log("Current peaks reset requested");
if (data.warning) log(data.warning, "warn");
await refreshMeasurements();
} catch (error) {
log(error.message, "error");
} finally {
$("resetCurrentPeaks").checked = false;
}
}
function measurePollInterval() {
return Math.max(50, Number($("measureInterval").value) || 200);
}
function startMeasurePoll() {
stopMeasurePoll();
const interval = measurePollInterval();
state.measureTimer = setInterval(async () => {
try {
await refreshMeasurements();
} catch (error) {
log(error.message, "error");
stopMeasurePoll();
$("measurePoll").checked = false;
}
}, interval);
refreshMeasurements().catch(error => log(error.message, "error"));
log(`Measurement polling ${interval} ms`);
}
function stopMeasurePoll() {
if (state.measureTimer) {
clearInterval(state.measureTimer);
state.measureTimer = null;
}
}
async function refreshParams() {
const data = await api("/api/params");
Object.entries(data.params).forEach(([key, value]) => {
if (paramFields[key]) paramFields[key].textContent = value;
});
updateRegisters(data.registers);
}
async function refreshAll() {
await refreshStatus();
await refreshMeasurements();
await refreshParams();
log("Refreshed");
}
async function readRange() {
try {
const payload = {address: $("readAddr").value, quantity: Number($("readQty").value)};
const data = await api("/api/read", {method: "POST", body: JSON.stringify(payload)});
updateRegisters(data.registers);
log(`Read ${data.registers.length} register(s)`);
} catch (error) {
log(error.message, "error");
}
}
async function refreshRegisterGroups() {
const data = await api("/api/read-groups");
updateRegisters(data.registers);
return data;
}
async function readGroups() {
try {
await refreshRegisterGroups();
log("Register groups refreshed");
} catch (error) {
log(error.message, "error");
}
}
function registerPollInterval() {
return Math.max(100, Number($("registerInterval").value) || 500);
}
function startRegisterPoll() {
stopRegisterPoll();
const interval = registerPollInterval();
state.registerTimer = setInterval(async () => {
try {
await refreshRegisterGroups();
} catch (error) {
log(error.message, "error");
stopRegisterPoll();
$("registerPoll").checked = false;
}
}, interval);
refreshRegisterGroups().catch(error => log(error.message, "error"));
log(`Register polling ${interval} ms`);
}
function stopRegisterPoll() {
if (state.registerTimer) {
clearInterval(state.registerTimer);
state.registerTimer = null;
}
}
async function writeOne() {
try {
const payload = {address: $("writeAddr").value, value: $("writeValue").value};
const data = await api("/api/write", {method: "POST", body: JSON.stringify(payload)});
updateRegisters(data.registers);
log(`Wrote ${data.address_hex} = ${data.value}`);
} catch (error) {
log(error.message, "error");
}
}
function startAuto() {
stopAuto();
const interval = Math.max(100, Number($("interval").value) || 500);
state.autoTimer = setInterval(async () => {
try {
await refreshStatus();
await refreshMeasurements();
} catch (error) {
log(error.message, "error");
stopAuto();
$("auto").checked = false;
}
}, interval);
log(`Auto refresh ${interval} ms`);
}
function stopAuto() {
if (state.autoTimer) {
clearInterval(state.autoTimer);
state.autoTimer = null;
}
}
function csvStart() {
state.csvRows = [];
state.csvLogging = true;
$("csvNote").textContent = "CSV logging in browser memory";
if (!$("measurePoll").checked) {
$("measurePoll").checked = true;
startMeasurePoll();
}
log("CSV logging started");
}
function appendCsv(raw) {
if (!state.csvLogging) return;
state.csvRows.push({host_time_s: (Date.now() / 1000).toFixed(3), ...raw});
$("csvNote").textContent = `${state.csvRows.length} row(s)`;
}
function csvStop() {
if (!state.csvLogging) return;
state.csvLogging = false;
const fields = ["host_time_s","ia_A","ib_A","ic_A","vdc_V","temp_C","speed_rpm","ia_rms_A","ib_rms_A","ic_rms_A","ia_peak_A","ib_peak_A","ic_peak_A","torque_Nm","slip_percent","meas_status"];
const lines = [fields.join(",")];
state.csvRows.forEach(row => lines.push(fields.map(field => row[field]).join(",")));
const blob = new Blob([lines.join("\n") + "\n"], {type: "text/csv"});
const url = URL.createObjectURL(blob);
const link = document.createElement("a");
link.href = url;
link.download = `ad_log_${new Date().toISOString().replaceAll(":", "-").slice(0, 19)}.csv`;
link.click();
URL.revokeObjectURL(url);
$("csvNote").textContent = `Saved ${state.csvRows.length} row(s)`;
log("CSV logging stopped");
}
function setupTabs() {
document.querySelectorAll(".tab").forEach(button => {
button.addEventListener("click", () => {
document.querySelectorAll(".tab").forEach(item => item.classList.remove("active"));
document.querySelectorAll(".view").forEach(item => item.classList.remove("active"));
button.classList.add("active");
$(button.dataset.tab).classList.add("active");
});
});
}
function bind() {
$("refreshPorts").onclick = refreshPorts;
$("connect").onclick = connect;
$("disconnect").onclick = disconnect;
$("ping").onclick = () => ping().catch(error => log(error.message, "error"));
$("start").onclick = () => startMode();
$("stop").onclick = stopAd;
$("reset").onclick = resetFaults;
$("refreshAll").onclick = () => refreshAll().catch(error => log(error.message, "error"));
$("readRange").onclick = readRange;
$("readGroups").onclick = readGroups;
$("registerRefresh").onclick = readGroups;
$("writeOne").onclick = writeOne;
$("auto").onchange = () => $("auto").checked ? startAuto() : stopAuto();
$("csvStart").onclick = csvStart;
$("csvStop").onclick = csvStop;
$("measureRefresh").onclick = () => refreshMeasurements().catch(error => log(error.message, "error"));
$("measurePoll").onchange = () => $("measurePoll").checked ? startMeasurePoll() : stopMeasurePoll();
$("resetCurrentPeaks").onchange = resetCurrentPeaks;
$("measureInterval").onchange = () => {
if ($("measurePoll").checked) startMeasurePoll();
};
$("registerPoll").onchange = () => $("registerPoll").checked ? startRegisterPoll() : stopRegisterPoll();
$("registerInterval").onchange = () => {
if ($("registerPoll").checked) startRegisterPoll();
};
["mode","duty","current","overvoltage","undervoltage","speed","temp","rotFreq","rotMod","rotRamp","polarity","timing","motor","polePairs","locked"]
.forEach(id => $(id).addEventListener("change", scheduleApplyRunControl));
$("applyNow").addEventListener("change", () => {
if ($("applyNow").checked) scheduleApplyRunControl();
});
}
async function boot() {
setupTabs();
buildMetrics();
bind();
await loadRegistry();
await refreshPorts();
log("GUI ready");
}
boot().catch(error => log(error.message, "error"));
</script>
</body>
</html>
"""
class ServerState:
def __init__(self) -> None:
self.lock = threading.RLock()
self.client: ad.ModbusRTUClient | None = None
self.port: str | None = None
def disconnect(self) -> None:
with self.lock:
if self.client is not None:
self.client.__exit__(None, None, None)
self.client = None
self.port = None
def require_client(self) -> ad.ModbusRTUClient:
if self.client is None:
raise ad.TerminalError("Not connected")
return self.client
def register_payload(start: int, values: list[int]) -> list[dict[str, object]]:
rows = []
for offset, value in enumerate(values):
addr = start + offset
reg = ad.REGISTER_BY_ADDR.get(addr)
rows.append(
{
"addr": addr,
"hex": f"0x{addr:04X}",
"name": reg.name if reg else "?",
"raw": value,
"hex_value": f"0x{value:04X}",
"decoded": ad.format_reg_value(addr, value),
}
)
return rows
def status_payload(values: list[int]) -> dict[str, str]:
status = ad.u32_from_words(values[1], values[2])
faults = ad.u32_from_words(values[3], values[4])
return {
"mode": f"{values[0]} ({ad.MODES.get(values[0], 'unknown')})",
"status": f"0x{status:08X} ({ad.decode_flags(status, ad.STATUS_FLAGS)})",
"faults": f"0x{faults:08X} ({ad.decode_flags(faults, ad.FAULT_FLAGS)})",
"power_stage_allowed": ad.format_reg_value(0x0015, values[5]),
"test_running": ad.format_reg_value(0x0016, values[6]),
"pwm_running": ad.format_reg_value(0x0017, values[7]),
"service_output": ad.format_reg_value(0x0018, values[8]),
"id_stage": ad.format_reg_value(0x0019, values[9]),
"pwm_timing": ad.format_reg_value(0x001A, values[10]),
"motor_control": ad.format_reg_value(0x001B, values[11]),
}
def flag_definitions(names: dict[int, str], prefix: str, bit_count: int = 16) -> list[dict[str, object]]:
return [
{
"bit": bit,
"label": f"{prefix}_{names[bit]}" if bit in names else "reserved",
}
for bit in range(bit_count)
]
def flag_payload(value: int, names: dict[int, str], prefix: str, bit_count: int = 16) -> list[dict[str, object]]:
return [
{
"bit": bit,
"label": f"{prefix}_{names[bit]}" if bit in names else "reserved",
"active": (value & (1 << bit)) != 0,
}
for bit in range(bit_count)
]
def measurement_payload(data: dict[str, object]) -> tuple[dict[str, str], dict[str, object]]:
raw = {
"ia_A": f"{float(data['ia_A']):.6g}",
"ib_A": f"{float(data['ib_A']):.6g}",
"ic_A": f"{float(data['ic_A']):.6g}",
"vdc_V": f"{float(data['vdc_V']):.6g}",
"temp_C": f"{float(data['temp_C']):.6g}",
"speed_rpm": str(data["speed_rpm"]),
"ia_rms_A": f"{float(data['ia_rms_A']):.6g}",
"ib_rms_A": f"{float(data['ib_rms_A']):.6g}",
"ic_rms_A": f"{float(data['ic_rms_A']):.6g}",
"ia_peak_A": f"{float(data['ia_peak_A']):.6g}",
"ib_peak_A": f"{float(data['ib_peak_A']):.6g}",
"ic_peak_A": f"{float(data['ic_peak_A']):.6g}",
"torque_Nm": f"{float(data['torque_Nm']):.6g}",
"slip_percent": f"{float(data['slip_percent']):.6g}",
"meas_status": f"0x{int(data['meas_status']):08X}",
}
shown = dict(raw)
shown["meas_status"] += f" ({ad.decode_flags(int(data['meas_status']), ad.MEAS_STATUS_FLAGS)})"
return shown, raw
def params_payload(values: list[int]) -> dict[str, str]:
valid = ad.u32_from_words(values[0], values[1])
params = {
"valid_mask": f"0x{valid:08X} ({ad.decode_flags(valid, ad.VALID_FLAGS)})",
"Rs_ohm": f"{values[2] * 0.001:.9g}",
"Ls_H": f"{ad.u32_from_words(values[3], values[4]) * 1e-6:.9g}",
"Ll_H": f"{ad.u32_from_words(values[5], values[6]) * 1e-6:.9g}",
"Rr_ohm": f"{values[7] * 0.001:.9g}",
"Lr_H": f"{ad.u32_from_words(values[8], values[9]) * 1e-6:.9g}",
"Lm_H": f"{ad.u32_from_words(values[10], values[11]) * 1e-6:.9g}",
"J_kg_m2": f"{ad.u32_from_words(values[12], values[13]) * 1e-9:.9g}",
"B_Nm_s": f"{ad.u32_from_words(values[14], values[15]) * 1e-9:.9g}",
"pole_pairs": str(values[16]),
"phase_shunt_ohm": f"{values[17] * 0.001:.9g}",
}
return params
def try_write_optional_ramp(client: ad.ModbusRTUClient, ramp_value: int) -> bool:
try:
client.write_single(0x001E, ramp_value)
return True
except ad.ModbusExceptionError as exc:
if exc.code in (2, 3):
return False
raise
def try_write_optional_single(client: ad.ModbusRTUClient, address: int, value: int) -> bool:
try:
client.write_single(address, value)
return True
except ad.ModbusExceptionError as exc:
if exc.code in (2, 3):
return False
raise
def stop_compatible(client: ad.ModbusRTUClient) -> bool:
client.write_single(0x0002, 0)
return try_write_optional_single(client, 0x0005, 1)
def optional_register_warning(
stop_supported: bool,
reset_supported: bool,
ramp_supported: bool,
pole_pairs_supported: bool,
phase_shunt_supported: bool,
) -> str | None:
missing = []
if not stop_supported:
missing.append("0x0005 stop")
if not reset_supported:
missing.append("0x0004 reset")
if not ramp_supported:
missing.append("0x001E ramp")
if not pole_pairs_supported:
missing.append("0x0040 pole_pairs")
if not phase_shunt_supported:
missing.append("0x0041 phase_shunt")
if not missing:
return None
return f"Firmware skipped optional register(s): {', '.join(missing)}"
def parse_json_body(handler: BaseHTTPRequestHandler) -> dict[str, object]:
size = int(handler.headers.get("Content-Length", "0"))
if size <= 0:
return {}
data = handler.rfile.read(size)
return json.loads(data.decode("utf-8"))
def parse_basic_auth(value: str | None) -> tuple[str, str] | None:
if not value or not value.startswith("Basic "):
return None
try:
decoded = base64.b64decode(value[6:].strip(), validate=True).decode("utf-8")
except (ValueError, UnicodeDecodeError):
return None
if ":" not in decoded:
return None
user, password = decoded.split(":", 1)
return user, password
def make_handler(state: ServerState, auth_user: str | None = None, auth_password: str | None = None):
class Handler(BaseHTTPRequestHandler):
server_version = "ADGui/1.0"
def log_message(self, format: str, *args) -> None:
return
def auth_enabled(self) -> bool:
return bool(auth_password)
def is_authorized(self) -> bool:
if not auth_password:
return True
credentials = parse_basic_auth(self.headers.get("Authorization"))
if credentials is None:
return False
user, password = credentials
expected_user = auth_user or DEFAULT_AUTH_USER
return hmac.compare_digest(user, expected_user) and hmac.compare_digest(password, auth_password)
def require_auth(self) -> bool:
if self.is_authorized():
return True
body = b"Authentication required\n"
self.send_response(HTTPStatus.UNAUTHORIZED)
self.send_header("WWW-Authenticate", f'Basic realm="{AUTH_REALM}", charset="UTF-8"')
self.send_header("Content-Type", "text/plain; charset=utf-8")
self.send_header("Cache-Control", "no-store")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
return False
def send_json(self, payload: dict[str, object], status: int = 200) -> None:
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Cache-Control", "no-store")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def send_error_json(self, exc: Exception, status: int = 400) -> None:
self.send_json({"ok": False, "error": str(exc)}, status)
def do_GET(self) -> None:
try:
if not self.require_auth():
return
parsed = urlparse(self.path)
if parsed.path == "/":
body = INDEX_HTML.encode("utf-8")
self.send_response(HTTPStatus.OK)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Cache-Control", "no-store")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
elif parsed.path == "/api/registry":
self.send_json(
{
"ok": True,
"modes": {str(key): value for key, value in ad.MODES.items()},
"status_flags": flag_definitions(ad.STATUS_FLAGS, "AD_PARAM_ID_STATUS"),
"fault_flags": flag_definitions(ad.FAULT_FLAGS, "AD_PARAM_ID_FAULT"),
"registers": [
{
"addr": reg.addr,
"hex": f"0x{reg.addr:04X}",
"name": reg.name,
"access": reg.access,
"unit": reg.unit,
"note": reg.note,
}
for reg in ad.REGISTERS
],
}
)
elif parsed.path == "/api/ports":
serial = ad.require_serial_module()
from serial.tools import list_ports # type: ignore[import-not-found]
ports = [
{
"device": port.device,
"description": port.description,
"hwid": port.hwid,
}
for port in list_ports.comports()
]
self.send_json({"ok": True, "ports": ports})
elif parsed.path == "/api/state":
with state.lock:
self.send_json({"ok": True, "connected": state.client is not None, "port": state.port})
elif parsed.path == "/api/ping":
with state.lock:
client = state.require_client()
values = client.read_holding(0, 2)
if values[0] != ad.DEVICE_ID:
raise ad.TerminalError(f"Unexpected device id: 0x{values[0]:04X}")
self.send_json(
{
"ok": True,
"device_id": f"0x{values[0]:04X}",
"protocol": values[1],
"registers": register_payload(0, values),
}
)
elif parsed.path == "/api/status":
with state.lock:
values = state.require_client().read_holding(0x0010, 12)
status_value = ad.u32_from_words(values[1], values[2])
fault_value = ad.u32_from_words(values[3], values[4])
self.send_json(
{
"ok": True,
"status": status_payload(values),
"status_leds": flag_payload(
status_value, ad.STATUS_FLAGS, "AD_PARAM_ID_STATUS"
),
"fault_leds": flag_payload(
fault_value, ad.FAULT_FLAGS, "AD_PARAM_ID_FAULT"
),
"registers": register_payload(0x0010, values),
}
)
elif parsed.path == "/api/measure":
with state.lock:
client = state.require_client()
values = ad.read_measurement_registers(client)
data = {
"ia_A": ad.to_s16(values[0]) * 0.001,
"ib_A": ad.to_s16(values[1]) * 0.001,
"ic_A": ad.to_s16(values[2]) * 0.001,
"vdc_V": values[3] * 0.1,
"temp_C": ad.to_s16(values[4]) * 0.1,
"meas_status": ad.u32_from_words(values[5], values[6]),
"speed_rpm": ad.to_s16(values[7]),
"ia_rms_A": values[8] * 0.001,
"ib_rms_A": values[9] * 0.001,
"ic_rms_A": values[10] * 0.001,
"torque_Nm": ad.to_s16(values[11]) * 0.001,
"ia_peak_A": values[12] * 0.001,
"ib_peak_A": values[13] * 0.001,
"ic_peak_A": values[14] * 0.001,
"slip_percent": ad.to_s16(values[15]) * 0.01,
}
shown, raw = measurement_payload(data)
self.send_json(
{
"ok": True,
"measurements": shown,
"raw": raw,
"registers": register_payload(0x0020, values),
}
)
elif parsed.path == "/api/params":
with state.lock:
values = ad.read_parameter_registers(state.require_client())
self.send_json(
{
"ok": True,
"params": params_payload(values),
"registers": register_payload(0x0030, values),
}
)
elif parsed.path == "/api/read-groups":
rows = []
with state.lock:
client = state.require_client()
for start, qty in [(0x0000, 16), (0x0010, 12)]:
rows.extend(register_payload(start, client.read_holding(start, qty)))
rows.extend(register_payload(0x0020, ad.read_measurement_registers(client)))
rows.extend(register_payload(0x0030, ad.read_parameter_registers(client)))
self.send_json({"ok": True, "registers": rows})
else:
self.send_error_json(Exception("Not found"), HTTPStatus.NOT_FOUND)
except Exception as exc:
self.send_error_json(exc)
def do_POST(self) -> None:
try:
if not self.require_auth():
return
parsed = urlparse(self.path)
payload = parse_json_body(self)
if parsed.path == "/api/connect":
port = str(payload.get("port", "")).strip()
if not port:
raise ad.TerminalError("Port is required")
baud = int(payload.get("baud", ad.DEFAULT_BAUD))
slave = int(payload.get("slave", ad.DEFAULT_SLAVE))
timeout = float(payload.get("timeout", ad.DEFAULT_TIMEOUT))
client = ad.ModbusRTUClient(port, baudrate=baud, slave=slave, timeout=timeout)
client.__enter__()
try:
values = client.read_holding(0, 2)
if values[0] != ad.DEVICE_ID:
raise ad.TerminalError(f"Unexpected device id: 0x{values[0]:04X}")
except Exception:
client.__exit__(None, None, None)
raise
with state.lock:
state.disconnect()
state.client = client
state.port = port
self.send_json({"ok": True, "port": port})
elif parsed.path == "/api/disconnect":
state.disconnect()
self.send_json({"ok": True})
elif parsed.path == "/api/read":
address = ad.parse_register_address(str(payload.get("address", "0")))
quantity = int(payload.get("quantity", 1))
with state.lock:
values = state.require_client().read_holding(address, quantity)
self.send_json({"ok": True, "registers": register_payload(address, values)})
elif parsed.path == "/api/write":
address = ad.parse_register_address(str(payload.get("address", "0")))
value = ad.parse_u16_value(str(payload.get("value", "0")))
with state.lock:
client = state.require_client()
client.write_single(address, value)
self.send_json(
{
"ok": True,
"address": address,
"address_hex": f"0x{address:04X}",
"value": value,
"registers": register_payload(address, [value]),
}
)
elif parsed.path == "/api/start":
mode_text = str(payload.get("mode", "logging")).split()[0]
args = argparse.Namespace(
mode=mode_text,
duty=float(payload.get("duty", 0.08)),
current_limit=float(payload.get("current_limit", 10.0)),
overvoltage=float(payload.get("overvoltage", 60.0)),
undervoltage=float(payload.get("undervoltage", 0.0)),
speed_limit=float(payload.get("speed_limit", 1000.0)),
temp_limit=float(payload.get("temp_limit", 85.0)),
rotation_freq=float(payload.get("rotation_freq", 3.0)),
rotation_mod=float(payload.get("rotation_mod", 0.35)),
rotation_ramp_ms=int(payload.get("rotation_ramp_ms", 3000)),
polarity=int(payload.get("polarity", ad.DEFAULT_POLARITY)),
timing=str(payload.get("timing", "center")),
motor=str(payload.get("motor", "ad")),
pole_pairs=float(payload.get("pole_pairs", ad.DEFAULT_POLE_PAIRS)),
phase_shunt_ohm=float(payload.get("phase_shunt_ohm", ad.DEFAULT_PHASE_SHUNT_OHM)),
locked_rotor=bool(payload.get("locked_rotor", False)),
)
mode, locked_values, values, ramp_value, pole_pairs_value, phase_shunt_value = ad.start_values(args)
with state.lock:
client = state.require_client()
stop_supported = stop_compatible(client)
reset_supported = try_write_optional_single(client, 0x0004, 1)
client.write_single(0x0006, locked_values[0])
client.write_multiple(0x0007, values[:9])
client.write_multiple(0x001A, values[9:])
ramp_supported = try_write_optional_ramp(client, ramp_value)
pole_pairs_supported = try_write_optional_single(client, 0x0040, pole_pairs_value)
phase_shunt_supported = try_write_optional_single(client, 0x0041, phase_shunt_value)
client.write_single(0x0003, mode)
client.write_single(0x0002, 1)
self.send_json(
{
"ok": True,
"mode": mode,
"mode_name": ad.MODES.get(mode, "unknown"),
"ramp_supported": ramp_supported,
"warning": optional_register_warning(
stop_supported,
reset_supported,
ramp_supported,
pole_pairs_supported,
phase_shunt_supported,
),
}
)
elif parsed.path == "/api/apply-run-control":
mode_text = str(payload.get("mode", "logging")).split()[0]
args = argparse.Namespace(
mode=mode_text,
duty=float(payload.get("duty", 0.08)),
current_limit=float(payload.get("current_limit", 10.0)),
overvoltage=float(payload.get("overvoltage", 60.0)),
undervoltage=float(payload.get("undervoltage", 0.0)),
speed_limit=float(payload.get("speed_limit", 1000.0)),
temp_limit=float(payload.get("temp_limit", 85.0)),
rotation_freq=float(payload.get("rotation_freq", 3.0)),
rotation_mod=float(payload.get("rotation_mod", 0.35)),
rotation_ramp_ms=int(payload.get("rotation_ramp_ms", 3000)),
polarity=int(payload.get("polarity", ad.DEFAULT_POLARITY)),
timing=str(payload.get("timing", "center")),
motor=str(payload.get("motor", "ad")),
pole_pairs=float(payload.get("pole_pairs", ad.DEFAULT_POLE_PAIRS)),
phase_shunt_ohm=float(payload.get("phase_shunt_ohm", ad.DEFAULT_PHASE_SHUNT_OHM)),
locked_rotor=bool(payload.get("locked_rotor", False)),
)
mode, locked_values, values, ramp_value, pole_pairs_value, phase_shunt_value = ad.start_values(args)
with state.lock:
client = state.require_client()
client.write_single(0x0006, locked_values[0])
client.write_multiple(0x0007, values[:9])
client.write_multiple(0x001A, values[9:])
ramp_supported = try_write_optional_ramp(client, ramp_value)
pole_pairs_supported = try_write_optional_single(client, 0x0040, pole_pairs_value)
phase_shunt_supported = try_write_optional_single(client, 0x0041, phase_shunt_value)
client.write_single(0x0003, mode)
registers = []
registers.extend(register_payload(0x0003, [mode]))
registers.extend(register_payload(0x0006, locked_values))
registers.extend(register_payload(0x0007, values[:9]))
registers.extend(register_payload(0x001A, values[9:]))
if ramp_supported:
registers.extend(register_payload(0x001E, [ramp_value]))
if pole_pairs_supported:
registers.extend(register_payload(0x0040, [pole_pairs_value]))
if phase_shunt_supported:
registers.extend(register_payload(0x0041, [phase_shunt_value]))
self.send_json(
{
"ok": True,
"mode": mode,
"mode_name": ad.MODES.get(mode, "unknown"),
"ramp_supported": ramp_supported,
"warning": optional_register_warning(
True,
True,
ramp_supported,
pole_pairs_supported,
phase_shunt_supported,
),
"registers": registers,
}
)
elif parsed.path == "/api/stop":
with state.lock:
stop_supported = stop_compatible(state.require_client())
self.send_json(
{
"ok": True,
"stop_supported": stop_supported,
"registers": register_payload(0x0002, [0]),
"warning": None if stop_supported else "0x0005 stop register is not supported by firmware; used control_enable=0",
}
)
elif parsed.path == "/api/reset":
with state.lock:
reset_supported = try_write_optional_single(state.require_client(), 0x0004, 1)
self.send_json(
{
"ok": True,
"reset_supported": reset_supported,
"warning": None if reset_supported else "0x0004 reset register is not supported by firmware",
}
)
elif parsed.path == "/api/reset-current-peaks":
with state.lock:
reset_supported = try_write_optional_single(state.require_client(), 0x001F, 1)
self.send_json(
{
"ok": True,
"reset_supported": reset_supported,
"warning": None if reset_supported else "0x001F current peak reset register is not supported by firmware",
}
)
else:
self.send_error_json(Exception("Not found"), HTTPStatus.NOT_FOUND)
except Exception as exc:
self.send_error_json(exc)
return Handler
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="AD project browser GUI.")
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=DEFAULT_WEB_PORT)
parser.add_argument("--no-open", action="store_true", help="do not open the browser")
parser.add_argument("--auth-user", default=os.environ.get("AD_GUI_USER", DEFAULT_AUTH_USER))
parser.add_argument("--password", default=os.environ.get("AD_GUI_PASSWORD"), help="enable HTTP Basic auth")
return parser
def main(argv: list[str] | None = None) -> int:
args = build_parser().parse_args(argv)
state = ServerState()
server = ThreadingHTTPServer((args.host, args.port), make_handler(state, args.auth_user, args.password))
host, port = server.server_address
url = f"http://{host}:{port}/"
print(f"AD GUI running at {url}")
if args.password:
print(f"Authentication enabled. User: {args.auth_user}")
print("Press Ctrl+C to stop.")
if not args.no_open:
webbrowser.open(url)
try:
server.serve_forever()
except KeyboardInterrupt:
print()
finally:
state.disconnect()
server.server_close()
return 0
if __name__ == "__main__":
raise SystemExit(main())