1534 lines
49 KiB
Python
1534 lines
49 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import serial
|
|
import time
|
|
import glob
|
|
import sys
|
|
from datetime import datetime
|
|
import time
|
|
import os
|
|
# This gets the Qt stuff
|
|
|
|
from PyQt5 import QtCore, QtGui, QtWidgets
|
|
from PyQt5.QtWidgets import QMainWindow, QApplication, QFileDialog
|
|
|
|
#import cv2
|
|
#from PyQt5 import cv2
|
|
# note, had to use version 3.2.0.8 otherwise it had its own
|
|
# pyqt packages that conflicted with mine
|
|
|
|
import numpy as np
|
|
from decimal import Decimal
|
|
# This is our window from QtCreator
|
|
import gui2
|
|
import pdb
|
|
import traceback, sys
|
|
|
|
# ##############################
|
|
# MULTITHREADING : SIGNALS CLASS
|
|
# ##############################
|
|
class WorkerSignals(QtCore.QObject):
|
|
'''
|
|
Defines the signals available from a running worker thread.
|
|
|
|
Supported signals are:
|
|
|
|
finished
|
|
No data
|
|
|
|
error
|
|
`tuple` (exctype, value, traceback.format_exc() )
|
|
|
|
result
|
|
`object` data returned from processing, anything
|
|
|
|
'''
|
|
finished = QtCore.pyqtSignal()
|
|
error = QtCore.pyqtSignal(tuple)
|
|
result = QtCore.pyqtSignal(object)
|
|
progress = QtCore.pyqtSignal(int)
|
|
datasignal = QtCore.pyqtSignal(int,float)
|
|
|
|
# #############################
|
|
# MULTITHREADING : WORKER CLASS
|
|
# #############################
|
|
|
|
|
|
class Thread(QtCore.QRunnable):
|
|
def __init__(self, fn, *args, **kwargs):
|
|
#parent = None
|
|
super().__init__()
|
|
self.runs = True
|
|
self.fn = fn
|
|
self.args = args
|
|
self.kwargs = kwargs
|
|
self.signals = WorkerSignals()
|
|
|
|
def run(self):
|
|
try:
|
|
#self.serial.flushInput()
|
|
#self.serial.flushOutput()
|
|
result = self.fn(*self.args, **self.kwargs)
|
|
except:
|
|
traceback.print_exc()
|
|
exctype, value = sys.exc_info()[:2]
|
|
self.signals.error.emit((exctype, value, traceback.format_exc()))
|
|
else:
|
|
self.signals.result.emit(result) # Return the result of the processing
|
|
finally:
|
|
self.signals.finished.emit() # Done
|
|
self.stop()
|
|
|
|
#print("Job completed")
|
|
|
|
def stop(self):
|
|
self.runs = False
|
|
|
|
|
|
|
|
|
|
# #####################################
|
|
# ERROR HANDLING : CANNOT CONNECT CLASS
|
|
# #####################################
|
|
class CannotConnectException(Exception):
|
|
pass
|
|
|
|
# #######################
|
|
# GUI : MAIN WINDOW CLASS
|
|
# #######################
|
|
class MainWindow(QtWidgets.QMainWindow, gui2.Ui_MainWindow):
|
|
|
|
# =======================================================
|
|
# INITIALIZING : The UI and setting some needed variables
|
|
# =======================================================
|
|
def __init__(self):
|
|
|
|
# Setting the UI to a class variable and connecting all GUI Components
|
|
super(MainWindow, self).__init__()
|
|
self.ui = gui2.Ui_MainWindow()
|
|
self.ui.setupUi(self)
|
|
|
|
#====================================== Jfs Display try
|
|
# aktueller Content in the syringe
|
|
self.p1_left = 0
|
|
self.p2_left = 0
|
|
self.p3_left = 0
|
|
self.globports =[]
|
|
#======================================================
|
|
|
|
# Put comments here
|
|
self.populate_microstepping()
|
|
self.populate_syringe_sizes()
|
|
self.populate_pump_jog_delta()
|
|
self.populate_pump_units()
|
|
self.setting_variables()
|
|
if self.globports == []:
|
|
self.populate_ports()
|
|
self.set_port()
|
|
|
|
|
|
|
|
self.connect_all_gui_components()
|
|
self.grey_out_components()
|
|
|
|
# Declaring start, mid, and end marker for sending code to Arduino
|
|
self.startMarker = 60# <
|
|
self.endMarker = 62 # ,F,0.0>
|
|
self.midMarker = 44 # ,
|
|
|
|
# Initializing multithreading to allow parallel operations
|
|
self.threadpool = QtCore.QThreadPool()
|
|
print("Multithreading with maximum %d threads" % self.threadpool.maxThreadCount())
|
|
|
|
|
|
# Camera setup
|
|
self.timer = QtCore.QTimer()
|
|
self.timer.setInterval(1000)
|
|
self.timer.timeout.connect(self.recurring_timer)
|
|
self.timer.start()
|
|
self.counter = 0
|
|
|
|
# Random other things I need
|
|
self.image = None
|
|
#self.microstepping = 1
|
|
#print(self.microstepping)
|
|
|
|
self.threadpool = QtCore.QThreadPool()
|
|
|
|
|
|
def recurring_timer(self):
|
|
self.counter +=1
|
|
|
|
# =============================
|
|
# SETTING : important variables
|
|
# =============================
|
|
def setting_variables(self):
|
|
|
|
self.set_p1_syringe()
|
|
self.set_p2_syringe()
|
|
self.set_p3_syringe()
|
|
|
|
|
|
|
|
#self.set_p1_units()
|
|
#self.set_p2_units()
|
|
#self.set_p3_units()
|
|
|
|
self.is_p1_active = False
|
|
self.is_p2_active = False
|
|
self.is_p3_active = False
|
|
|
|
self.experiment_notes = ""
|
|
|
|
def thread_finished(self, th):
|
|
print("Your thread has completed. Now terminating..")
|
|
th.stop()
|
|
print("Thread has been terminated.")
|
|
print("=============================\n\n")
|
|
# here is where you need to end the thread
|
|
|
|
|
|
# ===================================
|
|
# CONNECTING : all the GUI Components
|
|
# ===================================
|
|
def connect_all_gui_components(self):
|
|
|
|
# ~~~~~~~~~~~~~~~
|
|
# MAIN : MENU BAR
|
|
# ~~~~~~~~~~~~~~~
|
|
self.ui.load_settings_BTN.triggered.connect(self.load_settings)
|
|
self.ui.save_settings_BTN.triggered.connect(self.save_settings)
|
|
|
|
# ~~~~~~~~~~~~~~~~
|
|
# TAB : Controller
|
|
# ~~~~~~~~~~~~~~~~
|
|
|
|
# Px active checkboxes
|
|
self.ui.p1_activate_CHECKBOX.stateChanged.connect(self.toggle_p1_activation)
|
|
self.ui.p2_activate_CHECKBOX.stateChanged.connect(self.toggle_p2_activation)
|
|
self.ui.p3_activate_CHECKBOX.stateChanged.connect(self.toggle_p3_activation)
|
|
|
|
# Px display (TODO)
|
|
|
|
# Px syringe display
|
|
self.ui.p1_syringe_DROPDOWN.currentIndexChanged.connect(self.display_p1_syringe)
|
|
self.ui.p2_syringe_DROPDOWN.currentIndexChanged.connect(self.display_p2_syringe)
|
|
self.ui.p3_syringe_DROPDOWN.currentIndexChanged.connect(self.display_p3_syringe)
|
|
|
|
|
|
# Px speed display
|
|
self.ui.p1_units_DROPDOWN.currentIndexChanged.connect(self.display_p1_speed)
|
|
self.ui.p2_units_DROPDOWN.currentIndexChanged.connect(self.display_p2_speed)
|
|
self.ui.p3_units_DROPDOWN.currentIndexChanged.connect(self.display_p3_speed)
|
|
|
|
|
|
|
|
#self.populate_pump_units()
|
|
|
|
# Px amount
|
|
self.ui.p1_amount_INPUT.valueChanged.connect(self.set_p1_amount)
|
|
self.ui.p2_amount_INPUT.valueChanged.connect(self.set_p2_amount)
|
|
self.ui.p3_amount_INPUT.valueChanged.connect(self.set_p3_amount)
|
|
|
|
# Px jog delta
|
|
#self.ui.p1_jog_delta_INPUT.valueChanged.connect(self.set_p1_jog_delta)
|
|
#self.ui.p2_jog_delta_INPUT.valueChanged.connect(self.set_p2_jog_delta)
|
|
#self.ui.p3_jog_delta_INPUT.valueChanged.connect(self.set_p3_jog_delta)
|
|
|
|
# Action buttons
|
|
self.ui.run_BTN.clicked.connect(self.run)
|
|
|
|
|
|
self.ui.pause_BTN.clicked.connect(self.pause)
|
|
|
|
|
|
self.ui.zero_BTN.clicked.connect(self.zero)
|
|
self.ui.stop_BTN.clicked.connect(self.stop)
|
|
|
|
|
|
self.ui.jog_plus_BTN.clicked.connect(lambda:self.jog(self.ui.jog_plus_BTN))
|
|
self.ui.jog_minus_BTN.clicked.connect(lambda:self.jog(self.ui.jog_minus_BTN))
|
|
|
|
# Set coordinate system
|
|
self.ui.absolute_RADIO.toggled.connect(lambda:self.set_coordinate(self.ui.absolute_RADIO))
|
|
self.ui.incremental_RADIO.toggled.connect(lambda:self.set_coordinate(self.ui.incremental_RADIO))
|
|
|
|
# ~~~~~~~~~~~~
|
|
# TAB : Camera
|
|
# ~~~~~~~~~~~~
|
|
|
|
# Setting camera action buttons
|
|
self.ui.camera_connect_BTN.clicked.connect(self.start_camera)
|
|
self.ui.camera_disconnect_BTN.clicked.connect(self.stop_camera)
|
|
self.ui.camera_capture_image_BTN.clicked.connect(self.save_image)
|
|
|
|
# ~~~~~~~~~~~
|
|
# TAB : Setup
|
|
# ~~~~~~~~~~~
|
|
|
|
# Port, first populate it then connect it (population done earlier)
|
|
self.ui.refresh_ports_BTN.clicked.connect(self.refresh_ports)
|
|
self.ui.port_DROPDOWN.currentIndexChanged.connect(self.set_port)
|
|
|
|
self.ui.experiment_notes.editingFinished.connect(self.set_experiment_notes)
|
|
|
|
# Set the microstepping value, default is 1
|
|
self.ui.microstepping_DROPDOWN.currentIndexChanged.connect(self.set_microstepping)
|
|
|
|
# Set the log file name
|
|
self.date_string = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
self.date_string = self.date_string.replace(":","_") # Replace semicolons with underscores
|
|
|
|
# Px syringe size, populate then connect (population done earlier)
|
|
self.ui.p1_syringe_DROPDOWN.currentIndexChanged.connect(self.set_p1_syringe)
|
|
self.ui.p2_syringe_DROPDOWN.currentIndexChanged.connect(self.set_p2_syringe)
|
|
self.ui.p3_syringe_DROPDOWN.currentIndexChanged.connect(self.set_p3_syringe)
|
|
# warning to send the info to the controller
|
|
self.ui.p1_syringe_DROPDOWN.currentIndexChanged.connect(self.send_p1_warning)
|
|
self.ui.p2_syringe_DROPDOWN.currentIndexChanged.connect(self.send_p2_warning)
|
|
self.ui.p3_syringe_DROPDOWN.currentIndexChanged.connect(self.send_p3_warning)
|
|
|
|
# Px units
|
|
self.ui.p1_units_DROPDOWN.currentIndexChanged.connect(self.set_p1_units)
|
|
self.ui.p2_units_DROPDOWN.currentIndexChanged.connect(self.set_p2_units)
|
|
self.ui.p3_units_DROPDOWN.currentIndexChanged.connect(self.set_p3_units)
|
|
# warning to send the info to the controller
|
|
self.ui.p1_units_DROPDOWN.currentIndexChanged.connect(self.send_p1_warning)
|
|
self.ui.p2_units_DROPDOWN.currentIndexChanged.connect(self.send_p2_warning)
|
|
self.ui.p3_units_DROPDOWN.currentIndexChanged.connect(self.send_p3_warning)
|
|
|
|
# Px speed
|
|
self.ui.p1_speed_INPUT.valueChanged.connect(self.set_p1_speed)
|
|
self.ui.p2_speed_INPUT.valueChanged.connect(self.set_p2_speed)
|
|
self.ui.p3_speed_INPUT.valueChanged.connect(self.set_p3_speed)
|
|
# warning to send the info to the controller
|
|
self.ui.p1_speed_INPUT.valueChanged.connect(self.send_p1_warning)
|
|
self.ui.p2_speed_INPUT.valueChanged.connect(self.send_p2_warning)
|
|
self.ui.p3_speed_INPUT.valueChanged.connect(self.send_p3_warning)
|
|
|
|
# Px accel
|
|
self.ui.p1_accel_INPUT.valueChanged.connect(self.set_p1_accel)
|
|
self.ui.p2_accel_INPUT.valueChanged.connect(self.set_p2_accel)
|
|
self.ui.p3_accel_INPUT.valueChanged.connect(self.set_p3_accel)
|
|
# warning to send the info to the controller
|
|
self.ui.p1_accel_INPUT.valueChanged.connect(self.send_p1_warning)
|
|
self.ui.p2_accel_INPUT.valueChanged.connect(self.send_p2_warning)
|
|
self.ui.p3_accel_INPUT.valueChanged.connect(self.send_p3_warning)
|
|
|
|
# Px jog delta (setup)
|
|
self.ui.p1_setup_jog_delta_INPUT.currentIndexChanged.connect(self.set_p1_setup_jog_delta)
|
|
self.ui.p2_setup_jog_delta_INPUT.currentIndexChanged.connect(self.set_p2_setup_jog_delta)
|
|
self.ui.p3_setup_jog_delta_INPUT.currentIndexChanged.connect(self.set_p3_setup_jog_delta)
|
|
# warning to send the info to the contorller
|
|
self.ui.p1_setup_jog_delta_INPUT.currentIndexChanged.connect(self.send_p1_warning)
|
|
self.ui.p2_setup_jog_delta_INPUT.currentIndexChanged.connect(self.send_p2_warning)
|
|
self.ui.p3_setup_jog_delta_INPUT.currentIndexChanged.connect(self.send_p3_warning)
|
|
|
|
|
|
# Px send settings
|
|
self.ui.p1_setup_send_BTN.clicked.connect(self.send_p1_settings)
|
|
self.ui.p2_setup_send_BTN.clicked.connect(self.send_p2_settings)
|
|
self.ui.p3_setup_send_BTN.clicked.connect(self.send_p3_settings)
|
|
# remove warning to send settings
|
|
self.ui.p1_setup_send_BTN.clicked.connect(self.send_p1_success)
|
|
self.ui.p2_setup_send_BTN.clicked.connect(self.send_p2_success)
|
|
self.ui.p3_setup_send_BTN.clicked.connect(self.send_p3_success)
|
|
|
|
# Connect to arduino
|
|
self.ui.connect_BTN.clicked.connect(self.connect)
|
|
self.ui.disconnect_BTN.clicked.connect(self.disconnect)
|
|
|
|
# Send all the settings at once
|
|
self.ui.send_all_BTN.clicked.connect(self.send_all)
|
|
|
|
def send_p1_warning(self):
|
|
self.ui.p1_setup_send_BTN.setStyleSheet("background-color: green; color: black")
|
|
|
|
def send_p2_warning(self):
|
|
self.ui.p2_setup_send_BTN.setStyleSheet("background-color: green; color: black")
|
|
|
|
def send_p3_warning(self):
|
|
self.ui.p3_setup_send_BTN.setStyleSheet("background-color: green; color: black")
|
|
|
|
def send_p1_success(self):
|
|
self.ui.p1_setup_send_BTN.setStyleSheet("background-color: none")
|
|
|
|
def send_p2_success(self):
|
|
self.ui.p2_setup_send_BTN.setStyleSheet("background-color: none")
|
|
|
|
def send_p3_success(self):
|
|
self.ui.p3_setup_send_BTN.setStyleSheet("background-color: none")
|
|
|
|
def grey_out_components(self):
|
|
# ~~~~~~~~~~~~~~~~
|
|
# TAB : Controller
|
|
# ~~~~~~~~~~~~~~~~
|
|
self.ui.run_BTN.setEnabled(False)
|
|
self.ui.pause_BTN.setEnabled(False)
|
|
self.ui.zero_BTN.setEnabled(False)
|
|
self.ui.stop_BTN.setEnabled(False)
|
|
self.ui.jog_plus_BTN.setEnabled(False)
|
|
self.ui.jog_minus_BTN.setEnabled(False)
|
|
|
|
# ~~~~~~~~~~~~~~~~
|
|
# TAB : Setup
|
|
# ~~~~~~~~~~~~~~~~
|
|
self.ui.p1_setup_send_BTN.setEnabled(False)
|
|
self.ui.p2_setup_send_BTN.setEnabled(False)
|
|
self.ui.p3_setup_send_BTN.setEnabled(False)
|
|
self.ui.send_all_BTN.setEnabled(False)
|
|
|
|
def ungrey_out_components(self):
|
|
# ~~~~~~~~~~~~~~~~
|
|
# TAB : Controller
|
|
# ~~~~~~~~~~~~~~~~
|
|
self.ui.run_BTN.setEnabled(True)
|
|
self.ui.pause_BTN.setEnabled(True)
|
|
self.ui.zero_BTN.setEnabled(True)
|
|
self.ui.stop_BTN.setEnabled(True)
|
|
self.ui.jog_plus_BTN.setEnabled(True)
|
|
self.ui.jog_minus_BTN.setEnabled(True)
|
|
|
|
self.ui.run_BTN.setStyleSheet("background-color: green; color: black")
|
|
self.ui.pause_BTN.setStyleSheet("background-color: yellow; color: black")
|
|
self.ui.stop_BTN.setStyleSheet("background-color: red; color: black")
|
|
|
|
# ~~~~~~~~~~~~~~~~
|
|
# TAB : Setup
|
|
# ~~~~~~~~~~~~~~~~
|
|
self.ui.p1_setup_send_BTN.setEnabled(True)
|
|
self.ui.p2_setup_send_BTN.setEnabled(True)
|
|
self.ui.p3_setup_send_BTN.setEnabled(True)
|
|
self.ui.send_all_BTN.setEnabled(True)
|
|
# ======================
|
|
# FUNCTIONS : Controller
|
|
# ======================
|
|
|
|
def toggle_p1_activation(self):
|
|
if self.ui.p1_activate_CHECKBOX.isChecked():
|
|
self.is_p1_active = True
|
|
else:
|
|
self.is_p1_active = False
|
|
|
|
def toggle_p2_activation(self):
|
|
if self.ui.p2_activate_CHECKBOX.isChecked():
|
|
self.is_p2_active = True
|
|
else:
|
|
self.is_p2_active = False
|
|
|
|
def toggle_p3_activation(self):
|
|
if self.ui.p3_activate_CHECKBOX.isChecked():
|
|
self.is_p3_active = True
|
|
else:
|
|
self.is_p3_active = False
|
|
|
|
# Get a list of active pumps (IDK if this is the best way to do this)
|
|
def get_active_pumps(self):
|
|
pumps_list = [self.is_p1_active, self.is_p2_active, self.is_p3_active]
|
|
active_pumps = [i+1 for i in range(len(pumps_list)) if pumps_list[i]]
|
|
return active_pumps
|
|
|
|
def display_p1_syringe(self):
|
|
self.ui.p1_syringe_LABEL.setText(self.ui.p1_syringe_DROPDOWN.currentText())
|
|
def display_p2_syringe(self):
|
|
self.ui.p2_syringe_LABEL.setText(self.ui.p2_syringe_DROPDOWN.currentText())
|
|
def display_p3_syringe(self):
|
|
self.ui.p3_syringe_LABEL.setText(self.ui.p3_syringe_DROPDOWN.currentText())
|
|
|
|
def display_p1_speed(self):
|
|
self.ui.p1_units_LABEL.setText(str(self.p1_speed) + " " + self.ui.p1_units_DROPDOWN.currentText())
|
|
def display_p2_speed(self):
|
|
self.ui.p2_units_LABEL.setText(str(self.p2_speed) + " " + self.ui.p2_units_DROPDOWN.currentText())
|
|
def display_p3_speed(self):
|
|
self.ui.p3_units_LABEL.setText(str(self.p3_speed) + " " + self.ui.p3_units_DROPDOWN.currentText())
|
|
|
|
# Set Px distance to move
|
|
def set_p1_amount(self):
|
|
self.p1_amount = self.ui.p1_amount_INPUT.value()
|
|
def set_p2_amount(self):
|
|
self.p2_amount = self.ui.p2_amount_INPUT.value()
|
|
def set_p3_amount(self):
|
|
self.p3_amount = self.ui.p3_amount_INPUT.value()
|
|
|
|
# Set Px jog delta
|
|
#def set_p1_jog_delta(self):
|
|
# self.p1_jog_delta = self.ui.p1_jog_delta_INPUT.value()
|
|
#def set_p2_jog_delta(self):
|
|
# self.p2_jog_delta = self.ui.p2_jog_delta_INPUT.value()
|
|
#def set_p3_jog_delta(self):
|
|
# self.p3_jog_delta = self.ui.p3_jog_delta_INPUT.value()
|
|
|
|
# Set the coordinate system for the pump
|
|
def set_coordinate(self, radio):
|
|
if radio.text() == "Abs.":
|
|
if radio.isChecked():
|
|
self.coordinate = "absolute"
|
|
if radio.text() == "Incr.":
|
|
if radio.isChecked():
|
|
self.coordinate = "incremental"
|
|
|
|
def run(self):
|
|
self.statusBar().showMessage("You clicked RUN")
|
|
testData = []
|
|
|
|
active_pumps = self.get_active_pumps()
|
|
if len(active_pumps) > 0:
|
|
|
|
p1_input_displacement = str(self.convert_displacement(self.p1_amount, self.p1_units, self.p1_syringe_area, self.microstepping))
|
|
p2_input_displacement = str(self.convert_displacement(self.p2_amount, self.p2_units, self.p2_syringe_area, self.microstepping))
|
|
p3_input_displacement = str(self.convert_displacement(self.p3_amount, self.p3_units, self.p3_syringe_area, self.microstepping))
|
|
|
|
pumps_2_run = ''.join(map(str,active_pumps))
|
|
|
|
cmd = "<RUN,DIST,"+pumps_2_run+",0.0,F," + p1_input_displacement + "," + p2_input_displacement + "," + p3_input_displacement + ">"
|
|
|
|
testData.append(cmd)
|
|
|
|
print("Sending RUN command..")
|
|
thread = Thread(self.runTest, testData)
|
|
thread.signals.finished.connect(lambda:self.thread_finished(thread))
|
|
self.threadpool.start(thread)
|
|
print("RUN command sent.")
|
|
else:
|
|
self.statusBar().showMessage("No pumps enabled.")
|
|
|
|
# Clean up this text
|
|
def pause(self):
|
|
active_pumps = self.get_active_pumps()
|
|
pumps_2_run = ''.join(map(str,active_pumps))
|
|
|
|
if self.ui.pause_BTN.text() == "Pause":
|
|
self.statusBar().showMessage("You clicked PAUSE")
|
|
testData = []
|
|
cmd = "<PAUSE,BLAH," + pumps_2_run + ",BLAH,F,0.0,0.0,0.0>"
|
|
testData.append(cmd)
|
|
|
|
print("Sending PAUSE command..")
|
|
thread = Thread(self.runTest, testData)
|
|
thread.signals.finished.connect(lambda:self.thread_finished(thread))
|
|
self.threadpool.start(thread)
|
|
print("PAUSE command sent.")
|
|
|
|
self.ui.pause_BTN.setText("Resume")
|
|
|
|
elif self.ui.pause_BTN.text() == "Resume":
|
|
self.statusBar().showMessage("You clicked RESUME")
|
|
testData = []
|
|
cmd = "<RESUME,BLAH," + pumps_2_run + ",BLAH,F,0.0,0.0,0.0>"
|
|
testData.append(cmd)
|
|
|
|
print("Sending RESUME command..")
|
|
thread = Thread(self.runTest, testData)
|
|
thread.signals.finished.connect(lambda:self.thread_finished(thread))
|
|
self.threadpool.start(thread)
|
|
print("RESUME command sent.")
|
|
|
|
self.ui.pause_BTN.setText("Pause")
|
|
|
|
# fix
|
|
def zero(self):
|
|
self.statusBar().showMessage("You clicked ZERO")
|
|
testData = []
|
|
|
|
cmd = "<ZERO,BLAH,BLAH,BLAH,F,0.0,0.0,0.0>"
|
|
|
|
print("Sending ZERO command..")
|
|
thread = Thread(self.runTest, testData)
|
|
thread.signals.finished.connect(lambda:self.thread_finished(thread))
|
|
self.threadpool.start(thread)
|
|
##jfs
|
|
self.p1_left=int(self.p1_syringe.split(sep=' ')[1])
|
|
self.ui.p1_remain_DISP.display(self.p1_left)
|
|
self.p2_left=int(self.p2_syringe.split(sep=' ')[1])
|
|
self.ui.p2_remain_DISP.display(self.p2_left)
|
|
self.p3_left=int(self.p3_syringe.split(sep=' ')[1])
|
|
self.ui.p3_remain_DISP.display(self.p3_left)
|
|
|
|
##jfs
|
|
print("ZERO command sent.")
|
|
|
|
|
|
def stop(self):
|
|
self.statusBar().showMessage("You clicked STOP")
|
|
cmd = "<STOP,BLAH,BLAH,BLAH,F,0.0,0.0,0.0>"
|
|
|
|
print("Sending STOP command..")
|
|
thread = Thread(self.send_single_command, cmd)
|
|
thread.signals.finished.connect(lambda:self.thread_finished(thread))
|
|
self.threadpool.start(thread)
|
|
print("STOP command sent.")
|
|
|
|
def jog(self, btn):
|
|
self.statusBar().showMessage("You clicked JOG")
|
|
#self.serial.flushInput()
|
|
testData = []
|
|
active_pumps = self.get_active_pumps()
|
|
if len(active_pumps) > 0:
|
|
pumps_2_run = ''.join(map(str,active_pumps))
|
|
|
|
one_jog = str(self.p1_setup_jog_delta_to_send)
|
|
two_jog = str(self.p2_setup_jog_delta_to_send)
|
|
three_jog = str(self.p3_setup_jog_delta_to_send)
|
|
|
|
if btn.text() == "Jog +":
|
|
self.statusBar().showMessage("You clicked JOG +")
|
|
f_cmd = "<RUN,DIST," + pumps_2_run +",0,F," + one_jog + "," + two_jog + "," + three_jog + ">"
|
|
testData.append(f_cmd)
|
|
|
|
print("Sending JOG command..")
|
|
|
|
thread = Thread(self.runTest, testData)
|
|
thread.signals.finished.connect(lambda:self.thread_finished(thread))
|
|
self.threadpool.start(thread)
|
|
print("JOG command sent.")
|
|
|
|
elif btn.text() == "Jog -":
|
|
self.statusBar().showMessage("You clicked JOG -")
|
|
b_cmd = "<RUN,DIST," + pumps_2_run +",0,B," + one_jog + "," + two_jog + "," + three_jog + ">"
|
|
testData.append(b_cmd)
|
|
|
|
print("Sending JOG command..")
|
|
thread = Thread(self.runTest, testData)
|
|
thread.signals.finished.connect(lambda:self.thread_finished(thread))
|
|
self.threadpool.start(thread)
|
|
print("JOG command sent.")
|
|
else:
|
|
self.statusBar().showMessage("No pumps enabled.")
|
|
|
|
# ======================
|
|
# FUNCTIONS : Camera
|
|
# ======================
|
|
|
|
# Initialize the camera
|
|
def start_camera(self):
|
|
self.statusBar().showMessage("You clicked START CAMERA")
|
|
camera_port = 0
|
|
self.capture = cv2.VideoCapture(camera_port)
|
|
#TODO check the native resolution of the camera and scale the size down here
|
|
self.capture.set(cv2.CAP_PROP_FRAME_HEIGHT, 800)
|
|
self.capture.set(cv2.CAP_PROP_FRAME_WIDTH, 400)
|
|
|
|
self.timer = QtCore.QTimer(self)
|
|
self.timer.timeout.connect(self.update_frame)
|
|
self.timer.start(5)
|
|
|
|
# Update frame function
|
|
def update_frame(self):
|
|
ret, self.image = self.capture.read()
|
|
self.image = cv2.flip(self.image, 1)
|
|
self.display_image(self.image, 1)
|
|
|
|
# Display image in frame
|
|
def display_image(self, image, window=1):
|
|
qformat = QtGui.QImage.Format_Indexed8
|
|
if len(image.shape) == 3: #
|
|
if image.shape[2] == 4:
|
|
qformat = QtGui.QImage.Format_RGBA8888
|
|
|
|
else:
|
|
qformat = QtGui.QImage.Format_RGB888
|
|
#print(image.shape[0], image.shape[1], image.shape[2])
|
|
self.img_2_display = QtGui.QImage(image, image.shape[1], image.shape[0], image.strides[0], qformat)
|
|
self.img_2_display = QtGui.QImage.rgbSwapped(self.img_2_display)
|
|
|
|
if window == 1:
|
|
self.ui.imgLabel.setPixmap(QtGui.QPixmap.fromImage(self.img_2_display))
|
|
self.ui.imgLabel.setScaledContents(False)
|
|
|
|
# Save image to set location
|
|
def save_image(self):
|
|
if not os.path.exists("./images"):
|
|
os.mkdir("images")
|
|
|
|
self.date_string = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
# Replace semicolons with underscores
|
|
self.date_string = self.date_string.replace(":","_")
|
|
self.write_image_loc = './images/'+self.date_string + '.png'
|
|
cv2.imwrite(self.write_image_loc, self.image)
|
|
self.statusBar().showMessage("Captured Image, saved to: " + self.write_image_loc)
|
|
|
|
|
|
# Stop camera
|
|
def stop_camera(self):
|
|
self.timer.stop()
|
|
|
|
# ======================
|
|
# FUNCTIONS : Setup
|
|
# ======================
|
|
|
|
|
|
# Populate the available ports
|
|
def populate_ports(self):
|
|
"""
|
|
:raises EnvironmentError:
|
|
On unsupported or unknown platforms
|
|
:returns:
|
|
A list of the serial ports available on the system
|
|
"""
|
|
print("Populating ports..")
|
|
if sys.platform.startswith('win'):
|
|
ports = ['COM%s' % (i + 1) for i in range(256)]
|
|
elif sys.platform.startswith('linux') or sys.platform.startswith('cygwin'):
|
|
# this excludes your current terminal "/dev/tty"
|
|
ports = glob.glob('/dev/tty[A-Za-z]*')
|
|
elif sys.platform.startswith('darwin'):
|
|
ports = glob.glob('/dev/tty.*')
|
|
else:
|
|
raise EnvironmentError('Unsupported platform')
|
|
|
|
result = []
|
|
for port in ports:
|
|
try:
|
|
s = serial.Serial(port)
|
|
s.close()
|
|
result.append(port)
|
|
except (OSError, serial.SerialException):
|
|
pass
|
|
self.ui.port_DROPDOWN.addItems(result)
|
|
self.globports = result
|
|
print("Ports have been populated.",self.globports)
|
|
|
|
# Refresh the list of ports
|
|
def refresh_ports(self):
|
|
self.statusBar().showMessage("You clicked REFRESH PORTS")
|
|
self.ui.port_DROPDOWN.clear()
|
|
self.populate_ports()
|
|
self.set_port()
|
|
|
|
# Set the port that is selected from the dropdown menu
|
|
def set_port(self):
|
|
self.port = self.ui.port_DROPDOWN.currentText()
|
|
|
|
# Set the microstepping amount from the dropdown menu
|
|
# TODO: There is definitely a better way of updating different variables
|
|
# after there is a change of some input from the user. need to figure out.
|
|
def set_microstepping(self):
|
|
self.microstepping = int(self.ui.microstepping_DROPDOWN.currentText())
|
|
self.set_p1_units()
|
|
self.set_p1_speed()
|
|
self.set_p1_accel()
|
|
self.set_p1_setup_jog_delta()
|
|
self.set_p1_amount()
|
|
|
|
self.set_p2_units()
|
|
self.set_p2_speed()
|
|
self.set_p2_accel()
|
|
self.set_p2_setup_jog_delta()
|
|
self.set_p2_amount()
|
|
|
|
self.set_p3_units()
|
|
self.set_p3_speed()
|
|
self.set_p3_accel()
|
|
self.set_p3_setup_jog_delta()
|
|
self.set_p3_amount()
|
|
|
|
print(self.microstepping)
|
|
|
|
def set_experiment_notes(self):
|
|
self.experiment_notes = self.ui.experiment_notes.text()
|
|
|
|
# Set the name of the log file
|
|
# Can probably delete
|
|
def set_log_file_name(self):
|
|
"""
|
|
Sets the file name for the current test run, enables us to log data to the file.
|
|
|
|
Callback setter method from the 'self.ui.logFileNameInput' to set the
|
|
name of the log file. The log file name is of the form
|
|
label_Year-Month-Date hour_min_sec.txt
|
|
"""
|
|
# Create a date string
|
|
self.date_string = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
# Replace semicolons with underscores
|
|
self.date_string = self.date_string.replace(":","_")
|
|
self.log_file_name = self.ui.log_file_name_INPUT.text() + "_" + self.date_string + ".png"
|
|
|
|
def save_settings(self):
|
|
# TODO: if you cancel then it gives error, fix this
|
|
# TODO: add comment
|
|
name, _ = QFileDialog.getSaveFileName(self,'Save File', options=QFileDialog.DontUseNativeDialog)
|
|
|
|
# Create a date string
|
|
self.date_string = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
# Replace semicolons with underscores
|
|
self.date_string = self.date_string.replace(":","_")
|
|
|
|
date_string = self.date_string
|
|
|
|
# write all of the settings here
|
|
## Settings for pump 1
|
|
p1_syringe = str(self.p1_syringe)
|
|
p1_units = str(self.p1_units)
|
|
p1_speed = str(self.p1_speed)
|
|
p1_accel = str(self.p1_accel)
|
|
p1_setup_jog_delta = str(self.p1_setup_jog_delta)
|
|
|
|
## Settings for pump 2
|
|
p2_syringe = str(self.p2_syringe)
|
|
p2_units = str(self.p2_units)
|
|
p2_speed = str(self.p2_speed)
|
|
p2_accel = str(self.p2_accel)
|
|
p2_setup_jog_delta = str(self.p2_setup_jog_delta)
|
|
|
|
## Settings for pump 3
|
|
p3_syringe = str(self.p3_syringe)
|
|
p3_units = str(self.p3_units)
|
|
p3_speed = str(self.p3_speed)
|
|
p3_accel = str(self.p3_accel)
|
|
p3_setup_jog_delta = str(self.p3_setup_jog_delta)
|
|
|
|
## Experiment Notes
|
|
experiment_notes = self.experiment_notes
|
|
|
|
text = []
|
|
text.append("File name: " + name + ".txt" + "\n") # line 0
|
|
text.append("Date time: " + date_string + "\n") # line 1
|
|
|
|
text.append(":================================ \n") # line 2
|
|
text.append("P1 Syrin: " + p1_syringe + "\n") # line 3
|
|
text.append("P1 Units: " + p1_units + "\n") # line 4
|
|
text.append("P1 Speed: " + p1_speed + "\n") # line 5
|
|
text.append("P1 Accel: " + p1_accel + "\n") # line 6
|
|
text.append("P1 Jog D: " + p1_setup_jog_delta + "\n") # line 7
|
|
text.append(":================================ \n") # line 8
|
|
text.append("P2 Syrin: " + p2_syringe + "\n") # line 9
|
|
text.append("P2 Units: " + p2_units + "\n") # line 10
|
|
text.append("P2 Speed: " + p2_speed + "\n") # line 11
|
|
text.append("P2 Accel: " + p2_accel + "\n") # line 12
|
|
text.append("P2 Jog D: " + p2_setup_jog_delta + "\n") # line 13
|
|
text.append(":================================ \n") # line 14
|
|
text.append("P3 Syrin: " + p3_syringe + "\n") # line 15
|
|
text.append("P3 Units: " + p3_units + "\n") # line 16
|
|
text.append("P3 Speed: " + p3_speed + "\n") # line 17
|
|
text.append("P3 Accel: " + p3_accel + "\n") # line 18
|
|
text.append("P3 Jog D: " + p3_setup_jog_delta + "\n") # line 19
|
|
text.append("Exp Note: " + experiment_notes) # line 20
|
|
|
|
if name:
|
|
with open(name + ".txt", 'w') as f:
|
|
f.writelines(text)
|
|
self.statusBar().showMessage("Settings saved in " + name + ".txt")
|
|
|
|
def load_settings(self):
|
|
# need to make name an tuple otherwise i had an error and app crashed
|
|
name, _ = QFileDialog.getOpenFileName(self, 'Open File', options=QFileDialog.DontUseNativeDialog, filter = "Text (*.txt)")
|
|
|
|
if name:
|
|
with open(name, 'r') as f:
|
|
text = f.readlines()
|
|
# here is where you load all of your variables
|
|
# reformatting the text
|
|
text = [line.split(':')[-1].strip('\n')[1:] for line in text]
|
|
fname = text[0]
|
|
date_string = text[1]
|
|
|
|
p1_syringe = text[3]
|
|
p1_units = text[4]
|
|
p1_speed = text[5]
|
|
p1_accel = text[6]
|
|
p1_setup_jog_delta = text[7]
|
|
|
|
p2_syringe = text[9]
|
|
p2_units = text[10]
|
|
p2_speed = text[11]
|
|
p2_accel = text[12]
|
|
p2_setup_jog_delta = text[13]
|
|
|
|
p3_syringe = text[15]
|
|
p3_units = text[16]
|
|
p3_speed = text[17]
|
|
p3_accel = text[18]
|
|
p3_setup_jog_delta = text[19]
|
|
|
|
experiment_notes = text[20]
|
|
|
|
#print(fname, date_string, p1_syringe, p1_units, p1_speed, p1_accel, p1_setup_jog_delta)
|
|
|
|
# Here we are setting all of the values as given by the settings file
|
|
p1_syringe_index = self.ui.p1_syringe_DROPDOWN.findText(p1_syringe, QtCore.Qt.MatchFixedString)
|
|
self.ui.p1_syringe_DROPDOWN.setCurrentIndex(p1_syringe_index)
|
|
p1_units_index = self.ui.p1_units_DROPDOWN.findText(p1_units, QtCore.Qt.MatchFixedString)
|
|
self.ui.p1_units_DROPDOWN.setCurrentIndex(p1_units_index)
|
|
self.ui.p1_speed_INPUT.setValue(float(p1_speed))
|
|
self.ui.p1_accel_INPUT.setValue(float(p1_accel))
|
|
|
|
AllItems = [self.ui.p1_setup_jog_delta_INPUT.itemText(i) for i in range(self.ui.p1_setup_jog_delta_INPUT.count())]
|
|
|
|
p1_setup_jog_delta_index = self.ui.p1_setup_jog_delta_INPUT.findText(p1_setup_jog_delta, QtCore.Qt.MatchFixedString)
|
|
self.ui.p1_setup_jog_delta_INPUT.setCurrentIndex(p1_setup_jog_delta_index)
|
|
|
|
|
|
p2_syringe_index = self.ui.p2_syringe_DROPDOWN.findText(p2_syringe, QtCore.Qt.MatchFixedString)
|
|
self.ui.p2_syringe_DROPDOWN.setCurrentIndex(p2_syringe_index)
|
|
p2_units_index = self.ui.p2_units_DROPDOWN.findText(p2_units, QtCore.Qt.MatchFixedString)
|
|
self.ui.p2_units_DROPDOWN.setCurrentIndex(p2_units_index)
|
|
self.ui.p2_speed_INPUT.setValue(float(p2_speed))
|
|
self.ui.p2_accel_INPUT.setValue(float(p2_accel))
|
|
|
|
p2_setup_jog_delta_index = self.ui.p2_setup_jog_delta_INPUT.findText(p2_setup_jog_delta, QtCore.Qt.MatchFixedString)
|
|
self.ui.p2_setup_jog_delta_INPUT.setCurrentIndex(p2_setup_jog_delta_index)
|
|
|
|
p3_syringe_index = self.ui.p3_syringe_DROPDOWN.findText(p3_syringe, QtCore.Qt.MatchFixedString)
|
|
self.ui.p3_syringe_DROPDOWN.setCurrentIndex(p3_syringe_index)
|
|
p3_units_index = self.ui.p3_units_DROPDOWN.findText(p3_units, QtCore.Qt.MatchFixedString)
|
|
self.ui.p3_units_DROPDOWN.setCurrentIndex(p3_units_index)
|
|
self.ui.p3_speed_INPUT.setValue(float(p3_speed))
|
|
self.ui.p3_accel_INPUT.setValue(float(p3_accel))
|
|
|
|
p3_setup_jog_delta_index = self.ui.p3_setup_jog_delta_INPUT.findText(p3_setup_jog_delta, QtCore.Qt.MatchFixedString)
|
|
self.ui.p3_setup_jog_delta_INPUT.setCurrentIndex(p3_setup_jog_delta_index)
|
|
|
|
self.ui.experiment_notes.setText(experiment_notes)
|
|
|
|
self.statusBar().showMessage("Settings loaded from: " + text[1])
|
|
else:
|
|
self.statusBar().showMessage("No file selected.")
|
|
|
|
# Populate the microstepping amounts for the dropdown menu
|
|
def populate_microstepping(self):
|
|
self.microstepping_values = ['1', '2', '4', '8', '16', '32']
|
|
self.ui.microstepping_DROPDOWN.addItems(self.microstepping_values)
|
|
self.microstepping = 1
|
|
|
|
# Populate the list of possible syringes to the dropdown menus
|
|
def populate_syringe_sizes(self):
|
|
self.syringe_options = ["BD 1 mL", "BD 3 mL", "BD 5 mL", "BD 10 mL", "BD 20 mL", "BD 30 mL", "BD 60 mL"]
|
|
self.syringe_volumes = [1, 3, 5, 10, 20, 30, 60]
|
|
self.syringe_areas = [17.34206347, 57.88559215, 112.9089185, 163.539454, 285.022957, 366.0961536, 554.0462538]
|
|
|
|
self.ui.p1_syringe_DROPDOWN.addItems(self.syringe_options)
|
|
self.ui.p2_syringe_DROPDOWN.addItems(self.syringe_options)
|
|
self.ui.p3_syringe_DROPDOWN.addItems(self.syringe_options)
|
|
|
|
# Set Px syringe
|
|
def set_p1_syringe(self):
|
|
self.p1_syringe = self.ui.p1_syringe_DROPDOWN.currentText()
|
|
self.p1_syringe_area = self.syringe_areas[self.syringe_options.index(self.p1_syringe)]
|
|
self.display_p1_syringe()
|
|
|
|
self.set_p1_units()
|
|
self.set_p1_speed()
|
|
self.set_p1_accel()
|
|
self.set_p1_setup_jog_delta()
|
|
self.set_p1_amount()
|
|
|
|
def set_p2_syringe(self):
|
|
self.p2_syringe = self.ui.p2_syringe_DROPDOWN.currentText()
|
|
self.p2_syringe_area = self.syringe_areas[self.syringe_options.index(self.p2_syringe)]
|
|
self.display_p2_syringe()
|
|
|
|
self.set_p2_units()
|
|
self.set_p2_speed()
|
|
self.set_p2_accel()
|
|
self.set_p2_setup_jog_delta()
|
|
self.set_p2_amount()
|
|
|
|
def set_p3_syringe(self):
|
|
self.p3_syringe = self.ui.p3_syringe_DROPDOWN.currentText()
|
|
self.p3_syringe_area = self.syringe_areas[self.syringe_options.index(self.p3_syringe)]
|
|
self.display_p3_syringe()
|
|
|
|
self.set_p3_units()
|
|
self.set_p3_speed()
|
|
self.set_p3_accel()
|
|
self.set_p3_setup_jog_delta()
|
|
self.set_p3_amount()
|
|
|
|
# Set Px units
|
|
def set_p1_units(self):
|
|
self.p1_units = self.ui.p1_units_DROPDOWN.currentText()
|
|
|
|
length = self.p1_units.split("/")[0]
|
|
self.ui.p1_units_LABEL_2.setText(length)
|
|
|
|
self.set_p1_speed()
|
|
self.set_p1_accel()
|
|
self.set_p1_setup_jog_delta()
|
|
self.set_p1_amount()
|
|
|
|
def set_p2_units(self):
|
|
self.p2_units = self.ui.p2_units_DROPDOWN.currentText()
|
|
|
|
length = self.p2_units.split("/")[0]
|
|
self.ui.p2_units_LABEL_2.setText(length)
|
|
|
|
self.set_p2_speed()
|
|
self.set_p2_accel()
|
|
self.set_p2_setup_jog_delta()
|
|
self.set_p2_amount()
|
|
|
|
def set_p3_units(self):
|
|
self.p3_units = self.ui.p3_units_DROPDOWN.currentText()
|
|
|
|
length = self.p3_units.split("/")[0]
|
|
self.ui.p3_units_LABEL_2.setText(length)
|
|
|
|
self.set_p3_speed()
|
|
self.set_p3_accel()
|
|
self.set_p3_setup_jog_delta()
|
|
self.set_p3_amount()
|
|
|
|
|
|
def populate_pump_units(self):
|
|
self.units = ['mm/s', 'mL/s', 'mL/hr', 'µL/hr']
|
|
self.ui.p1_units_DROPDOWN.addItems(self.units)
|
|
self.ui.p2_units_DROPDOWN.addItems(self.units)
|
|
self.ui.p3_units_DROPDOWN.addItems(self.units)
|
|
|
|
def populate_pump_jog_delta(self):
|
|
self.jog_delta = ['0.01', '0.1', '1.0', '10.0']
|
|
self.ui.p1_setup_jog_delta_INPUT.addItems(self.jog_delta)
|
|
self.ui.p2_setup_jog_delta_INPUT.addItems(self.jog_delta)
|
|
self.ui.p3_setup_jog_delta_INPUT.addItems(self.jog_delta)
|
|
|
|
# Set Px speed
|
|
def set_p1_speed(self):
|
|
self.p1_speed = self.ui.p1_speed_INPUT.value()
|
|
self.ui.p1_units_LABEL.setText(str(self.p1_speed) + " " + self.ui.p1_units_DROPDOWN.currentText())
|
|
self.p1_speed_to_send = self.convert_speed(self.p1_speed, self.p1_units, self.p1_syringe_area, self.microstepping)
|
|
|
|
def set_p2_speed(self):
|
|
self.p2_speed = self.ui.p2_speed_INPUT.value()
|
|
self.ui.p2_units_LABEL.setText(str(self.p2_speed) + " " + self.ui.p2_units_DROPDOWN.currentText())
|
|
self.p2_speed_to_send = self.convert_speed(self.p2_speed, self.p2_units, self.p2_syringe_area, self.microstepping)
|
|
|
|
def set_p3_speed(self):
|
|
self.p3_speed = self.ui.p3_speed_INPUT.value()
|
|
self.ui.p3_units_LABEL.setText(str(self.p3_speed) + " " + self.ui.p3_units_DROPDOWN.currentText())
|
|
self.p3_speed_to_send = self.convert_speed(self.p3_speed, self.p3_units, self.p3_syringe_area, self.microstepping)
|
|
|
|
# Set Px accel
|
|
def set_p1_accel(self):
|
|
self.p1_accel = self.ui.p1_accel_INPUT.value()
|
|
self.p1_accel_to_send = self.convert_accel(self.p1_accel, self.p1_units, self.p1_syringe_area, self.microstepping)
|
|
|
|
def set_p2_accel(self):
|
|
self.p2_accel = self.ui.p2_accel_INPUT.value()
|
|
self.p2_accel_to_send = self.convert_accel(self.p2_accel, self.p2_units, self.p2_syringe_area, self.microstepping)
|
|
|
|
def set_p3_accel(self):
|
|
self.p3_accel = self.ui.p3_accel_INPUT.value()
|
|
self.p3_accel_to_send = self.convert_accel(self.p3_accel, self.p3_units, self.p3_syringe_area, self.microstepping)
|
|
|
|
# Set Px jog delta (setup)
|
|
def set_p1_setup_jog_delta(self):
|
|
self.p1_setup_jog_delta = self.ui.p1_setup_jog_delta_INPUT.currentText()
|
|
self.p1_setup_jog_delta = float(self.ui.p1_setup_jog_delta_INPUT.currentText())
|
|
self.p1_setup_jog_delta_to_send = self.convert_displacement(self.p1_setup_jog_delta, self.p1_units, self.p1_syringe_area, self.microstepping)
|
|
|
|
def set_p2_setup_jog_delta(self):
|
|
self.p2_setup_jog_delta = float(self.ui.p2_setup_jog_delta_INPUT.currentText())
|
|
self.p2_setup_jog_delta_to_send = self.convert_displacement(self.p2_setup_jog_delta, self.p2_units, self.p2_syringe_area, self.microstepping)
|
|
|
|
def set_p3_setup_jog_delta(self):
|
|
self.p3_setup_jog_delta = float(self.ui.p3_setup_jog_delta_INPUT.currentText())
|
|
self.p3_setup_jog_delta_to_send = self.convert_displacement(self.p3_setup_jog_delta, self.p3_units, self.p3_syringe_area, self.microstepping)
|
|
|
|
# Send Px settings
|
|
def send_p1_settings(self):
|
|
self.statusBar().showMessage("You clicked SEND P1 SETTINGS")
|
|
self.p1_settings = []
|
|
self.p1_settings.append("<SETTING,SPEED,1," + str(self.p1_speed_to_send) + ",F,0.0,0.0,0.0>")
|
|
self.p1_settings.append("<SETTING,ACCEL,1," + str(self.p1_accel_to_send) + ",F,0.0,0.0,0.0>")
|
|
self.p1_settings.append("<SETTING,DELTA,1," + str(self.p1_setup_jog_delta_to_send) + ",F,0.0,0.0,0.0>")
|
|
|
|
print("Sending P1 SETTINGS..")
|
|
thread = Thread(self.runTest, self.p1_settings)
|
|
thread.finishesignal.d.connect(lambda:self.thread_finished(thread))
|
|
self.threadpool.start(thread)
|
|
print("P1 SETTINGS sent.")
|
|
|
|
def send_p2_settings(self):
|
|
self.statusBar().showMessage("You clicked SEND P2 SETTINGS")
|
|
self.p2_settings = []
|
|
self.p2_settings.append("<SETTING,SPEED,2," + str(self.p2_speed_to_send) + ",F,0.0,0.0,0.0>")
|
|
self.p2_settings.append("<SETTING,ACCEL,2," + str(self.p2_accel_to_send) + ",F,0.0,0.0,0.0>")
|
|
self.p2_settings.append("<SETTING,DELTA,2," + str(self.p2_setup_jog_delta_to_send) + ",F,0.0,0.0,0.0>")
|
|
|
|
print("Sending P2 SETTINGS..")
|
|
thread = Thread(self.runTest, self.p2_settings)
|
|
thread.finishesignal.d.connect(lambda:self.thread_finished(thread))
|
|
self.threadpool.start(thread)
|
|
print("P2 SETTINGS sent.")
|
|
|
|
def send_p3_settings(self):
|
|
self.statusBar().showMessage("You clicked SEND P3 SETTINGS")
|
|
self.p3_settings = []
|
|
self.p3_settings.append("<SETTING,SPEED,3," + str(self.p3_speed_to_send) + ",F,0.0,0.0,0.0>")
|
|
self.p3_settings.append("<SETTING,ACCEL,3," + str(self.p3_accel_to_send) + ",F,0.0,0.0,0.0>")
|
|
self.p3_settings.append("<SETTING,DELTA,3," + str(self.p3_setup_jog_delta_to_send) + ",F,0.0,0.0,0.0>")
|
|
|
|
print("Sending P3 SETTINGS..")
|
|
thread = Thread(self.runTest, self.p3_settings)
|
|
thread.finishesignal.d.connect(lambda:self.thread_finished(thread))
|
|
self.threadpool.start(thread)
|
|
print("P3 SETTINGS sent.")
|
|
|
|
# Connect to the Arduino board
|
|
def connect(self):
|
|
#self.port_nano = '/dev/cu.usbserial-A9M11B77'
|
|
#self.port_uno = "/dev/cu.usbmodem1411"
|
|
#self.baudrate = baudrate
|
|
self.statusBar().showMessage("You clicked CONNECT TO CONTROLLER")
|
|
try:
|
|
port_declared = self.port in vars()
|
|
try:
|
|
self.serial = serial.Serial()
|
|
self.serial.port = self.port
|
|
self.serial.baudrate = 230400
|
|
self.serial.parity = serial.PARITY_NONE
|
|
self.serial.stopbits = serial.STOPBITS_ONE
|
|
self.serial.bytesize = serial.EIGHTBITS
|
|
self.serial.timeout = 1
|
|
self.serial.open()
|
|
#self.serial.flushInput()
|
|
|
|
# This is a thread that always runs and listens to commands from the Arduino
|
|
#self.global_listener_thread = Thread(self.listening)
|
|
#self.global_listener_thread.finished.connect(lambda:self.self.thread_finished(self.global_listenersignal._thread))
|
|
#self.global_listener_self.threadpool.start(thread)
|
|
|
|
# ~~~~~~~~~~~~~~~~
|
|
# TAB : Setup
|
|
# ~~~~~~~~~~~~~~~~
|
|
self.ui.disconnect_BTN.setEnabled(True)
|
|
self.ui.p1_setup_send_BTN.setEnabled(True)
|
|
self.ui.p2_setup_send_BTN.setEnabled(True)
|
|
self.ui.p3_setup_send_BTN.setEnabled(True)
|
|
self.ui.send_all_BTN.setEnabled(True)
|
|
|
|
self.ui.connect_BTN.setEnabled(False)
|
|
time.sleep(1)
|
|
self.statusBar().showMessage("Successfully connected to board.")
|
|
except:
|
|
self.statusBar().showMessage("Cannot connect to board. Try again..")
|
|
#raise CannotConnectException
|
|
except AttributeError:
|
|
self.statusBar().showMessage("Please plug in the board and select a proper port, then press connect.")
|
|
|
|
|
|
|
|
# Disconnect from the Arduino board
|
|
# TODO: figure out how to handle error..
|
|
def disconnect(self):
|
|
self.statusBar().showMessage("You clicked DISCONNECT FROM BOARD")
|
|
print("Disconnecting from board..")
|
|
#self.global_listener_thread.stop()
|
|
time.sleep(3)
|
|
self.serial.close()
|
|
print("Board has been disconnected")
|
|
|
|
self.grey_out_components()
|
|
self.ui.connect_BTN.setEnabled(True)
|
|
self.ui.disconnect_BTN.setEnabled(False)
|
|
|
|
# Send all settings
|
|
def send_all(self):
|
|
self.statusBar().showMessage("You clicked SEND ALL SETTINGS")
|
|
|
|
self.settings = []
|
|
self.settings.append("<SETTING,SPEED,1,"+str(self.p1_speed_to_send)+",F,0.0,0.0,0.0>")
|
|
self.settings.append("<SETTING,ACCEL,1,"+str(self.p1_accel_to_send)+",F,0.0,0.0,0.0>")
|
|
self.settings.append("<SETTING,DELTA,1,"+str(self.p1_setup_jog_delta_to_send)+",F,0.0,0.0,0.0>")
|
|
|
|
self.settings.append("<SETTING,SPEED,2,"+str(self.p2_speed_to_send)+",F,0.0,0.0,0.0>")
|
|
self.settings.append("<SETTING,ACCEL,2,"+str(self.p2_accel_to_send)+",F,0.0,0.0,0.0>")
|
|
self.settings.append("<SETTING,DELTA,2,"+str(self.p2_setup_jog_delta_to_send)+",F,0.0,0.0,0.0>")
|
|
|
|
self.settings.append("<SETTING,SPEED,3,"+str(self.p3_speed_to_send)+",F,0.0,0.0,0.0>")
|
|
self.settings.append("<SETTING,ACCEL,3,"+str(self.p3_accel_to_send)+",F,0.0,0.0,0.0>")
|
|
self.settings.append("<SETTING,DELTA,3,"+str(self.p3_setup_jog_delta_to_send)+",F,0.0,0.0,0.0>")
|
|
|
|
print("Sending all settings..")
|
|
thread = Thread(self.runTest, self.settings)
|
|
thread.signals.finished.connect(lambda:self.thread_finished(thread))
|
|
self.threadpool.start(thread)
|
|
|
|
self.ui.p1_setup_send_BTN.setStyleSheet("background-color: none")
|
|
self.ui.p2_setup_send_BTN.setStyleSheet("background-color: none")
|
|
self.ui.p3_setup_send_BTN.setStyleSheet("background-color: none")
|
|
|
|
self.ungrey_out_components()
|
|
|
|
|
|
|
|
# =======================
|
|
# MISC : Functions I need
|
|
# =======================
|
|
|
|
def steps2mm(self, steps):
|
|
# 200 steps per rev
|
|
# one rev is 0.8mm dist
|
|
#mm = steps/200/32*0.8
|
|
mm = steps/200/self.microstepping*0.8
|
|
return mm
|
|
|
|
def steps2mL(self, steps, syringe_area):
|
|
mL = self.mm32mL(self.steps2mm(steps)*syringe_area)
|
|
return mL
|
|
|
|
def steps2uL(self, steps, syringe_area):
|
|
uL = self.mm32uL(self.steps2mm(steps)*syringe_area,self.microstepping)
|
|
return uL
|
|
|
|
|
|
def mm2steps(self, mm, microsteps):
|
|
steps = mm/0.8*200*microsteps
|
|
#steps = mm*200/0.8
|
|
return steps
|
|
|
|
def mL2steps(self, mL, syringe_area, microsteps):
|
|
# note syringe_area is in mm^2
|
|
steps = self.mm2steps(self.mL2mm3(mL)/syringe_area, microsteps)
|
|
return steps
|
|
|
|
def uL2steps(self, uL, syringe_area, microsteps):
|
|
steps = self.mm2steps(self.uL2mm3(uL)/syringe_area, microsteps)
|
|
return steps
|
|
|
|
|
|
def mL2uL(self, mL):
|
|
return mL*1000.0
|
|
|
|
def mL2mm3(self, mL):
|
|
return mL*1000.0
|
|
|
|
|
|
def uL2mL(self, uL):
|
|
return uL/1000.0
|
|
|
|
def uL2mm3(self, uL):
|
|
return uL
|
|
|
|
|
|
def mm32mL(self, mm3):
|
|
return mm3/1000.0
|
|
|
|
def mm32uL(self, mm3):
|
|
return mm3
|
|
|
|
def persec2permin(self, value_per_sec):
|
|
value_per_min = value_per_sec*60.0
|
|
return value_per_min
|
|
|
|
def persec2perhour(self, value_per_sec):
|
|
value_per_hour = value_per_sec*60.0*60.0
|
|
return value_per_hour
|
|
|
|
|
|
def permin2perhour(self, value_per_min):
|
|
value_per_hour = value_per_min*60.0
|
|
return value_per_hour
|
|
|
|
def permin2persec(self, value_per_min):
|
|
value_per_sec = value_per_min/60.0
|
|
return value_per_sec
|
|
|
|
|
|
def perhour2permin(self, value_per_hour):
|
|
value_per_min = value_per_hour/60.0
|
|
return value_per_min
|
|
|
|
def perhour2persec(self, value_per_hour):
|
|
value_per_sec = value_per_hour/60.0/60.0
|
|
return value_per_sec
|
|
|
|
def convert_displacement(self, displacement, units, syringe_area, microsteps):
|
|
length = units.split("/")[0]
|
|
time = units.split("/")[1]
|
|
inp_displacement = displacement
|
|
# convert length first
|
|
if length == "mm":
|
|
displacement = self.mm2steps(displacement, microsteps)
|
|
elif length == "mL":
|
|
displacement = self.mL2steps(displacement, syringe_area, microsteps)
|
|
elif length == "µL":
|
|
displacement = self.uL2steps(displacement, syringe_area, microsteps)
|
|
|
|
print('______________________________')
|
|
print("INPUT DISPLACEMENT: " + str(inp_displacement) + ' ' + length)
|
|
print("OUTPUT DISPLACEMENT: " + str(displacement) + ' steps')
|
|
print('\n############################################################\n')
|
|
return displacement
|
|
|
|
def convert_speed(self, inp_speed, units, syringe_area, microsteps):
|
|
length = units.split("/")[0]
|
|
time = units.split("/")[1]
|
|
|
|
|
|
# convert length first
|
|
if length == "mm":
|
|
speed = self.mm2steps(inp_speed, microsteps)
|
|
elif length == "mL":
|
|
speed = self.mL2steps(inp_speed, syringe_area, microsteps)
|
|
elif length == "µL":
|
|
speed = self.uL2steps(inp_speed, syringe_area, microsteps)
|
|
|
|
|
|
# convert time next
|
|
if time == "s":
|
|
pass
|
|
elif time == "min":
|
|
speed = self.permin2persec(speed)
|
|
elif time == "hr":
|
|
speed = self.perhour2persec(speed)
|
|
|
|
|
|
|
|
print("INPUT SPEED: " + str(inp_speed) + ' ' + units)
|
|
print("OUTPUT SPEED: " + str(speed) + ' steps/s')
|
|
return speed
|
|
|
|
def convert_accel(self, accel, units, syringe_area, microsteps):
|
|
length = units.split("/")[0]
|
|
time = units.split("/")[1]
|
|
inp_accel = accel
|
|
accel = accel
|
|
|
|
# convert length first
|
|
if length == "mm":
|
|
accel = self.mm2steps(accel, microsteps)
|
|
elif length == "mL":
|
|
accel = self.mL2steps(accel, syringe_area, microsteps)
|
|
elif length == "µL":
|
|
accel = self.uL2steps(accel, syringe_area, microsteps)
|
|
|
|
# convert time next
|
|
if time == "s":
|
|
pass
|
|
elif time == "min":
|
|
accel = self.permin2persec(self.permin2persec(accel))
|
|
elif time == "hr":
|
|
accel = self.perhour2persec(self.perhour2persec(accel))
|
|
|
|
print('______________________________')
|
|
print("INPUT ACCEL: " + str(inp_accel) + ' ' + units + '/' + time)
|
|
print("OUTPUT ACCEL: " + str(accel) + ' steps/s/s')
|
|
return accel
|
|
|
|
|
|
'''
|
|
Syringe Volume (mL) | Syringe Area (mm^2)
|
|
-----------------------------------------------
|
|
1 | 17.34206347
|
|
3 | 57.88559215
|
|
5 | 112.9089185
|
|
10 | 163.539454
|
|
20 | 285.022957
|
|
30 | 366.0961536
|
|
60 | 554.0462538
|
|
|
|
IMPORTANT: These are for BD Plastic syringes ONLY!! Others will vary.
|
|
'''
|
|
|
|
def convert2volume(self, steps, units, syringe_area):
|
|
length = units.split("/")[0]
|
|
inp_steps = float(steps)
|
|
# convert length first
|
|
if length == "ul":
|
|
displacement = self.steps2uL(inp_steps, syringe_area)
|
|
else:
|
|
displacement = self.steps2mL(inp_steps, syringe_area)
|
|
return displacement
|
|
|
|
|
|
#====================================== Reading and Writing to Arduino
|
|
|
|
def sendToArduino(self, sendStr):
|
|
self.serial.write(sendStr.encode())
|
|
self.serial.flushInput()
|
|
|
|
|
|
def recvPositionArduino(self):
|
|
startMarker = self.startMarker
|
|
midMarker = self.midMarker
|
|
endMarker = self.endMarker
|
|
|
|
ck = ""
|
|
x = "z" # any value that is not an end- or startMarker
|
|
|
|
# wait for the start character
|
|
while ord(x) != startMarker:
|
|
x = self.serial.read()
|
|
|
|
# save data until the end marker is found
|
|
while ord(x) != endMarker:
|
|
if ord(x) == midMarker:
|
|
print('midMarker jfs ',ck)
|
|
if ck.startswith('p1') and self.is_p1_active:
|
|
out = self.convert2volume(ck.split(sep=':')[1],self.p1_units, self.p1_syringe_area)
|
|
self.p1_left -= out
|
|
self.ui.p1_remain_DISP.display(self.p1_left)
|
|
if ck.startswith('p2') and self.is_p2_active:
|
|
out = self.convert2volume(ck.split(sep=':')[1],self.p2_units, self.p2_syringe_area)
|
|
self.p2_left -= out
|
|
self.ui.p2_remain_DISP.display(self.p2_left)
|
|
if ck.startswith('p3') and self.is_p3_active:
|
|
out = self.convert2volume(ck.split(sep=':')[1],self.p3_units, self.p3_syringe_area)
|
|
self.p3_left -= out
|
|
self.ui.p3_remain_DISP.display(self.p3_left)
|
|
|
|
ck = ""
|
|
x = self.serial.read()
|
|
|
|
if ord(x) != startMarker:
|
|
#print(x)
|
|
ck = ck + x.decode()
|
|
|
|
x = self.serial.read()
|
|
|
|
return(ck)
|
|
|
|
#============================
|
|
|
|
|
|
def runTest(self, td):
|
|
numLoops = len(td)
|
|
waitingForReply = False
|
|
n = 0
|
|
|
|
while n < numLoops:
|
|
teststr = td[n]
|
|
|
|
if waitingForReply == False:
|
|
self.sendToArduino(teststr)
|
|
print("Sent from PC -- " + teststr)
|
|
waitingForReply = True
|
|
|
|
if waitingForReply == True:
|
|
|
|
while self.serial.inWaiting() == 0:
|
|
pass
|
|
|
|
dataRecvd = self.recvPositionArduino()
|
|
print("Reply Received -- " + dataRecvd)
|
|
n += 1
|
|
waitingForReply = False
|
|
|
|
|
|
time.sleep(0.1)
|
|
print("Send and receive complete\n\n")
|
|
|
|
def send_single_command(self, command):
|
|
waiting_for_reply = False
|
|
if waiting_for_reply == False:
|
|
self.sendToArduino(command)
|
|
print("Sent from PC -- STR " + command)
|
|
waiting_for_reply = True
|
|
if waiting_for_reply == True:
|
|
while self.serial.inWaiting() == 0:
|
|
pass
|
|
#data_received = self.recvFromArduino2()
|
|
data_received = "from other func"
|
|
print("Reply Received -- " + data_received)
|
|
waiting_for_reply = False
|
|
print("=============================\n\n")
|
|
print("Sent a single command")
|
|
|
|
|
|
# def listening(self):
|
|
# startMarker = self.startMarker
|
|
# midMarker = self.midMarker
|
|
# endMarker = self.endMarker
|
|
# posMarker = ord('?')
|
|
# i = 0
|
|
|
|
# while (True):
|
|
# self.serial.flushInput()
|
|
# x = "z"
|
|
# ck = ""
|
|
# isDisplay = "asdf"
|
|
# while self.serial.inWaiting() == 0:
|
|
# pass
|
|
# while not x or ord(x) != startMarker:
|
|
# x = self.serial.read()
|
|
# #if ord(x) == posMarker:
|
|
# # return self.get_position()
|
|
# while ord(x) != endMarker:
|
|
# if ord(x) == midMarker:
|
|
# i += 1
|
|
# print(ck)
|
|
# #isDisplay = ck
|
|
# #if i % 100 == 0:
|
|
# # self.ui.p1_absolute_DISP.display(ck)
|
|
# ck = ""
|
|
# x = self.serial.read()
|
|
|
|
# if ord(x) != startMarker:
|
|
# ck = ck + x.decode()
|
|
|
|
# x = self.serial.read()
|
|
# # TODO
|
|
# #if isDisplay == "START":
|
|
# # print("This is ck: " + ck)
|
|
# #motorID = int(ck)
|
|
# #self.is_p1_running = True
|
|
# #run thread(self.display_position, motorID)
|
|
|
|
# #toDisp = self.steps2mm(float(ck))
|
|
# #print("Pump num " + toDisp + " is now running.")i
|
|
# #self.ui.p1_absolute_DISP.display(toDisp)
|
|
# #isDisplay = ""
|
|
|
|
# #self.serial.flushInput()
|
|
# #print(self.serial.read(self.serial.inWaiting()).decode('ascii'))
|
|
# print(ck)
|
|
# print("\n")
|
|
|
|
# TODO
|
|
# def display_position(self, motorID):
|
|
# if motorID == 1:
|
|
|
|
# seconds = 0
|
|
# p1_speed = self.p1_speed_to_send
|
|
# p1_dist = 0
|
|
# p1_time = p1_dist/p1_speed
|
|
|
|
# time_start = time.start()
|
|
# while self.is_p1_running:
|
|
# pass
|
|
|
|
|
|
|
|
# def get_position(self):
|
|
# ck = ""
|
|
# x = self.serial.read()
|
|
|
|
# while ord(x) != self.endMarker:
|
|
# if ord(x) == self.midMarker:
|
|
# print(ck)
|
|
# ck = ""
|
|
# x = self.serial.read()
|
|
# ck = ck + x.decode()
|
|
# x = self.serial.read()
|
|
# print(ck)
|
|
# return (ck)
|
|
|
|
|
|
|
|
def closeEvent(self, event):
|
|
try:
|
|
#self.global_listener_thread.stop()
|
|
self.serial.close()
|
|
#self.threadpool.end()
|
|
|
|
except AttributeError:
|
|
pass
|
|
sys.exit()
|
|
|
|
# I feel better having one of these
|
|
def main():
|
|
# a new app instance
|
|
app = QtWidgets.QApplication(sys.argv)
|
|
window = MainWindow()
|
|
window.setWindowTitle("Poseidon Pumps Controller - Pachter Lab Caltech 2018")
|
|
window.show()
|
|
# without this, the script exits immediately.
|
|
sys.exit(app.exec_())
|
|
|
|
if __name__ == "__main__":
|
|
main()
|