Source code for PyMca5.PyMcaCore.SpsDataSource

#/*##########################################################################
#
# The PyMca X-Ray Fluorescence Toolkit
#
# Copyright (c) 2004-2015 European Synchrotron Radiation Facility
#
# This file is part of the PyMca X-ray Fluorescence Toolkit developed at
# the ESRF by the Software group.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
#############################################################################*/
__author__ = "V.A. Sole - ESRF Data Analysis"
__contact__ = "sole@esrf.fr"
__license__ = "MIT"
__copyright__ = "European Synchrotron Radiation Facility, Grenoble, France"
import copy
import types
from . import DataObject
from PyMca5.PyMcaIO import spswrap as sps

DEBUG = 0
SOURCE_TYPE = 'SPS'


[docs]class SpsDataSource(object): def __init__(self, name): if type(name) not in types.StringTypes: raise TypeError("Constructor needs string as first argument") self.name = name self.sourceName = name self.sourceType = SOURCE_TYPE
[docs] def refresh(self): pass
[docs] def getSourceInfo(self): """ Returns information about the Spec version in self.name to give application possibility to know about it before loading. Returns a dictionary with the key "KeyList" (list of all available keys in this source). Each element in "KeyList" is an shared memory array name. """ return self.__getSourceInfo()
[docs] def getKeyInfo(self, key): if key in self.getSourceInfo()['KeyList']: return self.__getArrayInfo(key) else: return {}
[docs] def getDataObject(self, key_list, selection=None): if type(key_list) != types.ListType: nolist = True key_list = [key_list] else: output = [] nolist = False if self.name in sps.getspeclist(): sourcekeys = self.getSourceInfo()['KeyList'] for key in key_list: #a key corresponds to an array name if key not in sourcekeys: raise KeyError("Key %s not in source keys" % key) #array = key #create data object data = DataObject.DataObject() data.info = self.__getArrayInfo(key) data.info['selection'] = selection data.data = sps.getdata(self.name, key) if nolist: if selection is not None: scantest = (data.info['flag'] & sps.TAG_SCAN) == sps.TAG_SCAN if ((key in ["SCAN_D"]) or scantest) \ and 'cntlist' in selection: data.x = None data.y = None data.m = None if 'nopts' in data.info: nopts = data.info['nopts'] elif 'nopts' in data.info['envdict']: nopts = int(data.info['envdict']['nopts']) + 1 else: nopts = data.info['rows'] if not 'LabelNames' in data.info: data.info['LabelNames'] =\ selection['cntlist'] * 1 newMemoryProblem = len(data.info['LabelNames']) != len(selection['cntlist']) # check the current information is up-to-date # (new HKL handling business) actualLabelSelection = {'x':[], 'y':[], 'm':[]} for tmpKey in ['x', 'y', 'm']: if tmpKey in selection: for labelIndex in selection[tmpKey]: actualLabelSelection[tmpKey].append( \ selection['cntlist'][labelIndex]) if 'x' in selection: for labelindex in selection['x']: #label = selection['cntlist'][labelindex] label = data.info['LabelNames'][labelindex] if label not in data.info['LabelNames']: raise ValueError("Label %s not in scan labels" % label) index = data.info['LabelNames'].index(label) if data.x is None: data.x = [] data.x.append(data.data[:nopts, index]) if 'y' in selection: #for labelindex in selection['y']: for label in actualLabelSelection['y']: #label = data.info['LabelNames'][labelindex] if label not in data.info['LabelNames']: raise ValueError("Label %s not in scan labels" % label) index = data.info['LabelNames'].index(label) if data.y is None: data.y = [] data.y.append(data.data[:nopts, index]) if 'm' in selection: #for labelindex in selection['m']: for label in actualLabelSelection['m']: #label = data.info['LabelNames'][labelindex] if label not in data.info['LabelNames']: raise ValueError("Label %s not in scan labels" % label) index = data.info['LabelNames'].index(label) if data.m is None: data.m = [] data.m.append(data.data[:nopts, index]) data.info['selectiontype'] = "1D" data.info['scanselection'] = True if newMemoryProblem: newSelection = copy.deepcopy(selection) for tmpKey in ['x', 'y', 'm']: if tmpKey in selection: for i in range(len(selection[tmpKey])): if tmpKey == "x": label = data.info['LabelNames'][selection[tmpKey][i]] else: label = selection['cntlist'][selection[tmpKey][i]] newSelection[tmpKey][i] = data.info['LabelNames'].index(label) data.info['selection'] = newSelection data.info['selection']['cntlist'] = data.info['LabelNames'] selection = newSelection data.data = None return data if (key in ["XIA_DATA"]) and 'XIA' in selection: if selection["XIA"]: if 'Detectors' in data.info: for i in range(len(selection['rows']['y'])): selection['rows']['y'][i] = \ data.info['Detectors'].index(selection['rows']['y'][i]) + 1 del selection['XIA'] return data.select(selection) else: if data.data is not None: data.info['selectiontype'] = "%dD" % len(data.data.shape) if data.info['selectiontype'] == "2D": data.info["imageselection"] = True return data else: output.append(data.select(selection)) return output else: return None
def __getSourceInfo(self): arraylist = [] sourcename = self.name for array in sps.getarraylist(sourcename): arrayinfo = sps.getarrayinfo(sourcename, array) arraytype = arrayinfo[2] arrayflag = arrayinfo[3] if arraytype != sps.STRING: if (arrayflag & sps.TAG_ARRAY) == sps.TAG_ARRAY: arraylist.append(array) continue if DEBUG: print("array not added %s" % array) source_info = {} source_info["Size"] = len(arraylist) source_info["KeyList"] = arraylist return source_info def __getArrayInfo(self, array): info = {} info["SourceType"] = SOURCE_TYPE info["SourceName"] = self.name info["Key"] = array arrayinfo = sps.getarrayinfo(self.name, array) info["rows"] = arrayinfo[0] info["cols"] = arrayinfo[1] info["type"] = arrayinfo[2] info["flag"] = arrayinfo[3] counter = sps.updatecounter(self.name, array) info["updatecounter"] = counter envdict = {} keylist = sps.getkeylist(self.name, array + "_ENV") for i in keylist: val = sps.getenv(self.name, array + "_ENV", i) envdict[i] = val info["envdict"] = envdict scantest = (info['flag'] & sps.TAG_SCAN) == sps.TAG_SCAN metdata = None if array in ["SCAN_D"]: # try to get new style SCAN_D metadata metadata = sps.getmetadata(self.name, array) if metadata is not None: motors, metadata = metadata #info["LabelNames"] = metadata["allcounters"].split(";") labels = list(motors.keys()) labels.sort() if len(labels): info["LabelNames"] = [motors[x] for x in labels] if len(metadata["allmotorm"]): info["MotorNames"] = metadata["allmotorm"].split(";") info["MotorValues"] = [float(x) \ for x in metadata["allpositions"].split(";")] info["nopts"] = int(metadata["npts"]) supplied_info = sps.getinfo(self.name, array) if len(supplied_info): info["nopts"] = int(supplied_info[0]) if 'hkl' in metadata: if len(metadata["hkl"]): info['hkl'] = [float(x) \ for x in metadata["hkl"].split(";")] # current SCAN info["scanno"] = int(metadata["scanno"]) # current SPEC file info["datafile"] = metadata["datafile"] # put any missing information info["selectedcounters"] = [x \ for x in metadata["selectedcounters"].split()] # do not confuse with unhandled keys ... #for key in metadata: # if key not in info: # info[key] = metadata[key] if (metdata is None) and ((array in ["SCAN_D"]) or scantest): # old style SCAN_D metadata if 'axistitles' in info["envdict"]: info["LabelNames"] = self._buildLabelsList(info['envdict']['axistitles']) if 'H' in info["envdict"]: if 'K' in info["envdict"]: if 'L' in info["envdict"]: info['hkl'] = [envdict['H'], envdict['K'], envdict['L']] calibarray = array + "_PARAM" if calibarray in sps.getarraylist(self.name): try: data = sps.getdata(self.name, calibarray) updc = sps.updatecounter(self.name, calibarray) info["EnvKey"] = calibarray # data is an array info["McaCalib"] = data.tolist()[0] info["env_updatecounter"] = updc except: # Some of our C modules return NULL without setting # an exception ... pass if array in ["XIA_DATA", "XIA_BASELINE"]: envarray = "XIA_DET" if envarray in sps.getarraylist(self.name): try: data = sps.getdata(self.name, envarray) updc = sps.updatecounter(self.name, envarray) info["EnvKey"] = envarray info["Detectors"] = data.tolist()[0] info["env_updatecounter"] = updc except: pass return info def _buildLabelsList(self, instr): if DEBUG: print('SpsDataSource : building counter list') state = 0 llist = [''] for letter in instr: if state == 0: if letter == ' ': state = 1 elif letter == '{': state = 2 else: llist[-1] = llist[-1] + letter elif state == 1: if letter == ' ': pass elif letter == '{': state = 2 llist.append('') else: llist.append(letter) state = 0 elif state == 2: if letter == '}': state = 0 else: llist[-1] = llist[-1] + letter try: llist.remove('') except ValueError: pass return llist
[docs] def isUpdated(self, sourceName, key): if sps.specrunning(sourceName): if sps.isupdated(sourceName, key): return True #return True if its environment is updated envkey = key + "_ENV" if envkey in sps.getarraylist(sourceName): if sps.isupdated(sourceName, envkey): return True return False
source_types = {SOURCE_TYPE: SpsDataSource} # TODO object is a builtins
[docs]def DataSource(name="", object=None, copy=True, source_type=SOURCE_TYPE): try: sourceClass = source_types[source_type] except KeyError: # ERROR invalid source type raise TypeError("Invalid Source Type, source type should be one of %s" % source_types.keys()) return sourceClass(name, object, copy)
[docs]def main(): import sys try: specname = sys.argv[1] arrayname = sys.argv[2] obj = DataSource(specname) data = obj.getData(arrayname) print("info = ", data.info) except: # give usage instructions print("Usage: SpsDataSource <specversion> <arrayname>") sys.exit()
if __name__ == "__main__": main()