260 lines
8.7 KiB
Python
260 lines
8.7 KiB
Python
import os
|
|
import json
|
|
from time import time, sleep
|
|
|
|
from PyQt4 import QtGui, QtCore, uic
|
|
|
|
import tango
|
|
|
|
import magic_numbers
|
|
from labels import cm
|
|
from threads import ThreadWrapper
|
|
|
|
from grouping import GroupingWindow
|
|
from seroutput import SERWindow
|
|
|
|
base_dir = os.path.dirname(os.path.realpath(__file__))
|
|
|
|
|
|
class MainWindow(QtGui.QWidget):
|
|
def __init__(self, offline=False, parent=None):
|
|
QtGui.QWidget.__init__(self, parent)
|
|
uic.loadUi("%s/ui/main.ui" % base_dir, self)
|
|
|
|
self.offline = offline
|
|
self.devices = {}
|
|
self.units = None
|
|
self.cache = {}
|
|
self.virtual = {
|
|
"delay": 0,
|
|
"avg_delay": 0,
|
|
"_delay_sum": 0,
|
|
"_delay_n": 0
|
|
}
|
|
|
|
self.grouping = GroupingWindow(self)
|
|
self.ser_output = SERWindow(self)
|
|
|
|
self.connect(self.FOFBONButton, QtCore.SIGNAL("clicked()"),
|
|
lambda: self.send_tim_event("EVG", magic_numbers.EventID.FOFB_ON)
|
|
)
|
|
self.connect(self.FOFBOFFButton, QtCore.SIGNAL("clicked()"),
|
|
lambda: self.send_tim_event("EVG", magic_numbers.EventID.FOFB_OFF)
|
|
)
|
|
self.connect(self.FOFBStandbyButton, QtCore.SIGNAL("clicked()"),
|
|
lambda: self.send_tim_event("EVG", magic_numbers.EventID.FOFB_STANDBY)
|
|
)
|
|
self.connect(self.SwitchBuffersButton, QtCore.SIGNAL("clicked()"),
|
|
lambda: self.send_tim_event("EVG", magic_numbers.EventID.UPDATE_DOUBLE_BUFFERS)
|
|
)
|
|
self.connect(self.SendEventButton, QtCore.SIGNAL("clicked()"),
|
|
lambda: self.send_tim_event("EVG", self.EventSpinBox.value())
|
|
)
|
|
|
|
self.connect(self.GroupingButton, QtCore.SIGNAL("clicked()"), self.grouping.show)
|
|
self.connect(self.SERButton, QtCore.SIGNAL("clicked()"), self.ser_output.show)
|
|
|
|
self.connect(self.XInterlockResetButton, QtCore.SIGNAL("clicked()"),
|
|
lambda: self.execute_ireg_command_on_axis("x", "GdxFOFBIlkReset")
|
|
)
|
|
self.connect(self.XReconnectButton, QtCore.SIGNAL("clicked()"),
|
|
lambda: self.execute_ireg_command_on_axis("x", "GdxFOFBReconnect")
|
|
)
|
|
self.connect(self.XResetPIButton, QtCore.SIGNAL("clicked()"),
|
|
lambda: self.execute_ireg_command_on_axis("x", "GdxFOFBPIReset")
|
|
)
|
|
self.connect(self.XSaturationResetButton, QtCore.SIGNAL("clicked()"),
|
|
lambda: self.execute_ireg_command_on_axis("x", "GdxFOFBSaturationReset")
|
|
)
|
|
|
|
# self.connect(self.YInterlockResetButton, QtCore.SIGNAL("clicked()"),
|
|
# lambda: self.execute_ireg_command_on_axis("y", "GdxFOFBIlkReset")
|
|
# )
|
|
# self.connect(self.YReconnectButton, QtCore.SIGNAL("clicked()"),
|
|
# lambda: self.execute_ireg_command_on_axis("y", "GdxFOFBReconnect")
|
|
# )
|
|
# self.connect(self.YResetPIButton, QtCore.SIGNAL("clicked()"),
|
|
# lambda: self.execute_ireg_command_on_axis("y", "GdxFOFBPIReset")
|
|
# )
|
|
# self.connect(self.YSaturationResetButton, QtCore.SIGNAL("clicked()"),
|
|
# lambda: self.execute_ireg_command_on_axis("y", "GdxFOFBSaturationReset")
|
|
# )
|
|
|
|
self.update_timer = QtCore.QTimer()
|
|
self.connect(self.update_timer, QtCore.SIGNAL("timeout()"), self.update)
|
|
|
|
self.connect(self, QtCore.SIGNAL("update"), self.grouping.update)
|
|
self.connect(self, QtCore.SIGNAL("update"), self.ser_output.update)
|
|
|
|
self.connect(self, QtCore.SIGNAL("loadingdone"), self.loading_done)
|
|
self.connect(self, QtCore.SIGNAL("update"), self.update_virtual_in_gui)
|
|
|
|
def setModel(self):
|
|
self.EVGStateLabel.setModel("EVG")
|
|
|
|
self.XStateLabel.setModel(self.settings["x"])
|
|
self.XInterlockLabel.setModel(self.settings["x"], "GdxFOFBIlk")
|
|
self.XIDLabel.setText(self.settings["x"])
|
|
self.XFOFBStateLabel.setModel(self.settings["x"], "GdxFOFBState")
|
|
self.XFOFBStateLabel.setColorMap(cm.FOFB_STATE)
|
|
# self.XBuffersLabel.setModel()
|
|
|
|
self.XS1Label.setModel(self.settings["x"], "GdxFOFBSaturation", 0)
|
|
self.XS1Label.setColorMap(cm.RED_BOOL)
|
|
self.XS2Label.setModel(self.settings["x"], "GdxFOFBSaturation", 1)
|
|
self.XS2Label.setColorMap(cm.RED_BOOL)
|
|
self.XS3Label.setModel(self.settings["x"], "GdxFOFBSaturation", 2)
|
|
self.XS3Label.setColorMap(cm.RED_BOOL)
|
|
self.XS4Label.setModel(self.settings["x"], "GdxFOFBSaturation", 3)
|
|
self.XS4Label.setColorMap(cm.RED_BOOL)
|
|
|
|
# self.YStateLabel.setModel(self.settings["y"])
|
|
# self.YInterlockLabel.setModel(self.settings["y"], "GdxFOFBIlk")
|
|
# self.YIDLabel.setText(self.settings["y"])
|
|
# self.YFOFBStateLabel.setModel(self.settings["y"], "GdxFOFBState")
|
|
# self.YFOFBStateLabel.setColorMap(cm.FOFB_STATE)
|
|
# # self.XBuffersLabel.setModel()
|
|
|
|
# self.YS1Label.setModel(self.settings["y"], "GdxFOFBSaturation", 0)
|
|
# self.YS1Label.setColorMap(cm.RED_BOOL)
|
|
# self.YS2Label.setModel(self.settings["y"], "GdxFOFBSaturation", 1)
|
|
# self.YS2Label.setColorMap(cm.RED_BOOL)
|
|
# self.YS3Label.setModel(self.settings["y"], "GdxFOFBSaturation", 2)
|
|
# self.YS3Label.setColorMap(cm.RED_BOOL)
|
|
# self.YS4Label.setModel(self.settings["y"], "GdxFOFBSaturation", 3)
|
|
# self.YS4Label.setColorMap(cm.RED_BOOL)
|
|
|
|
def start(self, fn, config=None):
|
|
ThreadWrapper(self.load, args=(fn, config)).start()
|
|
|
|
def loading_done(self):
|
|
# show all startup windows
|
|
self.show()
|
|
# fill grouping and magnets
|
|
for dev in self.devices.keys():
|
|
if dev.upper() != "EVG":
|
|
self.grouping.addLibera(dev)
|
|
self.ser_output.addLibera(dev)
|
|
self.grouping.addGlobal(self.settings["global_orbit"], self.settings["global_magnet"])
|
|
# start updates
|
|
self.update_timer.start(magic_numbers.MASTER_TIMER)
|
|
|
|
def closeEvent(self, evt):
|
|
self.grouping.close()
|
|
self.ser_output.close()
|
|
evt.accept()
|
|
|
|
def setVersion(self, version):
|
|
self.VersionLabel.setText("v.%s" % version)
|
|
|
|
def load(self, fn, cfg=None):
|
|
if fn:
|
|
with open(fn) as f:
|
|
conf = json.load(f)
|
|
else:
|
|
conf = json.loads(cfg)
|
|
for label, dev in conf["devices"].iteritems():
|
|
self.addDevice(label, dev)
|
|
self.units = conf["units"]
|
|
self.settings = conf["settings"]
|
|
self.setModel()
|
|
self.applySettings()
|
|
self.emit(QtCore.SIGNAL("loadingdone"))
|
|
|
|
def addDevice(self, label, dev):
|
|
self.emit(QtCore.SIGNAL("loadstatus"), "Loading %s = %s" % (label, dev))
|
|
if not self.offline:
|
|
self.devices[label] = tango.DeviceProxy(dev)
|
|
if label not in self.cache:
|
|
self.cache[label] = {}
|
|
|
|
def applySettings(self):
|
|
pass
|
|
|
|
def addAttribute(self, dev, attr):
|
|
self.emit(QtCore.SIGNAL("loadstatus"), "Add attribute %s of device %s" % (attr, dev))
|
|
if dev not in self.cache:
|
|
self.cache[dev] = {}
|
|
self.cache[dev][attr] = None
|
|
|
|
def update(self):
|
|
print "[ INFO]: update"
|
|
try:
|
|
start = time()
|
|
threads = {}
|
|
for dev, attrs in self.cache.iteritems():
|
|
threads[dev] = {}
|
|
for attr in attrs.keys():
|
|
threads[dev][attr] = ThreadWrapper(self.read_attribute, args=(dev, attr))
|
|
threads[dev][attr].start()
|
|
|
|
for dev, attrs in self.cache.iteritems():
|
|
for attr in attrs.keys():
|
|
threads[dev][attr].join()
|
|
self.cache[dev][attr] = threads[dev][attr].result
|
|
|
|
self.update_virtual(time() - start)
|
|
except Exception as e:
|
|
print "[ERROR]: update failed: %s" % str(e)
|
|
self.emit(QtCore.SIGNAL("update"), self.cache)
|
|
|
|
def read_attribute(self, dev, attr):
|
|
try:
|
|
value = self.devices[dev].read_attribute(attr).value
|
|
if type(value) in (int, float):
|
|
try:
|
|
conv = self.units[dev][attr]["conversion"]
|
|
except (KeyError, TypeError):
|
|
conv = 1
|
|
value = value * conv
|
|
|
|
# special case for machine state attribute
|
|
if dev == "statemachine" and attr == "MachineState":
|
|
if not value:
|
|
value = "UNKNOWN !!!"
|
|
|
|
return value
|
|
except Exception as e:
|
|
print "[ERROR]: could not read attribute %s of device %s: %s" % (attr, dev, str(e))
|
|
return None
|
|
|
|
def update_virtual(self, fetch_time):
|
|
self.virtual["delay"] = fetch_time * 1000.0 # convert to ms
|
|
self.virtual["_delay_sum"] += self.virtual["delay"]
|
|
self.virtual["_delay_n"] += 1
|
|
self.virtual["avg_delay"] = self.virtual["_delay_sum"] / self.virtual["_delay_n"]
|
|
|
|
def update_virtual_in_gui(self):
|
|
# This whole thing is a big, big technical debt
|
|
self.DelayLabel.setText("%.02f ms" % self.virtual["delay"])
|
|
if self.virtual["delay"] > magic_numbers.MASTER_TIMER:
|
|
style = "background:red; color:white"
|
|
elif self.virtual["delay"] > magic_numbers.MASTER_TIMER / 2.0:
|
|
style = "background:orange; color:white"
|
|
else:
|
|
style = ""
|
|
self.DelayLabel.setStyleSheet(style)
|
|
|
|
self.AvgDelayLabel.setText("avg: %.02f ms" % self.virtual["avg_delay"])
|
|
if self.virtual["avg_delay"] > magic_numbers.MASTER_TIMER:
|
|
style = "background:red; color:white"
|
|
elif self.virtual["avg_delay"] > magic_numbers.MASTER_TIMER / 2.0:
|
|
style = "background:orange; color:white"
|
|
else:
|
|
style = ""
|
|
self.AvgDelayLabel.setStyleSheet(style)
|
|
|
|
def execute_ireg_command(self, dev, cmd):
|
|
try:
|
|
print "[ INFO]: ireg_execute %s/%s" % (dev, cmd)
|
|
self.devices[dev].write_attribute(cmd, magic_numbers.IREG_EXECUTE)
|
|
except Exception as e:
|
|
print str(e)
|
|
|
|
def execute_ireg_command_on_axis(self, axis, cmd):
|
|
self.execute_ireg_command(self.settings[axis], cmd)
|
|
|
|
def send_tim_event(self, dev, evt):
|
|
print "[ INFO]: sw event %s/%d" % (dev, evt)
|
|
self.devices[dev].SendSoftwareEvent(evt)
|