Source code for PyMca5.PyMcaCore.XiaCorrect

#/*##########################################################################
# Copyright (C) 2004-2014 E. Papillon, European Synchrotron Radiation Facility
#
# This file is part of the PyMca X-ray Fluorescence Toolkit developed at
# the ESRF by the Software group.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
#############################################################################*/
__author__ = "E. Papillon - ESRF Software group"
__contact__ = "sole@esrf.fr"
__license__ = "MIT"
__copyright__ = "European Synchrotron Radiation Facility, Grenoble, France"
from . import XiaEdf
import sys
import os
import time

__version__="$Revision: 1.11 $"

[docs]def defaultErrorCB(message): print(message)
[docs]def defaultLogCB(message, verbose_level=None, verbose_ask=None): if verbose_level is None: print(message) elif verbose_level <= verbose_ask: print(message)
[docs]def defaultDoneCB(nbdone, total): pass
[docs]def checkCB(log_cb=None, done_cb=None, error_cb=None): if log_cb is None: log_cb= defaultLogCB if done_cb is None: done_cb= defaultDoneCB if error_cb is None: error_cb= defaultErrorCB return (log_cb, done_cb, error_cb)
[docs]def parseFiles(filelist, verbose=0, keep_sum=0, log_cb=None, done_cb=None, error_cb=None): (log_cb, done_cb, error_cb)= checkCB(log_cb, done_cb, error_cb) log_cb("Checking xia files ...") xiafiles= [] for file in filelist: xf= XiaEdf.XiaFilename(file) if xf.isValid(): log_cb(" - Parsing %s (OK - %s)"%(file, xf.getType()), 1, verbose) if not keep_sum: if not xf.isSum(): xiafiles.append(xf) else: xiafiles.append(xf) else: log_cb(" - Parsing %s (Not Xia)"%file, 1, verbose) if len(xiafiles): log_cb("Sorting xia files ...") xiafiles.sort() groupfiles= [] group= None for xf in xiafiles: if group is None: group= [ xf ] else: if xf.isGroupedWith(group[0]): group.append(xf) else: groupfiles.append(group) group= [ xf ] if group is not None: groupfiles.append(group) grouperrors= [] for group in groupfiles: if group[0].isScan(): if not group[-1].isStat(): stat= group[0].findStatFile() if stat is not None: log_cb(" - Find stat file for group <%s>"%stat.get(), 1, verbose) group.append(stat) else: error_cb("XiaCorrect ERROR: no stat file in current group <%s>"%group[0].get()) grouperrors.append(group) for group in grouperrors: groupfiles.remove(group) if not len(groupfiles): error_cb("XiaCorrect ERROR: No valid XIA group files") return None return groupfiles else: error_cb("XiaCorrect ERROR: No XIA files found.") return None
[docs]def correctFiles(xiafiles, deadtime=1, livetime=0, sums=None, avgflag=0, outdir=None, outname="corr", force=0, \ verbose=0, log_cb=None, done_cb=None, error_cb=None): (log_cb, done_cb, error_cb)= checkCB(log_cb, done_cb, error_cb) processed= 0 saved= 0 total= 0 errors= 0 tps= time.time() done_cb(0, total) total= len(xiafiles) log_cb("Correcting xia files ...") for group in xiafiles: if not group[0].isScan(): file= group[0] name= file.get() log_cb("Working on %s"%name, 1, verbose) try: xia= XiaEdf.XiaEdfCountFile(name) file.setDirectory(outdir) file.appendPrefix(outname) name= file.get() if sums is not None: err= xia.sum(sums, deadtime, livetime, avgflag) file.setType("sum", -1) else: err= xia.correct(deadtime, livetime) if len(err): error_cb(" - WARNING: in %s"%name) for msg in err: error_cb(" * " + msg) log_cb(" - Saving %s"%name) xia.save(name, force) saved += 1 except XiaEdf.XiaEdfError: errors += 1 log_cb(sys.exc_info()[1]) else: groupfiles= [ file.get() for file in group ] name= groupfiles[-1] log_cb("Reading %s"%name, 1, verbose) try: xia= XiaEdf.XiaEdfScanFile(name, groupfiles[:-1]) except XiaEdf.XiaEdfError: xia= None errors += 1 error_cb(sys.exc_info()[1]) if xia is not None: for file in group: file.setDirectory(outdir) file.appendPrefix(outname) if sums is None: for file in group[:-1]: det= file.getDetector() if det is not None: log_cb("Working on detector #%02d"%det, 1, verbose) try: err= xia.correct(det, deadtime, livetime) name= file.get() if len(err): error_cb(" - WARNING: in %s"%name) for msg in err: error_cb(" * " + msg) log_cb(" - Saving %s"%name) xia.save(name, force) saved += 1 except XiaEdf.XiaEdfError: errors += 1 error_cb(sys.exc_info()[1]) else: log_cb("Working on group %s"%name, 1, verbose) file= group[-1] for isum in range(len(sums)): try: err= xia.sum(sums[isum], deadtime, livetime, avgflag) file.setType("sum", isum+1) name= file.get() if len(err): error_cb(" - WARNING: in %s"%name) for msg in err: error_cb(" * " + msg) log_cb(" - Saving %s"%name) xia.save(name, force) saved += 1 except XiaEdf.XiaEdfError: errors += 1 error_cb(sys.exc_info()[1]) processed += 1 done_cb(processed, total) done_cb(total, total) log_cb("\n* %d groups processed and %d files saved in %.2f sec"%(processed, saved, time.time()-tps)) if not errors: log_cb("* No errors found") else: log_cb("* %d errors found"%errors) log_cb("\n")
[docs]def parseArguments(): import getopt, os.path prog= os.path.basename(sys.argv[0]) long = ["help", "input=", "output=", "force", "verbose", "deadtime", "livetime", "sum=", "avg", "name=", "parsing"] short= ["h", "i:", "o:", "f", "v", "d", "l", "s:", "a", "n:", "p"] try: opts, args= getopt.getopt(sys.argv[1:], " ".join(short), long) except getopt.error: print("XiaCorrect ERROR: Cannot parse command line arguments") print("\t%s" % sys.exc_info()[1]) sys.exit(0) parsing= 0 options= {"input": [], "files": [], "output": None, "force": 0, "name": "corr", "verbose": 0, "deadtime": 0, "livetime": 0, "sums": None, "avgflag": 0, "parsing": 0} for opt, arg in opts: if opt in ("-h", "--help"): printHelp() sys.exit(0) if opt in ("-i", "--input"): options["input"].append(os.path.normpath(arg)) if opt in ("-o", "--output"): options["output"]= os.path.normpath(arg) if opt in ("-f", "--force"): options["force"]= 1 if opt in ("-v", "--verbose"): options["verbose"]= 1 if opt in ("-d", "--deadtime"): options["deadtime"]= 1 if opt in ("-l", "--livetime"): options["livetime"]= 1 if opt in ("-n", "--name"): options["name"]= str(arg) if opt in ("-s", "--sum"): if options["sums"] is None: options["sums"]= [] try: ssum= [ int(det) for det in arg.split(",") ] if ssum[0]==-1: ssum= [] options["sums"].append(ssum) except: print("XiaCorrect ERROR: Cannot parse sum detectors") print("\t%s"%arg) sys.exit(0) if opt in ("-a", "--avg"): options["avgflag"]= 1 if opt in ("-p", "--parsing"): options["parsing"]= 1 for iinput in options["input"]: if not os.path.isdir(iinput): print("XiaCorrect WARNING: Input directory <%s> is not valid"%\ iinput) files= [ os.path.join(iinput, file) for file in os.listdir(iinput) ] if not len(files): print("XiaCorrect WARNING: Input directory <%s> is empty"%\ (iinput, prog)) else: options["files"]+= files if len(args): options["files"]+= args if not len(options["files"]): print("XiaCorrect ERROR: No input datafiles") sys.exit(0) if not options["parsing"]: if not options["deadtime"] and not options["livetime"] and options["sums"] is None: print("XiaCorrect ERROR: Must have at least deadtime, livetime or sum options") sys.exit(0) if options["output"] is not None: if not os.path.isdir(options["output"]): print("XiaCorrect ERROR: output directory is not valid") sys.exit(0) return options
[docs]def printHelp(): prog= os.path.basename(sys.argv[0]) msg= """ %s [-h] [-v] [-f] [-d] [-l] [-a] [-s <detlist>] [-i <directory>] [-o <directory>] [<files ...>] Options: [-h]/[--help] Print help message [-v]/[--verbose] Switch ON verbose mode [-f]/[--force] Force writing output files if they already exists [-d]/[--deadtime] Perform deadtime correction [-l]/[--livetime] Perform livetime normalization [-s]/[--sum] <detector_list_comma_separated> Sum given detectors. if detector list is set to (-1), all detectors are used: %s -s 2,4,8 --> will sum detectors 2,4 and 8 %s -s -1 --> will sum ALL detectors Several sums can be added: -s 2,4,6,7 -s 8,9,10,11 [-a]/[--avg] Sum(s) are averaged. Need <-s> to specify list of detectors: -s 2,3,4 -a --> will average detectors 2,3 and 4 [-i]/[--input] <directory> Specify input directory: all files in this directory which appears to be xia edf files are processed. Several [-i] options can be added: %s -d -i /tmp -i /data/opidXX [-o]/[--output] Specify output directories. If not specified, output files are saved in the same place as input file. [-n]/[--name] String to be appended to prefix for output filename. Default is \"corr\". [<files ...>] Specify one or several input files. Wildcards can be used: %s -l file1.edf file2.edf /tmp/test*.edf Minimum options to work: [-l] , [-d] or [-s] [-i input] or <files ...> """%(prog, prog, prog, prog, prog) print(msg)
[docs]def mainCommandLine(): options= parseArguments() files= parseFiles(options["files"], options["verbose"]) if files is not None: if options["parsing"]: for group in files: print("FileGroup:") for file in group: print(" - ", file.get()) else: correctFiles(files, options["deadtime"], options["livetime"], options["sums"], options["avgflag"], \ options["output"], options["name"], options["force"], options["verbose"])
[docs]def mainGUI(app=None): from PyMca5.PyMcaGui import PyMcaQt as qt from PyMca5.PyMcaGui.pymca import XiaCorrectWizard if app is None: app= qt.QApplication(sys.argv) wid= XiaCorrectWizard.XiaCorrectWizard() ret= wid.exec_() if ret==qt.QDialog.Accepted: options= wid.get() files= parseFiles(options["files"], options["verbose"]) if files is not None: correctFiles(files, options["deadtime"], options["livetime"], options["sums"], options["avgflag"], \ options["output"], options["name"], options["force"], options["verbose"])
if __name__=="__main__": import sys if len(sys.argv)==1: mainGUI() else: mainCommandLine()