arduino_ph_controller/gui/main.py
2025-07-10 21:33:48 +02:00

384 lines
16 KiB
Python

import sys
import serial
import serial.tools.list_ports
from PyQt6.QtWidgets import (QApplication, QMainWindow, QVBoxLayout, QHBoxLayout,
QWidget, QLabel, QPushButton, QComboBox, QDoubleSpinBox,
QGroupBox, QStatusBar, QMessageBox, QCheckBox)
from PyQt6.QtCore import QTimer, Qt
from PyQt6.QtGui import QFont
class PHControllerGUI(QMainWindow):
def __init__(self):
super().__init__()
self.serial_connection = None
self.updating_tube_combo = False # Flag to prevent recursive tube commands
self.setWindowTitle("Arduino pH Controller")
self.setGeometry(100, 100, 600, 400)
# Central Widget
central_widget = QWidget()
self.setCentralWidget(central_widget)
main_layout = QVBoxLayout(central_widget)
# Connection Group
connection_group = QGroupBox("Verbindung")
connection_layout = QHBoxLayout()
self.port_combo = QComboBox()
self.refresh_ports()
self.connect_btn = QPushButton("Verbinden")
self.connect_btn.clicked.connect(self.toggle_connection)
connection_layout.addWidget(QLabel("Port:"))
connection_layout.addWidget(self.port_combo, 1)
connection_layout.addWidget(self.connect_btn)
connection_group.setLayout(connection_layout)
# Status Group
status_group = QGroupBox("Status")
status_layout = QHBoxLayout()
self.ph_label = QLabel("pH: --")
self.ph_label.setFont(QFont('Arial', 24))
self.ph_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.pump_status = QLabel("Pumpe: Aus")
self.pump_status.setFont(QFont('Arial', 16))
self.pump_status.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.flow_rate_label = QLabel("ml/sec: --")
self.flow_rate_label.setFont(QFont('Arial', 14))
self.flow_rate_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.flow_rate_label.setVisible(False) # Initially hidden
self.total_volume_label = QLabel("Gesamt: 0.0 ml")
self.total_volume_label.setFont(QFont('Arial', 12))
self.total_volume_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.total_volume_label.setVisible(False) # Initially hidden
status_layout.addWidget(self.ph_label)
status_layout.addWidget(self.pump_status)
status_layout.addWidget(self.flow_rate_label)
status_layout.addWidget(self.total_volume_label)
status_group.setLayout(status_layout)
# Control Group
control_group = QGroupBox("Steuerung")
control_layout = QVBoxLayout()
# Pump Control
pump_control_layout = QHBoxLayout()
self.pump_on_btn = QPushButton("Pumpe Ein")
self.pump_on_btn.clicked.connect(lambda: self.send_pump_command(1))
self.pump_off_btn = QPushButton("Pumpe Aus")
self.pump_off_btn.clicked.connect(lambda: self.send_pump_command(0))
# Volume reset option
self.reset_volume_checkbox = QCheckBox("Volumen bei Start zurücksetzen")
self.reset_volume_checkbox.setChecked(True) # Default: reset volume
pump_control_layout.addWidget(self.pump_on_btn)
pump_control_layout.addWidget(self.pump_off_btn)
pump_control_layout.addWidget(self.reset_volume_checkbox)
# Tube Selection
tube_layout = QHBoxLayout()
self.tube_combo = QComboBox()
self.tube_combo.addItems([
"0.13 mm", "0.19 mm", "0.25 mm", "0.38 mm", "0.44 mm",
"0.51 mm", "0.57 mm", "0.64 mm", "0.76 mm", "0.89 mm",
"0.95 mm", "1.02 mm", "1.09 mm", "1.14 mm", "1.22 mm",
"1.30 mm", "1.42 mm", "1.52 mm", "1.65 mm", "1.75 mm",
"1.85 mm", "2.06 mm", "2.29 mm", "2.54 mm", "2.79 mm", "3.17 mm"
])
self.tube_combo.currentIndexChanged.connect(self.send_tube_command)
tube_layout.addWidget(QLabel("Schlauch:"))
tube_layout.addWidget(self.tube_combo, 1)
# Calibration
calibration_layout = QHBoxLayout()
self.cal_ph4_btn = QPushButton("Kalibrieren pH4")
self.cal_ph7_btn = QPushButton("Kalibrieren pH7")
self.cal_ph4_btn.clicked.connect(lambda: self.send_calibration_command(4))
self.cal_ph7_btn.clicked.connect(lambda: self.send_calibration_command(7))
calibration_layout.addWidget(self.cal_ph4_btn)
calibration_layout.addWidget(self.cal_ph7_btn)
# Auto Mode
auto_mode_layout = QHBoxLayout()
self.volume_spin = QDoubleSpinBox()
self.volume_spin.setRange(0.1, 1000)
self.volume_spin.setValue(10)
self.volume_spin.setSuffix(" ml")
self.auto_dose_btn = QPushButton("Automatisch dosieren")
self.auto_dose_btn.clicked.connect(self.start_auto_dose)
auto_mode_layout.addWidget(QLabel("Menge:"))
auto_mode_layout.addWidget(self.volume_spin)
auto_mode_layout.addWidget(self.auto_dose_btn)
# Add to control layout
control_layout.addLayout(pump_control_layout)
control_layout.addLayout(tube_layout)
control_layout.addLayout(calibration_layout)
control_layout.addLayout(auto_mode_layout)
control_group.setLayout(control_layout)
# Add groups to main layout
main_layout.addWidget(connection_group)
main_layout.addWidget(status_group)
main_layout.addWidget(control_group)
# Status Bar
self.status_bar = QStatusBar()
self.setStatusBar(self.status_bar)
self.status_bar.showMessage("Bereit")
# Timer for reading data
self.read_timer = QTimer()
self.read_timer.timeout.connect(self.read_serial_data)
self.read_timer.start(100) # Read incoming data every 100ms
# Timer for pH value requests
self.ph_request_timer = QTimer()
self.ph_request_timer.timeout.connect(self.request_ph_value)
# Timer for tube ID requests (every 5 seconds)
self.tube_request_timer = QTimer()
self.tube_request_timer.timeout.connect(self.request_tube_id)
# Timer for flow rate requests (when pump is running)
self.flow_rate_timer = QTimer()
self.flow_rate_timer.timeout.connect(self.request_flow_rate)
# Track pump state
self.pump_is_running = False
# Volume tracking
self.total_volume = 0.0 # Total volume pumped in ml
self.last_flow_rate = 0.0 # Last known flow rate
# Enable/disable controls based on connection
self.set_controls_enabled(False)
def refresh_ports(self):
self.port_combo.clear()
ports = serial.tools.list_ports.comports()
for port in ports:
self.port_combo.addItem(port.device)
def toggle_connection(self):
if self.serial_connection and self.serial_connection.is_open:
self.close_connection()
else:
self.open_connection()
def open_connection(self):
port = self.port_combo.currentText()
if not port:
QMessageBox.critical(self, "Fehler", "Kein Port ausgewählt!")
return
try:
self.serial_connection = serial.Serial(port, 9600, timeout=1)
self.connect_btn.setText("Trennen")
self.status_bar.showMessage(f"Verbunden mit {port}")
self.set_controls_enabled(True)
# Start pH value polling every 2 seconds
self.ph_request_timer.start(2000)
# Start tube ID polling every 5 seconds
self.tube_request_timer.start(5000)
# Request initial data after a short delay to ensure Arduino is ready
QTimer.singleShot(500, self.request_initial_data)
except Exception as e:
QMessageBox.critical(self, "Fehler", f"Verbindungsfehler: {str(e)}")
def close_connection(self):
if self.serial_connection and self.serial_connection.is_open:
self.serial_connection.close()
self.serial_connection = None
# Stop pH polling
self.ph_request_timer.stop()
# Stop tube ID polling
self.tube_request_timer.stop()
# Stop flow rate polling
self.flow_rate_timer.stop()
self.connect_btn.setText("Verbinden")
self.status_bar.showMessage("Getrennt")
self.set_controls_enabled(False)
self.ph_label.setText("pH: --")
self.pump_status.setText("Pumpe: --")
self.flow_rate_label.setVisible(False)
self.total_volume_label.setVisible(False)
self.pump_is_running = False
self.total_volume = 0.0
self.last_flow_rate = 0.0
def set_controls_enabled(self, enabled):
self.pump_on_btn.setEnabled(enabled)
self.pump_off_btn.setEnabled(enabled)
self.tube_combo.setEnabled(enabled)
self.cal_ph4_btn.setEnabled(enabled)
self.cal_ph7_btn.setEnabled(enabled)
self.auto_dose_btn.setEnabled(enabled)
self.volume_spin.setEnabled(enabled)
self.reset_volume_checkbox.setEnabled(enabled)
def send_command(self, command):
print(command)
if self.serial_connection and self.serial_connection.is_open:
try:
self.serial_connection.write(command.encode('utf-8'))
except Exception as e:
self.status_bar.showMessage(f"Fehler beim Senden: {str(e)}")
def request_initial_data(self):
"""Request initial data from Arduino after connection"""
if self.serial_connection and self.serial_connection.is_open:
# self.send_command("<14>") # Request current pH value
self.send_command("<13>") # Request current tube size
self.status_bar.showMessage("Initiale Daten angefragt...")
def request_ph_value(self):
"""Request pH value from Arduino every 2 seconds"""
if self.serial_connection and self.serial_connection.is_open:
self.send_command("<14>") # Request current pH value
def request_tube_id(self):
"""Request tube ID from Arduino every 5 seconds"""
if self.serial_connection and self.serial_connection.is_open:
self.send_command("<13>") # Request current tube size
def request_flow_rate(self):
"""Request flow rate from Arduino when pump is running"""
if self.serial_connection and self.serial_connection.is_open and self.pump_is_running:
self.send_command("<18>") # Request current flow rate
def update_total_volume(self, flow_rate):
"""Update the total volume based on current flow rate"""
if self.pump_is_running:
# Add volume from the last second (flow_rate is ml/sec)
self.total_volume += flow_rate * 1.0 # 1 second interval
self.total_volume_label.setText(f"Gesamt: {self.total_volume:.2f} ml")
self.last_flow_rate = flow_rate
def send_pump_command(self, state):
if state == 1:
self.send_command("<11>") # Pump on
self.pump_status.setText("Pumpe: Ein")
self.pump_is_running = True
self.flow_rate_label.setVisible(True)
self.total_volume_label.setVisible(True)
# Check if volume should be reset or continue accumulating
if self.reset_volume_checkbox.isChecked():
# Reset volume counter
self.total_volume = 0.0
self.total_volume_label.setText("Gesamt: 0.0 ml")
else:
# Continue with existing volume - just update display
self.total_volume_label.setText(f"Gesamt: {self.total_volume:.2f} ml")
self.flow_rate_timer.start(1000) # Request flow rate every second
else:
self.send_command("<12>") # Pump off
self.pump_status.setText("Pumpe: Aus")
self.pump_is_running = False
self.flow_rate_label.setVisible(False)
# Keep total_volume_label visible to show final amount
self.flow_rate_timer.stop()
def send_tube_command(self, index):
# Only send command if not updating from Arduino response
if not self.updating_tube_combo:
# Send command in format <17{tubid}> for tube selection
self.send_command(f"<17{index}>")
self.status_bar.showMessage(f"Schlauchgröße {self.tube_combo.itemText(index)} ausgewählt")
def send_calibration_command(self, ph):
if ph == 4:
self.send_command("<15>") # Start pH4 calibration
QMessageBox.information(self, "Kalibrierung",
"pH4 Kalibrierung gestartet. Sensor in pH4.01-Lösung tauchen und warten bis der Wert stabil ist.")
else:
self.send_command("<16>") # Start pH7 calibration
QMessageBox.information(self, "Kalibrierung",
"pH7 Kalibrierung gestartet. Sensor in pH6.86-Lösung tauchen und warten bis der Wert stabil ist.")
def start_auto_dose(self):
volume = self.volume_spin.value()
# Calculate pump run time based on tube size and volume
# This would need to be implemented based on your specific setup
QMessageBox.information(self, "Automatische Dosierung",
f"Es werden {volume} ml dosiert. Bitte warten...")
# Implementation would need to send appropriate commands to Arduino
def read_serial_data(self):
if not self.serial_connection or not self.serial_connection.is_open:
return
try:
while self.serial_connection.in_waiting:
line = self.serial_connection.readline().decode('utf-8').strip()
print(line)
if line.startswith('<') and line.endswith('>'):
data = line[1:-1]
if data.startswith('#'): # pH value
try:
ph_value = float(data[1:])
self.ph_label.setText(f"pH: {ph_value:.2f}")
except ValueError:
pass
elif data.startswith('P'): # Pump status
pump_state = data[1:]
self.pump_status.setText(f"Pumpe: {'Ein' if pump_state == '1' else 'Aus'}")
elif data.startswith('T'): # Tube index
try:
tube_index = int(data[1:])
# Ensure tube index is within valid range (0-25)
if 0 <= tube_index <= 25:
self.updating_tube_combo = True # Prevent recursive commands
self.tube_combo.setCurrentIndex(tube_index)
self.updating_tube_combo = False
self.status_bar.showMessage(f"Schlauchgröße {self.tube_combo.currentText()} geladen")
except ValueError:
pass
elif data.startswith('F'): # Flow rate (ml/sec)
try:
flow_rate = float(data[1:])
self.flow_rate_label.setText(f"ml/sec: {flow_rate:.4f}")
# Update total volume
self.update_total_volume(flow_rate)
except ValueError:
pass
except Exception as e:
self.status_bar.showMessage(f"Lesefehler: {str(e)}")
def closeEvent(self, event):
# Stop all timers
self.read_timer.stop()
self.ph_request_timer.stop()
self.tube_request_timer.stop()
self.flow_rate_timer.stop()
self.close_connection()
event.accept()
def main():
app = QApplication(sys.argv)
window = PHControllerGUI()
window.show()
sys.exit(app.exec())
if __name__ == "__main__":
main()