jfsposeidon/jfs_main01.py
2022-11-03 18:20:26 +01:00

389 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from poseidon_main import MainWindow, Thread
import sys, time, os
import serial
from PyQt5 import QtCore, QtWidgets
import pyqtgraph as pg
class MainWindow2( MainWindow):
#leftsignal = QtCore.pyqtSignal(float)
def __init__(self):
super(MainWindow2,self).__init__()
self.setWindowTitle('Pyshi´ne drag and drop')
#print(self.globports)
#print(self.microstepping)
self.threadx = {}
## grapfic output
self.timeBase=50
self.x = []
self.y = []
self.grafwg = pg.PlotWidget()
self.ui.gridLayout_2.addWidget(self.grafwg, 0, 1, 1, 1)
self.data_line = self.grafwg.plot(self.x,self.y,pen=(0,255,0))
## limit to stop the flow
self.wait2Stopflow = False
self.stopFlowLimit = 0
## serail things
if self.globports == []:
self.populate_ports()
self.ui.rheo_port_comboBox.addItems(self.globports)
self.ui.comboBox.addItems(self.globports)
#self.ui.rheo_port_comboBox.currentIndexChanged['QString'].connect(self.setRheoPort)
self.ui.openRheo_btn.clicked.connect(self.connectESP32)
self.ui.closeRheo_btn.clicked.connect(self.disconnectESP32)
self.ui.clearDisplay.clicked.connect(self.clearPlot)
self.ui.start_btn.clicked.connect(self.startesp32)
self.ui.stop_btn.clicked.connect(self.stopesp32)
self.ui.comboBox_tb.setCurrentText(str(self.timeBase))
self.ui.comboBox_tb.currentIndexChanged['QString'].connect(self.setTimeBase)
self.ui.checkBox.stateChanged.connect(self.clickedBox)
self.ui.lineEdit.returnPressed.connect(self.setFrequency)
self.ui.checkBox_2.stateChanged.connect(self.flowBox)
self.ui.pushButton.clicked.connect(self.connectRheo)
self.ui.pushButton_2.clicked.connect(self.disconnectRheo)
self.ui.checkBox_3.stateChanged.connect(self.clickedBox3)
self.ui.pushButton_3.clicked.connect(self.sendRheoHome)
self.ui.pushButton_4.clicked.connect(self.sendRheoInject)
#self.leftsignal.connect(self.setwhatsleft)
### Link zur Doku
#self.webEngineView.setUrl(QtCore.QUrl("file:///C:/Users/jens/ownCloud/www/jfs/git2022/jfsPoseidon/test.html"))
file_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "doku\\testMd.html"))
file_path = file_path.replace("\\",'/')
file_path = 'file:///'+file_path
self.ui.webEngineView.setUrl(QtCore.QUrl(file_path))
### init GUI
#ESP32
self.ui.start_btn.setEnabled(False)
self.ui.stop_btn.setEnabled(False)
self.ui.closeRheo_btn.setEnabled(False)
self.ui.checkBox.setEnabled(False)
self.ui.lineEdit.setEnabled(False)
#Rheo
self.ui.pushButton.setEnabled(True)
self.ui.pushButton_2.setEnabled(False)
self.ui.checkBox_3.setEnabled(False)
self.ui.pushButton_3.setEnabled(False)
self.ui.pushButton_4.setEnabled(False)
### ESP32
def setFrequency(self):
s = self.ui.lineEdit.text()
cmd='o'+str(s)+'\n'
thread = Thread(self.sendToEsp32, cmd)
thread.signals.finished.connect(lambda:self.thread_finished(thread))
self.threadpool.start(thread)
def sendToEsp32(self, sendStr):
print(f" sending {sendStr}")
self.esp32_serial.write(sendStr.encode('utf-8'))
self.esp32_serial.flushInput()
def clickedBox(self,state):
if state == QtCore.Qt.Checked:
print('checkes')
#self.threadx[2] =ThC_2(parent=None,port = self.esp32_serial)
#self.threadx[2].start()
self.threadx[2] = ThC_2(port = self.esp32_serial)
self.threadpool.start(self.threadx[2])
else:
print('no')
self.threadx[2].stop()
def flowBox(self,state):
if state == QtCore.Qt.Checked:
print('checkes')
self.wait2Stopflow=True
self.stopFlowLimit = float(self.ui.lineEdit_2.text())
else:
print('no')
self.wait2Stopflow=False
def startesp32(self):
self.clearPlot()
self.threadx[1] = ThC(index=1,port = self.esp32_serial,base=self.timeBase)
#self.threadx[1].start()
self.threadpool.start(self.threadx[1])
self.threadx[1].signals.datasignal.connect(self.updateData)
self.ui.start_btn.setEnabled(False)
self.ui.stop_btn.setEnabled(True)
self.ui.checkBox.setEnabled(True)
self.ui.comboBox_tb.setEnabled(False)
def stopesp32(self):
self.threadx[1].stop()
self.ui.start_btn.setEnabled(True)
self.ui.stop_btn.setEnabled(False)
self.ui.comboBox_tb.setEnabled(True)
def setTimeBase(self):
self.timeBase=int(self.ui.comboBox_tb.currentText())
def connectRheo(self):
self.rheo_port = self.ui.comboBox.currentText()
self.statusBar().showMessage("You clicked CONNECT TO Rheo Ventil")
try:
port_declared = self.port in vars()
try:
print(self.rheo_port)
self.rheo_serial = serial.Serial()
self.rheo_serial.port = self.rheo_port
self.rheo_serial.baudrate = 115200
self.rheo_serial.parity = serial.PARITY_NONE
self.rheo_serial.stopbits = serial.STOPBITS_ONE
self.rheo_serial.bytesize = serial.EIGHTBITS
self.rheo_serial.timeout = 1
self.rheo_serial.open()
self.ui.pushButton.setEnabled(False)
self.ui.pushButton_2.setEnabled(True)
self.ui.checkBox_3.setEnabled(True)
self.ui.checkBox_3.setCheckState(False)
self.ui.pushButton_3.setEnabled(True)
self.ui.pushButton_4.setEnabled(True)
time.sleep(1)
self.statusBar().showMessage("Successfully connected to Rheo board.")
self.getFromRheo()
except:
self.statusBar().showMessage("Cannot connect to board. Try again..")
#raise CannotConnectException
except AttributeError:
self.statusBar().showMessage("Please plug in the board and select a proper port, then press connect.")
def disconnectRheo(self):
self.statusBar().showMessage("You clicked DISCONNECT FROM Rheo")
print("Disconnecting from board..")
time.sleep(0.1)
self.rheo_serial.close()
self.ui.pushButton.setEnabled(True)
self.ui.pushButton_2.setEnabled(False)
self.ui.checkBox_3.setEnabled(False)
self.ui.pushButton_3.setEnabled(False)
self.ui.pushButton_4.setEnabled(False)
print("Board has been disconnected")
def clickedBox3(self,state):
if state == QtCore.Qt.Checked:
print('checkes')
self.sendRheoLoad()
else:
print('no')
self.sendRheoVentil()
def sendRheoInject(self):
xx = int(self.ui.lineEdit_3.text())
if (xx > 7) or (xx<0):
cmd=str(xx)+'\r\n'
self.sendToRheo(cmd)
self.getFromRheo()
def sendRheoHome(self):
cmd=str(3)+'\r\n'
self.sendToRheo(cmd)
self.getFromRheo()
def sendRheoLoad(self):
cmd=str(1)+'\r\n'
self.sendToRheo(cmd)
def sendRheoVentil(self):
cmd=str(2)+'\r\n'
self.sendToRheo(cmd)
def sendToRheo(self,cmd):
thread = Thread(self.sendRheo, cmd)
thread.signals.finished.connect(lambda:self.thread_finished(thread))
self.threadpool.start(thread)
def setwhatsleft(self,xx):
#print (f'that left {xx} ')
self.ui.label_26.setText(f'left {xx}')
def sendRheo(self, sendStr):
self.rheo_serial.flushInput()
self.rheo_serial.write(sendStr.encode('utf-8'))
#cmd=str(5)+'\r\n'
#time.sleep(1)
#self.rheo_serial.write(cmd.encode('utf-8'))
#time.sleep(1)
#inp = self.rheo_serial.readline().decode('utf-8')
#print(inp)
#self.leftsignal.emit(float(inp))
def getFromRheo(self):
cmd = ""
thread = Thread(self.getRheoVolume,cmd)
thread.signals.finished.connect(lambda:self.thread_finished(thread))
thread.signals.result.connect(self.setwhatsleft)
self.threadpool.start(thread)
def getRheoVolume(self,cmd):
time.sleep(1)
self.rheo_serial.flushOutput()
self.rheo_serial.flushInput()
cmd = str(5)+'\r\n'
self.rheo_serial.write(cmd.encode('utf-8'))
time.sleep(1)
inp = self.rheo_serial.readline().decode('utf-8')
inp = inp.rstrip()
print(f" >{inp}<")
#inp = 200
return inp
def connectESP32(self):
self.esp32_port = self.ui.rheo_port_comboBox.currentText()
self.statusBar().showMessage("You clicked CONNECT TO Esp32")
try:
port_declared = self.port in vars()
try:
print(self.esp32_port)
self.esp32_serial = serial.Serial()
self.esp32_serial.port = self.esp32_port
self.esp32_serial.baudrate = 115200
self.esp32_serial.parity = serial.PARITY_NONE
self.esp32_serial.stopbits = serial.STOPBITS_ONE
self.esp32_serial.bytesize = serial.EIGHTBITS
self.esp32_serial.timeout = 1
self.esp32_serial.open()
self.ui.closeRheo_btn.setEnabled(True)
self.ui.openRheo_btn.setEnabled(False)
self.ui.start_btn.setEnabled(True)
self.ui.stop_btn.setEnabled(False)
self.ui.checkBox.setEnabled(True)
self.ui.lineEdit.setEnabled(True)
# self.ui.connect_BTN.setEnabled(False)
time.sleep(1)
self.statusBar().showMessage("Successfully connected to esp32 board.")
except:
self.statusBar().showMessage("Cannot connect to board. Try again..")
#raise CannotConnectException
except AttributeError:
self.statusBar().showMessage("Please plug in the board and select a proper port, then press connect.")
def disconnectESP32(self):
self.statusBar().showMessage("You clicked DISCONNECT FROM ESP32")
print("Disconnecting from board..")
time.sleep(0.1)
self.esp32_serial.close()
print("Board has been disconnected")
self.ui.openRheo_btn.setEnabled(True)
self.ui.closeRheo_btn.setEnabled(False)
self.ui.start_btn.setEnabled(False)
self.ui.stop_btn.setEnabled(False)
self.ui.checkBox.setEnabled(False)
self.ui.lineEdit.setEnabled(False)
def checkForStopFlow(self):
if self.checkForStopFlow :
if self.y[-1] > self.stopFlowLimit :
self.ui.pause_BTN.clicked.emit()
def updateData(self,x,y):
self.x.append(x)
self.y.append(y)
self.data_line.setData(self.x,self.y)
if self.wait2Stopflow == True:
if (self.y[-1] >= self.stopFlowLimit):
self.ui.pause_BTN.clicked.emit()
self.wait2Stopflow = False
self.ui.checkBox_2.setChecked(False)
def clearPlot(self):
self.x.clear()
self.y.clear()
self.data_line.setData(self.x,self.y)
def sendToEsp32(self, sendStr):
self.esp32_serial.write(sendStr.encode())
self.esp32_serial.flushInput()
class WorkerKilledException(Exception):
pass
class ThC_Signals(QtCore.QObject):
datasignal = QtCore.pyqtSignal(int,float)
class ThC(QtCore.QRunnable):
#datasignal = QtCore.pyqtSignal(int,float)
def __init__(self,port=None,index=0,base=100):
super().__init__()
self.index = index
self.is_running = True
self.port = port
self.base = base
self.signals = ThC_Signals()
def run(self):
print(f"Thread {self.index} started")
self.port.flushOutput()
self.port.flushInput()
i = self.base*1000
cmd = 's'+str(i)+'\n'
self.port.write(cmd.encode('utf-8'))
cnt = 0;
while True:
cnt+=1
inp = self.port.readline()
try:
if inp[0] == 62 and inp[-3] == 60 :
inp = inp[1:-3]
y = float(inp.decode('utf-8'))
self.signals.datasignal.emit(cnt,y)
except IndexError:
pass
time.sleep(0.001)
if self.is_running==False:
raise WorkerKilledException
def stop(self):
self.is_running=False
print(f"Thread {self.index} stopped")
#self.terminate()
class ThC_2(QtCore.QRunnable):
def __init__(self,port=None):
super().__init__()
self.is_running = True
self.port=port
def run(self):
cnt=0
while True:
cnt+= 1
if cnt >= 100 : cnt = 1
cmd = 'o'+str(cnt*10000)+'\n'
print(cmd)
self.port.write(cmd.encode('utf-8'))
time.sleep(1)
if self.is_running==False:
raise WorkerKilledException
def stop(self):
self.is_running=False
print(f'Thread ThC-2 stopped')
if __name__ == '__main__':
app = QtWidgets.QApplication(sys.argv)
window = MainWindow2()
window.show()
sys.exit(app.exec_())