import sys import serial import serial.tools.list_ports from PyQt6.QtWidgets import (QApplication, QMainWindow, QVBoxLayout, QHBoxLayout, QWidget, QLabel, QPushButton, QComboBox, QDoubleSpinBox, QGroupBox, QStatusBar, QMessageBox, QCheckBox) from PyQt6.QtCore import QTimer, Qt from PyQt6.QtGui import QFont import matplotlib.pyplot as plt from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas from matplotlib.figure import Figure import datetime from collections import deque import numpy as np class PHControllerGUI(QMainWindow): def __init__(self): super().__init__() self.serial_connection = None self.updating_tube_combo = False # Flag to prevent recursive tube commands self.setWindowTitle("Arduino pH Controller") self.setGeometry(100, 100, 900, 800) # Larger window for plot # Central Widget central_widget = QWidget() self.setCentralWidget(central_widget) main_layout = QVBoxLayout(central_widget) # Connection Group connection_group = QGroupBox("Verbindung") connection_layout = QHBoxLayout() self.port_combo = QComboBox() self.refresh_ports() self.connect_btn = QPushButton("Verbinden") self.connect_btn.clicked.connect(self.toggle_connection) connection_layout.addWidget(QLabel("Port:")) connection_layout.addWidget(self.port_combo, 1) connection_layout.addWidget(self.connect_btn) connection_group.setLayout(connection_layout) # Status Group status_group = QGroupBox("Status") status_layout = QHBoxLayout() self.ph_label = QLabel("pH: --") self.ph_label.setFont(QFont('Arial', 24)) self.ph_label.setAlignment(Qt.AlignmentFlag.AlignCenter) self.pump_status = QLabel("Pumpe: Aus") self.pump_status.setFont(QFont('Arial', 16)) self.pump_status.setAlignment(Qt.AlignmentFlag.AlignCenter) self.flow_rate_label = QLabel("ml/sec: --") self.flow_rate_label.setFont(QFont('Arial', 14)) self.flow_rate_label.setAlignment(Qt.AlignmentFlag.AlignCenter) self.flow_rate_label.setVisible(False) # Initially hidden self.total_volume_label = QLabel("Gesamt: 0.0 ml") self.total_volume_label.setFont(QFont('Arial', 12)) self.total_volume_label.setAlignment(Qt.AlignmentFlag.AlignCenter) self.total_volume_label.setVisible(False) # Initially hidden status_layout.addWidget(self.ph_label) status_layout.addWidget(self.pump_status) status_layout.addWidget(self.flow_rate_label) status_layout.addWidget(self.total_volume_label) status_group.setLayout(status_layout) # Control Group control_group = QGroupBox("Steuerung") control_layout = QVBoxLayout() # Pump Control pump_control_layout = QHBoxLayout() self.pump_on_btn = QPushButton("Pumpe Ein") self.pump_on_btn.clicked.connect(lambda: self.send_pump_command(1)) self.pump_off_btn = QPushButton("Pumpe Aus") self.pump_off_btn.clicked.connect(lambda: self.send_pump_command(0)) # Volume reset option self.reset_volume_checkbox = QCheckBox("Volumen bei Start zurücksetzen") self.reset_volume_checkbox.setChecked(True) # Default: reset volume pump_control_layout.addWidget(self.pump_on_btn) pump_control_layout.addWidget(self.pump_off_btn) pump_control_layout.addWidget(self.reset_volume_checkbox) # Tube Selection tube_layout = QHBoxLayout() self.tube_combo = QComboBox() self.tube_combo.addItems([ "0.13 mm", "0.19 mm", "0.25 mm", "0.38 mm", "0.44 mm", "0.51 mm", "0.57 mm", "0.64 mm", "0.76 mm", "0.89 mm", "0.95 mm", "1.02 mm", "1.09 mm", "1.14 mm", "1.22 mm", "1.30 mm", "1.42 mm", "1.52 mm", "1.65 mm", "1.75 mm", "1.85 mm", "2.06 mm", "2.29 mm", "2.54 mm", "2.79 mm", "3.17 mm" ]) self.tube_combo.currentIndexChanged.connect(self.send_tube_command) tube_layout.addWidget(QLabel("Schlauch:")) tube_layout.addWidget(self.tube_combo, 1) # Calibration calibration_layout = QHBoxLayout() self.cal_ph4_btn = QPushButton("Kalibrieren pH4") self.cal_ph7_btn = QPushButton("Kalibrieren pH7") self.debug_btn = QPushButton("Debug Info") self.cal_ph4_btn.clicked.connect(lambda: self.send_calibration_command(4)) self.cal_ph7_btn.clicked.connect(lambda: self.send_calibration_command(7)) self.debug_btn.clicked.connect(self.send_debug_command) calibration_layout.addWidget(self.cal_ph4_btn) calibration_layout.addWidget(self.cal_ph7_btn) calibration_layout.addWidget(self.debug_btn) # Auto Mode auto_mode_layout = QHBoxLayout() self.volume_spin = QDoubleSpinBox() self.volume_spin.setRange(0.1, 1000) self.volume_spin.setValue(10) self.volume_spin.setSuffix(" ml") self.auto_dose_btn = QPushButton("Automatisch dosieren") self.auto_dose_btn.clicked.connect(self.start_auto_dose) auto_mode_layout.addWidget(QLabel("Menge:")) auto_mode_layout.addWidget(self.volume_spin) auto_mode_layout.addWidget(self.auto_dose_btn) # Add to control layout control_layout.addLayout(pump_control_layout) control_layout.addLayout(tube_layout) control_layout.addLayout(calibration_layout) control_layout.addLayout(auto_mode_layout) control_group.setLayout(control_layout) # Add groups to main layout main_layout.addWidget(connection_group) main_layout.addWidget(status_group) main_layout.addWidget(control_group) # pH Plot Group plot_group = QGroupBox("pH-Verlauf") plot_layout = QVBoxLayout() # Plot controls plot_controls_layout = QHBoxLayout() self.clear_plot_btn = QPushButton("Diagramm löschen") self.clear_plot_btn.clicked.connect(self.clear_ph_plot) self.plot_enabled_checkbox = QCheckBox("pH-Aufzeichnung") self.plot_enabled_checkbox.setChecked(True) plot_controls_layout.addWidget(self.plot_enabled_checkbox) plot_controls_layout.addWidget(self.clear_plot_btn) # pH Filter Control self.ph_filter_checkbox = QCheckBox("pH-Filter (Motorstörungen)") self.ph_filter_checkbox.setChecked(True) self.ph_filter_checkbox.setToolTip("Filtert pH-Sprünge beim Motor Ein-/Ausschalten") self.ph_filter_checkbox.toggled.connect(self.toggle_ph_filter) plot_controls_layout.addWidget(self.ph_filter_checkbox) plot_controls_layout.addStretch() # pH Plot Widget self.ph_plot = PHPlotWidget() plot_layout.addLayout(plot_controls_layout) plot_layout.addWidget(self.ph_plot) plot_group.setLayout(plot_layout) main_layout.addWidget(plot_group) # Titration Plot Group titration_group = QGroupBox("Titration") titration_layout = QVBoxLayout() # Titration controls titration_controls_layout = QHBoxLayout() # Titration parameters self.titration_step_spin = QDoubleSpinBox() self.titration_step_spin.setRange(0.1, 10.0) self.titration_step_spin.setValue(0.5) self.titration_step_spin.setSuffix(" ml") self.titration_step_spin.setDecimals(1) self.titration_wait_spin = QDoubleSpinBox() self.titration_wait_spin.setRange(5.0, 60.0) self.titration_wait_spin.setValue(10.0) self.titration_wait_spin.setSuffix(" s") self.titration_wait_spin.setDecimals(1) self.start_titration_btn = QPushButton("Automatische Titration starten") self.start_titration_btn.clicked.connect(self.toggle_auto_titration) self.clear_titration_btn = QPushButton("Titrationsdaten löschen") self.clear_titration_btn.clicked.connect(self.clear_titration_plot) titration_controls_layout.addWidget(QLabel("Schritt:")) titration_controls_layout.addWidget(self.titration_step_spin) titration_controls_layout.addWidget(QLabel("Warten:")) titration_controls_layout.addWidget(self.titration_wait_spin) titration_controls_layout.addWidget(self.start_titration_btn) titration_controls_layout.addWidget(self.clear_titration_btn) # Titration Plot Widget self.titration_plot = TitrationPlotWidget() titration_layout.addLayout(titration_controls_layout) titration_layout.addWidget(self.titration_plot) titration_group.setLayout(titration_layout) main_layout.addWidget(titration_group) # Status Bar self.status_bar = QStatusBar() self.setStatusBar(self.status_bar) self.status_bar.showMessage("Bereit") # Timer for reading data self.read_timer = QTimer() self.read_timer.timeout.connect(self.read_serial_data) self.read_timer.start(100) # Read incoming data every 100ms # Timer for pH value requests self.ph_request_timer = QTimer() self.ph_request_timer.timeout.connect(self.request_ph_value) # Timer for tube ID requests (every 5 seconds) self.tube_request_timer = QTimer() self.tube_request_timer.timeout.connect(self.request_tube_id) # Timer for flow rate requests (when pump is running) self.flow_rate_timer = QTimer() self.flow_rate_timer.timeout.connect(self.request_flow_rate) # Track pump state self.pump_is_running = False # Volume tracking self.total_volume = 0.0 # Total volume pumped in ml self.last_flow_rate = 0.0 # Last known flow rate # pH filtering for motor interference self.ph_history = deque(maxlen=5) # Store last 5 pH values self.motor_state_changed_time = None # Track when motor state changed self.motor_interference_duration = 3.0 # seconds to filter after motor state change self.ph_filter_enabled = True # Enable/disable pH filtering self.outlier_threshold = 2.0 # pH units - values changing more than this are considered outliers self.interference_threshold = 1.0 # pH units - stricter threshold during motor interference # Auto dosing self.auto_dosing = False self.target_volume = 0.0 self.auto_dose_timer = QTimer() self.auto_dose_timer.timeout.connect(self.check_auto_dose_progress) # Titration tracking self.titration_active = False self.titration_start_volume = 0.0 self.auto_titration_active = False self.titration_step_size = 0.5 # ml per step self.titration_wait_time = 10.0 # seconds to wait between steps self.titration_timer = QTimer() self.titration_timer.timeout.connect(self.titration_step) self.titration_current_step = 0 self.titration_target_volume = 0.0 # Enable/disable controls based on connection self.set_controls_enabled(False) def refresh_ports(self): self.port_combo.clear() ports = serial.tools.list_ports.comports() for port in ports: self.port_combo.addItem(port.device) def toggle_connection(self): if self.serial_connection and self.serial_connection.is_open: self.close_connection() else: self.open_connection() def open_connection(self): port = self.port_combo.currentText() if not port: QMessageBox.critical(self, "Fehler", "Kein Port ausgewählt!") return try: self.serial_connection = serial.Serial(port, 9600, timeout=1) self.connect_btn.setText("Trennen") self.status_bar.showMessage(f"Verbunden mit {port}") self.set_controls_enabled(True) # Start pH value polling every 2 seconds self.ph_request_timer.start(2000) # Start tube ID polling every 5 seconds self.tube_request_timer.start(5000) # Request initial data after a short delay to ensure Arduino is ready QTimer.singleShot(500, self.request_initial_data) except Exception as e: QMessageBox.critical(self, "Fehler", f"Verbindungsfehler: {str(e)}") def close_connection(self): # Turn off pump before closing connection for safety if self.serial_connection and self.serial_connection.is_open: try: self.send_command("<12>") # Pump off for safety # Give Arduino time to process the command import time time.sleep(0.1) except: pass # Ignore errors during shutdown if self.serial_connection and self.serial_connection.is_open: self.serial_connection.close() self.serial_connection = None # Stop pH polling self.ph_request_timer.stop() # Stop tube ID polling self.tube_request_timer.stop() # Stop flow rate polling self.flow_rate_timer.stop() # Stop auto dose timer self.auto_dose_timer.stop() self.auto_dosing = False # Stop titration timer self.titration_timer.stop() self.auto_titration_active = False self.connect_btn.setText("Verbinden") self.status_bar.showMessage("Getrennt") self.set_controls_enabled(False) self.ph_label.setText("pH: --") self.pump_status.setText("Pumpe: --") self.flow_rate_label.setVisible(False) self.total_volume_label.setVisible(False) self.pump_is_running = False self.total_volume = 0.0 self.last_flow_rate = 0.0 self.auto_dosing = False self.auto_dose_timer.stop() self.auto_titration_active = False self.titration_timer.stop() def set_controls_enabled(self, enabled): self.pump_on_btn.setEnabled(enabled) self.pump_off_btn.setEnabled(enabled) self.tube_combo.setEnabled(enabled) self.cal_ph4_btn.setEnabled(enabled) self.cal_ph7_btn.setEnabled(enabled) self.debug_btn.setEnabled(enabled) self.auto_dose_btn.setEnabled(enabled) self.volume_spin.setEnabled(enabled) self.reset_volume_checkbox.setEnabled(enabled) self.clear_plot_btn.setEnabled(enabled) self.plot_enabled_checkbox.setEnabled(enabled) self.start_titration_btn.setEnabled(enabled) self.clear_titration_btn.setEnabled(enabled) self.titration_step_spin.setEnabled(enabled) self.titration_wait_spin.setEnabled(enabled) self.ph_filter_checkbox.setEnabled(enabled) def send_command(self, command): print(command) if self.serial_connection and self.serial_connection.is_open: try: self.serial_connection.write(command.encode('utf-8')) except Exception as e: self.status_bar.showMessage(f"Fehler beim Senden: {str(e)}") def request_initial_data(self): """Request initial data from Arduino after connection""" if self.serial_connection and self.serial_connection.is_open: # self.send_command("<14>") # Request current pH value self.send_command("<13>") # Request current tube size self.status_bar.showMessage("Initiale Daten angefragt...") def request_ph_value(self): """Request pH value from Arduino every 2 seconds""" if self.serial_connection and self.serial_connection.is_open: self.send_command("<14>") # Request current pH value def request_tube_id(self): """Request tube ID from Arduino every 5 seconds""" if self.serial_connection and self.serial_connection.is_open: self.send_command("<13>") # Request current tube size def request_flow_rate(self): """Request flow rate from Arduino when pump is running""" if self.serial_connection and self.serial_connection.is_open and self.pump_is_running: self.send_command("<18>") # Request current flow rate def update_total_volume(self, flow_rate): """Update the total volume based on current flow rate""" if self.pump_is_running: # Add volume from the last second (flow_rate is ml/sec) self.total_volume += flow_rate * 1.0 # 1 second interval self.total_volume_label.setText(f"Gesamt: {self.total_volume:.2f} ml") self.last_flow_rate = flow_rate def send_pump_command(self, state): # Record motor state change time for interference filtering self.motor_state_changed_time = datetime.datetime.now() if state == 1: self.send_command("<11>") # Pump on self.pump_status.setText("Pumpe: Ein") self.pump_is_running = True self.flow_rate_label.setVisible(True) self.total_volume_label.setVisible(True) # Check if volume should be reset or continue accumulating if (self.reset_volume_checkbox.isChecked() or self.auto_dosing) and not self.auto_titration_active: # Reset volume counter (always reset for auto dosing, but NOT for titration) self.total_volume = 0.0 self.total_volume_label.setText("Gesamt: 0.0 ml") else: # Continue with existing volume - just update display self.total_volume_label.setText(f"Gesamt: {self.total_volume:.2f} ml") self.flow_rate_timer.start(1000) # Request flow rate every second else: self.send_command("<12>") # Pump off self.pump_status.setText("Pumpe: Aus") self.pump_is_running = False self.flow_rate_label.setVisible(False) # Keep total_volume_label visible to show final amount self.flow_rate_timer.stop() # Stop auto dosing if it was running if self.auto_dosing: self.auto_dosing = False self.auto_dose_timer.stop() self.auto_dose_btn.setText("Automatisch dosieren") self.auto_dose_btn.setEnabled(True) QMessageBox.information(self, "Automatische Dosierung", f"Dosierung abgeschlossen!\nGepumpte Menge: {self.total_volume:.2f} ml") def send_tube_command(self, index): # Only send command if not updating from Arduino response if not self.updating_tube_combo: # Send command in format <17{tubid}> for tube selection self.send_command(f"<17{index}>") self.status_bar.showMessage(f"Schlauchgröße {self.tube_combo.itemText(index)} ausgewählt") def send_calibration_command(self, ph): if ph == 4: self.send_command("<15>") # Start pH4 calibration QMessageBox.information(self, "Kalibrierung", "pH4 Kalibrierung gestartet. Sensor in pH4.01-Lösung tauchen und warten bis der Wert stabil ist.") else: self.send_command("<16>") # Start pH7 calibration QMessageBox.information(self, "Kalibrierung", "pH7 Kalibrierung gestartet. Sensor in pH6.86-Lösung tauchen und warten bis der Wert stabil ist.") def send_debug_command(self): """Send debug command to get calibration values""" self.send_command("<19>") # Request debug info self.status_bar.showMessage("Debug-Informationen angefragt...") def start_auto_dose(self): if self.auto_dosing: # Stop auto dosing self.auto_dosing = False self.auto_dose_timer.stop() self.send_pump_command(0) # Turn off pump self.auto_dose_btn.setText("Automatisch dosieren") # Re-enable manual pump controls self.pump_on_btn.setEnabled(True) self.pump_off_btn.setEnabled(True) return # Don't start if auto titration is running if self.auto_titration_active: QMessageBox.warning(self, "Warnung", "Automatische Titration läuft bereits!") return volume = self.volume_spin.value() self.target_volume = volume self.auto_dosing = True # Update button text and disable manual pump controls during auto dosing self.auto_dose_btn.setText("Stop Dosierung") self.pump_on_btn.setEnabled(False) self.pump_off_btn.setEnabled(False) # Reset volume counter for accurate measurement self.total_volume = 0.0 # Start pumping self.send_pump_command(1) # Start monitoring timer (check every 0.5 seconds for better precision) self.auto_dose_timer.start(500) self.status_bar.showMessage(f"Automatische Dosierung läuft... Ziel: {volume} ml") def toggle_ph_filter(self, enabled): """Enable or disable pH filtering""" self.ph_filter_enabled = enabled if enabled: self.status_bar.showMessage("pH-Filter aktiviert - Motorstörungen werden gefiltert") else: self.status_bar.showMessage("pH-Filter deaktiviert - Rohe pH-Werte werden angezeigt") # Clear history when disabling filter self.ph_history.clear() def configure_ph_filter(self, interference_duration=3.0, outlier_threshold=2.0, interference_threshold=1.0): """ Configure pH filter parameters Args: interference_duration: Seconds to apply enhanced filtering after motor state change outlier_threshold: pH units - normal outlier detection threshold interference_threshold: pH units - stricter threshold during motor interference """ self.motor_interference_duration = interference_duration self.outlier_threshold = outlier_threshold self.interference_threshold = interference_threshold if self.ph_filter_enabled: self.status_bar.showMessage(f"pH-Filter konfiguriert: Störzeit={interference_duration}s, " f"Normal={outlier_threshold}pH, Störung={interference_threshold}pH") def filter_ph_value(self, ph_value): """ Filter pH values to reduce motor interference and outliers Returns the filtered pH value or None if value should be ignored """ # If filtering is disabled, return raw value if not self.ph_filter_enabled: return ph_value current_time = datetime.datetime.now() # Add to history self.ph_history.append(ph_value) # If motor state changed recently, apply enhanced filtering if (self.motor_state_changed_time and (current_time - self.motor_state_changed_time).total_seconds() < self.motor_interference_duration): # During motor interference period, use more aggressive filtering if len(self.ph_history) >= 3: # Use median filter to remove spikes recent_values = list(self.ph_history)[-3:] filtered_value = sorted(recent_values)[1] # Median of last 3 values # Additional outlier detection during interference period if len(self.ph_history) >= 5: older_values = list(self.ph_history)[:-1] # All except current avg_older = sum(older_values) / len(older_values) # If current value deviates more than threshold from recent average if abs(ph_value - avg_older) > self.interference_threshold: # Use the median instead of the raw value return filtered_value return filtered_value else: # Not enough history during interference, return current value return ph_value # Normal operation - light filtering if len(self.ph_history) >= 2: # Simple outlier detection previous_value = self.ph_history[-2] # If change is more than threshold, it's likely an outlier if abs(ph_value - previous_value) > self.outlier_threshold: # Use moving average of last few values instead if len(self.ph_history) >= 3: recent_values = list(self.ph_history)[:-1] # Exclude current outlier return sum(recent_values) / len(recent_values) return ph_value def read_serial_data(self): if not self.serial_connection or not self.serial_connection.is_open: return try: while self.serial_connection.in_waiting: line = self.serial_connection.readline().decode('utf-8').strip() print(line) if line.startswith('<') and line.endswith('>'): data = line[1:-1] if data.startswith('#'): # pH value try: raw_ph_value = float(data[1:]) # Apply filtering to reduce motor interference filtered_ph_value = self.filter_ph_value(raw_ph_value) if filtered_ph_value is not None: # Update display with filtered value self.ph_label.setText(f"pH: {filtered_ph_value:.2f}") # Show filtering status if motor interference is active if (self.motor_state_changed_time and (datetime.datetime.now() - self.motor_state_changed_time).total_seconds() < self.motor_interference_duration): if abs(raw_ph_value - filtered_ph_value) > 0.1: self.status_bar.showMessage(f"pH gefiltert: {raw_ph_value:.2f} → {filtered_ph_value:.2f} (Motorstörung)") # Add to plot if recording is enabled if self.plot_enabled_checkbox.isChecked(): self.ph_plot.add_ph_value(filtered_ph_value) # Add to titration plot if titration is active if self.titration_active or self.auto_titration_active: titrated_volume = self.total_volume - self.titration_start_volume self.titration_plot.add_titration_point(titrated_volume, filtered_ph_value) except ValueError: pass elif data.startswith('P'): # Pump status pump_state = data[1:] self.pump_status.setText(f"Pumpe: {'Ein' if pump_state == '1' else 'Aus'}") elif data.startswith('T'): # Tube index try: tube_index = int(data[1:]) # Ensure tube index is within valid range (0-25) if 0 <= tube_index <= 25: self.updating_tube_combo = True # Prevent recursive commands self.tube_combo.setCurrentIndex(tube_index) self.updating_tube_combo = False self.status_bar.showMessage(f"Schlauchgröße {self.tube_combo.currentText()} geladen") except ValueError: pass elif data.startswith('F'): # Flow rate (ml/sec) try: flow_rate = float(data[1:]) self.flow_rate_label.setText(f"ml/sec: {flow_rate:.4f}") # Update total volume self.update_total_volume(flow_rate) except ValueError: pass elif data.startswith('DEBUG'): # Debug information # Show debug info in a message box QMessageBox.information(self, "Debug Information", f"Kalibrierungsdaten:\n{data}") elif data.startswith('CAL'): # Calibration confirmation with details # Show calibration info in status bar and message box self.status_bar.showMessage(f"Kalibrierung abgeschlossen: {data}") QMessageBox.information(self, "Kalibrierung", f"Kalibrierung erfolgreich:\n{data}") except Exception as e: self.status_bar.showMessage(f"Lesefehler: {str(e)}") def closeEvent(self, event): # Safety: Turn off pump before closing if self.serial_connection and self.serial_connection.is_open: try: self.send_command("<12>") # Ensure pump is off # Give Arduino time to process the command import time time.sleep(0.1) except: pass # Ignore errors during shutdown # Stop all timers self.read_timer.stop() self.ph_request_timer.stop() self.tube_request_timer.stop() self.flow_rate_timer.stop() self.auto_dose_timer.stop() self.titration_timer.stop() self.close_connection() event.accept() def clear_ph_plot(self): """Clear the pH plot data""" self.ph_plot.clear_data() self.status_bar.showMessage("pH-Diagramm gelöscht") def toggle_titration(self): """Start or stop titration recording""" if self.titration_active: # Stop titration self.titration_active = False self.start_titration_btn.setText("Titration starten") self.status_bar.showMessage("Titration gestoppt") else: # Start titration self.titration_active = True self.titration_start_volume = self.total_volume self.start_titration_btn.setText("Titration stoppen") self.status_bar.showMessage("Titration gestartet") def toggle_auto_titration(self): """Start or stop automatic titration""" if self.auto_titration_active: # Stop auto titration self.auto_titration_active = False self.titration_timer.stop() self.send_pump_command(0) # Turn off pump self.start_titration_btn.setText("Automatische Titration starten") # Re-enable controls self.pump_on_btn.setEnabled(True) self.pump_off_btn.setEnabled(True) self.auto_dose_btn.setEnabled(True) self.status_bar.showMessage("Automatische Titration gestoppt") return # Don't start if auto dosing is running if self.auto_dosing: QMessageBox.warning(self, "Warnung", "Automatische Dosierung läuft bereits!") return # Get parameters self.titration_step_size = self.titration_step_spin.value() self.titration_wait_time = self.titration_wait_spin.value() # Start auto titration self.auto_titration_active = True self.titration_start_volume = self.total_volume # Remember starting volume self.titration_current_step = 0 # Update button and disable controls self.start_titration_btn.setText("Automatische Titration stoppen") self.pump_on_btn.setEnabled(False) self.pump_off_btn.setEnabled(False) self.auto_dose_btn.setEnabled(False) # Start first titration step self.titration_step() self.status_bar.showMessage(f"Automatische Titration gestartet - Schritt: {self.titration_step_size} ml, Warten: {self.titration_wait_time} s") def titration_step(self): """Perform one titration step""" if not self.auto_titration_active: return self.titration_current_step += 1 # Calculate step target volume (only the additional volume for this step) step_target_volume = self.titration_step_size # Calculate absolute target volume (total from start of titration) absolute_target_volume = self.titration_start_volume + (self.titration_current_step * self.titration_step_size) # Start pumping for this step self.auto_dosing = True # Use auto dosing mechanism self.target_volume = absolute_target_volume # Set absolute target self.send_pump_command(1) self.auto_dose_timer.start(500) current_titrated_volume = self.total_volume - self.titration_start_volume total_target_titrated = self.titration_current_step * self.titration_step_size self.status_bar.showMessage(f"Titration Schritt {self.titration_current_step}: Dosiere {step_target_volume} ml (Titration gesamt: {total_target_titrated} ml)") def check_auto_dose_progress(self): """Check if target volume has been reached during auto dosing""" if not self.auto_dosing: return # Check if we've reached or exceeded the target volume if self.total_volume >= self.target_volume: # Stop dosing self.auto_dosing = False self.auto_dose_timer.stop() self.send_pump_command(0) # Turn off pump # If this was part of auto titration, schedule next step if self.auto_titration_active: # Wait for pH to stabilize, then continue with next step wait_time_ms = int(self.titration_wait_time * 1000) QTimer.singleShot(wait_time_ms, self.titration_step) self.status_bar.showMessage(f"Warte {self.titration_wait_time} s auf pH-Stabilisierung...") else: # Re-enable manual controls for regular auto dosing self.pump_on_btn.setEnabled(True) self.pump_off_btn.setEnabled(True) self.auto_dose_btn.setText("Automatisch dosieren") # Show completion message actual_volume = self.total_volume difference = actual_volume - self.target_volume message = f"Dosierung abgeschlossen!\n" message += f"Zielvolumen: {self.target_volume:.2f} ml\n" message += f"Tatsächliches Volumen: {actual_volume:.2f} ml\n" message += f"Abweichung: {difference:+.2f} ml" QMessageBox.information(self, "Automatische Dosierung", message) self.status_bar.showMessage(f"Dosierung abgeschlossen: {actual_volume:.2f} ml von {self.target_volume:.2f} ml") else: # Update progress in status bar progress = (self.total_volume / self.target_volume) * 100 if self.auto_titration_active: current_titrated_volume = self.total_volume - self.titration_start_volume target_titrated_volume = self.titration_current_step * self.titration_step_size self.status_bar.showMessage(f"Titration Schritt {self.titration_current_step}: Gesamt {current_titrated_volume:.2f}/{target_titrated_volume:.2f} ml titriert ({progress:.1f}%)") else: self.status_bar.showMessage(f"Dosierung: {self.total_volume:.2f}/{self.target_volume:.2f} ml ({progress:.1f}%)") def clear_titration_plot(self): """Clear the titration plot data""" try: # Clear the plot data self.titration_plot.clear_data() # Reset titration state variables self.titration_active = False self.titration_start_volume = 0.0 # Update button text if not in auto titration mode if not self.auto_titration_active: self.start_titration_btn.setText("Automatische Titration starten") self.status_bar.showMessage("Titrationsdaten gelöscht") except Exception as e: self.status_bar.showMessage(f"Fehler beim Löschen der Titrationsdaten: {str(e)}") QMessageBox.critical(self, "Fehler", f"Fehler beim Löschen der Titrationsdaten:\n{str(e)}") class PHPlotWidget(QWidget): def __init__(self): super().__init__() self.figure = Figure(figsize=(8, 4)) self.canvas = FigureCanvas(self.figure) self.ax = self.figure.add_subplot(111) # Data storage (keep last 300 points = 10 minutes at 2-second intervals) self.times = deque(maxlen=300) self.ph_values = deque(maxlen=300) self.start_time = None # Track when recording started # Setup plot self.ax.set_xlabel('Zeit') self.ax.set_ylabel('pH-Wert') self.ax.set_title('pH-Verlauf') self.ax.grid(True, alpha=0.3) self.ax.set_ylim(0, 14) # pH range 0-14 # Layout layout = QVBoxLayout() layout.addWidget(self.canvas) self.setLayout(layout) # Plot line self.line, = self.ax.plot([], [], 'b-', linewidth=2, label='pH-Wert') self.ax.legend() # Initial draw self.canvas.draw() def add_ph_value(self, ph_value): """Add new pH value with current timestamp""" current_time = datetime.datetime.now() # Set start time on first value if self.start_time is None: self.start_time = current_time # Calculate relative time in seconds relative_time = (current_time - self.start_time).total_seconds() self.times.append(relative_time) self.ph_values.append(ph_value) self.update_plot() def update_plot(self): """Update the plot with current data""" if len(self.times) > 0 and len(self.ph_values) > 0: # Update line data self.line.set_data(self.times, self.ph_values) # Auto-scale x-axis to show recent data if len(self.times) > 1: self.ax.set_xlim(min(self.times), max(self.times)) # Auto-scale y-axis around data with some margin if len(self.ph_values) > 0: min_ph = min(self.ph_values) max_ph = max(self.ph_values) margin = 0.5 self.ax.set_ylim(max(0, min_ph - margin), min(14, max_ph + margin)) # Format x-axis for time display (seconds/minutes/hours) self.format_time_axis() # Redraw self.canvas.draw() def format_time_axis(self): """Format the x-axis to show time in appropriate units""" if len(self.times) == 0: return max_time = max(self.times) if max_time < 120: # Less than 2 minutes - show seconds self.ax.set_xlabel('Zeit (Sekunden)') # No need to change tick formatting for seconds elif max_time < 7200: # Less than 2 hours - show minutes self.ax.set_xlabel('Zeit (Minuten)') # Convert tick labels to minutes import matplotlib.ticker as ticker def format_minutes(x, pos): return f'{x/60:.1f}' self.ax.xaxis.set_major_formatter(ticker.FuncFormatter(format_minutes)) else: # Show hours self.ax.set_xlabel('Zeit (Stunden)') # Convert tick labels to hours import matplotlib.ticker as ticker def format_hours(x, pos): return f'{x/3600:.1f}' self.ax.xaxis.set_major_formatter(ticker.FuncFormatter(format_hours)) def clear_data(self): """Clear all data points""" self.times.clear() self.ph_values.clear() self.start_time = None # Reset start time self.line.set_data([], []) self.ax.set_xlim(0, 1) self.ax.set_ylim(0, 14) self.ax.set_xlabel('Zeit') # Reset to default label # Reset tick formatter to default import matplotlib.ticker as ticker self.ax.xaxis.set_major_formatter(ticker.ScalarFormatter()) self.canvas.draw() class TitrationPlotWidget(QWidget): def __init__(self): super().__init__() self.figure = Figure(figsize=(8, 4)) self.canvas = FigureCanvas(self.figure) self.ax = self.figure.add_subplot(111) # Data storage self.volumes = [] self.ph_values = [] # Setup plot self.ax.set_xlabel('Dosierte Menge (ml)') self.ax.set_ylabel('pH-Wert') self.ax.set_title('Titrationskurve') self.ax.grid(True, alpha=0.3) self.ax.set_ylim(0, 14) # pH range 0-14 self.ax.set_xlim(0, 1) # Initial volume range # Layout layout = QVBoxLayout() layout.addWidget(self.canvas) self.setLayout(layout) # Plot line and points self.line, = self.ax.plot([], [], 'r-', linewidth=2, label='pH-Verlauf') self.points = self.ax.scatter([], [], c='red', s=30, zorder=5, label='Messpunkte') self.ax.legend() # Initial draw self.canvas.draw() def add_titration_point(self, volume, ph_value): """Add new titration point (volume vs pH)""" self.volumes.append(volume) self.ph_values.append(ph_value) self.update_plot() def update_plot(self): """Update the plot with current data""" if len(self.volumes) > 0 and len(self.ph_values) > 0: # Update line data self.line.set_data(self.volumes, self.ph_values) # Update scatter points self.points.set_offsets(list(zip(self.volumes, self.ph_values))) # Auto-scale x-axis if len(self.volumes) > 1: max_vol = max(self.volumes) self.ax.set_xlim(0, max_vol * 1.1) # Add 10% margin # Auto-scale y-axis around data with some margin if len(self.ph_values) > 0: min_ph = min(self.ph_values) max_ph = max(self.ph_values) margin = 0.5 self.ax.set_ylim(max(0, min_ph - margin), min(14, max_ph + margin)) # Redraw self.canvas.draw() def clear_data(self): """Clear all titration data points""" try: # Clear data lists self.volumes.clear() self.ph_values.clear() # Reset plot elements safely self.line.set_data([], []) # Clear scatter points properly - use empty 2D array self.points.set_offsets(np.empty((0, 2))) # Reset axis limits self.ax.set_xlim(0, 1) self.ax.set_ylim(0, 14) # Redraw canvas self.canvas.draw() except Exception as e: print(f"Fehler beim Löschen der Titrationsdaten: {str(e)}") def main(): app = QApplication(sys.argv) window = PHControllerGUI() window.show() sys.exit(app.exec()) if __name__ == "__main__": main()