arduino_ph_controller/gui/main.py
2025-07-10 23:25:42 +02:00

648 lines
27 KiB
Python

import sys
import serial
import serial.tools.list_ports
from PyQt6.QtWidgets import (QApplication, QMainWindow, QVBoxLayout, QHBoxLayout,
QWidget, QLabel, QPushButton, QComboBox, QDoubleSpinBox,
QGroupBox, QStatusBar, QMessageBox, QCheckBox)
from PyQt6.QtCore import QTimer, Qt
from PyQt6.QtGui import QFont
import matplotlib.pyplot as plt
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas
from matplotlib.figure import Figure
import datetime
from collections import deque
class PHControllerGUI(QMainWindow):
def __init__(self):
super().__init__()
self.serial_connection = None
self.updating_tube_combo = False # Flag to prevent recursive tube commands
self.setWindowTitle("Arduino pH Controller")
self.setGeometry(100, 100, 900, 800) # Larger window for plot
# Central Widget
central_widget = QWidget()
self.setCentralWidget(central_widget)
main_layout = QVBoxLayout(central_widget)
# Connection Group
connection_group = QGroupBox("Verbindung")
connection_layout = QHBoxLayout()
self.port_combo = QComboBox()
self.refresh_ports()
self.connect_btn = QPushButton("Verbinden")
self.connect_btn.clicked.connect(self.toggle_connection)
connection_layout.addWidget(QLabel("Port:"))
connection_layout.addWidget(self.port_combo, 1)
connection_layout.addWidget(self.connect_btn)
connection_group.setLayout(connection_layout)
# Status Group
status_group = QGroupBox("Status")
status_layout = QHBoxLayout()
self.ph_label = QLabel("pH: --")
self.ph_label.setFont(QFont('Arial', 24))
self.ph_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.pump_status = QLabel("Pumpe: Aus")
self.pump_status.setFont(QFont('Arial', 16))
self.pump_status.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.flow_rate_label = QLabel("ml/sec: --")
self.flow_rate_label.setFont(QFont('Arial', 14))
self.flow_rate_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.flow_rate_label.setVisible(False) # Initially hidden
self.total_volume_label = QLabel("Gesamt: 0.0 ml")
self.total_volume_label.setFont(QFont('Arial', 12))
self.total_volume_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.total_volume_label.setVisible(False) # Initially hidden
status_layout.addWidget(self.ph_label)
status_layout.addWidget(self.pump_status)
status_layout.addWidget(self.flow_rate_label)
status_layout.addWidget(self.total_volume_label)
status_group.setLayout(status_layout)
# Control Group
control_group = QGroupBox("Steuerung")
control_layout = QVBoxLayout()
# Pump Control
pump_control_layout = QHBoxLayout()
self.pump_on_btn = QPushButton("Pumpe Ein")
self.pump_on_btn.clicked.connect(lambda: self.send_pump_command(1))
self.pump_off_btn = QPushButton("Pumpe Aus")
self.pump_off_btn.clicked.connect(lambda: self.send_pump_command(0))
# Volume reset option
self.reset_volume_checkbox = QCheckBox("Volumen bei Start zurücksetzen")
self.reset_volume_checkbox.setChecked(True) # Default: reset volume
pump_control_layout.addWidget(self.pump_on_btn)
pump_control_layout.addWidget(self.pump_off_btn)
pump_control_layout.addWidget(self.reset_volume_checkbox)
# Tube Selection
tube_layout = QHBoxLayout()
self.tube_combo = QComboBox()
self.tube_combo.addItems([
"0.13 mm", "0.19 mm", "0.25 mm", "0.38 mm", "0.44 mm",
"0.51 mm", "0.57 mm", "0.64 mm", "0.76 mm", "0.89 mm",
"0.95 mm", "1.02 mm", "1.09 mm", "1.14 mm", "1.22 mm",
"1.30 mm", "1.42 mm", "1.52 mm", "1.65 mm", "1.75 mm",
"1.85 mm", "2.06 mm", "2.29 mm", "2.54 mm", "2.79 mm", "3.17 mm"
])
self.tube_combo.currentIndexChanged.connect(self.send_tube_command)
tube_layout.addWidget(QLabel("Schlauch:"))
tube_layout.addWidget(self.tube_combo, 1)
# Calibration
calibration_layout = QHBoxLayout()
self.cal_ph4_btn = QPushButton("Kalibrieren pH4")
self.cal_ph7_btn = QPushButton("Kalibrieren pH7")
self.debug_btn = QPushButton("Debug Info")
self.cal_ph4_btn.clicked.connect(lambda: self.send_calibration_command(4))
self.cal_ph7_btn.clicked.connect(lambda: self.send_calibration_command(7))
self.debug_btn.clicked.connect(self.send_debug_command)
calibration_layout.addWidget(self.cal_ph4_btn)
calibration_layout.addWidget(self.cal_ph7_btn)
calibration_layout.addWidget(self.debug_btn)
# Auto Mode
auto_mode_layout = QHBoxLayout()
self.volume_spin = QDoubleSpinBox()
self.volume_spin.setRange(0.1, 1000)
self.volume_spin.setValue(10)
self.volume_spin.setSuffix(" ml")
self.auto_dose_btn = QPushButton("Automatisch dosieren")
self.auto_dose_btn.clicked.connect(self.start_auto_dose)
auto_mode_layout.addWidget(QLabel("Menge:"))
auto_mode_layout.addWidget(self.volume_spin)
auto_mode_layout.addWidget(self.auto_dose_btn)
# Add to control layout
control_layout.addLayout(pump_control_layout)
control_layout.addLayout(tube_layout)
control_layout.addLayout(calibration_layout)
control_layout.addLayout(auto_mode_layout)
control_group.setLayout(control_layout)
# Add groups to main layout
main_layout.addWidget(connection_group)
main_layout.addWidget(status_group)
main_layout.addWidget(control_group)
# pH Plot Group
plot_group = QGroupBox("pH-Verlauf")
plot_layout = QVBoxLayout()
# Plot controls
plot_controls_layout = QHBoxLayout()
self.clear_plot_btn = QPushButton("Diagramm löschen")
self.clear_plot_btn.clicked.connect(self.clear_ph_plot)
self.plot_enabled_checkbox = QCheckBox("pH-Aufzeichnung")
self.plot_enabled_checkbox.setChecked(True)
plot_controls_layout.addWidget(self.plot_enabled_checkbox)
plot_controls_layout.addWidget(self.clear_plot_btn)
plot_controls_layout.addStretch()
# pH Plot Widget
self.ph_plot = PHPlotWidget()
plot_layout.addLayout(plot_controls_layout)
plot_layout.addWidget(self.ph_plot)
plot_group.setLayout(plot_layout)
main_layout.addWidget(plot_group)
# Status Bar
self.status_bar = QStatusBar()
self.setStatusBar(self.status_bar)
self.status_bar.showMessage("Bereit")
# Timer for reading data
self.read_timer = QTimer()
self.read_timer.timeout.connect(self.read_serial_data)
self.read_timer.start(100) # Read incoming data every 100ms
# Timer for pH value requests
self.ph_request_timer = QTimer()
self.ph_request_timer.timeout.connect(self.request_ph_value)
# Timer for tube ID requests (every 5 seconds)
self.tube_request_timer = QTimer()
self.tube_request_timer.timeout.connect(self.request_tube_id)
# Timer for flow rate requests (when pump is running)
self.flow_rate_timer = QTimer()
self.flow_rate_timer.timeout.connect(self.request_flow_rate)
# Track pump state
self.pump_is_running = False
# Volume tracking
self.total_volume = 0.0 # Total volume pumped in ml
self.last_flow_rate = 0.0 # Last known flow rate
# Auto dosing
self.auto_dosing = False
self.target_volume = 0.0
self.auto_dose_timer = QTimer()
self.auto_dose_timer.timeout.connect(self.check_auto_dose_progress)
# Enable/disable controls based on connection
self.set_controls_enabled(False)
def refresh_ports(self):
self.port_combo.clear()
ports = serial.tools.list_ports.comports()
for port in ports:
self.port_combo.addItem(port.device)
def toggle_connection(self):
if self.serial_connection and self.serial_connection.is_open:
self.close_connection()
else:
self.open_connection()
def open_connection(self):
port = self.port_combo.currentText()
if not port:
QMessageBox.critical(self, "Fehler", "Kein Port ausgewählt!")
return
try:
self.serial_connection = serial.Serial(port, 9600, timeout=1)
self.connect_btn.setText("Trennen")
self.status_bar.showMessage(f"Verbunden mit {port}")
self.set_controls_enabled(True)
# Start pH value polling every 2 seconds
self.ph_request_timer.start(2000)
# Start tube ID polling every 5 seconds
self.tube_request_timer.start(5000)
# Request initial data after a short delay to ensure Arduino is ready
QTimer.singleShot(500, self.request_initial_data)
except Exception as e:
QMessageBox.critical(self, "Fehler", f"Verbindungsfehler: {str(e)}")
def close_connection(self):
# Turn off pump before closing connection for safety
if self.serial_connection and self.serial_connection.is_open:
try:
self.send_command("<12>") # Pump off for safety
# Give Arduino time to process the command
import time
time.sleep(0.1)
except:
pass # Ignore errors during shutdown
if self.serial_connection and self.serial_connection.is_open:
self.serial_connection.close()
self.serial_connection = None
# Stop pH polling
self.ph_request_timer.stop()
# Stop tube ID polling
self.tube_request_timer.stop()
# Stop flow rate polling
self.flow_rate_timer.stop()
# Stop auto dose timer
self.auto_dose_timer.stop()
self.auto_dosing = False
self.connect_btn.setText("Verbinden")
self.status_bar.showMessage("Getrennt")
self.set_controls_enabled(False)
self.ph_label.setText("pH: --")
self.pump_status.setText("Pumpe: --")
self.flow_rate_label.setVisible(False)
self.total_volume_label.setVisible(False)
self.pump_is_running = False
self.total_volume = 0.0
self.last_flow_rate = 0.0
self.auto_dosing = False
self.auto_dose_timer.stop()
def set_controls_enabled(self, enabled):
self.pump_on_btn.setEnabled(enabled)
self.pump_off_btn.setEnabled(enabled)
self.tube_combo.setEnabled(enabled)
self.cal_ph4_btn.setEnabled(enabled)
self.cal_ph7_btn.setEnabled(enabled)
self.debug_btn.setEnabled(enabled)
self.auto_dose_btn.setEnabled(enabled)
self.volume_spin.setEnabled(enabled)
self.reset_volume_checkbox.setEnabled(enabled)
self.clear_plot_btn.setEnabled(enabled)
self.plot_enabled_checkbox.setEnabled(enabled)
def send_command(self, command):
print(command)
if self.serial_connection and self.serial_connection.is_open:
try:
self.serial_connection.write(command.encode('utf-8'))
except Exception as e:
self.status_bar.showMessage(f"Fehler beim Senden: {str(e)}")
def request_initial_data(self):
"""Request initial data from Arduino after connection"""
if self.serial_connection and self.serial_connection.is_open:
# self.send_command("<14>") # Request current pH value
self.send_command("<13>") # Request current tube size
self.status_bar.showMessage("Initiale Daten angefragt...")
def request_ph_value(self):
"""Request pH value from Arduino every 2 seconds"""
if self.serial_connection and self.serial_connection.is_open:
self.send_command("<14>") # Request current pH value
def request_tube_id(self):
"""Request tube ID from Arduino every 5 seconds"""
if self.serial_connection and self.serial_connection.is_open:
self.send_command("<13>") # Request current tube size
def request_flow_rate(self):
"""Request flow rate from Arduino when pump is running"""
if self.serial_connection and self.serial_connection.is_open and self.pump_is_running:
self.send_command("<18>") # Request current flow rate
def update_total_volume(self, flow_rate):
"""Update the total volume based on current flow rate"""
if self.pump_is_running:
# Add volume from the last second (flow_rate is ml/sec)
self.total_volume += flow_rate * 1.0 # 1 second interval
self.total_volume_label.setText(f"Gesamt: {self.total_volume:.2f} ml")
self.last_flow_rate = flow_rate
def send_pump_command(self, state):
if state == 1:
self.send_command("<11>") # Pump on
self.pump_status.setText("Pumpe: Ein")
self.pump_is_running = True
self.flow_rate_label.setVisible(True)
self.total_volume_label.setVisible(True)
# Check if volume should be reset or continue accumulating
if self.reset_volume_checkbox.isChecked() or self.auto_dosing:
# Reset volume counter (always reset for auto dosing)
self.total_volume = 0.0
self.total_volume_label.setText("Gesamt: 0.0 ml")
else:
# Continue with existing volume - just update display
self.total_volume_label.setText(f"Gesamt: {self.total_volume:.2f} ml")
self.flow_rate_timer.start(1000) # Request flow rate every second
else:
self.send_command("<12>") # Pump off
self.pump_status.setText("Pumpe: Aus")
self.pump_is_running = False
self.flow_rate_label.setVisible(False)
# Keep total_volume_label visible to show final amount
self.flow_rate_timer.stop()
# Stop auto dosing if it was running
if self.auto_dosing:
self.auto_dosing = False
self.auto_dose_timer.stop()
self.auto_dose_btn.setText("Automatisch dosieren")
self.auto_dose_btn.setEnabled(True)
QMessageBox.information(self, "Automatische Dosierung",
f"Dosierung abgeschlossen!\nGepumpte Menge: {self.total_volume:.2f} ml")
def send_tube_command(self, index):
# Only send command if not updating from Arduino response
if not self.updating_tube_combo:
# Send command in format <17{tubid}> for tube selection
self.send_command(f"<17{index}>")
self.status_bar.showMessage(f"Schlauchgröße {self.tube_combo.itemText(index)} ausgewählt")
def send_calibration_command(self, ph):
if ph == 4:
self.send_command("<15>") # Start pH4 calibration
QMessageBox.information(self, "Kalibrierung",
"pH4 Kalibrierung gestartet. Sensor in pH4.01-Lösung tauchen und warten bis der Wert stabil ist.")
else:
self.send_command("<16>") # Start pH7 calibration
QMessageBox.information(self, "Kalibrierung",
"pH7 Kalibrierung gestartet. Sensor in pH6.86-Lösung tauchen und warten bis der Wert stabil ist.")
def send_debug_command(self):
"""Send debug command to get calibration values"""
self.send_command("<19>") # Request debug info
self.status_bar.showMessage("Debug-Informationen angefragt...")
def start_auto_dose(self):
if self.auto_dosing:
# Stop auto dosing
self.auto_dosing = False
self.auto_dose_timer.stop()
self.send_pump_command(0) # Turn off pump
self.auto_dose_btn.setText("Automatisch dosieren")
# Re-enable manual pump controls
self.pump_on_btn.setEnabled(True)
self.pump_off_btn.setEnabled(True)
return
volume = self.volume_spin.value()
self.target_volume = volume
self.auto_dosing = True
# Update button text and disable manual pump controls during auto dosing
self.auto_dose_btn.setText("Stop Dosierung")
self.pump_on_btn.setEnabled(False)
self.pump_off_btn.setEnabled(False)
# Reset volume counter for accurate measurement
self.total_volume = 0.0
# Start pumping
self.send_pump_command(1)
# Start monitoring timer (check every 0.5 seconds for better precision)
self.auto_dose_timer.start(500)
self.status_bar.showMessage(f"Automatische Dosierung läuft... Ziel: {volume} ml")
def check_auto_dose_progress(self):
"""Check if target volume has been reached during auto dosing"""
if not self.auto_dosing:
return
# Check if we've reached or exceeded the target volume
if self.total_volume >= self.target_volume:
# Stop dosing
self.auto_dosing = False
self.auto_dose_timer.stop()
self.send_pump_command(0) # Turn off pump
# Re-enable manual controls
self.pump_on_btn.setEnabled(True)
self.pump_off_btn.setEnabled(True)
self.auto_dose_btn.setText("Automatisch dosieren")
# Show completion message
actual_volume = self.total_volume
difference = actual_volume - self.target_volume
message = f"Dosierung abgeschlossen!\n"
message += f"Zielvolumen: {self.target_volume:.2f} ml\n"
message += f"Tatsächliches Volumen: {actual_volume:.2f} ml\n"
message += f"Abweichung: {difference:+.2f} ml"
QMessageBox.information(self, "Automatische Dosierung", message)
self.status_bar.showMessage(f"Dosierung abgeschlossen: {actual_volume:.2f} ml von {self.target_volume:.2f} ml")
else:
# Update progress in status bar
progress = (self.total_volume / self.target_volume) * 100
self.status_bar.showMessage(f"Dosierung: {self.total_volume:.2f}/{self.target_volume:.2f} ml ({progress:.1f}%)")
def read_serial_data(self):
if not self.serial_connection or not self.serial_connection.is_open:
return
try:
while self.serial_connection.in_waiting:
line = self.serial_connection.readline().decode('utf-8').strip()
print(line)
if line.startswith('<') and line.endswith('>'):
data = line[1:-1]
if data.startswith('#'): # pH value
try:
ph_value = float(data[1:])
self.ph_label.setText(f"pH: {ph_value:.2f}")
# Add to plot if recording is enabled
if self.plot_enabled_checkbox.isChecked():
self.ph_plot.add_ph_value(ph_value)
except ValueError:
pass
elif data.startswith('P'): # Pump status
pump_state = data[1:]
self.pump_status.setText(f"Pumpe: {'Ein' if pump_state == '1' else 'Aus'}")
elif data.startswith('T'): # Tube index
try:
tube_index = int(data[1:])
# Ensure tube index is within valid range (0-25)
if 0 <= tube_index <= 25:
self.updating_tube_combo = True # Prevent recursive commands
self.tube_combo.setCurrentIndex(tube_index)
self.updating_tube_combo = False
self.status_bar.showMessage(f"Schlauchgröße {self.tube_combo.currentText()} geladen")
except ValueError:
pass
elif data.startswith('F'): # Flow rate (ml/sec)
try:
flow_rate = float(data[1:])
self.flow_rate_label.setText(f"ml/sec: {flow_rate:.4f}")
# Update total volume
self.update_total_volume(flow_rate)
except ValueError:
pass
elif data.startswith('DEBUG'): # Debug information
# Show debug info in a message box
QMessageBox.information(self, "Debug Information",
f"Kalibrierungsdaten:\n{data}")
elif data.startswith('CAL'): # Calibration confirmation with details
# Show calibration info in status bar and message box
self.status_bar.showMessage(f"Kalibrierung abgeschlossen: {data}")
QMessageBox.information(self, "Kalibrierung",
f"Kalibrierung erfolgreich:\n{data}")
except Exception as e:
self.status_bar.showMessage(f"Lesefehler: {str(e)}")
def closeEvent(self, event):
# Safety: Turn off pump before closing
if self.serial_connection and self.serial_connection.is_open:
try:
self.send_command("<12>") # Ensure pump is off
# Give Arduino time to process the command
import time
time.sleep(0.1)
except:
pass # Ignore errors during shutdown
# Stop all timers
self.read_timer.stop()
self.ph_request_timer.stop()
self.tube_request_timer.stop()
self.flow_rate_timer.stop()
self.auto_dose_timer.stop()
self.close_connection()
event.accept()
def clear_ph_plot(self):
"""Clear the pH plot data"""
self.ph_plot.clear_data()
self.status_bar.showMessage("pH-Diagramm gelöscht")
class PHPlotWidget(QWidget):
def __init__(self):
super().__init__()
self.figure = Figure(figsize=(8, 4))
self.canvas = FigureCanvas(self.figure)
self.ax = self.figure.add_subplot(111)
# Data storage (keep last 300 points = 10 minutes at 2-second intervals)
self.times = deque(maxlen=300)
self.ph_values = deque(maxlen=300)
self.start_time = None # Track when recording started
# Setup plot
self.ax.set_xlabel('Zeit')
self.ax.set_ylabel('pH-Wert')
self.ax.set_title('pH-Verlauf')
self.ax.grid(True, alpha=0.3)
self.ax.set_ylim(0, 14) # pH range 0-14
# Layout
layout = QVBoxLayout()
layout.addWidget(self.canvas)
self.setLayout(layout)
# Plot line
self.line, = self.ax.plot([], [], 'b-', linewidth=2, label='pH-Wert')
self.ax.legend()
# Initial draw
self.canvas.draw()
def add_ph_value(self, ph_value):
"""Add new pH value with current timestamp"""
current_time = datetime.datetime.now()
# Set start time on first value
if self.start_time is None:
self.start_time = current_time
# Calculate relative time in seconds
relative_time = (current_time - self.start_time).total_seconds()
self.times.append(relative_time)
self.ph_values.append(ph_value)
self.update_plot()
def update_plot(self):
"""Update the plot with current data"""
if len(self.times) > 0 and len(self.ph_values) > 0:
# Update line data
self.line.set_data(self.times, self.ph_values)
# Auto-scale x-axis to show recent data
if len(self.times) > 1:
self.ax.set_xlim(min(self.times), max(self.times))
# Auto-scale y-axis around data with some margin
if len(self.ph_values) > 0:
min_ph = min(self.ph_values)
max_ph = max(self.ph_values)
margin = 0.5
self.ax.set_ylim(max(0, min_ph - margin), min(14, max_ph + margin))
# Format x-axis for time display (seconds/minutes/hours)
self.format_time_axis()
# Redraw
self.canvas.draw()
def format_time_axis(self):
"""Format the x-axis to show time in appropriate units"""
if len(self.times) == 0:
return
max_time = max(self.times)
if max_time < 120: # Less than 2 minutes - show seconds
self.ax.set_xlabel('Zeit (Sekunden)')
# No need to change tick formatting for seconds
elif max_time < 7200: # Less than 2 hours - show minutes
self.ax.set_xlabel('Zeit (Minuten)')
# Convert tick labels to minutes
import matplotlib.ticker as ticker
def format_minutes(x, pos):
return f'{x/60:.1f}'
self.ax.xaxis.set_major_formatter(ticker.FuncFormatter(format_minutes))
else: # Show hours
self.ax.set_xlabel('Zeit (Stunden)')
# Convert tick labels to hours
import matplotlib.ticker as ticker
def format_hours(x, pos):
return f'{x/3600:.1f}'
self.ax.xaxis.set_major_formatter(ticker.FuncFormatter(format_hours))
def clear_data(self):
"""Clear all data points"""
self.times.clear()
self.ph_values.clear()
self.start_time = None # Reset start time
self.line.set_data([], [])
self.ax.set_xlim(0, 1)
self.ax.set_ylim(0, 14)
self.ax.set_xlabel('Zeit') # Reset to default label
# Reset tick formatter to default
import matplotlib.ticker as ticker
self.ax.xaxis.set_major_formatter(ticker.ScalarFormatter())
self.canvas.draw()
def main():
app = QApplication(sys.argv)
window = PHControllerGUI()
window.show()
sys.exit(app.exec())
if __name__ == "__main__":
main()