Skip to content

Lyse field #94

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
267 changes: 159 additions & 108 deletions blacs/analysis_submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@


class AnalysisSubmission(object):

icon_names = {'checking': ':/qtutils/fugue/hourglass',
'online': ':/qtutils/fugue/tick',
'offline': ':/qtutils/fugue/exclamation',
'': ':/qtutils/fugue/status-offline'}

tooltips = {'checking': 'Checking...',
'online': 'Server is responding',
'offline': 'Server not responding',
'': 'Disabled'}

def __init__(self, BLACS, blacs_ui):
self.inqueue = queue.Queue()
self.BLACS = BLACS
Expand All @@ -41,18 +52,20 @@ def __init__(self, BLACS, blacs_ui):
blacs_ui.analysis.addWidget(self._ui)
self._ui.frame.setMinimumWidth(blacs_ui.queue_controls_frame.sizeHint().width())
elide_label(self._ui.resend_shots_label, self._ui.failed_to_send_frame.layout(), Qt.ElideRight)

self._waiting_for_submission = {}
self.failure_reason = {}
self.send_to_server = False
self.server = ''
self.time_of_last_connectivity_check = 0
self.server_online = {}

# connect signals
self._ui.send_to_server.toggled.connect(lambda state: self._set_send_to_server(state))
self._ui.server.editingFinished.connect(lambda: self._set_server(self._ui.server.text()))
self._ui.clear_unsent_shots_button.clicked.connect(lambda _: self.clear_waiting_files())
self._ui.retry_button.clicked.connect(lambda _: self.check_retry())

self._waiting_for_submission = []
self.failure_reason = None
self.server_online = 'offline'
self.send_to_server = False
self.server = ''
self.time_of_last_connectivity_check = 0

self.mainloop_thread = threading.Thread(target=self.mainloop)
self.mainloop_thread.daemon = True
Expand All @@ -64,16 +77,23 @@ def restore_save_data(self,data):
if "send_to_server" in data:
self.send_to_server = data["send_to_server"]
if "waiting_for_submission" in data:
self._waiting_for_submission = list(data["waiting_for_submission"])
self.inqueue.put(['save data restored', None])
self._waiting_for_submission = dict(data["waiting_for_submission"])
self.inqueue.put(['save data restored', None, None])
self.check_retry()

def get_save_data(self):
return {"waiting_for_submission":list(self._waiting_for_submission),
"server":self.server,
"send_to_server":self.send_to_server
return {"waiting_for_submission": dict(self._waiting_for_submission),
"server": self.server,
"send_to_server": self.send_to_server
}

def _waiting_for_submission_len(self):
length = 0
for k, v in enumerate(self._waiting_for_submission):
length += len(v)

return length

def _set_send_to_server(self,value):
self.send_to_server = value

Expand Down Expand Up @@ -118,24 +138,36 @@ def server_online(self):

@server_online.setter
@inmain_decorator(True)
def server_online(self,value):
self._server_online = str(value)

icon_names = {'checking': ':/qtutils/fugue/hourglass',
'online': ':/qtutils/fugue/tick',
'offline': ':/qtutils/fugue/exclamation',
'': ':/qtutils/fugue/status-offline'}
def server_online(self, value):

tooltips = {'checking': 'Checking...',
'online': 'Server is responding',
'offline': 'Server not responding',
'': 'Disabled'}
self._server_online = value

icon = QIcon(icon_names.get(self._server_online, ':/qtutils/fugue/exclamation-red'))
status = 'online'
tooltip = ''
for server in self._waiting_for_submission:

if server not in value:
value[server] = ''

v = value[server]

if v == 'offline':
status = 'offline'
if tooltip != '':
tooltip += '\n'

tip = self.tooltips.get(status, 'Invalid message {}'.format(status))
tooltip += 'Server {} status: {}'.format(server, tip)

if server not in self.failure_reason:
self.failure_reason[server] = None
tooltip += 'Server not checked yet'

if self.failure_reason[server] is not None:
tooltip += '[[{}]]'.format(self.failure_reason[server])

icon = QIcon(self.icon_names.get(status, ':/qtutils/fugue/exclamation-red'))
pixmap = icon.pixmap(QSize(16, 16))
tooltip = tooltips.get(self._server_online, "Invalid server status: %s" % self._server_online)
if self.failure_reason is not None:
tooltip += '\n' + self.failure_reason

# Update GUI:
self._ui.server_online.setPixmap(pixmap)
Expand All @@ -145,33 +177,46 @@ def server_online(self,value):

@inmain_decorator(True)
def update_waiting_files_message(self):
# if there is only one shot and we haven't encountered failure yet, do
# not show the error frame:
if (self.server_online == 'checking') and (len(self._waiting_for_submission) == 1) and not self._ui.failed_to_send_frame.isVisible():
return
if self._waiting_for_submission:

message = ''
failed = False
for server, shots in self._waiting_for_submission.items():
length = len(shots)

# The server may never have been checked
if server not in self.server_online:
self._server_online[server] = ''

# if there is only one shot and we haven't encountered failure yet, do
# not show the error frame:
if (self.server_online[server] == 'checking') and (length == 1) and not self._ui.failed_to_send_frame.isVisible():
pass
elif length:
if self.server_online[server] == 'checking':
message += 'Server {}: Sending {} shot(s)...'.format(server, length)
else:
message += 'Server {}: {} shot(s) to send...'.format(server, length)

if failed and self._waiting_for_submission_len():
self._ui.failed_to_send_frame.show()
if self.server_online == 'checking':
self._ui.retry_button.hide()
text = 'Sending %s shot(s)...' % len(self._waiting_for_submission)
else:
self._ui.retry_button.show()
text = '%s shot(s) to send' % len(self._waiting_for_submission)
self._ui.resend_shots_label.setText(text)
else:
self._ui.failed_to_send_frame.hide()

self._ui.resend_shots_label.setText(message)

self._ui.retry_button.show()

def get_queue(self):
return self.inqueue

@inmain_decorator(True)
def clear_waiting_files(self):
self._waiting_for_submission = []
self._waiting_for_submission = {}
self.update_waiting_files_message()

@inmain_decorator(True)
def check_retry(self):
self.inqueue.put(['check/retry', None])
self.inqueue.put(['check/retry', None, None])

def mainloop(self):
self._mainloop_logger = logging.getLogger('BLACS.AnalysisSubmission.mainloop')
Expand All @@ -182,39 +227,27 @@ def mainloop(self):
while True:
try:
try:
signal, data = self.inqueue.get(timeout=timeout)
signal, data, lyse_host = self.inqueue.get(timeout=timeout)
except queue.Empty:
timeout = 10
# Periodic checking of connectivity and resending of files.
# Don't trigger a re-check if we already failed a connectivity
# check within the last second:
if (time.time() - self.time_of_last_connectivity_check) > 1:
signal = 'check/retry'
else:
continue
if signal == 'check/retry':
self.check_connectivity()
if self.server_online == 'online':
self.submit_waiting_files()
elif signal == 'file':
continue

if signal == 'file':
if self.send_to_server:
self._waiting_for_submission.append(data)
if self.server_online != 'online':
# Don't stack connectivity checks if many files are
# arriving. If we failed a connectivity check less
# than a second ago then don't check again.
if (time.time() - self.time_of_last_connectivity_check) > 1:
self.check_connectivity()
else:
# But do queue up a check for when we have
# been idle for one second:
timeout = 1
if self.server_online == 'online':
self.submit_waiting_files()

lyse_host = lyse_host if lyse_host != '' else self.server

if lyse_host not in self._waiting_for_submission:
self._waiting_for_submission[lyse_host] = []

self._waiting_for_submission[lyse_host].append(data)

self.submit_waiting_files()
elif signal == 'close':
break
elif signal == 'save data restored':
continue
elif signal == 'check/retry':
self.submit_waiting_files()
else:
raise ValueError('Invalid signal: %s'%str(signal))

Expand All @@ -225,53 +258,71 @@ def mainloop(self):
self._mainloop_logger.exception("Exception in mainloop, continuing")

def check_connectivity(self):
host = self.server
send_to_server = self.send_to_server
if host and send_to_server:
self.server_online = 'checking'
try:
response = zmq_get(self.port, host, 'hello', timeout=1)
self.failure_reason = None
except (TimeoutError, gaierror, AuthenticationFailure) as e:
success = False
self.failure_reason = str(e)
else:
success = (response == 'hello')
if not success:
self.failure_reason = "unexpected reponse: %s" % str(response)

server_online = {}

for server in self._waiting_for_submission:
send_to_server = self.send_to_server
if send_to_server:
server_online[server] = 'checking'
self.server_online = server_online # update GUI

# update GUI
self.server_online = 'online' if success else 'offline'
else:
self.server_online = ''
try:
response = zmq_get(self.port, server, 'hello', timeout=1)
self.failure_reason[k] = None
except (TimeoutError, gaierror, AuthenticationFailure) as e:
success = False
self.failure_reason[k] = str(e)
else:
success = (response == 'hello')
if not success:
self.failure_reason[k] = "unexpected reponse: %s" % str(response)

server_online[server] = 'online' if success else 'offline'
else:
server_online[server] = ''

# update GUI
self.server_online = server_online

self.time_of_last_connectivity_check = time.time()

def submit_waiting_files(self):
success = True
while self._waiting_for_submission and success:
path = self._waiting_for_submission[0]
self._mainloop_logger.info('Submitting run file %s.\n'%os.path.basename(path))
data = {'filepath': labscript_utils.shared_drive.path_to_agnostic(path)}
self.server_online = 'checking'
try:
response = zmq_get(self.port, self.server, data, timeout=1)
self.failure_reason = None
except (TimeoutError, gaierror, AuthenticationFailure) as e:
success = False
self.failure_reason = str(e)
else:
success = (response == 'added successfully')
if not success:
self.failure_reason = "unexpected reponse: %s" % str(response)

server_online = {}
for server, shots in self._waiting_for_submission.items():
success = True

while shots and success:
path = shots[0]
self.server = server

self._mainloop_logger.info('Submitting run file %s.\n'%os.path.basename(path))
data = {'filepath': labscript_utils.shared_drive.path_to_agnostic(path)}

server_online[server] = 'checking'
self.server_online = server_online # update GUI

try:
self._waiting_for_submission.pop(0)
except IndexError:
# Queue has been cleared
pass
if not success:
break
response = zmq_get(self.port, server, data, timeout=1)
self.failure_reason[server] = None
except (TimeoutError, gaierror, AuthenticationFailure) as e:
success = False
self.failure_reason[server] = str(e)
else:
success = (response == 'added successfully')
if not success:
self.failure_reason[server] = "unexpected reponse: %s" % str(response)
try:
shots.pop(0)
except IndexError:
# Queue has been cleared
pass

server_online[server] = 'online' if success else 'offline'

# update GUI
self.server_online = 'online' if success else 'offline'
self.server_online = server_online

self.time_of_last_connectivity_check = time.time()

7 changes: 5 additions & 2 deletions blacs/experiment_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -852,7 +852,10 @@ def restart_function(device_name):
data_group = hdf5_file['/'].create_group('data')
# stamp with the run time of the experiment
hdf5_file.attrs['run time'] = run_time.strftime('%Y%m%dT%H%M%S.%f')


# Get lyse host name from file if present in shot file
lyse_host = hdf5_file.attrs['lyse_host']

error_condition = False
response_list = {}
# Keep transitioning tabs to manual mode and waiting on them until they
Expand Down Expand Up @@ -956,7 +959,7 @@ def restart_function(device_name):

# Submit to the analysis server
if send_to_analysis:
self.BLACS.analysis_submission.get_queue().put(['file', path])
self.BLACS.analysis_submission.get_queue().put(['file', path, lyse_host])

##########################################################################################################################################
# Plugin callbacks #
Expand Down