Source code for PyMca5.PyMcaCore.XiaEdf

#/*##########################################################################
#
# The PyMca X-Ray Fluorescence Toolkit
#
# Copyright (c) 2004-2014 European Synchrotron Radiation Facility
#
# This file is part of the PyMca X-ray Fluorescence Toolkit developed at
# the ESRF by the Software group.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
#############################################################################*/
__author__ = "E. Papillon - ESRF Software Group"
__contact__ = "sole@esrf.fr"
__license__ = "MIT"
__copyright__ = "European Synchrotron Radiation Facility, Grenoble, France"
import os, os.path
import numpy
from PyMca5.PyMcaIO import EdfFile

__version__="$Revision: 1.15 $"

XiaStatIndex= {
    "det": 0,
    "evt": 1,
    "icr": 2,
    "ocr": 3,
    "lt" : 4,
    "dt" : 5
}
XiaStatNb= len(XiaStatIndex.keys())
XiaStatLabels= ["xdet", "xevt", "xicr", "xocr", "xlt", "xdt"]

[docs]def checkEdfForRead(filename): if not os.path.isfile(filename): raise XiaEdfError("Cannot find file <%s>"%filename) else: try: if os.path.getsize(filename)==0: raise XiaEdfError("File <%s> has a null size "%filename) except: raise XiaEdfError("Cannot open file <%s>"%filename)
[docs]def openEdf(filename, read=0, write=0, force=0): if read: checkEdfForRead(filename) if write: checkEdfForWrite(filename, force) try: edf= EdfFile.EdfFile(filename) except: raise XiaEdfError("Cannot open EDF file <%s>"%filename) return edf
[docs]def checkEdfForWrite(filename, force=0): if os.path.isfile(filename): if not force: raise XiaEdfError("<%s> already exist. Abort saving."%filename) else: os.remove(filename) if os.path.isfile(filename): raise XiaEdfError("Cannot remove <%s>. Abort saving."%filename)
[docs]class XiaEdfError(Exception): def __init__(self, message): self.msg= message def __str__(self): return "XiaEdf ERROR: %s"%self.msg
[docs]class XiaEdfCountFile: def __init__(self, filename): self.filename= filename self.edf= openEdf(filename, read=1) self.reset()
[docs] def reset(self): self.data= None self.header= None try: self.__readStat() except: raise XiaEdfError("Cannot parse header in <%s>"%filename)
def __readStat(self): self.header= self.edf.GetHeader(0) self.nbDet= int(self.header.get("xnb", 0)) if not self.nbDet: self.detList= [] self.statArray= None return self.nbDet self.detList= range(self.nbDet) det= self.header.get("xdet", None) if det is not None: dets= det.split() if len(dets)==self.nbDet: self.detList= map(int, dets) self.statArray = numpy.zeros(XiaStatNb*self.nbDet, numpy.int) idx= 0 for det in self.detList: self.statArray[idx+XiaStatIndex["det"]]= int(self.header.get("xdet%02d"%det, det)) self.statArray[idx+XiaStatIndex["evt"]]= int(self.header.get("xevt%02d"%det, 0)) self.statArray[idx+XiaStatIndex["icr"]]= int(self.header.get("xicr%02d"%det, 1)) self.statArray[idx+XiaStatIndex["ocr"]]= int(self.header.get("xocr%02d"%det, 1)) self.statArray[idx+XiaStatIndex["lt"]]= int(self.header.get("xlt%02d"%det, 1)) self.statArray[idx+XiaStatIndex["dt"]]= int(self.header.get("xdt%02d"%det, 0)) idx += 6 def __getData(self): self.__readData() return self.data def __readData(self): if self.data is None: try: self.data= self.edf.GetData(0) except: raise XiaEdfError("Cannot read data in <%s>"%self.filename)
[docs] def getDetList(self): return self.detList
[docs] def getData(self, detector=-1):
# --- WARNING: first index is channels data= self.__getData() if detector==-1: return data[1:] else: if detector not in self.detList: return None idx= self.detList.index(detector) return data[idx+1]
[docs] def getStat(self, detector=-1): if detector==-1: return self.statArray else: if detector not in self.detList: return None idx= self.detList.index(detector) return self.statArray[(idx*XiaStatNb):((idx+1)*XiaStatNb)]
[docs] def correct(self, deadtime=1, livetime=0): message= [] corrflag= int(self.header.get("xcorr", 0)) if livetime and corrflag&2: raise XiaEdfError("<%s> seems already livetime corrected"%self.filename) if deadtime and corrflag&1: raise XiaEdfError("<%s> seems already deadtime corrected"%self.filename) self.__readData() self.data= self.data.astype(numpy.float) if livetime: lvt= numpy.zeros((self.nbDet,1), numpy.float) derr= [] for idx in range(len(self.detList)): lvt[idx]= self.statArray[idx*XiaStatNb + XiaStatIndex["lt"]] / 1000.0 if lvt[idx]==0.: lvt[idx]= 1. derr.append("#%02d"%self.detList[idx]) if len(derr): message.append("Null livetime on det %s"% " ".join(derr)) self.data[1:,:]= self.data[1:,:] / lvt self.header["xcorr"]= corrflag|2 if deadtime: rate= numpy.zeros((self.nbDet,1), numpy.float) for idx in range(len(self.detList)): derr= [] try: rate[idx]= float(self.statArray[idx*XiaStatNb + XiaStatIndex["icr"]]) / \ float(self.statArray[idx*XiaStatNb + XiaStatIndex["ocr"]]) except: rate[idx]= 1. derr.append("#%02d"%idx) if len(derr): message.append("Null OCR on det %s" % " ".join(derr)) self.data[1:,:]= self.data[1:,:] * rate self.header["xcorr"]= corrflag|1 return message
[docs] def sum(self, sums=[], deadtime=0, livetime=0, average=0): message= [] if not len(sums): return message self.__readData() if deadtime or livetime: message+= self.correct(deadtime, livetime) else: self.data= self.data.astype(numpy.float) sumdata= numpy.zeros((len(sums), self.data.shape[1]), numpy.float) for idx in range(len(sums)): if not len(sums[idx]): sumdata[idx,:] = numpy.sum(self.data[1:,], 0) xdet= self.detList else: mask= numpy.zeros((self.nbDet+1,1), numpy.int) xdet= [] for det in sums[idx]: if det in self.detList: detidx= self.detList.index(det) mask[detidx+1]= 1 xdet.append(det) sumdata[idx,:]= numpy.sum(self.data*mask, 0) self.header["xcorr%d"%idx] = self.header.get("xcorr", 0) self.header["xdet%d"%idx] = " ".join([str(det) for det in xdet]) if average: self.data= sumdata / len(xdet) else: self.data= sumdata for key in self.header.keys(): if key[0]=='x': try: det= int(key[-2:]) del self.header[key] except: pass dataflag= int(self.header.get("xdata", 0)) self.header["xdata"]= dataflag | (1<<2) self.header["xnb"]= len(sums) return message
[docs] def save(self, filename, force=0): edf= openEdf(filename, write=1, force=force) self.__readData() edf.WriteImage(self.header, self.data)
[docs]class XiaEdfScanFile: def __init__(self, statfile, detfiles): self.statfile= statfile self.detfiles= detfiles self.detector= None self.data= None self.header= None checkEdfForRead(self.statfile) for file in self.detfiles: checkEdfForRead(file) try: self.__readStat() except: raise XiaEdfError("Cannot parse header in <%s>"%self.statfile) def __readStat(self): edf= openEdf(self.statfile) header= edf.GetHeader(0) self.nbDet= int(header.get("xnb", 0)) if not self.nbDet: self.detList= [] self.statArray= None return self.nbDet self.detList= range(self.nbDet) det= header.get("xdet", None) if det is not None: dets= det.split() if len(dets)==self.nbDet: self.detList= map(int, dets) self.statArray= edf.GetData(0) return self.nbDet def __readData(self, detector): if detector!=self.detector: self.detector= None self.data= None self.header= None #try: if 1: if detector in self.detList: idx= self.detList.index(detector) if idx < len(self.detfiles): file= self.detfiles[idx] edf= openEdf(self.detfiles[idx]) header= edf.GetHeader(0) xdet= int(header.get("xdet", -1)) if xdet==-1 or xdet==detector: self.data= edf.GetData(0) self.header= header self.detector= xdet if self.data is None: for file in self.detfiles: edf= openEdf(file) header= edf.GetHeader(0) xdet= int(header.get("xdet", -1)) if xdet==detector: self.data= edf.GetData(0) self.header= header self.detector= xdet break else: #except: raise XiaEdfError("Cannot read data on det #%02d in <%s>"%(detector, file)) if self.data is None: raise XiaEdfError("Cannot read data on det #%02d"%detector)
[docs] def getDetList(self): return self.detList
[docs] def getData(self, detector): self.__readData(detector) return self.data
[docs] def getStat(self, detector=-1): if detector==-1: return self.statArray else: if detector not in self.detList: return None idx= self.detList.index(detector) return self.statArray[:,(idx*XiaStatNb):((idx+1)*XiaStatNb)]
[docs] def correct(self, detector, deadtime=1, livetime=0): message= [] if detector!=self.detector: self.__readData(detector) corrflag= int(self.header.get("xcorr", 0)) if livetime and corrflag&2: raise XiaEdfError("det #%02d seems already livetime corrected"%detector) if deadtime and corrflag&1: raise XiaEdfError("det #%02d seems already deadtime corrected"%detector) self.data= self.data.astype(numpy.float) idx= self.detList.index(detector) pts= self.statArray.shape[0] if livetime: lvt= numpy.zeros((pts, 1), numpy.float) lvt[:,0]= self.statArray[:,((XiaStatNb*idx)+XiaStatIndex["lt"])] / 1000.0 perr= self.__checkNullLivetime(lvt, pts) if len(perr): message.append("Null livetime on det #%02d points %s"%(detector, self.__pointRange(perr))) self.data= self.data / lvt self.header["xcorr"]= corrflag|2 if deadtime: rate= numpy.zeros((self.statArray.shape[0], 1), numpy.float) count= numpy.zeros((self.statArray.shape[0], 2), numpy.float) count[:,0]= self.statArray[:, (XiaStatNb*idx)+XiaStatIndex["ocr"]] count[:,1]= self.statArray[:, (XiaStatNb*idx)+XiaStatIndex["icr"]] perr= self.__checkNullCount(count, pts) if len(perr): message.append("Null ICR|OCR on det #%02d points %s"%(detector, self.__pointRange(perr))) perr= [] for ipt in range(pts): if count[ipt,0]>0 and count[ipt,1]>0: rate[ipt,0]= count[ipt,1]/count[ipt,0] else: rate[ipt,0]= 1. perr.append(ipt) if len(perr): message.append("No DeadTime correction perfomed on det #%02d points %s"%(detector, self.__pointRange(perr))) self.data= self.data * rate self.header["xcorr"]= corrflag|1 return message
def __checkNullCount(self, count, pts): perr= [] check= numpy.sum(numpy.greater(count,0.), 1) for ipt in range(pts): if check[ipt]!=2: perr.append(ipt) if (ipt!=0 and check[ipt-1]==2) and (ipt!=pts-1 and check[ipt+1]==2): count[ipt,:]= (count[ipt-1,:]+count[ipt+1,:])/2. elif (ipt!=0 and check[ipt-1]==2): count[ipt,:]= count[ipt-1,:] elif (ipt!=pts-1 and check[ipt+1]==2): count[ipt,:]= count[ipt+1,:] else: count[ipt,:]= -1 return perr def __checkNullLivetime(self, lvt, pts): perr= [] check= numpy.greater(lvt, 0.) for ipt in range(pts): if check[ipt]==0: perr.append(ipt) if (ipt!=0 and check[ipt-1]==1) and (ipt!=pts-1 and check[ipt+1]==1): lvt[ipt,0]= (lvt[ipt+1]+lvt[ipt-1])/2. elif (ipt!=0 and check[ipt-1]==1): lvt[ipt,0]= lvt[ipt-1,0] elif (ipt!=pts-1 and check[ipt+1]==1): lvt[ipt,0]= lvt[ipt+1,0] else: lvt[ipt,0]= 1. return perr def __pointRange(self, ptlist): nb= len(ptlist) ptdiff= [] for idx in range(nb-1): ptdiff.append(((ptlist[idx+1]-ptlist[idx])==1)) ptdiff.append(0) ptrange= [] curr= None for idx in range(nb): if ptdiff[idx]: if curr is None: curr= ptlist[idx] else: if curr is None: ptrange.append(str(ptlist[idx])) else: ptrange.append("%d-%d"%(curr,ptlist[idx])) curr= None return "["+ ",".join(ptrange)+"]"
[docs] def sum(self, detectors=[], deadtime=0, livetime=0, average=0): message= [] if not len(detectors): sumdet= self.detList else: sumdet= [ det for det in detectors if det in self.detList ] sumdata= None for det in sumdet: if deadtime or livetime: message+= self.correct(det, deadtime, livetime) else: self.__readData(det) self.data= self.data.astype(numpy.float) if sumdata is None: sumdata= self.data * 1.0 else: sumdata= sumdata + self.data if average: self.data= sumdata / len(sumdet) else: self.data= sumdata dataflag= int(self.header.get("xdata", 0)) self.header["xdata"]= dataflag | (1<<2) self.header["xnb"]= 1 self.header["xdet"]= " ".join([str(det) for det in sumdet]) return message
[docs] def save(self, filename, force=0): if self.data is None: raise XiaEdfError("Cannot save. No data loaded.") edf= openEdf(filename, write=1, force=force) edf.WriteImage(self.header, self.data)
[docs]class XiaFilename: def __init__(self, filename=None): self.__reset() if filename is not None: self.__parseFilename(filename)
[docs] def setType(self, type, det=None): self.type= type self.det= det
[docs] def getType(self): return self.type
[docs] def isValid(self): return self.type is not None
[docs] def isCount(self): return (self.type=="ct" or (self.type=="sum" and self.det==-1))
[docs] def isScan(self): return (self.type=="det" or self.type=="st" or (self.type=="sum" and self.det>0))
[docs] def isSum(self): return (self.type=="sum")
[docs] def isStat(self): return (self.type=="st")
[docs] def findStatFile(self): xf= XiaFilename(self.get()) xf.setType("st") if os.path.isfile(xf.get()): return xf else: return None
[docs] def isGroupedWith(self, other): if (self.isScan() and other.isScan()) or (self.isCount() and other.isCount()): file1= "%s/%s"%(self.dir is None and "." or self.dir, self.prefix is None and "" or self.prefix) file2= "%s/%s"%(other.dir is None and "." or other.dir, other.prefix is None and "" or other.prefix) res= cmp(file1, file2) if res!=0: return 0 res= cmp(self.index, other.index) if res!=0: return 0 file1= "%s.%s"%(self.suffix is None and "" or self.suffix, self.ext is None and "" or self.ext) file2= "%s.%s"%(other.suffix is None and "" or other.suffix, other.ext is None and "" or other.ext) res= cmp(file1, file2) if res!=0: return 0 return 1 else: return 0
[docs] def setDirectory(self, dirname): if dirname is None: self.dir= "." else: self.dir= dirname
[docs] def appendPrefix(self, name): if self.prefix is None: self.prefix= name elif name is not None: self.prefix= "%s_%s"%(self.prefix, name)
[docs] def getDetector(self): if self.type!="det": return None else: return self.det
[docs] def set(self, filename): self.__reset() if filename is not None: self.__parseFilename(filename)
[docs] def get(self): self.__createFilename() return self.file
def __reset(self): self.file= None # --- full filename self.dir = None # --- directory self.type= None # --- file type = "ct", "st", "det" self.index= [] # --- file indexes self.suffix= None # --- suffix self.det= None # --- if type=="det", detector number def __parseFilename(self, filename): self.file= os.path.basename(filename) self.dir= os.path.dirname(filename) if not len(self.dir): self.dir= "." fileext= os.path.splitext(self.file) if len(fileext[1]): self.ext= fileext[1][1:] filelist= fileext[0].split("_") xiaidx= 0 for xiaidx in range(len(filelist)): if filelist[xiaidx][0:3]=="xia": break xiaidx += 1 if xiaidx==len(filelist): self.type= None self.prefix= fileext[0] else: self.prefix= "_".join(filelist[0:xiaidx]) try: self.index= map(int, filelist[xiaidx+1:]) except: self.suffix= "_".join(filelist[xiaidx+1:]) type= filelist[xiaidx][3:] if type=="ct" or type=="st": self.type= type elif type[0]=='S': self.type= "sum" if type[1]=='N': self.det= -1 else: try: self.det= int(type[1]) except: self.type= None else: try: self.type= "det" self.det= int(type) except: self.type= None def __createFilename(self): if self.prefix is None or self.type is None: self.file= None else: self.file= "%s/%s"%(self.dir, self.prefix) xia= None if self.type=="ct": xia= "xiact" elif self.type=="st": xia= "xiast" elif self.type=="sum": if self.det==-1: xia= "xiaSN" else: xia= "xiaS%01d"%self.det elif self.type=="det": xia= "xia%02d"%self.det elif self.type is not None: xia= "xia%s"%self.type if xia is not None: self.file= "%s_%s"%(self.file, xia) for idx in self.index: self.file= "%s_%04d"%(self.file, idx) if self.suffix is not None: self.file= "%s_%s"%(self.file, self.suffix) if self.ext is not None: self.file= "%s.%s"%(self.file, self.ext) def __cmp__(self, other): file1= "%s/%s"%(self.dir is None and "." or self.dir, self.prefix is None and "" or self.prefix) file2= "%s/%s"%(other.dir is None and "." or other.dir, other.prefix is None and "" or other.prefix) res= cmp(file1, file2) if res!=0: return res res= cmp(self.ext, other.ext) if res!=0: return res res= cmp(self.index, other.index) if res!=0: return res res= cmp(self.type, other.type) if res!=0: return res if self.type=="det" or self.type=="sum": res= cmp(self.det, other.det) if res!=0: return res file1= "%s.%s"%(self.suffix is None and "" or self.suffix, self.ext is None and "" or self.ext) file2= "%s.%s"%(other.suffix is None and "" or other.suffix, other.ext is None and "" or other.ext) return cmp(file1, file2)
[docs]def testScan(): x= XiaEdfScanFile("data/test_xiast_0000_0000_0000.edf", ["data/test_xia00_0000_0000_0000.edf", "data/test_xia01_0000_0000_0000.edf", "data/test_xia02_0000_0000_0000.edf", "data/test_xia03_0000_0000_0000.edf", "data/test_xia04_0000_0000_0000.edf", "data/test_xia05_0000_0000_0000.edf", "data/test_xia06_0000_0000_0000.edf", "data/test_xia07_0000_0000_0000.edf", "data/test_xia08_0000_0000_0000.edf", "data/test_xia09_0000_0000_0000.edf", "data/test_xia10_0000_0000_0000.edf", "data/test_xia11_0000_0000_0000.edf", ]) x.sum([1,10]) print(x.header) x.save("test_sum.edf")
#for det in [1, 2, 3, 4, 5, 6]: # x.correct(det, livetime=1) # x.save(det, "scan_corr_xia%02d.edf"%det)
[docs]def testAcq(infile, outfile=None): print("Reading ", infile) x= XiaEdfCountFile(infile) #print "DeadTime Correction ..." #x.correct() x.sum([1,10]) if outfile is not None: print(x.header) print("Saving %s" % outfile) x.save(outfile)
if __name__=="__main__": import sys testScan() sys.exit(0) if len(sys.argv)<2: print("%s <ct_filename> [<output_filename>]" % sys.argv[0]) sys.exit(0) else: infile= sys.argv[1] outfile= len(sys.argv)>=3 and sys.argv[2] or None testAcq(infile, outfile)