from PySide6.QtWidgets import ( QApplication, QMainWindow, QWidget, QLabel, QLineEdit, QComboBox, QPushButton, QVBoxLayout, QHBoxLayout, QFormLayout, QGroupBox, QMessageBox ) from PySide6.QtCore import Qt, QThread, Signal import serial.tools.list_ports import json import os from socket_server import TcpServer class ServerThread(QThread): error_occurred = Signal(str) def __init__(self, config): super().__init__() self.config = config self.server = None def run(self): try: QApplication.instance().quit() self.server = TcpServer( server_config={ "host": self.config['socket_host'], "port": self.config['socket_port'] }, modbus_config={ "com_port": self.config["com_port"], "baud_rate": self.config["baud_rate"] } ) self.server.start() except Exception as e: self.error_occurred.emit(str(e)) def stop(self): if self.server: self.server.stop() class ConfigWindow(QMainWindow): def __init__(self): super().__init__() self.setWindowTitle("Communication Server Configuration") self.setMinimumWidth(400) self.server_thread = None # Load existing config self.config = self.load_config() # Create central widget and main layout central_widget = QWidget() self.setCentralWidget(central_widget) main_layout = QVBoxLayout(central_widget) # Create socket server group socket_group = QGroupBox("Socket Server Configuration") socket_layout = QFormLayout() # Socket server widgets self.host_edit = QLineEdit(self.config.get('socket_host', '0.0.0.0')) self.port_edit = QLineEdit(str(self.config.get('socket_port', 5000))) socket_layout.addRow("Host:", self.host_edit) socket_layout.addRow("Port:", self.port_edit) socket_group.setLayout(socket_layout) # Create serial port group serial_group = QGroupBox("Serial Port Configuration") serial_layout = QFormLayout() # Serial port widgets self.com_combo = QComboBox() self.refresh_ports() self.baud_combo = QComboBox() self.baud_combo.addItems(['9600', '19200', '38400', '57600', '115200']) self.baud_combo.setCurrentText(str(self.config.get('baud_rate', '9600'))) self.data_bits_combo = QComboBox() self.data_bits_combo.addItems(['7', '8']) self.data_bits_combo.setCurrentText(str(self.config.get('data_bits', '8'))) self.parity_combo = QComboBox() self.parity_combo.addItems(['N', 'E', 'O']) self.parity_combo.setCurrentText(self.config.get('parity', 'N')) self.stop_bits_combo = QComboBox() self.stop_bits_combo.addItems(['1', '2']) self.stop_bits_combo.setCurrentText(str(self.config.get('stop_bits', '1'))) serial_layout.addRow("COM Port:", self.com_combo) serial_layout.addRow("Baud Rate:", self.baud_combo) serial_layout.addRow("Data Bits:", self.data_bits_combo) serial_layout.addRow("Parity:", self.parity_combo) serial_layout.addRow("Stop Bits:", self.stop_bits_combo) serial_group.setLayout(serial_layout) # Create button layout button_layout = QHBoxLayout() self.save_button = QPushButton("Save") self.save_button.clicked.connect(self.save_config) self.refresh_button = QPushButton("Refresh Ports") self.refresh_button.clicked.connect(self.refresh_ports) self.start_button = QPushButton("Start Server") self.start_button.clicked.connect(self.toggle_server) button_layout.addWidget(self.save_button) button_layout.addWidget(self.refresh_button) button_layout.addWidget(self.start_button) # Add status label self.status_label = QLabel("Server Status: Stopped") self.status_label.setAlignment(Qt.AlignCenter) # Add all widgets to main layout main_layout.addWidget(socket_group) main_layout.addWidget(serial_group) main_layout.addLayout(button_layout) main_layout.addWidget(self.status_label) # Set some spacing main_layout.setSpacing(20) main_layout.setContentsMargins(20, 20, 20, 20) def toggle_server(self): if self.server_thread is None or not self.server_thread.isRunning(): self.start_server() else: self.stop_server() def start_server(self): try: # Save current configuration before starting server self.save_config() # Create and start server thread self.server_thread = ServerThread(self.config) self.server_thread.error_occurred.connect(self.handle_server_error) self.server_thread.start() # Update UI self.start_button.setText("Stop Server") self.status_label.setText("Server Status: Running") self.disable_config_inputs(True) except Exception as e: QMessageBox.critical(self, "Error", f"Failed to start server: {str(e)}") def stop_server(self): if self.server_thread: self.server_thread.stop() self.server_thread.wait() self.server_thread = None # Update UI self.start_button.setText("Start Server") self.status_label.setText("Server Status: Stopped") self.disable_config_inputs(False) def handle_server_error(self, error_msg): QMessageBox.critical(self, "Server Error", error_msg) self.stop_server() def disable_config_inputs(self, disabled): """Disable/enable configuration inputs while server is running""" self.host_edit.setEnabled(not disabled) self.port_edit.setEnabled(not disabled) self.com_combo.setEnabled(not disabled) self.baud_combo.setEnabled(not disabled) self.data_bits_combo.setEnabled(not disabled) self.parity_combo.setEnabled(not disabled) self.stop_bits_combo.setEnabled(not disabled) self.save_button.setEnabled(not disabled) self.refresh_button.setEnabled(not disabled) def refresh_ports(self): """Refresh the list of available COM ports""" current_port = self.com_combo.currentText() self.com_combo.clear() ports = [port.device for port in serial.tools.list_ports.comports()] self.com_combo.addItems(ports) if ports: if current_port in ports: self.com_combo.setCurrentText(current_port) else: self.com_combo.setCurrentText(ports[0]) def load_config(self): """Load configuration from file""" config_file = 'server_config.json' if os.path.exists(config_file): try: with open(config_file, 'r') as f: return json.load(f) except: return {} return {} def save_config(self): """Save configuration to file""" try: self.config = { 'socket_host': self.host_edit.text(), 'socket_port': int(self.port_edit.text()), 'com_port': self.com_combo.currentText(), 'baud_rate': int(self.baud_combo.currentText()), 'data_bits': int(self.data_bits_combo.currentText()), 'parity': self.parity_combo.currentText(), 'stop_bits': int(self.stop_bits_combo.currentText()) } with open('server_config.json', 'w') as f: json.dump(self.config, f, indent=4) QMessageBox.information(self, "Success", "Configuration saved successfully!") return True except ValueError as e: QMessageBox.warning(self, "Error", "Invalid port number!") except Exception as e: QMessageBox.critical(self, "Error", f"Failed to save configuration: {str(e)}") return False def closeEvent(self, event): """Handle application closing""" if self.server_thread and self.server_thread.isRunning(): self.stop_server() event.accept() def main(): app = QApplication([]) # Set application style app.setStyle('Fusion') window = ConfigWindow() window.show() app.exec() if __name__ == "__main__": main()