250 lines
8.8 KiB
Python
250 lines
8.8 KiB
Python
from PySide6.QtWidgets import (
|
|
QApplication, QMainWindow, QWidget, QLabel,
|
|
QLineEdit, QComboBox, QPushButton, QVBoxLayout,
|
|
QHBoxLayout, QFormLayout, QGroupBox, QMessageBox
|
|
)
|
|
from PySide6.QtCore import Qt, QThread, Signal
|
|
import serial.tools.list_ports
|
|
import json
|
|
import os
|
|
from socket_server import TcpServer
|
|
|
|
|
|
class ServerThread(QThread):
|
|
error_occurred = Signal(str)
|
|
|
|
def __init__(self, config):
|
|
super().__init__()
|
|
self.config = config
|
|
self.server = None
|
|
|
|
def run(self):
|
|
try:
|
|
QApplication.instance().quit()
|
|
self.server = TcpServer(
|
|
server_config={
|
|
"host": self.config['socket_host'],
|
|
"port": self.config['socket_port']
|
|
},
|
|
modbus_config={
|
|
"com_port": self.config["com_port"],
|
|
"baud_rate": self.config["baud_rate"]
|
|
}
|
|
)
|
|
self.server.start()
|
|
except Exception as e:
|
|
self.error_occurred.emit(str(e))
|
|
|
|
def stop(self):
|
|
if self.server:
|
|
self.server.stop()
|
|
|
|
class ConfigWindow(QMainWindow):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.setWindowTitle("Communication Server Configuration")
|
|
self.setMinimumWidth(400)
|
|
|
|
self.server_thread = None
|
|
|
|
# Load existing config
|
|
self.config = self.load_config()
|
|
|
|
# Create central widget and main layout
|
|
central_widget = QWidget()
|
|
self.setCentralWidget(central_widget)
|
|
main_layout = QVBoxLayout(central_widget)
|
|
|
|
# Create socket server group
|
|
socket_group = QGroupBox("Socket Server Configuration")
|
|
socket_layout = QFormLayout()
|
|
|
|
# Socket server widgets
|
|
self.host_edit = QLineEdit(self.config.get('socket_host', '0.0.0.0'))
|
|
self.port_edit = QLineEdit(str(self.config.get('socket_port', 5000)))
|
|
|
|
socket_layout.addRow("Host:", self.host_edit)
|
|
socket_layout.addRow("Port:", self.port_edit)
|
|
socket_group.setLayout(socket_layout)
|
|
|
|
# Create serial port group
|
|
serial_group = QGroupBox("Serial Port Configuration")
|
|
serial_layout = QFormLayout()
|
|
|
|
# Serial port widgets
|
|
self.com_combo = QComboBox()
|
|
self.refresh_ports()
|
|
|
|
self.baud_combo = QComboBox()
|
|
self.baud_combo.addItems(['9600', '19200', '38400', '57600', '115200'])
|
|
self.baud_combo.setCurrentText(str(self.config.get('baud_rate', '9600')))
|
|
|
|
self.data_bits_combo = QComboBox()
|
|
self.data_bits_combo.addItems(['7', '8'])
|
|
self.data_bits_combo.setCurrentText(str(self.config.get('data_bits', '8')))
|
|
|
|
self.parity_combo = QComboBox()
|
|
self.parity_combo.addItems(['N', 'E', 'O'])
|
|
self.parity_combo.setCurrentText(self.config.get('parity', 'N'))
|
|
|
|
self.stop_bits_combo = QComboBox()
|
|
self.stop_bits_combo.addItems(['1', '2'])
|
|
self.stop_bits_combo.setCurrentText(str(self.config.get('stop_bits', '1')))
|
|
|
|
serial_layout.addRow("COM Port:", self.com_combo)
|
|
serial_layout.addRow("Baud Rate:", self.baud_combo)
|
|
serial_layout.addRow("Data Bits:", self.data_bits_combo)
|
|
serial_layout.addRow("Parity:", self.parity_combo)
|
|
serial_layout.addRow("Stop Bits:", self.stop_bits_combo)
|
|
serial_group.setLayout(serial_layout)
|
|
|
|
# Create button layout
|
|
button_layout = QHBoxLayout()
|
|
|
|
self.save_button = QPushButton("Save")
|
|
self.save_button.clicked.connect(self.save_config)
|
|
|
|
self.refresh_button = QPushButton("Refresh Ports")
|
|
self.refresh_button.clicked.connect(self.refresh_ports)
|
|
|
|
self.start_button = QPushButton("Start Server")
|
|
self.start_button.clicked.connect(self.toggle_server)
|
|
|
|
button_layout.addWidget(self.save_button)
|
|
button_layout.addWidget(self.refresh_button)
|
|
button_layout.addWidget(self.start_button)
|
|
|
|
# Add status label
|
|
self.status_label = QLabel("Server Status: Stopped")
|
|
self.status_label.setAlignment(Qt.AlignCenter)
|
|
|
|
# Add all widgets to main layout
|
|
main_layout.addWidget(socket_group)
|
|
main_layout.addWidget(serial_group)
|
|
main_layout.addLayout(button_layout)
|
|
main_layout.addWidget(self.status_label)
|
|
|
|
# Set some spacing
|
|
main_layout.setSpacing(20)
|
|
main_layout.setContentsMargins(20, 20, 20, 20)
|
|
|
|
def toggle_server(self):
|
|
if self.server_thread is None or not self.server_thread.isRunning():
|
|
self.start_server()
|
|
else:
|
|
self.stop_server()
|
|
|
|
def start_server(self):
|
|
try:
|
|
# Save current configuration before starting server
|
|
self.save_config()
|
|
|
|
# Create and start server thread
|
|
self.server_thread = ServerThread(self.config)
|
|
self.server_thread.error_occurred.connect(self.handle_server_error)
|
|
self.server_thread.start()
|
|
|
|
# Update UI
|
|
self.start_button.setText("Stop Server")
|
|
self.status_label.setText("Server Status: Running")
|
|
self.disable_config_inputs(True)
|
|
|
|
except Exception as e:
|
|
QMessageBox.critical(self, "Error", f"Failed to start server: {str(e)}")
|
|
|
|
def stop_server(self):
|
|
if self.server_thread:
|
|
self.server_thread.stop()
|
|
self.server_thread.wait()
|
|
self.server_thread = None
|
|
|
|
# Update UI
|
|
self.start_button.setText("Start Server")
|
|
self.status_label.setText("Server Status: Stopped")
|
|
self.disable_config_inputs(False)
|
|
|
|
def handle_server_error(self, error_msg):
|
|
QMessageBox.critical(self, "Server Error", error_msg)
|
|
self.stop_server()
|
|
|
|
def disable_config_inputs(self, disabled):
|
|
"""Disable/enable configuration inputs while server is running"""
|
|
self.host_edit.setEnabled(not disabled)
|
|
self.port_edit.setEnabled(not disabled)
|
|
self.com_combo.setEnabled(not disabled)
|
|
self.baud_combo.setEnabled(not disabled)
|
|
self.data_bits_combo.setEnabled(not disabled)
|
|
self.parity_combo.setEnabled(not disabled)
|
|
self.stop_bits_combo.setEnabled(not disabled)
|
|
self.save_button.setEnabled(not disabled)
|
|
self.refresh_button.setEnabled(not disabled)
|
|
|
|
def refresh_ports(self):
|
|
"""Refresh the list of available COM ports"""
|
|
current_port = self.com_combo.currentText()
|
|
self.com_combo.clear()
|
|
|
|
ports = [port.device for port in serial.tools.list_ports.comports()]
|
|
self.com_combo.addItems(ports)
|
|
|
|
if ports:
|
|
if current_port in ports:
|
|
self.com_combo.setCurrentText(current_port)
|
|
else:
|
|
self.com_combo.setCurrentText(ports[0])
|
|
|
|
def load_config(self):
|
|
"""Load configuration from file"""
|
|
config_file = 'server_config.json'
|
|
if os.path.exists(config_file):
|
|
try:
|
|
with open(config_file, 'r') as f:
|
|
return json.load(f)
|
|
except:
|
|
return {}
|
|
return {}
|
|
|
|
def save_config(self):
|
|
"""Save configuration to file"""
|
|
try:
|
|
self.config = {
|
|
'socket_host': self.host_edit.text(),
|
|
'socket_port': int(self.port_edit.text()),
|
|
'com_port': self.com_combo.currentText(),
|
|
'baud_rate': int(self.baud_combo.currentText()),
|
|
'data_bits': int(self.data_bits_combo.currentText()),
|
|
'parity': self.parity_combo.currentText(),
|
|
'stop_bits': int(self.stop_bits_combo.currentText())
|
|
}
|
|
|
|
with open('server_config.json', 'w') as f:
|
|
json.dump(self.config, f, indent=4)
|
|
|
|
QMessageBox.information(self, "Success", "Configuration saved successfully!")
|
|
return True
|
|
|
|
except ValueError as e:
|
|
QMessageBox.warning(self, "Error", "Invalid port number!")
|
|
except Exception as e:
|
|
QMessageBox.critical(self, "Error", f"Failed to save configuration: {str(e)}")
|
|
return False
|
|
|
|
def closeEvent(self, event):
|
|
"""Handle application closing"""
|
|
if self.server_thread and self.server_thread.isRunning():
|
|
self.stop_server()
|
|
event.accept()
|
|
|
|
def main():
|
|
app = QApplication([])
|
|
|
|
# Set application style
|
|
app.setStyle('Fusion')
|
|
|
|
window = ConfigWindow()
|
|
window.show()
|
|
|
|
app.exec()
|
|
|
|
if __name__ == "__main__":
|
|
main() |