Source code for PyMca5.PyMcaGui.misc.CalculationThread

#/*##########################################################################
# Copyright (C) 2004-2014 V.A. Sole, European Synchrotron Radiation Facility
#
# This file is part of the PyMca X-ray Fluorescence Toolkit developed at
# the ESRF by the Software group.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
#############################################################################*/
__author__ = "V.A. Sole - ESRF Data Analysis"
__contact__ = "sole@esrf.fr"
__license__ = "MIT"
__copyright__ = "European Synchrotron Radiation Facility, Grenoble, France"
import sys
import time
try:
    from PyMca5.PyMcaGui import PyMcaQt as qt
except ImportError:
    import PyMcaQt as qt

USE_MOVE_TO_THREAD = False
QTHREAD = True
if QTHREAD:
    QThread = qt.QThread
else:
    import threading
    QThread =  threading.Thread
    # Python threads do not support moveToThread (untested)
    USE_MOVE_TO_THREAD = False

[docs]class CalculationObject(qt.QObject): finished = qt.pyqtSignal() started = qt.pyqtSignal() def __init__(self, parent=None, calculation_method=None, calculation_vars=None, calculation_kw=None, expand_vars=True, expand_kw=True): qt.QObject.__init__(self, parent) self.calculation_method = calculation_method self.calculation_vars = calculation_vars self.calculation_kw = calculation_kw self.expand_vars = expand_vars self.expand_kw = expand_kw if self.expand_vars: if not self.expand_kw: raise ValueError("Cannot expand vars without expanding kw") self.__result = None
[docs] def run(self): try: self.__result = None if self.calculation_vars is None and self.calculation_kw is None: self.__result = self.calculation_method() elif self.calculation_vars is None: if self.expand_kw: self.__result = self.calculation_method(**self.calculation_kw) else: self.__result = self.calculation_method(self.calculation_kw) elif self.calculation_kw is None: if self.expand_vars: self.__result = self.calculation_method(*self.calculation_vars) else: self.__result = self.calculation_method(self.calculation_vars) elif self.expand_vars and self.expand_kw: self.__result = self.calculation_method(*self.calculation_vars, **self.calculation_kw) elif self.expand_kw: self.__result = self.calculation_method(self.calculation_vars, **self.calculation_kw) else: print("Impossible combination of vars and kw") self._threadRunning = False raise ValueError("Impossible combination of vars and kw") except: self.__result = ("Exception",) + sys.exc_info() finally: # comment lines to allow to other call ???? self.calculation_vars = None self.calculation_kw = None self.finished.emit()
[docs] def getResult(self): return self.__result
[docs]class NewCalculationThread(qt.QObject): finished = qt.pyqtSignal() started = qt.pyqtSignal() def __init__(self, parent=None, calculation_method=None, calculation_vars=None, calculation_kw=None, expand_vars=True, expand_kw=True): qt.QObject.__init__(self, parent) self._calculationObject = CalculationObject(parent, calculation_method=calculation_method, calculation_vars=calculation_vars, calculation_kw=calculation_kw, expand_vars=expand_vars, expand_kw=expand_kw) self._calculationThread = qt.QThread() self._calculationObject.moveToThread(self._calculationThread) self._calculationObject.finished.connect(self._calculationThread.quit, qt.Qt.DirectConnection) self._calculationThread.started.connect(self._calculationObject.run, qt.Qt.DirectConnection) self._calculationThread.finished.connect(self._threadFinishedSlot, qt.Qt.DirectConnection) self._threadRunning = False
[docs] def isRunning(self):
#return self.calculationThread.isRunning() return self._threadRunning def _threadFinishedSlot(self): self._threadRunning = False self.finished.emit()
[docs] def start(self): self._threadRunning = True self._calculationThread.start() self.started.emit()
[docs] def getResult(self): return self._calculationObject.getResult()
result = property(getResult)
[docs]class OldCalculationThread(QThread): def __init__(self, parent=None, calculation_method=None, calculation_vars=None, calculation_kw=None, expand_vars=True, expand_kw=True): if QTHREAD: QThread.__init__(self, parent) else: QThread.__init__(self) self._threadRunning = False self.calculation_method = calculation_method self.calculation_vars = calculation_vars self.calculation_kw = calculation_kw self.expand_vars = expand_vars self.expand_kw = expand_kw if self.expand_vars: if not self.expand_kw: raise ValueError("Cannot expand vars without expanding kw") if not QTHREAD: def isRunning(self): return self._threadRunning
[docs] def run(self): try: self._threadRunning = True self.result = None if self.calculation_vars is None and self.calculation_kw is None: self.result = self.calculation_method() elif self.calculation_vars is None: if self.expand_kw: self.result = self.calculation_method(**self.calculation_kw) else: self.result = self.calculation_method(self.calculation_kw) elif self.calculation_kw is None: if self.expand_vars: self.result = self.calculation_method(*self.calculation_vars) else: self.result = self.calculation_method(self.calculation_vars) elif self.expand_vars and self.expand_kw: self.result = self.calculation_method(*self.calculation_vars, **self.calculation_kw) elif self.expand_kw: self.result = self.calculation_method(self.calculation_vars, **self.calculation_kw) else: print("Impossible combination of vars and kw") self._threadRunning = False raise ValueError("Impossible combination of vars and kw") except: self._threadRunning = False self.result = ("Exception",) + sys.exc_info() finally: self.calculation_vars = None self.calculation_kw = None self._threadRunning = False
[docs] def getResult(self): if hasattr(self, "result"): return self.result else: return None
if USE_MOVE_TO_THREAD: CalculationThread=NewCalculationThread else: CalculationThread=OldCalculationThread
[docs]def waitingMessageDialog(thread, message=None, parent=None, modal=True, update_callback=None, frameless=False): """ thread - The thread to be polled message - The initial message to be diplayed parent - The parent QWidget. It is used just to provide a convenient localtion modal - Default is True. The dialog will prevent user from using other widgets update_callback - The function to be called to provide progress feedback. It is expected to return a dictionnary. The recognized key words are: message: The updated message to be displayed. title: The title of the window title. progress: A number between 0 and 100 indicating the progress of the task. status: Status of the calculation thread. """ if message is None: message = "Please wait. Calculation going on." windowTitle = "Please Wait" if frameless: msg = qt.QDialog(None, qt.Qt.FramelessWindowHint) else: msg = qt.QDialog(None) #if modal: # msg.setWindowFlags(qt.Qt.Window | qt.Qt.CustomizeWindowHint | qt.Qt.WindowTitleHint) msg.setModal(modal) msg.setWindowTitle(windowTitle) layout = qt.QHBoxLayout(msg) layout.setContentsMargins(0, 0, 0, 0) layout.setSpacing(0) l1 = qt.QLabel(msg) l1.setFixedWidth(l1.fontMetrics().width('##')) l2 = qt.QLabel(msg) l2.setText("%s" % message) l3 = qt.QLabel(msg) l3.setFixedWidth(l3.fontMetrics().width('##')) layout.addWidget(l1) layout.addWidget(l2) layout.addWidget(l3) msg.show() if parent is not None: # place the dialog at appropriate point parentGeometry = parent.geometry() x = parentGeometry.x() + 0.5 * parentGeometry.width() y = parentGeometry.y() + 0.5 * parentGeometry.height() msg.move(int(x - 0.5 * msg.width()), int(y)) t0 = time.time() i = 0 ticks = ['-','\\', "|", "/","-","\\",'|','/'] qApp = qt.QApplication.instance() if update_callback is None: while (thread.isRunning()): i = (i+1) % 8 l1.setText(ticks[i]) l3.setText(" "+ticks[i]) qApp.processEvents() if QTHREAD: qt.QThread.msleep(1000) else: time.sleep(2) else: while (thread.isRunning()): updateInfo = update_callback() message = updateInfo.get('message', message) windowTitle = updateInfo.get('title', windowTitle) msg.setWindowTitle(windowTitle) i = (i+1) % 8 l1.setText(ticks[i]) l2.setText(message) l3.setText(" "+ticks[i]) qApp.processEvents() if QTHREAD: qt.QThread.msleep(1000) else: time.sleep(2) msg.close()