app_socket_server/config_gui.py
cuijingwei@brisonus.com 8493db2dae 首次提交
2025-02-21 16:00:12 +08:00

250 lines
8.8 KiB
Python

from PySide6.QtWidgets import (
QApplication, QMainWindow, QWidget, QLabel,
QLineEdit, QComboBox, QPushButton, QVBoxLayout,
QHBoxLayout, QFormLayout, QGroupBox, QMessageBox
)
from PySide6.QtCore import Qt, QThread, Signal
import serial.tools.list_ports
import json
import os
from socket_server import TcpServer
class ServerThread(QThread):
error_occurred = Signal(str)
def __init__(self, config):
super().__init__()
self.config = config
self.server = None
def run(self):
try:
QApplication.instance().quit()
self.server = TcpServer(
server_config={
"host": self.config['socket_host'],
"port": self.config['socket_port']
},
modbus_config={
"com_port": self.config["com_port"],
"baud_rate": self.config["baud_rate"]
}
)
self.server.start()
except Exception as e:
self.error_occurred.emit(str(e))
def stop(self):
if self.server:
self.server.stop()
class ConfigWindow(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("Communication Server Configuration")
self.setMinimumWidth(400)
self.server_thread = None
# Load existing config
self.config = self.load_config()
# Create central widget and main layout
central_widget = QWidget()
self.setCentralWidget(central_widget)
main_layout = QVBoxLayout(central_widget)
# Create socket server group
socket_group = QGroupBox("Socket Server Configuration")
socket_layout = QFormLayout()
# Socket server widgets
self.host_edit = QLineEdit(self.config.get('socket_host', '0.0.0.0'))
self.port_edit = QLineEdit(str(self.config.get('socket_port', 5000)))
socket_layout.addRow("Host:", self.host_edit)
socket_layout.addRow("Port:", self.port_edit)
socket_group.setLayout(socket_layout)
# Create serial port group
serial_group = QGroupBox("Serial Port Configuration")
serial_layout = QFormLayout()
# Serial port widgets
self.com_combo = QComboBox()
self.refresh_ports()
self.baud_combo = QComboBox()
self.baud_combo.addItems(['9600', '19200', '38400', '57600', '115200'])
self.baud_combo.setCurrentText(str(self.config.get('baud_rate', '9600')))
self.data_bits_combo = QComboBox()
self.data_bits_combo.addItems(['7', '8'])
self.data_bits_combo.setCurrentText(str(self.config.get('data_bits', '8')))
self.parity_combo = QComboBox()
self.parity_combo.addItems(['N', 'E', 'O'])
self.parity_combo.setCurrentText(self.config.get('parity', 'N'))
self.stop_bits_combo = QComboBox()
self.stop_bits_combo.addItems(['1', '2'])
self.stop_bits_combo.setCurrentText(str(self.config.get('stop_bits', '1')))
serial_layout.addRow("COM Port:", self.com_combo)
serial_layout.addRow("Baud Rate:", self.baud_combo)
serial_layout.addRow("Data Bits:", self.data_bits_combo)
serial_layout.addRow("Parity:", self.parity_combo)
serial_layout.addRow("Stop Bits:", self.stop_bits_combo)
serial_group.setLayout(serial_layout)
# Create button layout
button_layout = QHBoxLayout()
self.save_button = QPushButton("Save")
self.save_button.clicked.connect(self.save_config)
self.refresh_button = QPushButton("Refresh Ports")
self.refresh_button.clicked.connect(self.refresh_ports)
self.start_button = QPushButton("Start Server")
self.start_button.clicked.connect(self.toggle_server)
button_layout.addWidget(self.save_button)
button_layout.addWidget(self.refresh_button)
button_layout.addWidget(self.start_button)
# Add status label
self.status_label = QLabel("Server Status: Stopped")
self.status_label.setAlignment(Qt.AlignCenter)
# Add all widgets to main layout
main_layout.addWidget(socket_group)
main_layout.addWidget(serial_group)
main_layout.addLayout(button_layout)
main_layout.addWidget(self.status_label)
# Set some spacing
main_layout.setSpacing(20)
main_layout.setContentsMargins(20, 20, 20, 20)
def toggle_server(self):
if self.server_thread is None or not self.server_thread.isRunning():
self.start_server()
else:
self.stop_server()
def start_server(self):
try:
# Save current configuration before starting server
self.save_config()
# Create and start server thread
self.server_thread = ServerThread(self.config)
self.server_thread.error_occurred.connect(self.handle_server_error)
self.server_thread.start()
# Update UI
self.start_button.setText("Stop Server")
self.status_label.setText("Server Status: Running")
self.disable_config_inputs(True)
except Exception as e:
QMessageBox.critical(self, "Error", f"Failed to start server: {str(e)}")
def stop_server(self):
if self.server_thread:
self.server_thread.stop()
self.server_thread.wait()
self.server_thread = None
# Update UI
self.start_button.setText("Start Server")
self.status_label.setText("Server Status: Stopped")
self.disable_config_inputs(False)
def handle_server_error(self, error_msg):
QMessageBox.critical(self, "Server Error", error_msg)
self.stop_server()
def disable_config_inputs(self, disabled):
"""Disable/enable configuration inputs while server is running"""
self.host_edit.setEnabled(not disabled)
self.port_edit.setEnabled(not disabled)
self.com_combo.setEnabled(not disabled)
self.baud_combo.setEnabled(not disabled)
self.data_bits_combo.setEnabled(not disabled)
self.parity_combo.setEnabled(not disabled)
self.stop_bits_combo.setEnabled(not disabled)
self.save_button.setEnabled(not disabled)
self.refresh_button.setEnabled(not disabled)
def refresh_ports(self):
"""Refresh the list of available COM ports"""
current_port = self.com_combo.currentText()
self.com_combo.clear()
ports = [port.device for port in serial.tools.list_ports.comports()]
self.com_combo.addItems(ports)
if ports:
if current_port in ports:
self.com_combo.setCurrentText(current_port)
else:
self.com_combo.setCurrentText(ports[0])
def load_config(self):
"""Load configuration from file"""
config_file = 'server_config.json'
if os.path.exists(config_file):
try:
with open(config_file, 'r') as f:
return json.load(f)
except:
return {}
return {}
def save_config(self):
"""Save configuration to file"""
try:
self.config = {
'socket_host': self.host_edit.text(),
'socket_port': int(self.port_edit.text()),
'com_port': self.com_combo.currentText(),
'baud_rate': int(self.baud_combo.currentText()),
'data_bits': int(self.data_bits_combo.currentText()),
'parity': self.parity_combo.currentText(),
'stop_bits': int(self.stop_bits_combo.currentText())
}
with open('server_config.json', 'w') as f:
json.dump(self.config, f, indent=4)
QMessageBox.information(self, "Success", "Configuration saved successfully!")
return True
except ValueError as e:
QMessageBox.warning(self, "Error", "Invalid port number!")
except Exception as e:
QMessageBox.critical(self, "Error", f"Failed to save configuration: {str(e)}")
return False
def closeEvent(self, event):
"""Handle application closing"""
if self.server_thread and self.server_thread.isRunning():
self.stop_server()
event.accept()
def main():
app = QApplication([])
# Set application style
app.setStyle('Fusion')
window = ConfigWindow()
window.show()
app.exec()
if __name__ == "__main__":
main()