import sys import serial import serial.tools.list_ports from PyQt6.QtWidgets import (QApplication, QMainWindow, QVBoxLayout, QHBoxLayout, QWidget, QLabel, QPushButton, QComboBox, QDoubleSpinBox, QGroupBox, QStatusBar, QMessageBox, QCheckBox) from PyQt6.QtCore import QTimer, Qt from PyQt6.QtGui import QFont import matplotlib.pyplot as plt from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas from matplotlib.figure import Figure import datetime from collections import deque class PHControllerGUI(QMainWindow): def __init__(self): super().__init__() self.serial_connection = None self.updating_tube_combo = False # Flag to prevent recursive tube commands self.setWindowTitle("Arduino pH Controller") self.setGeometry(100, 100, 900, 800) # Larger window for plot # Central Widget central_widget = QWidget() self.setCentralWidget(central_widget) main_layout = QVBoxLayout(central_widget) # Connection Group connection_group = QGroupBox("Verbindung") connection_layout = QHBoxLayout() self.port_combo = QComboBox() self.refresh_ports() self.connect_btn = QPushButton("Verbinden") self.connect_btn.clicked.connect(self.toggle_connection) connection_layout.addWidget(QLabel("Port:")) connection_layout.addWidget(self.port_combo, 1) connection_layout.addWidget(self.connect_btn) connection_group.setLayout(connection_layout) # Status Group status_group = QGroupBox("Status") status_layout = QHBoxLayout() self.ph_label = QLabel("pH: --") self.ph_label.setFont(QFont('Arial', 24)) self.ph_label.setAlignment(Qt.AlignmentFlag.AlignCenter) self.pump_status = QLabel("Pumpe: Aus") self.pump_status.setFont(QFont('Arial', 16)) self.pump_status.setAlignment(Qt.AlignmentFlag.AlignCenter) self.flow_rate_label = QLabel("ml/sec: --") self.flow_rate_label.setFont(QFont('Arial', 14)) self.flow_rate_label.setAlignment(Qt.AlignmentFlag.AlignCenter) self.flow_rate_label.setVisible(False) # Initially hidden self.total_volume_label = QLabel("Gesamt: 0.0 ml") self.total_volume_label.setFont(QFont('Arial', 12)) self.total_volume_label.setAlignment(Qt.AlignmentFlag.AlignCenter) self.total_volume_label.setVisible(False) # Initially hidden status_layout.addWidget(self.ph_label) status_layout.addWidget(self.pump_status) status_layout.addWidget(self.flow_rate_label) status_layout.addWidget(self.total_volume_label) status_group.setLayout(status_layout) # Control Group control_group = QGroupBox("Steuerung") control_layout = QVBoxLayout() # Pump Control pump_control_layout = QHBoxLayout() self.pump_on_btn = QPushButton("Pumpe Ein") self.pump_on_btn.clicked.connect(lambda: self.send_pump_command(1)) self.pump_off_btn = QPushButton("Pumpe Aus") self.pump_off_btn.clicked.connect(lambda: self.send_pump_command(0)) # Volume reset option self.reset_volume_checkbox = QCheckBox("Volumen bei Start zurücksetzen") self.reset_volume_checkbox.setChecked(True) # Default: reset volume pump_control_layout.addWidget(self.pump_on_btn) pump_control_layout.addWidget(self.pump_off_btn) pump_control_layout.addWidget(self.reset_volume_checkbox) # Tube Selection tube_layout = QHBoxLayout() self.tube_combo = QComboBox() self.tube_combo.addItems([ "0.13 mm", "0.19 mm", "0.25 mm", "0.38 mm", "0.44 mm", "0.51 mm", "0.57 mm", "0.64 mm", "0.76 mm", "0.89 mm", "0.95 mm", "1.02 mm", "1.09 mm", "1.14 mm", "1.22 mm", "1.30 mm", "1.42 mm", "1.52 mm", "1.65 mm", "1.75 mm", "1.85 mm", "2.06 mm", "2.29 mm", "2.54 mm", "2.79 mm", "3.17 mm" ]) self.tube_combo.currentIndexChanged.connect(self.send_tube_command) tube_layout.addWidget(QLabel("Schlauch:")) tube_layout.addWidget(self.tube_combo, 1) # Calibration calibration_layout = QHBoxLayout() self.cal_ph4_btn = QPushButton("Kalibrieren pH4") self.cal_ph7_btn = QPushButton("Kalibrieren pH7") self.debug_btn = QPushButton("Debug Info") self.cal_ph4_btn.clicked.connect(lambda: self.send_calibration_command(4)) self.cal_ph7_btn.clicked.connect(lambda: self.send_calibration_command(7)) self.debug_btn.clicked.connect(self.send_debug_command) calibration_layout.addWidget(self.cal_ph4_btn) calibration_layout.addWidget(self.cal_ph7_btn) calibration_layout.addWidget(self.debug_btn) # Auto Mode auto_mode_layout = QHBoxLayout() self.volume_spin = QDoubleSpinBox() self.volume_spin.setRange(0.1, 1000) self.volume_spin.setValue(10) self.volume_spin.setSuffix(" ml") self.auto_dose_btn = QPushButton("Automatisch dosieren") self.auto_dose_btn.clicked.connect(self.start_auto_dose) auto_mode_layout.addWidget(QLabel("Menge:")) auto_mode_layout.addWidget(self.volume_spin) auto_mode_layout.addWidget(self.auto_dose_btn) # Add to control layout control_layout.addLayout(pump_control_layout) control_layout.addLayout(tube_layout) control_layout.addLayout(calibration_layout) control_layout.addLayout(auto_mode_layout) control_group.setLayout(control_layout) # Add groups to main layout main_layout.addWidget(connection_group) main_layout.addWidget(status_group) main_layout.addWidget(control_group) # pH Plot Group plot_group = QGroupBox("pH-Verlauf") plot_layout = QVBoxLayout() # Plot controls plot_controls_layout = QHBoxLayout() self.clear_plot_btn = QPushButton("Diagramm löschen") self.clear_plot_btn.clicked.connect(self.clear_ph_plot) self.plot_enabled_checkbox = QCheckBox("pH-Aufzeichnung") self.plot_enabled_checkbox.setChecked(True) plot_controls_layout.addWidget(self.plot_enabled_checkbox) plot_controls_layout.addWidget(self.clear_plot_btn) plot_controls_layout.addStretch() # pH Plot Widget self.ph_plot = PHPlotWidget() plot_layout.addLayout(plot_controls_layout) plot_layout.addWidget(self.ph_plot) plot_group.setLayout(plot_layout) main_layout.addWidget(plot_group) # Status Bar self.status_bar = QStatusBar() self.setStatusBar(self.status_bar) self.status_bar.showMessage("Bereit") # Timer for reading data self.read_timer = QTimer() self.read_timer.timeout.connect(self.read_serial_data) self.read_timer.start(100) # Read incoming data every 100ms # Timer for pH value requests self.ph_request_timer = QTimer() self.ph_request_timer.timeout.connect(self.request_ph_value) # Timer for tube ID requests (every 5 seconds) self.tube_request_timer = QTimer() self.tube_request_timer.timeout.connect(self.request_tube_id) # Timer for flow rate requests (when pump is running) self.flow_rate_timer = QTimer() self.flow_rate_timer.timeout.connect(self.request_flow_rate) # Track pump state self.pump_is_running = False # Volume tracking self.total_volume = 0.0 # Total volume pumped in ml self.last_flow_rate = 0.0 # Last known flow rate # Auto dosing self.auto_dosing = False self.target_volume = 0.0 self.auto_dose_timer = QTimer() self.auto_dose_timer.timeout.connect(self.check_auto_dose_progress) # Enable/disable controls based on connection self.set_controls_enabled(False) def refresh_ports(self): self.port_combo.clear() ports = serial.tools.list_ports.comports() for port in ports: self.port_combo.addItem(port.device) def toggle_connection(self): if self.serial_connection and self.serial_connection.is_open: self.close_connection() else: self.open_connection() def open_connection(self): port = self.port_combo.currentText() if not port: QMessageBox.critical(self, "Fehler", "Kein Port ausgewählt!") return try: self.serial_connection = serial.Serial(port, 9600, timeout=1) self.connect_btn.setText("Trennen") self.status_bar.showMessage(f"Verbunden mit {port}") self.set_controls_enabled(True) # Start pH value polling every 2 seconds self.ph_request_timer.start(2000) # Start tube ID polling every 5 seconds self.tube_request_timer.start(5000) # Request initial data after a short delay to ensure Arduino is ready QTimer.singleShot(500, self.request_initial_data) except Exception as e: QMessageBox.critical(self, "Fehler", f"Verbindungsfehler: {str(e)}") def close_connection(self): # Turn off pump before closing connection for safety if self.serial_connection and self.serial_connection.is_open: try: self.send_command("<12>") # Pump off for safety # Give Arduino time to process the command import time time.sleep(0.1) except: pass # Ignore errors during shutdown if self.serial_connection and self.serial_connection.is_open: self.serial_connection.close() self.serial_connection = None # Stop pH polling self.ph_request_timer.stop() # Stop tube ID polling self.tube_request_timer.stop() # Stop flow rate polling self.flow_rate_timer.stop() # Stop auto dose timer self.auto_dose_timer.stop() self.auto_dosing = False self.connect_btn.setText("Verbinden") self.status_bar.showMessage("Getrennt") self.set_controls_enabled(False) self.ph_label.setText("pH: --") self.pump_status.setText("Pumpe: --") self.flow_rate_label.setVisible(False) self.total_volume_label.setVisible(False) self.pump_is_running = False self.total_volume = 0.0 self.last_flow_rate = 0.0 self.auto_dosing = False self.auto_dose_timer.stop() def set_controls_enabled(self, enabled): self.pump_on_btn.setEnabled(enabled) self.pump_off_btn.setEnabled(enabled) self.tube_combo.setEnabled(enabled) self.cal_ph4_btn.setEnabled(enabled) self.cal_ph7_btn.setEnabled(enabled) self.debug_btn.setEnabled(enabled) self.auto_dose_btn.setEnabled(enabled) self.volume_spin.setEnabled(enabled) self.reset_volume_checkbox.setEnabled(enabled) self.clear_plot_btn.setEnabled(enabled) self.plot_enabled_checkbox.setEnabled(enabled) def send_command(self, command): print(command) if self.serial_connection and self.serial_connection.is_open: try: self.serial_connection.write(command.encode('utf-8')) except Exception as e: self.status_bar.showMessage(f"Fehler beim Senden: {str(e)}") def request_initial_data(self): """Request initial data from Arduino after connection""" if self.serial_connection and self.serial_connection.is_open: # self.send_command("<14>") # Request current pH value self.send_command("<13>") # Request current tube size self.status_bar.showMessage("Initiale Daten angefragt...") def request_ph_value(self): """Request pH value from Arduino every 2 seconds""" if self.serial_connection and self.serial_connection.is_open: self.send_command("<14>") # Request current pH value def request_tube_id(self): """Request tube ID from Arduino every 5 seconds""" if self.serial_connection and self.serial_connection.is_open: self.send_command("<13>") # Request current tube size def request_flow_rate(self): """Request flow rate from Arduino when pump is running""" if self.serial_connection and self.serial_connection.is_open and self.pump_is_running: self.send_command("<18>") # Request current flow rate def update_total_volume(self, flow_rate): """Update the total volume based on current flow rate""" if self.pump_is_running: # Add volume from the last second (flow_rate is ml/sec) self.total_volume += flow_rate * 1.0 # 1 second interval self.total_volume_label.setText(f"Gesamt: {self.total_volume:.2f} ml") self.last_flow_rate = flow_rate def send_pump_command(self, state): if state == 1: self.send_command("<11>") # Pump on self.pump_status.setText("Pumpe: Ein") self.pump_is_running = True self.flow_rate_label.setVisible(True) self.total_volume_label.setVisible(True) # Check if volume should be reset or continue accumulating if self.reset_volume_checkbox.isChecked() or self.auto_dosing: # Reset volume counter (always reset for auto dosing) self.total_volume = 0.0 self.total_volume_label.setText("Gesamt: 0.0 ml") else: # Continue with existing volume - just update display self.total_volume_label.setText(f"Gesamt: {self.total_volume:.2f} ml") self.flow_rate_timer.start(1000) # Request flow rate every second else: self.send_command("<12>") # Pump off self.pump_status.setText("Pumpe: Aus") self.pump_is_running = False self.flow_rate_label.setVisible(False) # Keep total_volume_label visible to show final amount self.flow_rate_timer.stop() # Stop auto dosing if it was running if self.auto_dosing: self.auto_dosing = False self.auto_dose_timer.stop() self.auto_dose_btn.setText("Automatisch dosieren") self.auto_dose_btn.setEnabled(True) QMessageBox.information(self, "Automatische Dosierung", f"Dosierung abgeschlossen!\nGepumpte Menge: {self.total_volume:.2f} ml") def send_tube_command(self, index): # Only send command if not updating from Arduino response if not self.updating_tube_combo: # Send command in format <17{tubid}> for tube selection self.send_command(f"<17{index}>") self.status_bar.showMessage(f"Schlauchgröße {self.tube_combo.itemText(index)} ausgewählt") def send_calibration_command(self, ph): if ph == 4: self.send_command("<15>") # Start pH4 calibration QMessageBox.information(self, "Kalibrierung", "pH4 Kalibrierung gestartet. Sensor in pH4.01-Lösung tauchen und warten bis der Wert stabil ist.") else: self.send_command("<16>") # Start pH7 calibration QMessageBox.information(self, "Kalibrierung", "pH7 Kalibrierung gestartet. Sensor in pH6.86-Lösung tauchen und warten bis der Wert stabil ist.") def send_debug_command(self): """Send debug command to get calibration values""" self.send_command("<19>") # Request debug info self.status_bar.showMessage("Debug-Informationen angefragt...") def start_auto_dose(self): if self.auto_dosing: # Stop auto dosing self.auto_dosing = False self.auto_dose_timer.stop() self.send_pump_command(0) # Turn off pump self.auto_dose_btn.setText("Automatisch dosieren") return volume = self.volume_spin.value() self.target_volume = volume self.auto_dosing = True # Update button text and disable manual pump controls during auto dosing self.auto_dose_btn.setText("Stop Dosierung") self.pump_on_btn.setEnabled(False) self.pump_off_btn.setEnabled(False) # Reset volume counter for accurate measurement self.total_volume = 0.0 # Start pumping self.send_pump_command(1) # Start monitoring timer (check every 0.5 seconds for better precision) self.auto_dose_timer.start(500) self.status_bar.showMessage(f"Automatische Dosierung läuft... Ziel: {volume} ml") def check_auto_dose_progress(self): """Check if target volume has been reached during auto dosing""" if not self.auto_dosing: return # Check if we've reached or exceeded the target volume if self.total_volume >= self.target_volume: # Stop dosing self.auto_dosing = False self.auto_dose_timer.stop() self.send_pump_command(0) # Turn off pump # Re-enable manual controls self.pump_on_btn.setEnabled(True) self.pump_off_btn.setEnabled(True) self.auto_dose_btn.setText("Automatisch dosieren") # Show completion message actual_volume = self.total_volume difference = actual_volume - self.target_volume message = f"Dosierung abgeschlossen!\n" message += f"Zielvolumen: {self.target_volume:.2f} ml\n" message += f"Tatsächliches Volumen: {actual_volume:.2f} ml\n" message += f"Abweichung: {difference:+.2f} ml" QMessageBox.information(self, "Automatische Dosierung", message) self.status_bar.showMessage(f"Dosierung abgeschlossen: {actual_volume:.2f} ml von {self.target_volume:.2f} ml") else: # Update progress in status bar progress = (self.total_volume / self.target_volume) * 100 self.status_bar.showMessage(f"Dosierung: {self.total_volume:.2f}/{self.target_volume:.2f} ml ({progress:.1f}%)") def read_serial_data(self): if not self.serial_connection or not self.serial_connection.is_open: return try: while self.serial_connection.in_waiting: line = self.serial_connection.readline().decode('utf-8').strip() print(line) if line.startswith('<') and line.endswith('>'): data = line[1:-1] if data.startswith('#'): # pH value try: ph_value = float(data[1:]) self.ph_label.setText(f"pH: {ph_value:.2f}") # Add to plot if recording is enabled if self.plot_enabled_checkbox.isChecked(): self.ph_plot.add_ph_value(ph_value) except ValueError: pass elif data.startswith('P'): # Pump status pump_state = data[1:] self.pump_status.setText(f"Pumpe: {'Ein' if pump_state == '1' else 'Aus'}") elif data.startswith('T'): # Tube index try: tube_index = int(data[1:]) # Ensure tube index is within valid range (0-25) if 0 <= tube_index <= 25: self.updating_tube_combo = True # Prevent recursive commands self.tube_combo.setCurrentIndex(tube_index) self.updating_tube_combo = False self.status_bar.showMessage(f"Schlauchgröße {self.tube_combo.currentText()} geladen") except ValueError: pass elif data.startswith('F'): # Flow rate (ml/sec) try: flow_rate = float(data[1:]) self.flow_rate_label.setText(f"ml/sec: {flow_rate:.4f}") # Update total volume self.update_total_volume(flow_rate) except ValueError: pass elif data.startswith('DEBUG'): # Debug information # Show debug info in a message box QMessageBox.information(self, "Debug Information", f"Kalibrierungsdaten:\n{data}") elif data.startswith('CAL'): # Calibration confirmation with details # Show calibration info in status bar and message box self.status_bar.showMessage(f"Kalibrierung abgeschlossen: {data}") QMessageBox.information(self, "Kalibrierung", f"Kalibrierung erfolgreich:\n{data}") except Exception as e: self.status_bar.showMessage(f"Lesefehler: {str(e)}") def closeEvent(self, event): # Safety: Turn off pump before closing if self.serial_connection and self.serial_connection.is_open: try: self.send_command("<12>") # Ensure pump is off # Give Arduino time to process the command import time time.sleep(0.1) except: pass # Ignore errors during shutdown # Stop all timers self.read_timer.stop() self.ph_request_timer.stop() self.tube_request_timer.stop() self.flow_rate_timer.stop() self.auto_dose_timer.stop() self.close_connection() event.accept() def clear_ph_plot(self): """Clear the pH plot data""" self.ph_plot.clear_data() self.status_bar.showMessage("pH-Diagramm gelöscht") class PHPlotWidget(QWidget): def __init__(self): super().__init__() self.figure = Figure(figsize=(8, 4)) self.canvas = FigureCanvas(self.figure) self.ax = self.figure.add_subplot(111) # Data storage (keep last 300 points = 10 minutes at 2-second intervals) self.times = deque(maxlen=300) self.ph_values = deque(maxlen=300) self.start_time = None # Track when recording started # Setup plot self.ax.set_xlabel('Zeit') self.ax.set_ylabel('pH-Wert') self.ax.set_title('pH-Verlauf') self.ax.grid(True, alpha=0.3) self.ax.set_ylim(0, 14) # pH range 0-14 # Layout layout = QVBoxLayout() layout.addWidget(self.canvas) self.setLayout(layout) # Plot line self.line, = self.ax.plot([], [], 'b-', linewidth=2, label='pH-Wert') self.ax.legend() # Initial draw self.canvas.draw() def add_ph_value(self, ph_value): """Add new pH value with current timestamp""" current_time = datetime.datetime.now() # Set start time on first value if self.start_time is None: self.start_time = current_time # Calculate relative time in seconds relative_time = (current_time - self.start_time).total_seconds() self.times.append(relative_time) self.ph_values.append(ph_value) self.update_plot() def update_plot(self): """Update the plot with current data""" if len(self.times) > 0 and len(self.ph_values) > 0: # Update line data self.line.set_data(self.times, self.ph_values) # Auto-scale x-axis to show recent data if len(self.times) > 1: self.ax.set_xlim(min(self.times), max(self.times)) # Auto-scale y-axis around data with some margin if len(self.ph_values) > 0: min_ph = min(self.ph_values) max_ph = max(self.ph_values) margin = 0.5 self.ax.set_ylim(max(0, min_ph - margin), min(14, max_ph + margin)) # Format x-axis for time display (seconds/minutes/hours) self.format_time_axis() # Redraw self.canvas.draw() def format_time_axis(self): """Format the x-axis to show time in appropriate units""" if len(self.times) == 0: return max_time = max(self.times) if max_time < 120: # Less than 2 minutes - show seconds self.ax.set_xlabel('Zeit (Sekunden)') # No need to change tick formatting for seconds elif max_time < 7200: # Less than 2 hours - show minutes self.ax.set_xlabel('Zeit (Minuten)') # Convert tick labels to minutes import matplotlib.ticker as ticker def format_minutes(x, pos): return f'{x/60:.1f}' self.ax.xaxis.set_major_formatter(ticker.FuncFormatter(format_minutes)) else: # Show hours self.ax.set_xlabel('Zeit (Stunden)') # Convert tick labels to hours import matplotlib.ticker as ticker def format_hours(x, pos): return f'{x/3600:.1f}' self.ax.xaxis.set_major_formatter(ticker.FuncFormatter(format_hours)) def clear_data(self): """Clear all data points""" self.times.clear() self.ph_values.clear() self.start_time = None # Reset start time self.line.set_data([], []) self.ax.set_xlim(0, 1) self.ax.set_ylim(0, 14) self.ax.set_xlabel('Zeit') # Reset to default label # Reset tick formatter to default import matplotlib.ticker as ticker self.ax.xaxis.set_major_formatter(ticker.ScalarFormatter()) self.canvas.draw() def main(): app = QApplication(sys.argv) window = PHControllerGUI() window.show() sys.exit(app.exec()) if __name__ == "__main__": main()