import os import json from time import time, sleep from PyQt4 import QtGui, QtCore, uic import tango import magic_numbers from labels import cm from threads import ThreadWrapper from grouping import GroupingWindow from seroutput import SERWindow base_dir = os.path.dirname(os.path.realpath(__file__)) class MainWindow(QtGui.QWidget): def __init__(self, offline=False, parent=None): QtGui.QWidget.__init__(self, parent) uic.loadUi("%s/ui/main.ui" % base_dir, self) self.offline = offline self.devices = {} self.units = None self.cache = {} self.virtual = { "delay": 0, "avg_delay": 0, "_delay_sum": 0, "_delay_n": 0 } self.grouping = GroupingWindow(self) self.ser_output = SERWindow(self) self.connect(self.FOFBONButton, QtCore.SIGNAL("clicked()"), lambda: self.send_tim_event("EVG", magic_numbers.EventID.FOFB_ON) ) self.connect(self.FOFBOFFButton, QtCore.SIGNAL("clicked()"), lambda: self.send_tim_event("EVG", magic_numbers.EventID.FOFB_OFF) ) self.connect(self.FOFBStandbyButton, QtCore.SIGNAL("clicked()"), lambda: self.send_tim_event("EVG", magic_numbers.EventID.FOFB_STANDBY) ) self.connect(self.SwitchBuffersButton, QtCore.SIGNAL("clicked()"), lambda: self.send_tim_event("EVG", magic_numbers.EventID.UPDATE_DOUBLE_BUFFERS) ) self.connect(self.SendEventButton, QtCore.SIGNAL("clicked()"), lambda: self.send_tim_event("EVG", self.EventSpinBox.value()) ) self.connect(self.GroupingButton, QtCore.SIGNAL("clicked()"), self.grouping.show) self.connect(self.SERButton, QtCore.SIGNAL("clicked()"), self.ser_output.show) self.connect(self.XInterlockResetButton, QtCore.SIGNAL("clicked()"), lambda: self.execute_ireg_command_on_axis("x", "GdxFOFBIlkReset") ) self.connect(self.XReconnectButton, QtCore.SIGNAL("clicked()"), lambda: self.execute_ireg_command_on_axis("x", "GdxFOFBReconnect") ) self.connect(self.XResetPIButton, QtCore.SIGNAL("clicked()"), lambda: self.execute_ireg_command_on_axis("x", "GdxFOFBPIReset") ) self.connect(self.XSaturationResetButton, QtCore.SIGNAL("clicked()"), lambda: self.execute_ireg_command_on_axis("x", "GdxFOFBSaturationReset") ) # self.connect(self.YInterlockResetButton, QtCore.SIGNAL("clicked()"), # lambda: self.execute_ireg_command_on_axis("y", "GdxFOFBIlkReset") # ) # self.connect(self.YReconnectButton, QtCore.SIGNAL("clicked()"), # lambda: self.execute_ireg_command_on_axis("y", "GdxFOFBReconnect") # ) # self.connect(self.YResetPIButton, QtCore.SIGNAL("clicked()"), # lambda: self.execute_ireg_command_on_axis("y", "GdxFOFBPIReset") # ) # self.connect(self.YSaturationResetButton, QtCore.SIGNAL("clicked()"), # lambda: self.execute_ireg_command_on_axis("y", "GdxFOFBSaturationReset") # ) self.update_timer = QtCore.QTimer() self.connect(self.update_timer, QtCore.SIGNAL("timeout()"), self.update) self.connect(self, QtCore.SIGNAL("update"), self.grouping.update) self.connect(self, QtCore.SIGNAL("update"), self.ser_output.update) self.connect(self, QtCore.SIGNAL("loadingdone"), self.loading_done) self.connect(self, QtCore.SIGNAL("update"), self.update_virtual_in_gui) def setModel(self): self.EVGStateLabel.setModel("EVG") self.XStateLabel.setModel(self.settings["x"]) self.XInterlockLabel.setModel(self.settings["x"], "GdxFOFBIlk") self.XIDLabel.setText(self.settings["x"]) self.XFOFBStateLabel.setModel(self.settings["x"], "GdxFOFBState") self.XFOFBStateLabel.setColorMap(cm.FOFB_STATE) # self.XBuffersLabel.setModel() self.XS1Label.setModel(self.settings["x"], "GdxFOFBSaturation", 0) self.XS1Label.setColorMap(cm.RED_BOOL) self.XS2Label.setModel(self.settings["x"], "GdxFOFBSaturation", 1) self.XS2Label.setColorMap(cm.RED_BOOL) self.XS3Label.setModel(self.settings["x"], "GdxFOFBSaturation", 2) self.XS3Label.setColorMap(cm.RED_BOOL) self.XS4Label.setModel(self.settings["x"], "GdxFOFBSaturation", 3) self.XS4Label.setColorMap(cm.RED_BOOL) # self.YStateLabel.setModel(self.settings["y"]) # self.YInterlockLabel.setModel(self.settings["y"], "GdxFOFBIlk") # self.YIDLabel.setText(self.settings["y"]) # self.YFOFBStateLabel.setModel(self.settings["y"], "GdxFOFBState") # self.YFOFBStateLabel.setColorMap(cm.FOFB_STATE) # # self.XBuffersLabel.setModel() # self.YS1Label.setModel(self.settings["y"], "GdxFOFBSaturation", 0) # self.YS1Label.setColorMap(cm.RED_BOOL) # self.YS2Label.setModel(self.settings["y"], "GdxFOFBSaturation", 1) # self.YS2Label.setColorMap(cm.RED_BOOL) # self.YS3Label.setModel(self.settings["y"], "GdxFOFBSaturation", 2) # self.YS3Label.setColorMap(cm.RED_BOOL) # self.YS4Label.setModel(self.settings["y"], "GdxFOFBSaturation", 3) # self.YS4Label.setColorMap(cm.RED_BOOL) def start(self, fn, config=None): ThreadWrapper(self.load, args=(fn, config)).start() def loading_done(self): # show all startup windows self.show() # fill grouping and magnets for dev in sorted(self.devices.keys()): if dev.upper() != "EVG": self.grouping.addLibera(dev) self.ser_output.addLibera(dev) self.grouping.addGlobal(self.settings["global_orbit"], self.settings["global_magnet"]) # start updates self.update_timer.start(magic_numbers.MASTER_TIMER) def closeEvent(self, evt): self.grouping.close() self.ser_output.close() evt.accept() def setVersion(self, version): self.VersionLabel.setText("v.%s" % version) def load(self, fn, cfg=None): if fn: with open(fn) as f: conf = json.load(f) else: conf = json.loads(cfg) for label, dev in conf["devices"].iteritems(): self.addDevice(label, dev) self.units = conf["units"] self.settings = conf["settings"] self.setModel() self.applySettings() self.emit(QtCore.SIGNAL("loadingdone")) def addDevice(self, label, dev): self.emit(QtCore.SIGNAL("loadstatus"), "Loading %s = %s" % (label, dev)) if not self.offline: self.devices[label] = tango.DeviceProxy(dev) if label not in self.cache: self.cache[label] = {} def applySettings(self): pass def addAttribute(self, dev, attr): self.emit(QtCore.SIGNAL("loadstatus"), "Add attribute %s of device %s" % (attr, dev)) if dev not in self.cache: self.cache[dev] = {} self.cache[dev][attr] = None def update(self): print "[ INFO]: update" try: start = time() threads = {} for dev, attrs in self.cache.iteritems(): threads[dev] = {} for attr in attrs.keys(): threads[dev][attr] = ThreadWrapper(self.read_attribute, args=(dev, attr)) threads[dev][attr].start() for dev, attrs in self.cache.iteritems(): for attr in attrs.keys(): threads[dev][attr].join() self.cache[dev][attr] = threads[dev][attr].result self.update_virtual(time() - start) except Exception as e: print "[ERROR]: update failed: %s" % str(e) self.emit(QtCore.SIGNAL("update"), self.cache) def read_attribute(self, dev, attr): try: value = self.devices[dev].read_attribute(attr).value if type(value) in (int, float): try: conv = self.units[dev][attr]["conversion"] except (KeyError, TypeError): conv = 1 value = value * conv # special case for machine state attribute if dev == "statemachine" and attr == "MachineState": if not value: value = "UNKNOWN !!!" return value except Exception as e: print "[ERROR]: could not read attribute %s of device %s: %s" % (attr, dev, str(e)) return None def update_virtual(self, fetch_time): self.virtual["delay"] = fetch_time * 1000.0 # convert to ms self.virtual["_delay_sum"] += self.virtual["delay"] self.virtual["_delay_n"] += 1 self.virtual["avg_delay"] = self.virtual["_delay_sum"] / self.virtual["_delay_n"] def update_virtual_in_gui(self): # This whole thing is a big, big technical debt self.DelayLabel.setText("%.02f ms" % self.virtual["delay"]) if self.virtual["delay"] > magic_numbers.MASTER_TIMER: style = "background:red; color:white" elif self.virtual["delay"] > magic_numbers.MASTER_TIMER / 2.0: style = "background:orange; color:white" else: style = "" self.DelayLabel.setStyleSheet(style) self.AvgDelayLabel.setText("avg: %.02f ms" % self.virtual["avg_delay"]) if self.virtual["avg_delay"] > magic_numbers.MASTER_TIMER: style = "background:red; color:white" elif self.virtual["avg_delay"] > magic_numbers.MASTER_TIMER / 2.0: style = "background:orange; color:white" else: style = "" self.AvgDelayLabel.setStyleSheet(style) def execute_ireg_command(self, dev, cmd): try: print "[ INFO]: ireg_execute %s/%s" % (dev, cmd) self.devices[dev].write_attribute(cmd, magic_numbers.IREG_EXECUTE) except Exception as e: print str(e) def execute_ireg_command_on_axis(self, axis, cmd): self.execute_ireg_command(self.settings[axis], cmd) def send_tim_event(self, dev, evt): print "[ INFO]: sw event %s/%d" % (dev, evt) self.devices[dev].SendSoftwareEvent(evt)