fofbcontrol/mainwindow.py

220 lines
6.6 KiB
Python

import os
import json
from time import time, sleep
from PyQt4 import QtGui, QtCore, uic
import tango
import magic_numbers
from labels import cm
from threads import ThreadWrapper
from grouping import GroupingWindow
from seroutput import SERWindow
from statuswidget import StatusWidget
base_dir = os.path.dirname(os.path.realpath(__file__))
class MainWindow(QtGui.QWidget):
def __init__(self, offline=False, parent=None):
QtGui.QWidget.__init__(self, parent)
uic.loadUi("%s/ui/main.ui" % base_dir, self)
self.offline = offline
self.devices = {}
self.units = None
self.cache = {}
self.virtual = {
"delay": 0,
"avg_delay": 0,
"_delay_sum": 0,
"_delay_n": 0
}
self.grouping = GroupingWindow(self)
self.ser_output = SERWindow(self)
self.x_list_layout = QtGui.QVBoxLayout()
self.XFrame.setLayout(self.x_list_layout)
self.y_list_layout = QtGui.QVBoxLayout()
self.YFrame.setLayout(self.y_list_layout)
self.connect(self.FOFBONButton, QtCore.SIGNAL("clicked()"),
lambda: self.send_tim_event("EVG", magic_numbers.EventID.FOFB_ON)
)
self.connect(self.FOFBOFFButton, QtCore.SIGNAL("clicked()"),
lambda: self.send_tim_event("EVG", magic_numbers.EventID.FOFB_OFF)
)
self.connect(self.FOFBStandbyButton, QtCore.SIGNAL("clicked()"),
lambda: self.send_tim_event("EVG", magic_numbers.EventID.FOFB_STANDBY)
)
self.connect(self.SwitchBuffersButton, QtCore.SIGNAL("clicked()"),
lambda: self.send_tim_event("EVG", magic_numbers.EventID.UPDATE_DOUBLE_BUFFERS)
)
self.connect(self.SendEventButton, QtCore.SIGNAL("clicked()"),
lambda: self.send_tim_event("EVG", self.EventSpinBox.value())
)
self.connect(self.GroupingButton, QtCore.SIGNAL("clicked()"), self.grouping.show)
self.connect(self.SERButton, QtCore.SIGNAL("clicked()"), self.ser_output.show)
self.update_timer = QtCore.QTimer()
self.connect(self.update_timer, QtCore.SIGNAL("timeout()"), self.update)
self.connect(self, QtCore.SIGNAL("update"), self.grouping.update)
self.connect(self, QtCore.SIGNAL("update"), self.ser_output.update)
self.connect(self, QtCore.SIGNAL("loadingdone"), self.loading_done)
self.connect(self, QtCore.SIGNAL("update"), self.update_virtual_in_gui)
def setModel(self):
self.EVGStateLabel.setModel("EVG")
def start(self, fn, config=None):
ThreadWrapper(self.load, args=(fn, config)).start()
def loading_done(self):
# show all startup windows
self.show()
# fill grouping and magnets
for dev in sorted(self.devices.keys()):
if dev.upper() != "EVG":
self.addLibera(dev)
self.grouping.addLibera(dev)
self.ser_output.addLibera(dev)
self.grouping.addGlobal(self.settings["global_orbit"], self.settings["global_magnet"])
# start updates
self.update_timer.start(magic_numbers.MASTER_TIMER)
def closeEvent(self, evt):
self.grouping.close()
self.ser_output.close()
evt.accept()
def setVersion(self, version):
self.VersionLabel.setText("v.%s" % version)
def load(self, fn, cfg=None):
if fn:
with open(fn) as f:
conf = json.load(f)
else:
conf = json.loads(cfg)
for label, dev in conf["devices"].iteritems():
self.addDevice(label, dev)
self.units = conf["units"]
self.settings = conf["settings"]
self.setModel()
self.applySettings()
self.emit(QtCore.SIGNAL("loadingdone"))
def addDevice(self, label, dev):
self.emit(QtCore.SIGNAL("loadstatus"), "Loading %s = %s" % (label, dev))
if not self.offline:
self.devices[label] = tango.DeviceProxy(dev)
if label not in self.cache:
self.cache[label] = {}
def applySettings(self):
pass
def addAttribute(self, dev, attr):
self.emit(QtCore.SIGNAL("loadstatus"), "Add attribute %s of device %s" % (attr, dev))
if dev not in self.cache:
self.cache[dev] = {}
self.cache[dev][attr] = None
def addLibera(self, dev):
print "[ INFO]: Status: add Libera %s" % dev
libera = StatusWidget(self, dev)
self.connect(self, QtCore.SIGNAL("update"), libera.update)
# self.liberas[dev] = libera
if dev in self.settings["x"]:
self.x_list_layout.addWidget(libera)
elif dev in self.settings["y"]:
self.y_list_layout.addWidget(libera)
def update(self):
print "[ INFO]: update"
try:
start = time()
threads = {}
for dev, attrs in self.cache.iteritems():
threads[dev] = {}
for attr in attrs.keys():
threads[dev][attr] = ThreadWrapper(self.read_attribute, args=(dev, attr))
threads[dev][attr].start()
for dev, attrs in self.cache.iteritems():
for attr in attrs.keys():
threads[dev][attr].join()
self.cache[dev][attr] = threads[dev][attr].result
self.update_virtual(time() - start)
except Exception as e:
print "[ERROR]: update failed: %s" % str(e)
self.emit(QtCore.SIGNAL("update"), self.cache)
def read_attribute(self, dev, attr):
try:
value = self.devices[dev].read_attribute(attr).value
if type(value) in (int, float):
try:
conv = self.units[dev][attr]["conversion"]
except (KeyError, TypeError):
conv = 1
value = value * conv
# special case for machine state attribute
if dev == "statemachine" and attr == "MachineState":
if not value:
value = "UNKNOWN !!!"
return value
except Exception as e:
print "[ERROR]: could not read attribute %s of device %s: %s" % (attr, dev, str(e))
return None
def update_virtual(self, fetch_time):
self.virtual["delay"] = fetch_time * 1000.0 # convert to ms
self.virtual["_delay_sum"] += self.virtual["delay"]
self.virtual["_delay_n"] += 1
self.virtual["avg_delay"] = self.virtual["_delay_sum"] / self.virtual["_delay_n"]
def update_virtual_in_gui(self):
# This whole thing is a big, big technical debt
self.DelayLabel.setText("%.02f ms" % self.virtual["delay"])
if self.virtual["delay"] > magic_numbers.MASTER_TIMER:
style = "background:red; color:white"
elif self.virtual["delay"] > magic_numbers.MASTER_TIMER / 2.0:
style = "background:orange; color:white"
else:
style = ""
self.DelayLabel.setStyleSheet(style)
self.AvgDelayLabel.setText("avg: %.02f ms" % self.virtual["avg_delay"])
if self.virtual["avg_delay"] > magic_numbers.MASTER_TIMER:
style = "background:red; color:white"
elif self.virtual["avg_delay"] > magic_numbers.MASTER_TIMER / 2.0:
style = "background:orange; color:white"
else:
style = ""
self.AvgDelayLabel.setStyleSheet(style)
def execute_ireg_command(self, dev, cmd):
try:
print "[ INFO]: ireg_execute %s/%s" % (dev, cmd)
self.devices[dev].write_attribute(cmd, magic_numbers.IREG_EXECUTE)
except Exception as e:
print str(e)
# def execute_ireg_command_on_axis(self, axis, cmd):
# self.execute_ireg_command(self.settings[axis], cmd)
def send_tim_event(self, dev, evt):
print "[ INFO]: sw event %s/%d" % (dev, evt)
self.devices[dev].SendSoftwareEvent(evt)