Files
PySetTerminal/rawprotocol.py
Razvalyaev c287274588 init commit
Работа с сериальной шиной и макросами
2026-02-10 11:32:38 +03:00

350 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import serial
import serial.tools.list_ports
import time
from PySide2.QtWidgets import *
from PySide2.QtCore import *
from PySide2.QtGui import *
class RawProtocol:
"""НИЗКОУРОВНЕВЫЙ ПРОТОКОЛ PEEK/POKE с полной реализацией протокола контроллера"""
def __init__(self):
self.serial_port = None
self.is_connected = False
# Коды команд из кода контроллера
self.CMD_PEEK = 56 # CMD_RS232_PEEK
self.CMD_POKE = 57 # CMD_RS232_POKE
def connect(self, port_name, baudrate=115200):
"""Подключение к COM порту"""
try:
self.serial_port = serial.Serial(
port=port_name,
baudrate=baudrate,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=1
)
self.is_connected = True
return True
except Exception as e:
print(f"Ошибка подключения: {e}")
return False
def disconnect(self):
"""Отключение от COM порта"""
if self.serial_port and self.serial_port.is_open:
self.serial_port.close()
self.is_connected = False
def calculate_crc(self, data, init=0xFFFF):
"""CRC16-IBM (aka CRC-16/ANSI, polynomial 0xA001 reflected)."""
crc = init
for b in data:
crc ^= b
for _ in range(8):
if crc & 1:
crc = (crc >> 1) ^ 0xA001
else:
crc >>= 1
return crc & 0xFFFF
def send_command(self, address, command, data_bytes):
"""Отправка команды по протоколу контроллера"""
if not self.is_connected:
return None
try:
# Формируем сообщение без CRC
message = bytearray()
message.append(address) # Адрес устройства
message.append(command) # Код команды
message.extend(data_bytes) # Данные команды
# Добавляем CRC
crc = self.calculate_crc(message)
message.extend(crc.to_bytes(2, 'little'))
# Отправляем
self.serial_port.write(message)
self.serial_port.flush()
# Формируем hex строку с пробелами между байтами
hex_str = ' '.join(f'{b:02X}' for b in message)
print(f"Отправлено: {hex_str}")
return message
except Exception as e:
print(f"Ошибка отправки команды: {e}")
return None
def validate_response(self, response_bytes):
"""Проверка валидности ответа от контроллера"""
if len(response_bytes) < 4: # минимум адрес+команда+CRC
return False, "Слишком короткий ответ"
# Проверяем CRC
received_crc = int.from_bytes(response_bytes[-2:], 'little')
calculated_crc = self.calculate_crc(response_bytes[:-2])
if received_crc != calculated_crc:
return False, f"Ошибка CRC: получено 0x{received_crc:04X}, ожидалось 0x{calculated_crc:04X}"
# Извлекаем данные из ответа
address = response_bytes[0]
command = response_bytes[1]
data = response_bytes[2:-2]
return True, {
'address': address,
'command': command,
'data': data,
'raw': response_bytes
}
def receive_response(self, timeout=1.0):
"""Прием ответа от контроллера"""
if not self.is_connected:
return None
try:
# Читаем данные с таймаутом
self.serial_port.timeout = timeout
response = self.serial_port.read(1024)
if response:
# Формируем hex строку с пробелами между байтами
hex_str = ' '.join(f'{b:02X}' for b in response)
print(f"Получено: {hex_str}")
return response
return None
except Exception as e:
print(f"Ошибка приема: {e}")
return None
def send_and_receive(self, address, command, data_bytes, timeout=1.0):
"""Отправить команду и получить ответ от TMS"""
# Отправляем команду
sent = self.send_command(address, command, data_bytes)
if not sent:
return False, "Не удалось отправить команду"
# Ждем ответ
time.sleep(0.01) # небольшая пауза
response = self.receive_response(timeout)
if not response:
return False, "Нет ответа от устройства"
# Проверяем ответ
return self.validate_response(response)
def poke(self, mem_address, data_value, device_address=1):
"""ПОЛНАЯ команда POKE - запись в память с CRC и анализом ответа"""
# Формируем данные команды POKE
data = bytearray()
data.extend(mem_address.to_bytes(4, 'little'))
data.extend(data_value.to_bytes(4, 'little'))
# Отправляем и получаем ответ
success, result = self.send_and_receive(device_address, self.CMD_POKE, data)
if success:
# Ответ успешен, проверяем что это ответ на POKE
response_data = result['data']
if len(response_data) >= 1:
# В ответе POKE обычно есть подтверждение
return True, result
return False, result
def peek(self, mem_address, device_address=1):
"""ПОЛНАЯ команда PEEK - чтение из памяти с CRC и анализом ответа"""
# Формируем данные команды PEEK
data = bytearray()
data.extend(mem_address.to_bytes(4, 'little'))
# Отправляем и получаем ответ
success, result = self.send_and_receive(device_address, self.CMD_PEEK, data)
if success:
# Извлекаем данные из ответа PEEK
response_data = result['data']
if len(response_data) >= 2:
# Предполагаем что данные идут первыми 2 байтами
read_value = int.from_bytes(response_data[:2], 'little')
return True, {'value': read_value, 'raw_response': result}
return False, result
def send_raw(self, hex_data):
"""Отправка сырых данных (для отладки)"""
try:
hex_str = hex_data.replace(" ", "")
data = bytes.fromhex(hex_str)
if self.is_connected and self.serial_port:
self.serial_port.write(data)
self.serial_port.flush()
return True, f"Отправлено: {hex_str}"
else:
return False, "Порт не подключен"
except ValueError:
return False, "Некорректные hex данные"
def receive_raw(self, max_bytes=1024):
"""Чтение сырых данных (для отладки)"""
if self.is_connected and self.serial_port:
data = self.serial_port.read(max_bytes)
if data:
return True, data.hex()
else:
return False, "Нет данных"
else:
return False, "Порт не подключен"
class RawProtocolWidget(QWidget):
"""Виджет для отладки RAW протокола (всегда виден внизу окна)"""
def __init__(self, raw_protocol):
super().__init__()
self.raw_protocol = raw_protocol
self.init_ui()
def init_ui(self):
layout = QVBoxLayout()
# Группа для ПРЯМОГО ДОСТУПА К ПАМЯТИ (PEEK/POKE)
mem_group = QGroupBox("Прямой доступ к памяти (PEEK/POKE)")
mem_layout = QGridLayout()
mem_layout.addWidget(QLabel("Адрес памяти (hex):"), 0, 0)
self.mem_addr_edit = QLineEdit("2000")
self.mem_addr_edit.setMaximumWidth(100)
mem_layout.addWidget(self.mem_addr_edit, 0, 1)
mem_layout.addWidget(QLabel("Данные (hex):"), 1, 0)
self.mem_data_edit = QLineEdit("0000")
self.mem_data_edit.setMaximumWidth(100)
mem_layout.addWidget(self.mem_data_edit, 1, 1)
self.mem_write_btn = QPushButton("Записать (POKE)")
self.mem_write_btn.clicked.connect(self.memory_write)
mem_layout.addWidget(self.mem_write_btn, 0, 2)
self.mem_read_btn = QPushButton("Прочитать (PEEK)")
self.mem_read_btn.clicked.connect(self.memory_read)
mem_layout.addWidget(self.mem_read_btn, 1, 2)
self.mem_result_label = QLabel("---")
mem_layout.addWidget(self.mem_result_label, 1, 3)
mem_group.setLayout(mem_layout)
layout.addWidget(mem_group)
# Группа для RAW отправки
raw_group = QGroupBox("RAW отправка/чтение")
raw_layout = QVBoxLayout()
self.raw_send_edit = QLineEdit()
self.raw_send_edit.setPlaceholderText("Введите hex данные для отправки")
raw_layout.addWidget(self.raw_send_edit)
hbox = QHBoxLayout()
self.raw_send_btn = QPushButton("Отправить RAW")
self.raw_send_btn.clicked.connect(self.send_raw_data)
hbox.addWidget(self.raw_send_btn)
self.raw_read_btn = QPushButton("Прочитать RAW")
self.raw_read_btn.clicked.connect(self.read_raw_data)
hbox.addWidget(self.raw_read_btn)
raw_layout.addLayout(hbox)
self.raw_output = QTextEdit()
self.raw_output.setReadOnly(True)
self.raw_output.setMaximumHeight(100)
raw_layout.addWidget(self.raw_output)
self.raw_clear_btn = QPushButton("Очистить логи")
self.raw_clear_btn.clicked.connect(self.clear_logs)
raw_layout.addWidget(self.raw_clear_btn)
raw_group.setLayout(raw_layout)
layout.addWidget(raw_group)
self.setLayout(layout)
def memory_write(self):
"""Запись в память (POKE)"""
try:
device_addr = 10
mem_addr = int(self.mem_addr_edit.text(), 16)
data = int(self.mem_data_edit.text(), 16)
success, result = self.raw_protocol.poke(mem_addr, data, device_addr)
if success:
self.raw_output.append(f"POKE 0x{mem_addr:04X} = 0x{data:04X}")
self.raw_output.append(f"Ответ: {result['raw'].hex()}")
else:
self.raw_output.append(f"Ошибка POKE: {result}")
except ValueError as e:
self.raw_output.append(f"Ошибка в данных: {e}")
def memory_read(self):
"""Чтение из памяти (PEEK)"""
try:
device_addr = 10
mem_addr = int(self.mem_addr_edit.text(), 16)
success, result = self.raw_protocol.peek(mem_addr, device_addr)
if success:
value = result['value']
self.mem_result_label.setText(f"0x{value:04X} ({value})")
self.raw_output.append(f"PEEK 0x{mem_addr:04X} = 0x{value:04X}")
self.raw_output.append(f"Ответ: {result['raw_response']['raw'].hex()}")
else:
self.mem_result_label.setText(f"Ошибка: {result}")
self.raw_output.append(f"Ошибка PEEK: {result}")
except ValueError as e:
self.mem_result_label.setText(f"Ошибка: {e}")
self.raw_output.append(f"Ошибка в данных: {e}")
def send_raw_data(self):
"""Отправка сырых данных"""
hex_str = self.raw_send_edit.text()
success, result = self.raw_protocol.send_raw(hex_str)
if success:
self.raw_output.append(result)
else:
self.raw_output.append(f"Ошибка: {result}")
def read_raw_data(self):
"""Чтение сырых данных"""
success, result = self.raw_protocol.receive_raw()
if success:
self.raw_output.append(f"Получено: {result}")
else:
self.raw_output.append(f"Ошибка: {result}")
def append_text(self, text):
"""Добавить текст в вывод (используется SerialTab)"""
if text:
self.raw_output.append(text)
def clear_logs(self):
"""Очистить лог"""
self.raw_output.clear()