jfsposeidon/poseidon_main.py

1534 lines
49 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import serial
import time
import glob
import sys
from datetime import datetime
import time
import os
# This gets the Qt stuff
from PyQt5 import QtCore, QtGui, QtWidgets
from PyQt5.QtWidgets import QMainWindow, QApplication, QFileDialog
import cv2
#from PyQt5 import cv2
# note, had to use version 3.2.0.8 otherwise it had its own
# pyqt packages that conflicted with mine
import numpy as np
from decimal import Decimal
# This is our window from QtCreator
import gui2
import pdb
import traceback, sys
# ##############################
# MULTITHREADING : SIGNALS CLASS
# ##############################
class WorkerSignals(QtCore.QObject):
'''
Defines the signals available from a running worker thread.
Supported signals are:
finished
No data
error
`tuple` (exctype, value, traceback.format_exc() )
result
`object` data returned from processing, anything
'''
finished = QtCore.pyqtSignal()
error = QtCore.pyqtSignal(tuple)
result = QtCore.pyqtSignal(object)
progress = QtCore.pyqtSignal(int)
datasignal = QtCore.pyqtSignal(int,float)
# #############################
# MULTITHREADING : WORKER CLASS
# #############################
class Thread(QtCore.QRunnable):
def __init__(self, fn, *args, **kwargs):
#parent = None
super().__init__()
self.runs = True
self.fn = fn
self.args = args
self.kwargs = kwargs
self.signals = WorkerSignals()
def run(self):
try:
#self.serial.flushInput()
#self.serial.flushOutput()
result = self.fn(*self.args, **self.kwargs)
except:
traceback.print_exc()
exctype, value = sys.exc_info()[:2]
self.signals.error.emit((exctype, value, traceback.format_exc()))
else:
self.signals.result.emit(result) # Return the result of the processing
finally:
self.signals.finished.emit() # Done
self.stop()
#print("Job completed")
def stop(self):
self.runs = False
# #####################################
# ERROR HANDLING : CANNOT CONNECT CLASS
# #####################################
class CannotConnectException(Exception):
pass
# #######################
# GUI : MAIN WINDOW CLASS
# #######################
class MainWindow(QtWidgets.QMainWindow, gui2.Ui_MainWindow):
# =======================================================
# INITIALIZING : The UI and setting some needed variables
# =======================================================
def __init__(self):
# Setting the UI to a class variable and connecting all GUI Components
super(MainWindow, self).__init__()
self.ui = gui2.Ui_MainWindow()
self.ui.setupUi(self)
#====================================== Jfs Display try
# aktueller Content in the syringe
self.p1_left = 0
self.p2_left = 0
self.p3_left = 0
self.globports =[]
#======================================================
# Put comments here
self.populate_microstepping()
self.populate_syringe_sizes()
self.populate_pump_jog_delta()
self.populate_pump_units()
self.setting_variables()
if self.globports == []:
self.populate_ports()
self.set_port()
self.connect_all_gui_components()
self.grey_out_components()
# Declaring start, mid, and end marker for sending code to Arduino
self.startMarker = 60# <
self.endMarker = 62 # ,F,0.0>
self.midMarker = 44 # ,
# Initializing multithreading to allow parallel operations
self.threadpool = QtCore.QThreadPool()
print("Multithreading with maximum %d threads" % self.threadpool.maxThreadCount())
# Camera setup
self.timer = QtCore.QTimer()
self.timer.setInterval(1000)
self.timer.timeout.connect(self.recurring_timer)
self.timer.start()
self.counter = 0
# Random other things I need
self.image = None
#self.microstepping = 1
#print(self.microstepping)
self.threadpool = QtCore.QThreadPool()
def recurring_timer(self):
self.counter +=1
# =============================
# SETTING : important variables
# =============================
def setting_variables(self):
self.set_p1_syringe()
self.set_p2_syringe()
self.set_p3_syringe()
#self.set_p1_units()
#self.set_p2_units()
#self.set_p3_units()
self.is_p1_active = False
self.is_p2_active = False
self.is_p3_active = False
self.experiment_notes = ""
def thread_finished(self, th):
print("Your thread has completed. Now terminating..")
th.stop()
print("Thread has been terminated.")
print("=============================\n\n")
# here is where you need to end the thread
# ===================================
# CONNECTING : all the GUI Components
# ===================================
def connect_all_gui_components(self):
# ~~~~~~~~~~~~~~~
# MAIN : MENU BAR
# ~~~~~~~~~~~~~~~
self.ui.load_settings_BTN.triggered.connect(self.load_settings)
self.ui.save_settings_BTN.triggered.connect(self.save_settings)
# ~~~~~~~~~~~~~~~~
# TAB : Controller
# ~~~~~~~~~~~~~~~~
# Px active checkboxes
self.ui.p1_activate_CHECKBOX.stateChanged.connect(self.toggle_p1_activation)
self.ui.p2_activate_CHECKBOX.stateChanged.connect(self.toggle_p2_activation)
self.ui.p3_activate_CHECKBOX.stateChanged.connect(self.toggle_p3_activation)
# Px display (TODO)
# Px syringe display
self.ui.p1_syringe_DROPDOWN.currentIndexChanged.connect(self.display_p1_syringe)
self.ui.p2_syringe_DROPDOWN.currentIndexChanged.connect(self.display_p2_syringe)
self.ui.p3_syringe_DROPDOWN.currentIndexChanged.connect(self.display_p3_syringe)
# Px speed display
self.ui.p1_units_DROPDOWN.currentIndexChanged.connect(self.display_p1_speed)
self.ui.p2_units_DROPDOWN.currentIndexChanged.connect(self.display_p2_speed)
self.ui.p3_units_DROPDOWN.currentIndexChanged.connect(self.display_p3_speed)
#self.populate_pump_units()
# Px amount
self.ui.p1_amount_INPUT.valueChanged.connect(self.set_p1_amount)
self.ui.p2_amount_INPUT.valueChanged.connect(self.set_p2_amount)
self.ui.p3_amount_INPUT.valueChanged.connect(self.set_p3_amount)
# Px jog delta
#self.ui.p1_jog_delta_INPUT.valueChanged.connect(self.set_p1_jog_delta)
#self.ui.p2_jog_delta_INPUT.valueChanged.connect(self.set_p2_jog_delta)
#self.ui.p3_jog_delta_INPUT.valueChanged.connect(self.set_p3_jog_delta)
# Action buttons
self.ui.run_BTN.clicked.connect(self.run)
self.ui.pause_BTN.clicked.connect(self.pause)
self.ui.zero_BTN.clicked.connect(self.zero)
self.ui.stop_BTN.clicked.connect(self.stop)
self.ui.jog_plus_BTN.clicked.connect(lambda:self.jog(self.ui.jog_plus_BTN))
self.ui.jog_minus_BTN.clicked.connect(lambda:self.jog(self.ui.jog_minus_BTN))
# Set coordinate system
self.ui.absolute_RADIO.toggled.connect(lambda:self.set_coordinate(self.ui.absolute_RADIO))
self.ui.incremental_RADIO.toggled.connect(lambda:self.set_coordinate(self.ui.incremental_RADIO))
# ~~~~~~~~~~~~
# TAB : Camera
# ~~~~~~~~~~~~
# Setting camera action buttons
self.ui.camera_connect_BTN.clicked.connect(self.start_camera)
self.ui.camera_disconnect_BTN.clicked.connect(self.stop_camera)
self.ui.camera_capture_image_BTN.clicked.connect(self.save_image)
# ~~~~~~~~~~~
# TAB : Setup
# ~~~~~~~~~~~
# Port, first populate it then connect it (population done earlier)
self.ui.refresh_ports_BTN.clicked.connect(self.refresh_ports)
self.ui.port_DROPDOWN.currentIndexChanged.connect(self.set_port)
self.ui.experiment_notes.editingFinished.connect(self.set_experiment_notes)
# Set the microstepping value, default is 1
self.ui.microstepping_DROPDOWN.currentIndexChanged.connect(self.set_microstepping)
# Set the log file name
self.date_string = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.date_string = self.date_string.replace(":","_") # Replace semicolons with underscores
# Px syringe size, populate then connect (population done earlier)
self.ui.p1_syringe_DROPDOWN.currentIndexChanged.connect(self.set_p1_syringe)
self.ui.p2_syringe_DROPDOWN.currentIndexChanged.connect(self.set_p2_syringe)
self.ui.p3_syringe_DROPDOWN.currentIndexChanged.connect(self.set_p3_syringe)
# warning to send the info to the controller
self.ui.p1_syringe_DROPDOWN.currentIndexChanged.connect(self.send_p1_warning)
self.ui.p2_syringe_DROPDOWN.currentIndexChanged.connect(self.send_p2_warning)
self.ui.p3_syringe_DROPDOWN.currentIndexChanged.connect(self.send_p3_warning)
# Px units
self.ui.p1_units_DROPDOWN.currentIndexChanged.connect(self.set_p1_units)
self.ui.p2_units_DROPDOWN.currentIndexChanged.connect(self.set_p2_units)
self.ui.p3_units_DROPDOWN.currentIndexChanged.connect(self.set_p3_units)
# warning to send the info to the controller
self.ui.p1_units_DROPDOWN.currentIndexChanged.connect(self.send_p1_warning)
self.ui.p2_units_DROPDOWN.currentIndexChanged.connect(self.send_p2_warning)
self.ui.p3_units_DROPDOWN.currentIndexChanged.connect(self.send_p3_warning)
# Px speed
self.ui.p1_speed_INPUT.valueChanged.connect(self.set_p1_speed)
self.ui.p2_speed_INPUT.valueChanged.connect(self.set_p2_speed)
self.ui.p3_speed_INPUT.valueChanged.connect(self.set_p3_speed)
# warning to send the info to the controller
self.ui.p1_speed_INPUT.valueChanged.connect(self.send_p1_warning)
self.ui.p2_speed_INPUT.valueChanged.connect(self.send_p2_warning)
self.ui.p3_speed_INPUT.valueChanged.connect(self.send_p3_warning)
# Px accel
self.ui.p1_accel_INPUT.valueChanged.connect(self.set_p1_accel)
self.ui.p2_accel_INPUT.valueChanged.connect(self.set_p2_accel)
self.ui.p3_accel_INPUT.valueChanged.connect(self.set_p3_accel)
# warning to send the info to the controller
self.ui.p1_accel_INPUT.valueChanged.connect(self.send_p1_warning)
self.ui.p2_accel_INPUT.valueChanged.connect(self.send_p2_warning)
self.ui.p3_accel_INPUT.valueChanged.connect(self.send_p3_warning)
# Px jog delta (setup)
self.ui.p1_setup_jog_delta_INPUT.currentIndexChanged.connect(self.set_p1_setup_jog_delta)
self.ui.p2_setup_jog_delta_INPUT.currentIndexChanged.connect(self.set_p2_setup_jog_delta)
self.ui.p3_setup_jog_delta_INPUT.currentIndexChanged.connect(self.set_p3_setup_jog_delta)
# warning to send the info to the contorller
self.ui.p1_setup_jog_delta_INPUT.currentIndexChanged.connect(self.send_p1_warning)
self.ui.p2_setup_jog_delta_INPUT.currentIndexChanged.connect(self.send_p2_warning)
self.ui.p3_setup_jog_delta_INPUT.currentIndexChanged.connect(self.send_p3_warning)
# Px send settings
self.ui.p1_setup_send_BTN.clicked.connect(self.send_p1_settings)
self.ui.p2_setup_send_BTN.clicked.connect(self.send_p2_settings)
self.ui.p3_setup_send_BTN.clicked.connect(self.send_p3_settings)
# remove warning to send settings
self.ui.p1_setup_send_BTN.clicked.connect(self.send_p1_success)
self.ui.p2_setup_send_BTN.clicked.connect(self.send_p2_success)
self.ui.p3_setup_send_BTN.clicked.connect(self.send_p3_success)
# Connect to arduino
self.ui.connect_BTN.clicked.connect(self.connect)
self.ui.disconnect_BTN.clicked.connect(self.disconnect)
# Send all the settings at once
self.ui.send_all_BTN.clicked.connect(self.send_all)
def send_p1_warning(self):
self.ui.p1_setup_send_BTN.setStyleSheet("background-color: green; color: black")
def send_p2_warning(self):
self.ui.p2_setup_send_BTN.setStyleSheet("background-color: green; color: black")
def send_p3_warning(self):
self.ui.p3_setup_send_BTN.setStyleSheet("background-color: green; color: black")
def send_p1_success(self):
self.ui.p1_setup_send_BTN.setStyleSheet("background-color: none")
def send_p2_success(self):
self.ui.p2_setup_send_BTN.setStyleSheet("background-color: none")
def send_p3_success(self):
self.ui.p3_setup_send_BTN.setStyleSheet("background-color: none")
def grey_out_components(self):
# ~~~~~~~~~~~~~~~~
# TAB : Controller
# ~~~~~~~~~~~~~~~~
self.ui.run_BTN.setEnabled(False)
self.ui.pause_BTN.setEnabled(False)
self.ui.zero_BTN.setEnabled(False)
self.ui.stop_BTN.setEnabled(False)
self.ui.jog_plus_BTN.setEnabled(False)
self.ui.jog_minus_BTN.setEnabled(False)
# ~~~~~~~~~~~~~~~~
# TAB : Setup
# ~~~~~~~~~~~~~~~~
self.ui.p1_setup_send_BTN.setEnabled(False)
self.ui.p2_setup_send_BTN.setEnabled(False)
self.ui.p3_setup_send_BTN.setEnabled(False)
self.ui.send_all_BTN.setEnabled(False)
def ungrey_out_components(self):
# ~~~~~~~~~~~~~~~~
# TAB : Controller
# ~~~~~~~~~~~~~~~~
self.ui.run_BTN.setEnabled(True)
self.ui.pause_BTN.setEnabled(True)
self.ui.zero_BTN.setEnabled(True)
self.ui.stop_BTN.setEnabled(True)
self.ui.jog_plus_BTN.setEnabled(True)
self.ui.jog_minus_BTN.setEnabled(True)
self.ui.run_BTN.setStyleSheet("background-color: green; color: black")
self.ui.pause_BTN.setStyleSheet("background-color: yellow; color: black")
self.ui.stop_BTN.setStyleSheet("background-color: red; color: black")
# ~~~~~~~~~~~~~~~~
# TAB : Setup
# ~~~~~~~~~~~~~~~~
self.ui.p1_setup_send_BTN.setEnabled(True)
self.ui.p2_setup_send_BTN.setEnabled(True)
self.ui.p3_setup_send_BTN.setEnabled(True)
self.ui.send_all_BTN.setEnabled(True)
# ======================
# FUNCTIONS : Controller
# ======================
def toggle_p1_activation(self):
if self.ui.p1_activate_CHECKBOX.isChecked():
self.is_p1_active = True
else:
self.is_p1_active = False
def toggle_p2_activation(self):
if self.ui.p2_activate_CHECKBOX.isChecked():
self.is_p2_active = True
else:
self.is_p2_active = False
def toggle_p3_activation(self):
if self.ui.p3_activate_CHECKBOX.isChecked():
self.is_p3_active = True
else:
self.is_p3_active = False
# Get a list of active pumps (IDK if this is the best way to do this)
def get_active_pumps(self):
pumps_list = [self.is_p1_active, self.is_p2_active, self.is_p3_active]
active_pumps = [i+1 for i in range(len(pumps_list)) if pumps_list[i]]
return active_pumps
def display_p1_syringe(self):
self.ui.p1_syringe_LABEL.setText(self.ui.p1_syringe_DROPDOWN.currentText())
def display_p2_syringe(self):
self.ui.p2_syringe_LABEL.setText(self.ui.p2_syringe_DROPDOWN.currentText())
def display_p3_syringe(self):
self.ui.p3_syringe_LABEL.setText(self.ui.p3_syringe_DROPDOWN.currentText())
def display_p1_speed(self):
self.ui.p1_units_LABEL.setText(str(self.p1_speed) + " " + self.ui.p1_units_DROPDOWN.currentText())
def display_p2_speed(self):
self.ui.p2_units_LABEL.setText(str(self.p2_speed) + " " + self.ui.p2_units_DROPDOWN.currentText())
def display_p3_speed(self):
self.ui.p3_units_LABEL.setText(str(self.p3_speed) + " " + self.ui.p3_units_DROPDOWN.currentText())
# Set Px distance to move
def set_p1_amount(self):
self.p1_amount = self.ui.p1_amount_INPUT.value()
def set_p2_amount(self):
self.p2_amount = self.ui.p2_amount_INPUT.value()
def set_p3_amount(self):
self.p3_amount = self.ui.p3_amount_INPUT.value()
# Set Px jog delta
#def set_p1_jog_delta(self):
# self.p1_jog_delta = self.ui.p1_jog_delta_INPUT.value()
#def set_p2_jog_delta(self):
# self.p2_jog_delta = self.ui.p2_jog_delta_INPUT.value()
#def set_p3_jog_delta(self):
# self.p3_jog_delta = self.ui.p3_jog_delta_INPUT.value()
# Set the coordinate system for the pump
def set_coordinate(self, radio):
if radio.text() == "Abs.":
if radio.isChecked():
self.coordinate = "absolute"
if radio.text() == "Incr.":
if radio.isChecked():
self.coordinate = "incremental"
def run(self):
self.statusBar().showMessage("You clicked RUN")
testData = []
active_pumps = self.get_active_pumps()
if len(active_pumps) > 0:
p1_input_displacement = str(self.convert_displacement(self.p1_amount, self.p1_units, self.p1_syringe_area, self.microstepping))
p2_input_displacement = str(self.convert_displacement(self.p2_amount, self.p2_units, self.p2_syringe_area, self.microstepping))
p3_input_displacement = str(self.convert_displacement(self.p3_amount, self.p3_units, self.p3_syringe_area, self.microstepping))
pumps_2_run = ''.join(map(str,active_pumps))
cmd = "<RUN,DIST,"+pumps_2_run+",0.0,F," + p1_input_displacement + "," + p2_input_displacement + "," + p3_input_displacement + ">"
testData.append(cmd)
print("Sending RUN command..")
thread = Thread(self.runTest, testData)
thread.signals.finished.connect(lambda:self.thread_finished(thread))
self.threadpool.start(thread)
print("RUN command sent.")
else:
self.statusBar().showMessage("No pumps enabled.")
# Clean up this text
def pause(self):
active_pumps = self.get_active_pumps()
pumps_2_run = ''.join(map(str,active_pumps))
if self.ui.pause_BTN.text() == "Pause":
self.statusBar().showMessage("You clicked PAUSE")
testData = []
cmd = "<PAUSE,BLAH," + pumps_2_run + ",BLAH,F,0.0,0.0,0.0>"
testData.append(cmd)
print("Sending PAUSE command..")
thread = Thread(self.runTest, testData)
thread.signals.finished.connect(lambda:self.thread_finished(thread))
self.threadpool.start(thread)
print("PAUSE command sent.")
self.ui.pause_BTN.setText("Resume")
elif self.ui.pause_BTN.text() == "Resume":
self.statusBar().showMessage("You clicked RESUME")
testData = []
cmd = "<RESUME,BLAH," + pumps_2_run + ",BLAH,F,0.0,0.0,0.0>"
testData.append(cmd)
print("Sending RESUME command..")
thread = Thread(self.runTest, testData)
thread.signals.finished.connect(lambda:self.thread_finished(thread))
self.threadpool.start(thread)
print("RESUME command sent.")
self.ui.pause_BTN.setText("Pause")
# fix
def zero(self):
self.statusBar().showMessage("You clicked ZERO")
testData = []
cmd = "<ZERO,BLAH,BLAH,BLAH,F,0.0,0.0,0.0>"
print("Sending ZERO command..")
thread = Thread(self.runTest, testData)
thread.signals.finished.connect(lambda:self.thread_finished(thread))
self.threadpool.start(thread)
##jfs
self.p1_left=int(self.p1_syringe.split(sep=' ')[1])
self.ui.p1_remain_DISP.display(self.p1_left)
self.p2_left=int(self.p2_syringe.split(sep=' ')[1])
self.ui.p2_remain_DISP.display(self.p2_left)
self.p3_left=int(self.p3_syringe.split(sep=' ')[1])
self.ui.p3_remain_DISP.display(self.p3_left)
##jfs
print("ZERO command sent.")
def stop(self):
self.statusBar().showMessage("You clicked STOP")
cmd = "<STOP,BLAH,BLAH,BLAH,F,0.0,0.0,0.0>"
print("Sending STOP command..")
thread = Thread(self.send_single_command, cmd)
thread.signals.finished.connect(lambda:self.thread_finished(thread))
self.threadpool.start(thread)
print("STOP command sent.")
def jog(self, btn):
self.statusBar().showMessage("You clicked JOG")
#self.serial.flushInput()
testData = []
active_pumps = self.get_active_pumps()
if len(active_pumps) > 0:
pumps_2_run = ''.join(map(str,active_pumps))
one_jog = str(self.p1_setup_jog_delta_to_send)
two_jog = str(self.p2_setup_jog_delta_to_send)
three_jog = str(self.p3_setup_jog_delta_to_send)
if btn.text() == "Jog +":
self.statusBar().showMessage("You clicked JOG +")
f_cmd = "<RUN,DIST," + pumps_2_run +",0,F," + one_jog + "," + two_jog + "," + three_jog + ">"
testData.append(f_cmd)
print("Sending JOG command..")
thread = Thread(self.runTest, testData)
thread.signals.finished.connect(lambda:self.thread_finished(thread))
self.threadpool.start(thread)
print("JOG command sent.")
elif btn.text() == "Jog -":
self.statusBar().showMessage("You clicked JOG -")
b_cmd = "<RUN,DIST," + pumps_2_run +",0,B," + one_jog + "," + two_jog + "," + three_jog + ">"
testData.append(b_cmd)
print("Sending JOG command..")
thread = Thread(self.runTest, testData)
thread.signals.finished.connect(lambda:self.thread_finished(thread))
self.threadpool.start(thread)
print("JOG command sent.")
else:
self.statusBar().showMessage("No pumps enabled.")
# ======================
# FUNCTIONS : Camera
# ======================
# Initialize the camera
def start_camera(self):
self.statusBar().showMessage("You clicked START CAMERA")
camera_port = 0
self.capture = cv2.VideoCapture(camera_port)
#TODO check the native resolution of the camera and scale the size down here
self.capture.set(cv2.CAP_PROP_FRAME_HEIGHT, 800)
self.capture.set(cv2.CAP_PROP_FRAME_WIDTH, 400)
self.timer = QtCore.QTimer(self)
self.timer.timeout.connect(self.update_frame)
self.timer.start(5)
# Update frame function
def update_frame(self):
ret, self.image = self.capture.read()
self.image = cv2.flip(self.image, 1)
self.display_image(self.image, 1)
# Display image in frame
def display_image(self, image, window=1):
qformat = QtGui.QImage.Format_Indexed8
if len(image.shape) == 3: #
if image.shape[2] == 4:
qformat = QtGui.QImage.Format_RGBA8888
else:
qformat = QtGui.QImage.Format_RGB888
#print(image.shape[0], image.shape[1], image.shape[2])
self.img_2_display = QtGui.QImage(image, image.shape[1], image.shape[0], image.strides[0], qformat)
self.img_2_display = QtGui.QImage.rgbSwapped(self.img_2_display)
if window == 1:
self.ui.imgLabel.setPixmap(QtGui.QPixmap.fromImage(self.img_2_display))
self.ui.imgLabel.setScaledContents(False)
# Save image to set location
def save_image(self):
if not os.path.exists("./images"):
os.mkdir("images")
self.date_string = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Replace semicolons with underscores
self.date_string = self.date_string.replace(":","_")
self.write_image_loc = './images/'+self.date_string + '.png'
cv2.imwrite(self.write_image_loc, self.image)
self.statusBar().showMessage("Captured Image, saved to: " + self.write_image_loc)
# Stop camera
def stop_camera(self):
self.timer.stop()
# ======================
# FUNCTIONS : Setup
# ======================
# Populate the available ports
def populate_ports(self):
"""
:raises EnvironmentError:
On unsupported or unknown platforms
:returns:
A list of the serial ports available on the system
"""
print("Populating ports..")
if sys.platform.startswith('win'):
ports = ['COM%s' % (i + 1) for i in range(256)]
elif sys.platform.startswith('linux') or sys.platform.startswith('cygwin'):
# this excludes your current terminal "/dev/tty"
ports = glob.glob('/dev/tty[A-Za-z]*')
elif sys.platform.startswith('darwin'):
ports = glob.glob('/dev/tty.*')
else:
raise EnvironmentError('Unsupported platform')
result = []
for port in ports:
try:
s = serial.Serial(port)
s.close()
result.append(port)
except (OSError, serial.SerialException):
pass
self.ui.port_DROPDOWN.addItems(result)
self.globports = result
print("Ports have been populated.",self.globports)
# Refresh the list of ports
def refresh_ports(self):
self.statusBar().showMessage("You clicked REFRESH PORTS")
self.ui.port_DROPDOWN.clear()
self.populate_ports()
self.set_port()
# Set the port that is selected from the dropdown menu
def set_port(self):
self.port = self.ui.port_DROPDOWN.currentText()
# Set the microstepping amount from the dropdown menu
# TODO: There is definitely a better way of updating different variables
# after there is a change of some input from the user. need to figure out.
def set_microstepping(self):
self.microstepping = int(self.ui.microstepping_DROPDOWN.currentText())
self.set_p1_units()
self.set_p1_speed()
self.set_p1_accel()
self.set_p1_setup_jog_delta()
self.set_p1_amount()
self.set_p2_units()
self.set_p2_speed()
self.set_p2_accel()
self.set_p2_setup_jog_delta()
self.set_p2_amount()
self.set_p3_units()
self.set_p3_speed()
self.set_p3_accel()
self.set_p3_setup_jog_delta()
self.set_p3_amount()
print(self.microstepping)
def set_experiment_notes(self):
self.experiment_notes = self.ui.experiment_notes.text()
# Set the name of the log file
# Can probably delete
def set_log_file_name(self):
"""
Sets the file name for the current test run, enables us to log data to the file.
Callback setter method from the 'self.ui.logFileNameInput' to set the
name of the log file. The log file name is of the form
label_Year-Month-Date hour_min_sec.txt
"""
# Create a date string
self.date_string = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Replace semicolons with underscores
self.date_string = self.date_string.replace(":","_")
self.log_file_name = self.ui.log_file_name_INPUT.text() + "_" + self.date_string + ".png"
def save_settings(self):
# TODO: if you cancel then it gives error, fix this
# TODO: add comment
name, _ = QFileDialog.getSaveFileName(self,'Save File', options=QFileDialog.DontUseNativeDialog)
# Create a date string
self.date_string = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Replace semicolons with underscores
self.date_string = self.date_string.replace(":","_")
date_string = self.date_string
# write all of the settings here
## Settings for pump 1
p1_syringe = str(self.p1_syringe)
p1_units = str(self.p1_units)
p1_speed = str(self.p1_speed)
p1_accel = str(self.p1_accel)
p1_setup_jog_delta = str(self.p1_setup_jog_delta)
## Settings for pump 2
p2_syringe = str(self.p2_syringe)
p2_units = str(self.p2_units)
p2_speed = str(self.p2_speed)
p2_accel = str(self.p2_accel)
p2_setup_jog_delta = str(self.p2_setup_jog_delta)
## Settings for pump 3
p3_syringe = str(self.p3_syringe)
p3_units = str(self.p3_units)
p3_speed = str(self.p3_speed)
p3_accel = str(self.p3_accel)
p3_setup_jog_delta = str(self.p3_setup_jog_delta)
## Experiment Notes
experiment_notes = self.experiment_notes
text = []
text.append("File name: " + name + ".txt" + "\n") # line 0
text.append("Date time: " + date_string + "\n") # line 1
text.append(":================================ \n") # line 2
text.append("P1 Syrin: " + p1_syringe + "\n") # line 3
text.append("P1 Units: " + p1_units + "\n") # line 4
text.append("P1 Speed: " + p1_speed + "\n") # line 5
text.append("P1 Accel: " + p1_accel + "\n") # line 6
text.append("P1 Jog D: " + p1_setup_jog_delta + "\n") # line 7
text.append(":================================ \n") # line 8
text.append("P2 Syrin: " + p2_syringe + "\n") # line 9
text.append("P2 Units: " + p2_units + "\n") # line 10
text.append("P2 Speed: " + p2_speed + "\n") # line 11
text.append("P2 Accel: " + p2_accel + "\n") # line 12
text.append("P2 Jog D: " + p2_setup_jog_delta + "\n") # line 13
text.append(":================================ \n") # line 14
text.append("P3 Syrin: " + p3_syringe + "\n") # line 15
text.append("P3 Units: " + p3_units + "\n") # line 16
text.append("P3 Speed: " + p3_speed + "\n") # line 17
text.append("P3 Accel: " + p3_accel + "\n") # line 18
text.append("P3 Jog D: " + p3_setup_jog_delta + "\n") # line 19
text.append("Exp Note: " + experiment_notes) # line 20
if name:
with open(name + ".txt", 'w') as f:
f.writelines(text)
self.statusBar().showMessage("Settings saved in " + name + ".txt")
def load_settings(self):
# need to make name an tuple otherwise i had an error and app crashed
name, _ = QFileDialog.getOpenFileName(self, 'Open File', options=QFileDialog.DontUseNativeDialog, filter = "Text (*.txt)")
if name:
with open(name, 'r') as f:
text = f.readlines()
# here is where you load all of your variables
# reformatting the text
text = [line.split(':')[-1].strip('\n')[1:] for line in text]
fname = text[0]
date_string = text[1]
p1_syringe = text[3]
p1_units = text[4]
p1_speed = text[5]
p1_accel = text[6]
p1_setup_jog_delta = text[7]
p2_syringe = text[9]
p2_units = text[10]
p2_speed = text[11]
p2_accel = text[12]
p2_setup_jog_delta = text[13]
p3_syringe = text[15]
p3_units = text[16]
p3_speed = text[17]
p3_accel = text[18]
p3_setup_jog_delta = text[19]
experiment_notes = text[20]
#print(fname, date_string, p1_syringe, p1_units, p1_speed, p1_accel, p1_setup_jog_delta)
# Here we are setting all of the values as given by the settings file
p1_syringe_index = self.ui.p1_syringe_DROPDOWN.findText(p1_syringe, QtCore.Qt.MatchFixedString)
self.ui.p1_syringe_DROPDOWN.setCurrentIndex(p1_syringe_index)
p1_units_index = self.ui.p1_units_DROPDOWN.findText(p1_units, QtCore.Qt.MatchFixedString)
self.ui.p1_units_DROPDOWN.setCurrentIndex(p1_units_index)
self.ui.p1_speed_INPUT.setValue(float(p1_speed))
self.ui.p1_accel_INPUT.setValue(float(p1_accel))
AllItems = [self.ui.p1_setup_jog_delta_INPUT.itemText(i) for i in range(self.ui.p1_setup_jog_delta_INPUT.count())]
p1_setup_jog_delta_index = self.ui.p1_setup_jog_delta_INPUT.findText(p1_setup_jog_delta, QtCore.Qt.MatchFixedString)
self.ui.p1_setup_jog_delta_INPUT.setCurrentIndex(p1_setup_jog_delta_index)
p2_syringe_index = self.ui.p2_syringe_DROPDOWN.findText(p2_syringe, QtCore.Qt.MatchFixedString)
self.ui.p2_syringe_DROPDOWN.setCurrentIndex(p2_syringe_index)
p2_units_index = self.ui.p2_units_DROPDOWN.findText(p2_units, QtCore.Qt.MatchFixedString)
self.ui.p2_units_DROPDOWN.setCurrentIndex(p2_units_index)
self.ui.p2_speed_INPUT.setValue(float(p2_speed))
self.ui.p2_accel_INPUT.setValue(float(p2_accel))
p2_setup_jog_delta_index = self.ui.p2_setup_jog_delta_INPUT.findText(p2_setup_jog_delta, QtCore.Qt.MatchFixedString)
self.ui.p2_setup_jog_delta_INPUT.setCurrentIndex(p2_setup_jog_delta_index)
p3_syringe_index = self.ui.p3_syringe_DROPDOWN.findText(p3_syringe, QtCore.Qt.MatchFixedString)
self.ui.p3_syringe_DROPDOWN.setCurrentIndex(p3_syringe_index)
p3_units_index = self.ui.p3_units_DROPDOWN.findText(p3_units, QtCore.Qt.MatchFixedString)
self.ui.p3_units_DROPDOWN.setCurrentIndex(p3_units_index)
self.ui.p3_speed_INPUT.setValue(float(p3_speed))
self.ui.p3_accel_INPUT.setValue(float(p3_accel))
p3_setup_jog_delta_index = self.ui.p3_setup_jog_delta_INPUT.findText(p3_setup_jog_delta, QtCore.Qt.MatchFixedString)
self.ui.p3_setup_jog_delta_INPUT.setCurrentIndex(p3_setup_jog_delta_index)
self.ui.experiment_notes.setText(experiment_notes)
self.statusBar().showMessage("Settings loaded from: " + text[1])
else:
self.statusBar().showMessage("No file selected.")
# Populate the microstepping amounts for the dropdown menu
def populate_microstepping(self):
self.microstepping_values = ['1', '2', '4', '8', '16', '32']
self.ui.microstepping_DROPDOWN.addItems(self.microstepping_values)
self.microstepping = 1
# Populate the list of possible syringes to the dropdown menus
def populate_syringe_sizes(self):
self.syringe_options = ["BD 1 mL", "BD 3 mL", "BD 5 mL", "BD 10 mL", "BD 20 mL", "BD 30 mL", "BD 60 mL"]
self.syringe_volumes = [1, 3, 5, 10, 20, 30, 60]
self.syringe_areas = [17.34206347, 57.88559215, 112.9089185, 163.539454, 285.022957, 366.0961536, 554.0462538]
self.ui.p1_syringe_DROPDOWN.addItems(self.syringe_options)
self.ui.p2_syringe_DROPDOWN.addItems(self.syringe_options)
self.ui.p3_syringe_DROPDOWN.addItems(self.syringe_options)
# Set Px syringe
def set_p1_syringe(self):
self.p1_syringe = self.ui.p1_syringe_DROPDOWN.currentText()
self.p1_syringe_area = self.syringe_areas[self.syringe_options.index(self.p1_syringe)]
self.display_p1_syringe()
self.set_p1_units()
self.set_p1_speed()
self.set_p1_accel()
self.set_p1_setup_jog_delta()
self.set_p1_amount()
def set_p2_syringe(self):
self.p2_syringe = self.ui.p2_syringe_DROPDOWN.currentText()
self.p2_syringe_area = self.syringe_areas[self.syringe_options.index(self.p2_syringe)]
self.display_p2_syringe()
self.set_p2_units()
self.set_p2_speed()
self.set_p2_accel()
self.set_p2_setup_jog_delta()
self.set_p2_amount()
def set_p3_syringe(self):
self.p3_syringe = self.ui.p3_syringe_DROPDOWN.currentText()
self.p3_syringe_area = self.syringe_areas[self.syringe_options.index(self.p3_syringe)]
self.display_p3_syringe()
self.set_p3_units()
self.set_p3_speed()
self.set_p3_accel()
self.set_p3_setup_jog_delta()
self.set_p3_amount()
# Set Px units
def set_p1_units(self):
self.p1_units = self.ui.p1_units_DROPDOWN.currentText()
length = self.p1_units.split("/")[0]
self.ui.p1_units_LABEL_2.setText(length)
self.set_p1_speed()
self.set_p1_accel()
self.set_p1_setup_jog_delta()
self.set_p1_amount()
def set_p2_units(self):
self.p2_units = self.ui.p2_units_DROPDOWN.currentText()
length = self.p2_units.split("/")[0]
self.ui.p2_units_LABEL_2.setText(length)
self.set_p2_speed()
self.set_p2_accel()
self.set_p2_setup_jog_delta()
self.set_p2_amount()
def set_p3_units(self):
self.p3_units = self.ui.p3_units_DROPDOWN.currentText()
length = self.p3_units.split("/")[0]
self.ui.p3_units_LABEL_2.setText(length)
self.set_p3_speed()
self.set_p3_accel()
self.set_p3_setup_jog_delta()
self.set_p3_amount()
def populate_pump_units(self):
self.units = ['mm/s', 'mL/s', 'mL/hr', 'µL/hr']
self.ui.p1_units_DROPDOWN.addItems(self.units)
self.ui.p2_units_DROPDOWN.addItems(self.units)
self.ui.p3_units_DROPDOWN.addItems(self.units)
def populate_pump_jog_delta(self):
self.jog_delta = ['0.01', '0.1', '1.0', '10.0']
self.ui.p1_setup_jog_delta_INPUT.addItems(self.jog_delta)
self.ui.p2_setup_jog_delta_INPUT.addItems(self.jog_delta)
self.ui.p3_setup_jog_delta_INPUT.addItems(self.jog_delta)
# Set Px speed
def set_p1_speed(self):
self.p1_speed = self.ui.p1_speed_INPUT.value()
self.ui.p1_units_LABEL.setText(str(self.p1_speed) + " " + self.ui.p1_units_DROPDOWN.currentText())
self.p1_speed_to_send = self.convert_speed(self.p1_speed, self.p1_units, self.p1_syringe_area, self.microstepping)
def set_p2_speed(self):
self.p2_speed = self.ui.p2_speed_INPUT.value()
self.ui.p2_units_LABEL.setText(str(self.p2_speed) + " " + self.ui.p2_units_DROPDOWN.currentText())
self.p2_speed_to_send = self.convert_speed(self.p2_speed, self.p2_units, self.p2_syringe_area, self.microstepping)
def set_p3_speed(self):
self.p3_speed = self.ui.p3_speed_INPUT.value()
self.ui.p3_units_LABEL.setText(str(self.p3_speed) + " " + self.ui.p3_units_DROPDOWN.currentText())
self.p3_speed_to_send = self.convert_speed(self.p3_speed, self.p3_units, self.p3_syringe_area, self.microstepping)
# Set Px accel
def set_p1_accel(self):
self.p1_accel = self.ui.p1_accel_INPUT.value()
self.p1_accel_to_send = self.convert_accel(self.p1_accel, self.p1_units, self.p1_syringe_area, self.microstepping)
def set_p2_accel(self):
self.p2_accel = self.ui.p2_accel_INPUT.value()
self.p2_accel_to_send = self.convert_accel(self.p2_accel, self.p2_units, self.p2_syringe_area, self.microstepping)
def set_p3_accel(self):
self.p3_accel = self.ui.p3_accel_INPUT.value()
self.p3_accel_to_send = self.convert_accel(self.p3_accel, self.p3_units, self.p3_syringe_area, self.microstepping)
# Set Px jog delta (setup)
def set_p1_setup_jog_delta(self):
self.p1_setup_jog_delta = self.ui.p1_setup_jog_delta_INPUT.currentText()
self.p1_setup_jog_delta = float(self.ui.p1_setup_jog_delta_INPUT.currentText())
self.p1_setup_jog_delta_to_send = self.convert_displacement(self.p1_setup_jog_delta, self.p1_units, self.p1_syringe_area, self.microstepping)
def set_p2_setup_jog_delta(self):
self.p2_setup_jog_delta = float(self.ui.p2_setup_jog_delta_INPUT.currentText())
self.p2_setup_jog_delta_to_send = self.convert_displacement(self.p2_setup_jog_delta, self.p2_units, self.p2_syringe_area, self.microstepping)
def set_p3_setup_jog_delta(self):
self.p3_setup_jog_delta = float(self.ui.p3_setup_jog_delta_INPUT.currentText())
self.p3_setup_jog_delta_to_send = self.convert_displacement(self.p3_setup_jog_delta, self.p3_units, self.p3_syringe_area, self.microstepping)
# Send Px settings
def send_p1_settings(self):
self.statusBar().showMessage("You clicked SEND P1 SETTINGS")
self.p1_settings = []
self.p1_settings.append("<SETTING,SPEED,1," + str(self.p1_speed_to_send) + ",F,0.0,0.0,0.0>")
self.p1_settings.append("<SETTING,ACCEL,1," + str(self.p1_accel_to_send) + ",F,0.0,0.0,0.0>")
self.p1_settings.append("<SETTING,DELTA,1," + str(self.p1_setup_jog_delta_to_send) + ",F,0.0,0.0,0.0>")
print("Sending P1 SETTINGS..")
thread = Thread(self.runTest, self.p1_settings)
thread.finishesignal.d.connect(lambda:self.thread_finished(thread))
self.threadpool.start(thread)
print("P1 SETTINGS sent.")
def send_p2_settings(self):
self.statusBar().showMessage("You clicked SEND P2 SETTINGS")
self.p2_settings = []
self.p2_settings.append("<SETTING,SPEED,2," + str(self.p2_speed_to_send) + ",F,0.0,0.0,0.0>")
self.p2_settings.append("<SETTING,ACCEL,2," + str(self.p2_accel_to_send) + ",F,0.0,0.0,0.0>")
self.p2_settings.append("<SETTING,DELTA,2," + str(self.p2_setup_jog_delta_to_send) + ",F,0.0,0.0,0.0>")
print("Sending P2 SETTINGS..")
thread = Thread(self.runTest, self.p2_settings)
thread.finishesignal.d.connect(lambda:self.thread_finished(thread))
self.threadpool.start(thread)
print("P2 SETTINGS sent.")
def send_p3_settings(self):
self.statusBar().showMessage("You clicked SEND P3 SETTINGS")
self.p3_settings = []
self.p3_settings.append("<SETTING,SPEED,3," + str(self.p3_speed_to_send) + ",F,0.0,0.0,0.0>")
self.p3_settings.append("<SETTING,ACCEL,3," + str(self.p3_accel_to_send) + ",F,0.0,0.0,0.0>")
self.p3_settings.append("<SETTING,DELTA,3," + str(self.p3_setup_jog_delta_to_send) + ",F,0.0,0.0,0.0>")
print("Sending P3 SETTINGS..")
thread = Thread(self.runTest, self.p3_settings)
thread.finishesignal.d.connect(lambda:self.thread_finished(thread))
self.threadpool.start(thread)
print("P3 SETTINGS sent.")
# Connect to the Arduino board
def connect(self):
#self.port_nano = '/dev/cu.usbserial-A9M11B77'
#self.port_uno = "/dev/cu.usbmodem1411"
#self.baudrate = baudrate
self.statusBar().showMessage("You clicked CONNECT TO CONTROLLER")
try:
port_declared = self.port in vars()
try:
self.serial = serial.Serial()
self.serial.port = self.port
self.serial.baudrate = 230400
self.serial.parity = serial.PARITY_NONE
self.serial.stopbits = serial.STOPBITS_ONE
self.serial.bytesize = serial.EIGHTBITS
self.serial.timeout = 1
self.serial.open()
#self.serial.flushInput()
# This is a thread that always runs and listens to commands from the Arduino
#self.global_listener_thread = Thread(self.listening)
#self.global_listener_thread.finished.connect(lambda:self.self.thread_finished(self.global_listenersignal._thread))
#self.global_listener_self.threadpool.start(thread)
# ~~~~~~~~~~~~~~~~
# TAB : Setup
# ~~~~~~~~~~~~~~~~
self.ui.disconnect_BTN.setEnabled(True)
self.ui.p1_setup_send_BTN.setEnabled(True)
self.ui.p2_setup_send_BTN.setEnabled(True)
self.ui.p3_setup_send_BTN.setEnabled(True)
self.ui.send_all_BTN.setEnabled(True)
self.ui.connect_BTN.setEnabled(False)
time.sleep(1)
self.statusBar().showMessage("Successfully connected to board.")
except:
self.statusBar().showMessage("Cannot connect to board. Try again..")
#raise CannotConnectException
except AttributeError:
self.statusBar().showMessage("Please plug in the board and select a proper port, then press connect.")
# Disconnect from the Arduino board
# TODO: figure out how to handle error..
def disconnect(self):
self.statusBar().showMessage("You clicked DISCONNECT FROM BOARD")
print("Disconnecting from board..")
#self.global_listener_thread.stop()
time.sleep(3)
self.serial.close()
print("Board has been disconnected")
self.grey_out_components()
self.ui.connect_BTN.setEnabled(True)
self.ui.disconnect_BTN.setEnabled(False)
# Send all settings
def send_all(self):
self.statusBar().showMessage("You clicked SEND ALL SETTINGS")
self.settings = []
self.settings.append("<SETTING,SPEED,1,"+str(self.p1_speed_to_send)+",F,0.0,0.0,0.0>")
self.settings.append("<SETTING,ACCEL,1,"+str(self.p1_accel_to_send)+",F,0.0,0.0,0.0>")
self.settings.append("<SETTING,DELTA,1,"+str(self.p1_setup_jog_delta_to_send)+",F,0.0,0.0,0.0>")
self.settings.append("<SETTING,SPEED,2,"+str(self.p2_speed_to_send)+",F,0.0,0.0,0.0>")
self.settings.append("<SETTING,ACCEL,2,"+str(self.p2_accel_to_send)+",F,0.0,0.0,0.0>")
self.settings.append("<SETTING,DELTA,2,"+str(self.p2_setup_jog_delta_to_send)+",F,0.0,0.0,0.0>")
self.settings.append("<SETTING,SPEED,3,"+str(self.p3_speed_to_send)+",F,0.0,0.0,0.0>")
self.settings.append("<SETTING,ACCEL,3,"+str(self.p3_accel_to_send)+",F,0.0,0.0,0.0>")
self.settings.append("<SETTING,DELTA,3,"+str(self.p3_setup_jog_delta_to_send)+",F,0.0,0.0,0.0>")
print("Sending all settings..")
thread = Thread(self.runTest, self.settings)
thread.signals.finished.connect(lambda:self.thread_finished(thread))
self.threadpool.start(thread)
self.ui.p1_setup_send_BTN.setStyleSheet("background-color: none")
self.ui.p2_setup_send_BTN.setStyleSheet("background-color: none")
self.ui.p3_setup_send_BTN.setStyleSheet("background-color: none")
self.ungrey_out_components()
# =======================
# MISC : Functions I need
# =======================
def steps2mm(self, steps):
# 200 steps per rev
# one rev is 0.8mm dist
#mm = steps/200/32*0.8
mm = steps/200/self.microstepping*0.8
return mm
def steps2mL(self, steps, syringe_area):
mL = self.mm32mL(self.steps2mm(steps)*syringe_area)
return mL
def steps2uL(self, steps, syringe_area):
uL = self.mm32uL(self.steps2mm(steps)*syringe_area,self.microstepping)
return uL
def mm2steps(self, mm, microsteps):
steps = mm/0.8*200*microsteps
#steps = mm*200/0.8
return steps
def mL2steps(self, mL, syringe_area, microsteps):
# note syringe_area is in mm^2
steps = self.mm2steps(self.mL2mm3(mL)/syringe_area, microsteps)
return steps
def uL2steps(self, uL, syringe_area, microsteps):
steps = self.mm2steps(self.uL2mm3(uL)/syringe_area, microsteps)
return steps
def mL2uL(self, mL):
return mL*1000.0
def mL2mm3(self, mL):
return mL*1000.0
def uL2mL(self, uL):
return uL/1000.0
def uL2mm3(self, uL):
return uL
def mm32mL(self, mm3):
return mm3/1000.0
def mm32uL(self, mm3):
return mm3
def persec2permin(self, value_per_sec):
value_per_min = value_per_sec*60.0
return value_per_min
def persec2perhour(self, value_per_sec):
value_per_hour = value_per_sec*60.0*60.0
return value_per_hour
def permin2perhour(self, value_per_min):
value_per_hour = value_per_min*60.0
return value_per_hour
def permin2persec(self, value_per_min):
value_per_sec = value_per_min/60.0
return value_per_sec
def perhour2permin(self, value_per_hour):
value_per_min = value_per_hour/60.0
return value_per_min
def perhour2persec(self, value_per_hour):
value_per_sec = value_per_hour/60.0/60.0
return value_per_sec
def convert_displacement(self, displacement, units, syringe_area, microsteps):
length = units.split("/")[0]
time = units.split("/")[1]
inp_displacement = displacement
# convert length first
if length == "mm":
displacement = self.mm2steps(displacement, microsteps)
elif length == "mL":
displacement = self.mL2steps(displacement, syringe_area, microsteps)
elif length == "µL":
displacement = self.uL2steps(displacement, syringe_area, microsteps)
print('______________________________')
print("INPUT DISPLACEMENT: " + str(inp_displacement) + ' ' + length)
print("OUTPUT DISPLACEMENT: " + str(displacement) + ' steps')
print('\n############################################################\n')
return displacement
def convert_speed(self, inp_speed, units, syringe_area, microsteps):
length = units.split("/")[0]
time = units.split("/")[1]
# convert length first
if length == "mm":
speed = self.mm2steps(inp_speed, microsteps)
elif length == "mL":
speed = self.mL2steps(inp_speed, syringe_area, microsteps)
elif length == "µL":
speed = self.uL2steps(inp_speed, syringe_area, microsteps)
# convert time next
if time == "s":
pass
elif time == "min":
speed = self.permin2persec(speed)
elif time == "hr":
speed = self.perhour2persec(speed)
print("INPUT SPEED: " + str(inp_speed) + ' ' + units)
print("OUTPUT SPEED: " + str(speed) + ' steps/s')
return speed
def convert_accel(self, accel, units, syringe_area, microsteps):
length = units.split("/")[0]
time = units.split("/")[1]
inp_accel = accel
accel = accel
# convert length first
if length == "mm":
accel = self.mm2steps(accel, microsteps)
elif length == "mL":
accel = self.mL2steps(accel, syringe_area, microsteps)
elif length == "µL":
accel = self.uL2steps(accel, syringe_area, microsteps)
# convert time next
if time == "s":
pass
elif time == "min":
accel = self.permin2persec(self.permin2persec(accel))
elif time == "hr":
accel = self.perhour2persec(self.perhour2persec(accel))
print('______________________________')
print("INPUT ACCEL: " + str(inp_accel) + ' ' + units + '/' + time)
print("OUTPUT ACCEL: " + str(accel) + ' steps/s/s')
return accel
'''
Syringe Volume (mL) | Syringe Area (mm^2)
-----------------------------------------------
1 | 17.34206347
3 | 57.88559215
5 | 112.9089185
10 | 163.539454
20 | 285.022957
30 | 366.0961536
60 | 554.0462538
IMPORTANT: These are for BD Plastic syringes ONLY!! Others will vary.
'''
def convert2volume(self, steps, units, syringe_area):
length = units.split("/")[0]
inp_steps = float(steps)
# convert length first
if length == "ul":
displacement = self.steps2uL(inp_steps, syringe_area)
else:
displacement = self.steps2mL(inp_steps, syringe_area)
return displacement
#====================================== Reading and Writing to Arduino
def sendToArduino(self, sendStr):
self.serial.write(sendStr.encode())
self.serial.flushInput()
def recvPositionArduino(self):
startMarker = self.startMarker
midMarker = self.midMarker
endMarker = self.endMarker
ck = ""
x = "z" # any value that is not an end- or startMarker
# wait for the start character
while ord(x) != startMarker:
x = self.serial.read()
# save data until the end marker is found
while ord(x) != endMarker:
if ord(x) == midMarker:
print('midMarker jfs ',ck)
if ck.startswith('p1') and self.is_p1_active:
out = self.convert2volume(ck.split(sep=':')[1],self.p1_units, self.p1_syringe_area)
self.p1_left -= out
self.ui.p1_remain_DISP.display(self.p1_left)
if ck.startswith('p2') and self.is_p2_active:
out = self.convert2volume(ck.split(sep=':')[1],self.p2_units, self.p2_syringe_area)
self.p2_left -= out
self.ui.p2_remain_DISP.display(self.p2_left)
if ck.startswith('p3') and self.is_p3_active:
out = self.convert2volume(ck.split(sep=':')[1],self.p3_units, self.p3_syringe_area)
self.p3_left -= out
self.ui.p3_remain_DISP.display(self.p3_left)
ck = ""
x = self.serial.read()
if ord(x) != startMarker:
#print(x)
ck = ck + x.decode()
x = self.serial.read()
return(ck)
#============================
def runTest(self, td):
numLoops = len(td)
waitingForReply = False
n = 0
while n < numLoops:
teststr = td[n]
if waitingForReply == False:
self.sendToArduino(teststr)
print("Sent from PC -- " + teststr)
waitingForReply = True
if waitingForReply == True:
while self.serial.inWaiting() == 0:
pass
dataRecvd = self.recvPositionArduino()
print("Reply Received -- " + dataRecvd)
n += 1
waitingForReply = False
time.sleep(0.1)
print("Send and receive complete\n\n")
def send_single_command(self, command):
waiting_for_reply = False
if waiting_for_reply == False:
self.sendToArduino(command)
print("Sent from PC -- STR " + command)
waiting_for_reply = True
if waiting_for_reply == True:
while self.serial.inWaiting() == 0:
pass
#data_received = self.recvFromArduino2()
data_received = "from other func"
print("Reply Received -- " + data_received)
waiting_for_reply = False
print("=============================\n\n")
print("Sent a single command")
# def listening(self):
# startMarker = self.startMarker
# midMarker = self.midMarker
# endMarker = self.endMarker
# posMarker = ord('?')
# i = 0
# while (True):
# self.serial.flushInput()
# x = "z"
# ck = ""
# isDisplay = "asdf"
# while self.serial.inWaiting() == 0:
# pass
# while not x or ord(x) != startMarker:
# x = self.serial.read()
# #if ord(x) == posMarker:
# # return self.get_position()
# while ord(x) != endMarker:
# if ord(x) == midMarker:
# i += 1
# print(ck)
# #isDisplay = ck
# #if i % 100 == 0:
# # self.ui.p1_absolute_DISP.display(ck)
# ck = ""
# x = self.serial.read()
# if ord(x) != startMarker:
# ck = ck + x.decode()
# x = self.serial.read()
# # TODO
# #if isDisplay == "START":
# # print("This is ck: " + ck)
# #motorID = int(ck)
# #self.is_p1_running = True
# #run thread(self.display_position, motorID)
# #toDisp = self.steps2mm(float(ck))
# #print("Pump num " + toDisp + " is now running.")i
# #self.ui.p1_absolute_DISP.display(toDisp)
# #isDisplay = ""
# #self.serial.flushInput()
# #print(self.serial.read(self.serial.inWaiting()).decode('ascii'))
# print(ck)
# print("\n")
# TODO
# def display_position(self, motorID):
# if motorID == 1:
# seconds = 0
# p1_speed = self.p1_speed_to_send
# p1_dist = 0
# p1_time = p1_dist/p1_speed
# time_start = time.start()
# while self.is_p1_running:
# pass
# def get_position(self):
# ck = ""
# x = self.serial.read()
# while ord(x) != self.endMarker:
# if ord(x) == self.midMarker:
# print(ck)
# ck = ""
# x = self.serial.read()
# ck = ck + x.decode()
# x = self.serial.read()
# print(ck)
# return (ck)
def closeEvent(self, event):
try:
#self.global_listener_thread.stop()
self.serial.close()
#self.threadpool.end()
except AttributeError:
pass
sys.exit()
# I feel better having one of these
def main():
# a new app instance
app = QtWidgets.QApplication(sys.argv)
window = MainWindow()
window.setWindowTitle("Poseidon Pumps Controller - Pachter Lab Caltech 2018")
window.show()
# without this, the script exits immediately.
sys.exit(app.exec_())
if __name__ == "__main__":
main()