jfsposeidon/jfs_main01.py

307 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from poseidon_main import MainWindow, Thread
import sys, time
import serial
from PyQt5 import QtCore, QtWidgets
import pyqtgraph as pg
class MainWindow2( MainWindow):
leftsignal = QtCore.pyqtSignal(float)
def __init__(self):
super(MainWindow2,self).__init__()
self.setWindowTitle('Pyshi´ne drag and drop')
#print(self.globports)
#print(self.microstepping)
self.threadx = {}
## grapfic output
self.timeBase=50
self.x = []
self.y = []
self.grafwg = pg.PlotWidget()
self.ui.gridLayout_2.addWidget(self.grafwg, 0, 1, 1, 1)
self.data_line = self.grafwg.plot(self.x,self.y,pen=(0,255,0))
## limit to stop the flow
self.wait2Stopflow = False
self.stopFlowLimit = 0
## serail things
if self.globports == []:
self.populate_ports()
self.ui.rheo_port_comboBox.addItems(self.globports)
self.ui.comboBox.addItems(self.globports)
#self.ui.rheo_port_comboBox.currentIndexChanged['QString'].connect(self.setRheoPort)
self.ui.openRheo_btn.clicked.connect(self.connectESP32)
self.ui.closeRheo_btn.clicked.connect(self.disconnectESP32)
self.ui.clearDisplay.clicked.connect(self.clearPlot)
self.ui.start_btn.clicked.connect(self.startesp32)
self.ui.stop_btn.clicked.connect(self.stopesp32)
self.ui.comboBox_tb.setCurrentText(str(self.timeBase))
self.ui.comboBox_tb.currentIndexChanged['QString'].connect(self.setTimeBase)
self.ui.checkBox.stateChanged.connect(self.clickedBox)
self.ui.lineEdit.returnPressed.connect(self.setFrequency)
self.ui.checkBox_2.stateChanged.connect(self.flowBox)
self.ui.pushButton.clicked.connect(self.connectRheo)
self.ui.pushButton_2.clicked.connect(self.disconnectRheo)
self.ui.checkBox_3.stateChanged.connect(self.clickedBox3)
self.ui.pushButton_3.clicked.connect(self.sendRheoHome)
self.ui.pushButton_4.clicked.connect(self.sendRheoInject)
self.leftsignal.connect(self.setwhatsleft)
### ESP32
def setFrequency(self):
s = self.ui.lineEdit.text()
cmd='o'+str(s)+'\n'
thread = Thread(self.sendToEsp32, cmd)
thread.finished.connect(lambda:self.thread_finished(thread))
thread.start()
def sendToEsp32(self, sendStr):
self.esp32_serial.write(sendStr.encode('utf-8'))
self.esp32_serial.flushInput()
def clickedBox(self,state):
if state == QtCore.Qt.Checked:
print('checkes')
self.threadx[2] =ThC_2(parent=None,port = self.esp32_serial)
self.threadx[2].start()
else:
print('no')
self.threadx[2].stop()
def flowBox(self,state):
if state == QtCore.Qt.Checked:
print('checkes')
self.wait2Stopflow=True
self.stopFlowLimit = float(self.ui.lineEdit_2.text())
else:
print('no')
self.wait2Stopflow=False
def startesp32(self):
self.threadx[1] = ThC(parent=None,index=1,port = self.esp32_serial,base=self.timeBase)
self.threadx[1].start()
self.threadx[1].datasignal.connect(self.updateData)
self.ui.start_btn.setEnabled(False)
self.ui.stop_btn.setEnabled(True)
def stopesp32(self):
self.threadx[1].stop()
self.ui.start_btn.setEnabled(True)
self.ui.stop_btn.setEnabled(False)
def setTimeBase(self):
self.timeBase=int(self.ui.comboBox_tb.currentText())
def connectRheo(self):
self.rheo_port = self.ui.comboBox.currentText()
self.statusBar().showMessage("You clicked CONNECT TO Rheo Ventil")
try:
port_declared = self.port in vars()
try:
print(self.rheo_port)
self.rheo_serial = serial.Serial()
self.rheo_serial.port = self.rheo_port
self.rheo_serial.baudrate = 115200
self.rheo_serial.parity = serial.PARITY_NONE
self.rheo_serial.stopbits = serial.STOPBITS_ONE
self.rheo_serial.bytesize = serial.EIGHTBITS
self.rheo_serial.timeout = 1
self.rheo_serial.open()
# self.ui.closeRheo_btn.setEnabled(True)
# self.ui.openRheo_btn.setEnabled(False)
# self.ui.start_btn.setEnabled(True)
# self.ui.stop_btn.setEnabled(False)
# self.ui.connect_BTN.setEnabled(False)
time.sleep(1)
self.statusBar().showMessage("Successfully connected to Rheo board.")
except:
self.statusBar().showMessage("Cannot connect to board. Try again..")
#raise CannotConnectException
except AttributeError:
self.statusBar().showMessage("Please plug in the board and select a proper port, then press connect.")
def disconnectRheo(self):
self.statusBar().showMessage("You clicked DISCONNECT FROM Rheo")
print("Disconnecting from board..")
time.sleep(0.1)
self.rheo_serial.close()
print("Board has been disconnected")
def clickedBox3(self,state):
if state == QtCore.Qt.Checked:
print('checkes')
self.sendRheoLoad()
else:
print('no')
self.sendRheoVentil()
def sendRheoInject(self):
xx = int(self.ui.lineEdit_3.text())
if (xx > 7) or (xx<0):
cmd=str(xx)+'\r\n'
self.sendToRheo(cmd)
def sendRheoHome(self):
cmd=str(3)+'\r\n'
self.sendToRheo(cmd)
def sendRheoLoad(self):
cmd=str(1)+'\r\n'
self.sendToRheo(cmd)
def sendRheoVentil(self):
cmd=str(2)+'\r\n'
self.sendToRheo(cmd)
def sendToRheo(self,cmd):
thread = Thread(self.sendRheo, cmd)
thread.finished.connect(lambda:self.thread_finished(thread))
thread.start()
def setwhatsleft(self,xx):
print (f'that left {xx} ')
def sendRheo(self, sendStr):
self.rheo_serial.flushInput()
self.rheo_serial.write(sendStr.encode('utf-8'))
cmd=str(5)+'\r\n'
self.rheo_serial.write(sendStr.encode('utf-8'))
inp = self.rheo_serial.readline().decode('utf')
self.leftsignal.emit(float(inp))
def connectESP32(self):
self.esp32_port = self.ui.rheo_port_comboBox.currentText()
self.statusBar().showMessage("You clicked CONNECT TO Esp32")
try:
port_declared = self.port in vars()
try:
print(self.esp32_port)
self.esp32_serial = serial.Serial()
self.esp32_serial.port = self.esp32_port
self.esp32_serial.baudrate = 115200
self.esp32_serial.parity = serial.PARITY_NONE
self.esp32_serial.stopbits = serial.STOPBITS_ONE
self.esp32_serial.bytesize = serial.EIGHTBITS
self.esp32_serial.timeout = 1
self.esp32_serial.open()
self.ui.closeRheo_btn.setEnabled(True)
self.ui.openRheo_btn.setEnabled(False)
self.ui.start_btn.setEnabled(True)
self.ui.stop_btn.setEnabled(False)
# self.ui.connect_BTN.setEnabled(False)
time.sleep(1)
self.statusBar().showMessage("Successfully connected to esp32 board.")
except:
self.statusBar().showMessage("Cannot connect to board. Try again..")
#raise CannotConnectException
except AttributeError:
self.statusBar().showMessage("Please plug in the board and select a proper port, then press connect.")
def disconnectESP32(self):
self.statusBar().showMessage("You clicked DISCONNECT FROM ESP32")
print("Disconnecting from board..")
time.sleep(0.1)
self.esp32_serial.close()
print("Board has been disconnected")
self.ui.openRheo_btn.setEnabled(True)
self.ui.closeRheo_btn.setEnabled(False)
self.ui.start_btn.setEnabled(False)
self.ui.stop_btn.setEnabled(False)
def checkForStopFlow(self):
if self.checkForStopFlow :
if self.y[-1] > self.stopFlowLimit :
self.ui.pause_BTN.clicked.emit()
def updateData(self,x,y):
self.x.append(x)
self.y.append(y)
self.data_line.setData(self.x,self.y)
if self.wait2Stopflow == True:
if (self.y[-1] >= self.stopFlowLimit):
self.ui.pause_BTN.clicked.emit()
self.wait2Stopflow = False
self.ui.checkBox_2.setChecked(False)
def clearPlot(self):
self.x.clear()
self.y.clear()
self.data_line.setData(self.x,self.y)
def sendToEsp32(self, sendStr):
self.esp32_serial.write(sendStr.encode())
self.esp32_serial.flushInput()
class ThC(QtCore.QThread):
datasignal = QtCore.pyqtSignal(int,float)
def __init__(self,parent=None,port=None,index=0,base=100):
super(ThC, self).__init__(parent)
self.index = index
self.is_running = True
self.port = port
self.base = base
def run(self):
print(f"Thread {self.index} started")
self.port.flushOutput()
self.port.flushInput()
i = self.base*1000
cmd = 's'+str(i)+'\n'
self.port.write(cmd.encode('utf-8'))
cnt = 0;
while True:
cnt+=1
inp = self.port.readline()
if inp[0] == 62 and inp[-3] == 60 :
inp = inp[1:-3]
y = float(inp.decode('utf-8'))
self.datasignal.emit(cnt,y)
time.sleep(0.001)
def stop(self):
self.is_running=False
print(f"Thread {self.index} stopped")
self.terminate()
class ThC_2(QtCore.QThread):
def __init__(self,parent=None,port=None):
super(ThC_2,self).__init__(parent)
self.is_running = True
self.port=port
def run(self):
cnt=0
while True:
cnt+= 1
if cnt >= 100 : cnt = 1
cmd = 'o'+str(cnt*10000)+'\n'
print(cmd)
self.port.write(cmd.encode('utf-8'))
time.sleep(1)
def stop(self):
self.is_running=False
print(f'Thread ThC-2 stopped')
self.terminate()
if __name__ == '__main__':
app = QtWidgets.QApplication(sys.argv)
window = MainWindow2()
window.show()
sys.exit(app.exec_())