import serial import serial.tools.list_ports import time from PySide2.QtWidgets import * from PySide2.QtCore import * from PySide2.QtGui import * class RawProtocol: """НИЗКОУРОВНЕВЫЙ ПРОТОКОЛ PEEK/POKE с полной реализацией протокола контроллера""" def __init__(self): self.serial_port = None self.is_connected = False # Коды команд из кода контроллера self.CMD_PEEK = 56 # CMD_RS232_PEEK self.CMD_POKE = 57 # CMD_RS232_POKE def connect(self, port_name, baudrate=115200): """Подключение к COM порту""" try: self.serial_port = serial.Serial( port=port_name, baudrate=baudrate, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=1 ) self.is_connected = True return True except Exception as e: print(f"Ошибка подключения: {e}") return False def disconnect(self): """Отключение от COM порта""" if self.serial_port and self.serial_port.is_open: self.serial_port.close() self.is_connected = False def calculate_crc(self, data, init=0xFFFF): """CRC16-IBM (aka CRC-16/ANSI, polynomial 0xA001 reflected).""" crc = init for b in data: crc ^= b for _ in range(8): if crc & 1: crc = (crc >> 1) ^ 0xA001 else: crc >>= 1 return crc & 0xFFFF def send_command(self, address, command, data_bytes): """Отправка команды по протоколу контроллера""" if not self.is_connected: return None try: # Формируем сообщение без CRC message = bytearray() message.append(address) # Адрес устройства message.append(command) # Код команды message.extend(data_bytes) # Данные команды # Добавляем CRC crc = self.calculate_crc(message) message.extend(crc.to_bytes(2, 'little')) # Отправляем self.serial_port.write(message) self.serial_port.flush() # Формируем hex строку с пробелами между байтами hex_str = ' '.join(f'{b:02X}' for b in message) print(f"Отправлено: {hex_str}") return message except Exception as e: print(f"Ошибка отправки команды: {e}") return None def validate_response(self, response_bytes): """Проверка валидности ответа от контроллера""" if len(response_bytes) < 4: # минимум адрес+команда+CRC return False, "Слишком короткий ответ" # Проверяем CRC received_crc = int.from_bytes(response_bytes[-2:], 'little') calculated_crc = self.calculate_crc(response_bytes[:-2]) if received_crc != calculated_crc: return False, f"Ошибка CRC: получено 0x{received_crc:04X}, ожидалось 0x{calculated_crc:04X}" # Извлекаем данные из ответа address = response_bytes[0] command = response_bytes[1] data = response_bytes[2:-2] return True, { 'address': address, 'command': command, 'data': data, 'raw': response_bytes } def receive_response(self, timeout=1.0): """Прием ответа от контроллера""" if not self.is_connected: return None try: # Читаем данные с таймаутом self.serial_port.timeout = timeout response = self.serial_port.read(1024) if response: # Формируем hex строку с пробелами между байтами hex_str = ' '.join(f'{b:02X}' for b in response) print(f"Получено: {hex_str}") return response return None except Exception as e: print(f"Ошибка приема: {e}") return None def send_and_receive(self, address, command, data_bytes, timeout=1.0): """Отправить команду и получить ответ от TMS""" # Отправляем команду sent = self.send_command(address, command, data_bytes) if not sent: return False, "Не удалось отправить команду" # Ждем ответ time.sleep(0.01) # небольшая пауза response = self.receive_response(timeout) if not response: return False, "Нет ответа от устройства" # Проверяем ответ return self.validate_response(response) def poke(self, mem_address, data_value, device_address=1): """ПОЛНАЯ команда POKE - запись в память с CRC и анализом ответа""" # Формируем данные команды POKE data = bytearray() data.extend(mem_address.to_bytes(4, 'little')) data.extend(data_value.to_bytes(4, 'little')) # Отправляем и получаем ответ success, result = self.send_and_receive(device_address, self.CMD_POKE, data) if success: # Ответ успешен, проверяем что это ответ на POKE response_data = result['data'] if len(response_data) >= 1: # В ответе POKE обычно есть подтверждение return True, result return False, result def peek(self, mem_address, device_address=1): """ПОЛНАЯ команда PEEK - чтение из памяти с CRC и анализом ответа""" # Формируем данные команды PEEK data = bytearray() data.extend(mem_address.to_bytes(4, 'little')) # Отправляем и получаем ответ success, result = self.send_and_receive(device_address, self.CMD_PEEK, data) if success: # Извлекаем данные из ответа PEEK response_data = result['data'] if len(response_data) >= 2: # Предполагаем что данные идут первыми 2 байтами read_value = int.from_bytes(response_data[:2], 'little') return True, {'value': read_value, 'raw_response': result} return False, result def send_raw(self, hex_data): """Отправка сырых данных (для отладки)""" try: hex_str = hex_data.replace(" ", "") data = bytes.fromhex(hex_str) if self.is_connected and self.serial_port: self.serial_port.write(data) self.serial_port.flush() return True, f"Отправлено: {hex_str}" else: return False, "Порт не подключен" except ValueError: return False, "Некорректные hex данные" def receive_raw(self, max_bytes=1024): """Чтение сырых данных (для отладки)""" if self.is_connected and self.serial_port: data = self.serial_port.read(max_bytes) if data: return True, data.hex() else: return False, "Нет данных" else: return False, "Порт не подключен" class RawProtocolWidget(QWidget): """Виджет для отладки RAW протокола (всегда виден внизу окна)""" def __init__(self, raw_protocol): super().__init__() self.raw_protocol = raw_protocol self.init_ui() def init_ui(self): layout = QVBoxLayout() # Группа для ПРЯМОГО ДОСТУПА К ПАМЯТИ (PEEK/POKE) mem_group = QGroupBox("Прямой доступ к памяти (PEEK/POKE)") mem_layout = QGridLayout() mem_layout.addWidget(QLabel("Адрес памяти (hex):"), 0, 0) self.mem_addr_edit = QLineEdit("2000") self.mem_addr_edit.setMaximumWidth(100) mem_layout.addWidget(self.mem_addr_edit, 0, 1) mem_layout.addWidget(QLabel("Данные (hex):"), 1, 0) self.mem_data_edit = QLineEdit("0000") self.mem_data_edit.setMaximumWidth(100) mem_layout.addWidget(self.mem_data_edit, 1, 1) self.mem_write_btn = QPushButton("Записать (POKE)") self.mem_write_btn.clicked.connect(self.memory_write) mem_layout.addWidget(self.mem_write_btn, 0, 2) self.mem_read_btn = QPushButton("Прочитать (PEEK)") self.mem_read_btn.clicked.connect(self.memory_read) mem_layout.addWidget(self.mem_read_btn, 1, 2) self.mem_result_label = QLabel("---") mem_layout.addWidget(self.mem_result_label, 1, 3) mem_group.setLayout(mem_layout) layout.addWidget(mem_group) # Группа для RAW отправки raw_group = QGroupBox("RAW отправка/чтение") raw_layout = QVBoxLayout() self.raw_send_edit = QLineEdit() self.raw_send_edit.setPlaceholderText("Введите hex данные для отправки") raw_layout.addWidget(self.raw_send_edit) hbox = QHBoxLayout() self.raw_send_btn = QPushButton("Отправить RAW") self.raw_send_btn.clicked.connect(self.send_raw_data) hbox.addWidget(self.raw_send_btn) self.raw_read_btn = QPushButton("Прочитать RAW") self.raw_read_btn.clicked.connect(self.read_raw_data) hbox.addWidget(self.raw_read_btn) raw_layout.addLayout(hbox) self.raw_output = QTextEdit() self.raw_output.setReadOnly(True) self.raw_output.setMaximumHeight(100) raw_layout.addWidget(self.raw_output) self.raw_clear_btn = QPushButton("Очистить логи") self.raw_clear_btn.clicked.connect(self.clear_logs) raw_layout.addWidget(self.raw_clear_btn) raw_group.setLayout(raw_layout) layout.addWidget(raw_group) self.setLayout(layout) def memory_write(self): """Запись в память (POKE)""" try: device_addr = 10 mem_addr = int(self.mem_addr_edit.text(), 16) data = int(self.mem_data_edit.text(), 16) success, result = self.raw_protocol.poke(mem_addr, data, device_addr) if success: self.raw_output.append(f"POKE 0x{mem_addr:04X} = 0x{data:04X}") self.raw_output.append(f"Ответ: {result['raw'].hex()}") else: self.raw_output.append(f"Ошибка POKE: {result}") except ValueError as e: self.raw_output.append(f"Ошибка в данных: {e}") def memory_read(self): """Чтение из памяти (PEEK)""" try: device_addr = 10 mem_addr = int(self.mem_addr_edit.text(), 16) success, result = self.raw_protocol.peek(mem_addr, device_addr) if success: value = result['value'] self.mem_result_label.setText(f"0x{value:04X} ({value})") self.raw_output.append(f"PEEK 0x{mem_addr:04X} = 0x{value:04X}") self.raw_output.append(f"Ответ: {result['raw_response']['raw'].hex()}") else: self.mem_result_label.setText(f"Ошибка: {result}") self.raw_output.append(f"Ошибка PEEK: {result}") except ValueError as e: self.mem_result_label.setText(f"Ошибка: {e}") self.raw_output.append(f"Ошибка в данных: {e}") def send_raw_data(self): """Отправка сырых данных""" hex_str = self.raw_send_edit.text() success, result = self.raw_protocol.send_raw(hex_str) if success: self.raw_output.append(result) else: self.raw_output.append(f"Ошибка: {result}") def read_raw_data(self): """Чтение сырых данных""" success, result = self.raw_protocol.receive_raw() if success: self.raw_output.append(f"Получено: {result}") else: self.raw_output.append(f"Ошибка: {result}") def append_text(self, text): """Добавить текст в вывод (используется SerialTab)""" if text: self.raw_output.append(text) def clear_logs(self): """Очистить лог""" self.raw_output.clear()