#/*##########################################################################
#
# The PyMca X-Ray Fluorescence Toolkit
#
# Copyright (c) 2004-2015 European Synchrotron Radiation Facility
#
# This file is part of the PyMca X-ray Fluorescence Toolkit developed at
# the ESRF by the Software group.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
#############################################################################*/
__author__ = "V.A. Sole - ESRF Data Analysis"
__contact__ = "sole@esrf.fr"
__license__ = "MIT"
__copyright__ = "European Synchrotron Radiation Facility, Grenoble, France"
import os
import numpy
from PyMca5.PyMcaMath.linalg import lstsq
from . import ClassMcaTheory
from PyMca5.PyMcaMath.fitting import Gefit
from . import ConcentrationsTool
from PyMca5.PyMcaMath.fitting import SpecfitFuns
from PyMca5.PyMcaIO import ConfigDict
import time
DEBUG = 0
[docs]class FastXRFLinearFit(object):
def __init__(self, mcafit=None):
self._config = None
if mcafit is None:
self._mcaTheory = ClassMcaTheory.McaTheory()
else:
self._mcaTheory = mcafit
[docs] def setFitConfiguration(self, configuration):
self._mcaTheory.setConfiguration(configuration)
[docs] def setFitConfigurationFile(self, ffile):
if not os.path.exists(ffile):
raise IOError("File <%s> does not exists" % ffile)
configuration = ConfigDict.ConfigDict()
configuration.read(ffile)
self.setFitConfiguration(configuration)
[docs] def fitMultipleSpectra(self, x=None, y=None, xmin=None, xmax=None,
configuration=None, concentrations=False,
ysum=None, weight=None):
if y is None:
raise RuntimeError("y keyword argument is mandatory!")
#if concentrations:
# txt = "Fast concentration calculation not implemented yet"
# raise NotImplemented(txt)
if DEBUG:
t0 = time.time()
if configuration is not None:
self._mcaTheory.setConfiguration(configuration)
# read the current configuration
config = self._mcaTheory.getConfiguration()
# background
if config['fit']['stripflag']:
if config['fit']['stripalgorithm'] == 1:
if DEBUG:
print("SNIP")
else:
raise RuntimeError("Please use the faster SNIP background")
toReconfigure = False
if weight is None:
# dictated by the file
weight = config['fit']['fitweight']
if weight:
# individual pixel weights (slow)
weightPolicy = 2
else:
# No weight
weightPolicy = 0
elif weight == 1:
# use average weight from the sum spectrum
weightPolicy = 1
if not config['fit']['fitweight']:
config['fit']['fitweight'] = 1
toReconfigure = True
elif weight == 2:
# individual pixel weights (slow)
weightPolicy = 2
if not config['fit']['fitweight']:
config['fit']['fitweight'] = 1
toReconfigure = True
weight = 1
else:
# No weight
weightPolicy = 0
if config['fit']['fitweight']:
config['fit']['fitweight'] = 0
toReconfigure = True
weight = 0
if not config['fit']['linearfitflag']:
#make sure we force a linear fit
config['fit']['linearfitflag'] = 1
toReconfigure = True
if toReconfigure:
# we must configure again the fit
self._mcaTheory.setConfiguration(config)
if hasattr(y, "info") and hasattr(y, "data"):
data = y.data
mcaIndex = y.info.get("McaIndex", -1)
else:
data = y
mcaIndex = -1
if len(data.shape) != 3:
txt = "For the time being only three dimensional arrays supported"
raise IndexError(txt)
elif mcaIndex not in [-1, 2]:
txt = "For the time being only mca arrays supported"
raise IndexError(txt)
else:
# if the cumulated spectrum is present it should be better
nRows = data.shape[0]
nColumns = data.shape[1]
nPixels = nRows * nColumns
if ysum is not None:
firstSpectrum = ysum
elif weightPolicy == 1:
# we need to calculate the sum spectrum to derive the uncertainties
totalSpectra = data.shape[0] * data.shape[1]
jStep = min(5000, data.shape[1])
ysum = numpy.zeros((data.shape[mcaIndex],), numpy.float)
for i in range(0, data.shape[0]):
if i == 0:
chunk = numpy.zeros((data.shape[0], jStep), numpy.float)
jStart = 0
while jStart < data.shape[1]:
jEnd = min(jStart + jStep, data.shape[1])
ysum += data[i, jStart:jEnd, :].sum(axis=0, dtype=numpy.float)
jStart = jEnd
firstSpectrum = ysum
elif not concentrations:
# just one spectrum is enough for the setup
firstSpectrum = data[0, 0, :]
else:
firstSpectrum = data[0, :, :].sum(axis=0, dtype=numpy.float)
# make sure we calculate the matrix of the contributions
self._mcaTheory.enableOptimizedLinearFit()
# initialize the fit
# print("xmin = ", xmin)
# print("xmax = ", xmax)
# print("firstShape = ", firstSpectrum.shape)
self._mcaTheory.setData(x=x, y=firstSpectrum, xmin=xmin, xmax=xmax)
# and initialize the derivatives
self._mcaTheory.estimate()
# now we can get the derivatives respect to the free parameters
# These are the "derivatives" respect to the peaks
# linearMatrix = self._mcaTheory.linearMatrix
# but we are still missing the derivatives from the background
nFree = 0
freeNames = []
nFreeBackgroundParameters = 0
for i, param in enumerate(self._mcaTheory.PARAMETERS):
if self._mcaTheory.codes[0][i] != ClassMcaTheory.Gefit.CFIXED:
nFree += 1
freeNames.append(param)
if i < self._mcaTheory.NGLOBAL:
nFreeBackgroundParameters += 1
#build the matrix of derivatives
derivatives = None
idx = 0
for i, param in enumerate(self._mcaTheory.PARAMETERS):
if self._mcaTheory.codes[0][i] == ClassMcaTheory.Gefit.CFIXED:
continue
deriv= self._mcaTheory.linearMcaTheoryDerivative(self._mcaTheory.parameters,
i,
self._mcaTheory.xdata)
deriv.shape = -1
if derivatives is None:
derivatives = numpy.zeros((deriv.shape[0], nFree), numpy.float)
derivatives[:, idx] = deriv
idx += 1
#loop for anchors
xdata = self._mcaTheory.xdata
if config['fit']['stripflag']:
anchorslist = []
if config['fit']['stripanchorsflag']:
if config['fit']['stripanchorslist'] is not None:
ravelled = numpy.ravel(xdata)
for channel in config['fit']['stripanchorslist']:
if channel <= ravelled[0]:continue
index = numpy.nonzero(ravelled >= channel)[0]
if len(index):
index = min(index)
if index > 0:
anchorslist.append(index)
if len(anchorslist) == 0:
anchorlist = [0, self._mcaTheory.ydata.size - 1]
anchorslist.sort()
# find the indices to be used for selecting the appropriate data
# if the original x data were nor ordered we have a problem
# TODO: check for original ordering.
if x is None:
# we have an enumerated channels axis
iXMin = xdata[0]
iXMax = xdata[-1]
else:
iXMin = numpy.nonzero(x <= xdata[0])[0][-1]
iXMax = numpy.nonzero(x >= xdata[-1])[0][0]
dummySpectrum = firstSpectrum[iXMin:iXMax+1].reshape(-1, 1)
# print("dummy = ", dummySpectrum.shape)
# allocate the output buffer
results = numpy.zeros((nFree, nRows, nColumns), numpy.float32)
uncertainties = numpy.zeros((nFree, nRows, nColumns), numpy.float32)
#perform the initial fit
if DEBUG:
print("Configuration elapsed = %f" % (time.time() - t0))
t0 = time.time()
totalSpectra = data.shape[0] * data.shape[1]
jStep = min(100, data.shape[1])
if weightPolicy == 2:
SVD = False
sigma_b = None
elif weightPolicy == 1:
# the +1 is to prevent misbehavior due to weights less than 1.0
sigma_b = 1 + numpy.sqrt(dummySpectrum)/nPixels
SVD = True
else:
SVD = True
sigma_b = None
last_svd = None
for i in range(0, data.shape[0]):
#print(i)
#chunks of nColumns spectra
if i == 0:
chunk = numpy.zeros((dummySpectrum.shape[0],
jStep),
numpy.float)
jStart = 0
while jStart < data.shape[1]:
jEnd = min(jStart + jStep, data.shape[1])
chunk[:,:(jEnd - jStart)] = data[i, jStart:jEnd, iXMin:iXMax+1].T
if config['fit']['stripflag']:
for k in range(jStep):
# obtain the smoothed spectrum
background=SpecfitFuns.SavitskyGolay(chunk[:, k],
config['fit']['stripfilterwidth'])
lastAnchor = 0
for anchor in anchorslist:
if (anchor > lastAnchor) and (anchor < background.size):
background[lastAnchor:anchor] =\
SpecfitFuns.snip1d(background[lastAnchor:anchor],
config['fit']['snipwidth'],
0)
lastAnchor = anchor
if lastAnchor < background.size:
background[lastAnchor:] =\
SpecfitFuns.snip1d(background[lastAnchor:],
config['fit']['snipwidth'],
0)
chunk[:, k] -= background
# perform the multiple fit to all the spectra in the chunk
#print("SHAPES")
#print(derivatives.shape)
#print(chunk[:,:(jEnd - jStart)].shape)
ddict=lstsq(derivatives, chunk[:,:(jEnd - jStart)],
sigma_b=sigma_b,
weight=weight,
digested_output=True,
svd=SVD,
last_svd=last_svd)
last_svd = ddict.get('svd', None)
parameters = ddict['parameters']
results[:, i, jStart:jEnd] = parameters
uncertainties[:, i, jStart:jEnd] = ddict['uncertainties']
jStart = jEnd
if DEBUG:
t = time.time() - t0
print("First fit elapsed = %f" % t)
print("Spectra per second = %f" % (data.shape[0]*data.shape[1]/float(t)))
t0 = time.time()
# cleanup zeros
# start with the parameter with the largest amount of negative values
negativePresent = True
nFits = 0
while negativePresent:
zeroList = []
for i in range(nFree):
#we have to skip the background parameters
if i >= nFreeBackgroundParameters:
t = results[i] < 0
if t.sum() > 0:
zeroList.append((t.sum(), i, t))
if len(zeroList) == 0:
negativePresent = False
continue
if nFits > (2 * (nFree - nFreeBackgroundParameters)):
# we are probably in an endless loop
# force negative pixels
for item in zeroList:
i = item[1]
badMask = item[2]
results[i][badMask] = 0.0
print("WARNING: %d pixels of parameter %s set to zero" % (item[0], freeNames[i]))
continue
zeroList.sort()
zeroList.reverse()
badParameters = []
badParameters.append(zeroList[0][1])
badMask = zeroList[0][2]
if 1:
# prevent and endless loop if two or more parameters have common pixels where they are
# negative and one of them remains negative when forcing other one to zero
for i, item in enumerate(zeroList):
if item[1] not in badParameters:
if item[0] > 0:
#check if they have common negative pixels
t = badMask * item[-1]
if t.sum() > 0:
badParameters.append(item[1])
badMask = t
if badMask.sum() < (0.0025 * nPixels):
# fit not worth
for i in badParameters:
results[i][badMask] = 0.0
uncertainties[i][badMask] = 0.0
if DEBUG:
print("WARNING: %d pixels of parameter %s set to zero" % (badMask.sum(),
freeNames[i]))
else:
if DEBUG:
print("Number of secondary fits = %d" % (nFits + 1))
nFits += 1
A = derivatives[:, [i for i in range(nFree) if i not in badParameters]]
#assume we'll not have too many spectra
try:
spectra = data[badMask, iXMin:iXMax+1]
spectra.shape = badMask.sum(), -1
except TypeError:
# in case of dynamic arrays, two dimensional indices are not
# supported by h5py
spectra = numpy.zeros((int(badMask.sum()), 1 + iXMax - iXMin),
data.dtype)
selectedIndices = numpy.nonzero(badMask > 0)
tmpData = numpy.zeros((1, 1 + iXMax - iXMin), data.dtype)
oldDataRow = -1
j = 0
for i in range(len(selectedIndices[0])):
j = selectedIndices[0][i]
if j != oldDataRow:
tmpData = data[j]
olddataRow = j
spectra[i] = tmpData[selectedIndices[1][i], iXMin:iXMax+1]
spectra = spectra.T
#
if config['fit']['stripflag']:
for k in range(spectra.shape[1]):
# obtain the smoothed spectrum
background=SpecfitFuns.SavitskyGolay(spectra[:, k],
config['fit']['stripfilterwidth'])
lastAnchor = 0
for anchor in anchorslist:
if (anchor > lastAnchor) and (anchor < background.size):
background[lastAnchor:anchor] =\
SpecfitFuns.snip1d(background[lastAnchor:anchor],
config['fit']['snipwidth'],
0)
lastAnchor = anchor
if lastAnchor < background.size:
background[lastAnchor:] =\
SpecfitFuns.snip1d(background[lastAnchor:],
config['fit']['snipwidth'],
0)
spectra[:, k] -= background
ddict = lstsq(A, spectra,
sigma_b=sigma_b,
weight=weight,
digested_output=True,
svd=SVD)
idx = 0
for i in range(nFree):
if i in badParameters:
results[i][badMask] = 0.0
uncertainties[i][badMask] = 0.0
else:
results[i][badMask] = ddict['parameters'][idx]
uncertainties[i][badMask] = ddict['uncertainties'][idx]
idx += 1
if DEBUG:
t = time.time() - t0
print("Fit of negative peaks elapsed = %f" % t)
t0 = time.time()
outputDict = {'parameters':results, 'uncertainties':uncertainties, 'names':freeNames}
if concentrations:
# check if an internal reference is used and if it is set to auto
####################################################
# CONCENTRATIONS
cTool = ConcentrationsTool.ConcentrationsTool()
cToolConf = cTool.configure()
cToolConf.update(config['concentrations'])
fitFirstSpectrum = False
if config['concentrations']['usematrix']:
if DEBUG:
print("USING MATRIX")
if config['concentrations']['reference'].upper() == "AUTO":
fitFirstSpectrum = True
fitresult = {}
if fitFirstSpectrum:
# we have to fit the "reference" spectrum just to get the reference element
mcafitresult = self._mcaTheory.startfit(digest=0, linear=True)
# if one of the elements has zero area this cannot be made directly
fitresult['result'] = self._mcaTheory.imagingDigestResult()
fitresult['result']['config'] = config
concentrationsResult, addInfo = cTool.processFitResult(config=cToolConf,
fitresult=fitresult,
elementsfrommatrix=False,
fluorates=self._mcaTheory._fluoRates,
addinfo=True)
# and we have to make sure that all the areas are positive
for group in fitresult['result']['groups']:
if fitresult['result'][group]['fitarea'] <= 0.0:
# give a tiny area
fitresult['result'][group]['fitarea'] = 1.0e-6
config['concentrations']['reference'] = addInfo['ReferenceElement']
else:
fitresult['result'] = {}
fitresult['result']['config'] = config
fitresult['result']['groups'] = []
idx = 0
for i, param in enumerate(self._mcaTheory.PARAMETERS):
if self._mcaTheory.codes[0][i] == Gefit.CFIXED:
continue
if i < self._mcaTheory.NGLOBAL:
# background
pass
else:
fitresult['result']['groups'].append(param)
fitresult['result'][param] = {}
# we are just interested on the factor to be applied to the area to get the
# concentrations
fitresult['result'][param]['fitarea'] = 1.0
fitresult['result'][param]['sigmaarea'] = 1.0
idx += 1
concentrationsResult, addInfo = cTool.processFitResult(config=cToolConf,
fitresult=fitresult,
elementsfrommatrix=False,
fluorates=self._mcaTheory._fluoRates,
addinfo=True)
nValues = 1
if len(concentrationsResult['layerlist']) > 1:
nValues += len(concentrationsResult['layerlist'])
nElements = len(list(concentrationsResult['mass fraction'].keys()))
massFractions = numpy.zeros((nValues * nElements, nRows, nColumns),
numpy.float32)
referenceElement = addInfo['ReferenceElement']
referenceTransitions = addInfo['ReferenceTransitions']
if DEBUG:
print("Reference <%s> transition <%s>" % (referenceElement, referenceTransitions))
if referenceElement in ["", None, "None"]:
if DEBUG:
print("No reference")
counter = 0
for i, group in enumerate(fitresult['result']['groups']):
if group.lower().startswith("scatter"):
if DEBUG:
print("skept %s" % group)
continue
outputDict['names'].append("C(%s)" % group)
massFractions[counter] = results[nFreeBackgroundParameters+i] *\
(concentrationsResult['mass fraction'][group]/fitresult['result'][param]['fitarea'])
if len(concentrationsResult['layerlist']) > 1:
for layer in concentrationsResult['layerlist']:
outputDict['names'].append("C(%s)-%s" % (group, layer))
massFractions[counter] = results[nFreeBackgroundParameters+i] *\
(concentrationsResult[layer]['mass fraction'][group]/fitresult['result'][param]['fitarea'])
else:
if DEBUG:
print("With reference")
idx = None
testGroup = referenceElement+ " " + referenceTransitions.split()[0]
for i, group in enumerate(fitresult['result']['groups']):
if group == testGroup:
idx = i
if idx is None:
raise ValueError("Invalid reference: <%s> <%s>" %\
(referenceElement, referenceTransitions))
group = fitresult['result']['groups'][idx]
referenceArea = fitresult['result'][group]['fitarea']
referenceConcentrations = concentrationsResult['mass fraction'][group]
goodIdx = results[nFreeBackgroundParameters+idx] > 0
massFractions[idx] = referenceConcentrations
counter = 0
for i, group in enumerate(fitresult['result']['groups']):
if group.lower().startswith("scatter"):
if DEBUG:
print("skept %s" % group)
continue
outputDict['names'].append("C(%s)" % group)
if i == idx:
continue
goodI = results[nFreeBackgroundParameters+i] > 0
tmp = results[nFreeBackgroundParameters+idx][goodI]
massFractions[i][goodI] = (results[nFreeBackgroundParameters+i][goodI]/(tmp + (tmp == 0))) *\
((referenceArea/fitresult['result'][group]['fitarea']) *\
(concentrationsResult['mass fraction'][group]))
if len(concentrationsResult['layerlist']) > 1:
for layer in concentrationsResult['layerlist']:
outputDict['names'].append("C(%s)-%s" % (group, layer))
massFractions[i][goodI] = (results[nFreeBackgroundParameters+i][goodI]/(tmp + (tmp == 0))) *\
((referenceArea/fitresult['result'][group]['fitarea']) *\
(concentrationsResult[layer]['mass fraction'][group]))
outputDict['concentrations'] = massFractions
if DEBUG:
t = time.time() - t0
print("Calculation of concentrations elapsed = %f" % t)
t0 = time.time()
####################################################
return outputDict
if __name__ == "__main__":
DEBUG = True
import glob
from PyMca.PyMcaIO import EDFStack
if 1:
#configurationFile = "G4-4720eV-NOWEIGHT-NO_Constant-batch.cfg"
configurationFile = "G4-4720eV-WEIGHT-NO_Constant-batch.cfg"
fileList = glob.glob("E:\DATA\COTTE\CH1777\G4_mca_0012_0000_*.edf")
concentrations = False
dataStack = EDFStack.EDFStack(filelist=fileList)
elif 0:
configurationFile = "D:\RIVARD\config_3-6kev_OceanIsland_batch_NO_BACKGROUND.cfg"
fileList = glob.glob("D:\RIVARD\m*.edf")
concentrations = False
dataStack = EDFStack.EDFStack(filelist=fileList)
else:
configurationFile = "E2_line.cfg"
fileList = glob.glob("E:\DATA\PyMca-Training\FDM55\AS_EDF\E2_line*.edf")
concentrations = False
dataStack = EDFStack.EDFStack(filelist=fileList)
t0 = time.time()
fastFit = FastXRFLinearFit()
fastFit.setFitConfigurationFile(configurationFile)
print("Main configuring Elapsed = % s " % (time.time() - t0))
results = fastFit.fitMultipleSpectra(y=dataStack,
concentrations=concentrations)
print("Total Elapsed = % s " % (time.time() - t0))