#/*##########################################################################
#
# The PyMca X-Ray Fluorescence Toolkit
#
# Copyright (c) 2004-2014 European Synchrotron Radiation Facility
#
# This file is part of the PyMca X-ray Fluorescence Toolkit developed at
# the ESRF by the Software group.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
#############################################################################*/
__author__ = "V.A. Sole - ESRF Data Analysis"
__contact__ = "sole@esrf.fr"
__license__ = "MIT"
__copyright__ = "European Synchrotron Radiation Facility, Grenoble, France"
import sys
import numpy
import numpy.linalg
try:
# make a explicit import to warn about missing optimized libraries
import numpy.core._dotblas as dotblas
except ImportError:
print("WARNING: Not using BLAS/ATLAS, PCA calculation will be slower")
dotblas = numpy
DEBUG = 0
[docs]def getCovarianceMatrix(stack,
index=None,
binning=None,
dtype=numpy.float64,
force=True,
center=True,
weights=None,
spatial_mask=None):
"""
Calculate the covariance matrix of input data (stack) array. The input array is to be
understood as a set of observables (spectra) taken at different instances (for instance
spatial coordinates).
:param stack: Array of data. Dimension greater than one.
:type stack: Numpy ndarray.
:param index: Integer specifying the array dimension containing the "observables". Only the first
the first (index = 0) or the last dimension (index = -1 or index = (ndimensions - 1)) supported.
:type index: Integer (default is -1 to indicate it is the last dimension of input array)
:param binning: Current implementation corresponds to a sampling of the spectral data and not to
an actual binning. This may change in future versions.
:type binning: Positive integer (default 1)
:param dtype: Keyword indicating the data type of the returned covariance matrix.
:type dtype: A valid numpy data type (default numpy.float64)
:param force: Indicate how to calculate the covariance matrix:
- False : Perform the product data.T * data in one call
- True : Perform the product data.T * data progressively (smaller memory footprint)
:type force: Boolean (default True)
:param center: Indicate if the mean is to be subtracted from the observables.
:type center: Boolean (default True)
:param weights: Weight to be applied to each observable. It can therefore be used as a spectral mask
setting the weight to 0 on the values to ignore.
:type weights: Numpy ndarray of same size as the observables or None (default).
:spatial_mask: Array of size n where n is the number of measurement instances. In mapping
experiments, n would be equal to the number of pixels.
:type spatial_mask: Numpy array of unsigned bytes (numpy.uint8) or None (default).
:returns: The covMatrix, the average spectrum and the number of used pixels.
"""
#the 1D mask = weights should correspond to the values, before or after
#sampling? it could be handled as weigths to be applied to the
#spectra. That would allow two uses, as mask and as weights, at
#the cost of a multiplication.
#the spatial_mask accounts for pixels to be considered. It allows
#to calculate the covariance matrix of a subset or to deal with
#non finite data (NaN, +inf, -inf, ...). The calling program
#should set the mask.
#recover the actual data to work with
if hasattr(stack, "info") and hasattr(stack, "data"):
#we are dealing with a PyMca data object
data = stack.data
if index is None:
index = stack.info.get("McaWindex", -1)
else:
data = stack
if index is None:
index = -1
oldShape = data.shape
if index not in [0, -1, len(oldShape) - 1]:
data = None
raise IndexError("1D index must be one of 0, -1 or %d" % len(oldShape))
if index < 0:
actualIndex = len(oldShape) + index
else:
actualIndex = index
#the number of spatial pixels
nPixels = 1
for i in range(len(oldShape)):
if i != actualIndex:
nPixels *= oldShape[i]
#remove inf or nan
#image_data = data.sum(axis=actualIndex)
#spatial_mask = numpy.isfinite(image_data)
#
#the starting number of channels or of images
N = oldShape[actualIndex]
# our binning (better said sampling) is spectral, in order not to
# affect the spatial resolution
if binning is None:
binning = 1
if spatial_mask is not None:
cleanMask = spatial_mask[:].reshape(nPixels)
usedPixels = cleanMask.sum()
badMask = numpy.array(spatial_mask < 1, dtype=cleanMask.dtype)
badMask.shape = nPixels
else:
cleanMask = None
usedPixels = nPixels
nChannels = int(N / binning)
if weights is None:
weights = numpy.ones(N, numpy.float)
if weights.size == nChannels:
# binning was taken into account
cleanWeights = weights[:]
else:
cleanWeights = weights[::binning]
#end of checking part
eigenvectorLength = nChannels
if (not force)and isinstance(data, numpy.ndarray):
if DEBUG:
print("Memory consuming calculation")
#make a direct calculation (memory cosuming)
#take a view to the data
dataView = data[:]
if index in [0]:
#reshape the view to allow the matrix multiplication
dataView.shape = -1, nPixels
cleanWeights.shape = -1, 1
dataView = dataView[::binning] * cleanWeights
if cleanMask is not None:
dataView[:, badMask] = 0
sumSpectrum = dataView.sum(axis=1, dtype=numpy.float64)
#and return the standard covariance matrix as a matrix product
covMatrix = dotblas.dot(dataView, dataView.T)\
/ float(usedPixels - 1)
else:
#the last index
dataView.shape = nPixels, -1
cleanWeights.shape = 1, -1
dataView = dataView[:, ::binning] * cleanWeights
if cleanMask is not None:
cleanMask.shape = -1
if 0:
for i in range(dataView.shape[-1]):
dataView[badMask, i] = 0
else:
dataView[badMask] = 0
sumSpectrum = dataView.sum(axis=0, dtype=numpy.float64)
#and return the standard covariance matrix as a matrix product
covMatrix = dotblas.dot(dataView.T, dataView )\
/ float(usedPixels - 1)
if center:
averageMatrix = numpy.outer(sumSpectrum, sumSpectrum)\
/ (usedPixels * (usedPixels - 1))
covMatrix -= averageMatrix
averageMatrix = None
return covMatrix, sumSpectrum / usedPixels, usedPixels
#we are dealing with dynamically loaded data
if DEBUG:
print("DYNAMICALLY LOADED DATA")
#create the needed storage space for the covariance matrix
try:
covMatrix = numpy.zeros((eigenvectorLength, eigenvectorLength),
dtype=dtype)
sumSpectrum = numpy.zeros((eigenvectorLength,), numpy.float64)
except:
#make sure no reference to the original input data is kept
cleanWeights = None
covMatrix = None
averageMatrix = None
data = None
raise
#workaround a problem with h5py
try:
if actualIndex in [0]:
testException = data[0:1]
else:
if len(data.shape) == 2:
testException = data[0:1,-1]
elif len(data.shape) == 3:
testException = data[0:1,0:1,-1]
except AttributeError:
txt = "%s" % type(data)
if 'h5py' in txt:
print("Implementing h5py workaround")
import h5py
data = h5py.Dataset(data.id)
else:
raise
if actualIndex in [0]:
#divider is used to decide the fraction of images to keep in memory
#in order to limit file access on dynamically loaded data.
#Since two chunks of the same size are used, the amount of memory
#needed is twice the data size divided by the divider.
#For instance, divider = 10 implies the data to be read 5.5 times
#from disk while having a memory footprint of about one fifth of
#the dataset size.
step = 0
divider = 10
while step < 1:
step = int(oldShape[index] / divider)
divider -= 2
if divider <= 0:
step = oldShape[index]
break
if DEBUG:
print("Reading chunks of %d images" % step)
nImagesRead = 0
if (binning == 1) and oldShape[index] >= step:
chunk1 = numpy.zeros((step, nPixels), numpy.float64)
chunk2 = numpy.zeros((nPixels, step), numpy.float64)
if spatial_mask is not None:
badMask.shape = -1
cleanMask.shape = -1
i = 0
while i < N:
iToRead = min(step, N - i)
#get step images for the first chunk
chunk1[0:iToRead] = data[i:i + iToRead].reshape(iToRead, -1)
if spatial_mask is not None:
chunk1[0:iToRead, badMask] = 0
sumSpectrum[i:i + iToRead] = chunk1[0:iToRead].sum(axis=1)
if center:
average = sumSpectrum[i:i + iToRead] / usedPixels
average.shape = iToRead, 1
chunk1[0:iToRead] -= average
if spatial_mask is not None:
chunk1[0:iToRead, badMask] = 0
nImagesRead += iToRead
j = 0
while j <= i:
#get step images for the second chunk
if j == i:
jToRead = iToRead
if 0:
for k in range(0, jToRead):
chunk2[:, k] = chunk1[k]
else:
chunk2[:, 0:jToRead] = chunk1[0:jToRead, :].T
else:
#get step images for the second chunk
jToRead = min(step, nChannels - j)
#with loop:
#for k in range(0, jToRead):
# chunk2[:,k] = data[(j+k):(j+k+1)].reshape(1,-1)
# if spatial_mask is not None:
# chunk2[badMask[(j+k):(j+k+1),k]] = 0
#equivalent without loop:
chunk2[:, 0:jToRead] =\
data[j:(j + jToRead)].reshape(jToRead, -1).T
if spatial_mask is not None:
chunk2[badMask, 0:jToRead] = 0
nImagesRead += jToRead
if center:
average = \
chunk2[:, 0:jToRead].sum(axis=0) / usedPixels
average.shape = 1, jToRead
chunk2[:, 0:jToRead] -= average
if spatial_mask is not None:
chunk2[badMask, 0:jToRead] = 0
#dot product
if (iToRead != step) or (jToRead != step):
covMatrix[i: (i + iToRead), j: (j + jToRead)] =\
dotblas.dot(chunk1[:iToRead, :nPixels],
chunk2[:nPixels, :jToRead])
else:
covMatrix[i: (i + iToRead), j: (j + jToRead)] =\
dotblas.dot(chunk1, chunk2)
if i != j:
covMatrix[j: (j + jToRead), i: (i + iToRead)] =\
covMatrix[i: (i + iToRead), j: (j + jToRead)].T
#increment j
j += jToRead
i += iToRead
chunk1 = None
chunk2 = None
if DEBUG:
print("totalImages Read = ", nImagesRead)
elif (binning > 1) and (oldShape[index] >= step):
chunk1 = numpy.zeros((step, nPixels), numpy.float64)
chunk2 = numpy.zeros((nPixels, step), numpy.float64)
#one by one reading till we fill the chunks
imagesToRead = numpy.arange(0, oldShape[index], binning)
i = int(imagesToRead[weights > 0][0])
spectrumIndex = 0
nImagesRead = 0
while i < N:
#fill chunk1
jj = 0
for iToRead in range(0, int(min(step * binning, N - i)),
binning):
chunk1[jj] = data[i + iToRead].reshape(1, -1) * \
weights[i + iToRead]
jj += 1
sumSpectrum[spectrumIndex:(spectrumIndex + jj)] = \
chunk1[0:jj].sum(axis=1)
if center:
average = \
sumSpectrum[spectrumIndex:(spectrumIndex + jj)] / nPixels
average.shape = jj, 1
chunk1[0:jj] -= average
nImagesRead += jj
iToRead = jj
j = 0
while j <= i:
#get step images for the second chunk
if j == i:
jToRead = iToRead
chunk2[:, 0:jToRead] = chunk1[0:jToRead, :].T
else:
#get step images for the second chunk
jj = 0
for jToRead in range(0,
int(min(step * binning, N - j)),
binning):
chunk2[:, jj] =\
data[j + jToRead].reshape(1, -1)\
* weights[j + jToRead]
jj += 1
nImagesRead += jj
if center:
average = chunk2[:, 0:jj].sum(axis=0) / nPixels
average.shape = 1, jj
chunk2 -= average
jToRead = jj
#dot product
if (iToRead != step) or (jToRead != step):
covMatrix[i:(i + iToRead), j:(j + jToRead)] =\
dotblas.dot(chunk1[:iToRead, :nPixels],
chunk2[:nPixels, :jToRead])
else:
covMatrix[i:(i + iToRead), j:(j + jToRead)] =\
dotblas.dot(chunk1, chunk2)
if i != j:
covMatrix[j:(j + jToRead), i:(i + iToRead)] =\
covMatrix[i:(i + iToRead), j:(j + jToRead)].T
#increment j
j += jToRead * step
i += iToRead * step
chunk1 = None
chunk2 = None
else:
raise ValueError("PCATools.getCovarianceMatrix: Unhandled case")
#should one divide by N or by N-1 ?? if we use images, we
#assume the observables are the images, not the spectra!!!
#so, covMatrix /= nChannels is wrong and one has to use:
covMatrix /= usedPixels
else:
#the data are already arranged as (nPixels, nChannels) and we
#basically have to return data.T * data to get back the covariance
#matrix as (nChannels, nChannels)
#if someone had the bad idea to store the data in HDF5 with a chunk
#size based on the pixels and not on the spectra a loop based on
#reading spectrum per spectrum can be very slow
step = 0
divider = 10
while step < 1:
step = int(nPixels / divider)
divider -= 1
if divider <= 0:
step = nPixels
break
step = nPixels
if DEBUG:
print("Reading chunks of %d spectra" % step)
cleanWeights.shape = 1, -1
if len(data.shape) == 2:
if cleanMask is not None:
badMask.shape = -1
tmpData = numpy.zeros((step, nChannels), numpy.float64)
k = 0
while k < nPixels:
kToRead = min(step, nPixels - k)
tmpData[0:kToRead] = data[k: k + kToRead, ::binning]
if cleanMask is not None:
tmpData[badMask[k: k + kToRead]] = 0
a = tmpData[0:kToRead] * cleanWeights
sumSpectrum += a.sum(axis=0)
covMatrix += dotblas.dot(a.T, a)
a = None
k += kToRead
tmpData = None
elif len(data.shape) == 3:
if oldShape[0] == 1:
#close to the previous case
tmpData = numpy.zeros((step, nChannels), numpy.float64)
if cleanMask is not None:
badMask.shape = data.shape[0], data.shape[1]
for i in range(oldShape[0]):
k = 0
while k < oldShape[1]:
kToRead = min(step, oldShape[1] - k)
tmpData[0:kToRead] = data[i, k:k + kToRead, ::binning]\
* cleanWeights
if cleanMask is not None:
tmpData[0:kToRead][badMask[i, k: k + kToRead]] = 0
a = tmpData[0:kToRead]
sumSpectrum += a.sum(axis=0)
covMatrix += dotblas.dot(a.T, a)
a = None
k += kToRead
tmpData = None
elif oldShape[1] == 1:
#almost identical to the previous case
tmpData = numpy.zeros((step, nChannels), numpy.float64)
if cleanMask is not None:
badMask.shape = data.shape[0], data.shape[1]
for i in range(oldShape[1]):
k = 0
while k < oldShape[0]:
kToRead = min(step, oldShape[0] - k)
tmpData[0:kToRead] = data[k: k + kToRead, i, ::binning]\
* cleanWeights
if cleanMask is not None:
tmpData[0:kToRead][badMask[k: k + kToRead, i]] = 0
a = tmpData[0:kToRead]
sumSpectrum += a.sum(axis=0)
covMatrix += dotblas.dot(a.T, a)
a = None
k += kToRead
tmpData = None
elif oldShape[0] < 21:
if step > oldShape[1]:
step = oldShape[1]
tmpData = numpy.zeros((step, nChannels), numpy.float64)
if cleanMask is not None:
badMask.shape = data.shape[0], data.shape[1]
for i in range(oldShape[0]):
k = 0
while k < oldShape[1]:
kToRead = min(step, oldShape[1] - k)
tmpData[0:kToRead] = data[i, k: k + kToRead, ::binning]\
* cleanWeights
if cleanMask is not None:
tmpData[0:kToRead][badMask[i, k: k + kToRead]] = 0
a = tmpData[0:kToRead]
sumSpectrum += a.sum(axis=0)
covMatrix += dotblas.dot(a.T, a)
a = None
k += kToRead
tmpData = None
else:
#I should choose the sizes in terms of the size
#of the dataset
if oldShape[0] < 41:
#divide by 10
deltaRow = 4
elif oldShape[0] < 101:
#divide by 10
deltaRow = 10
else:
#take pieces of one tenth
deltaRow = int(oldShape[0] / 10)
deltaCol = oldShape[1]
tmpData = numpy.zeros((deltaRow, deltaCol, nChannels),
numpy.float64)
if cleanMask is not None:
badMask.shape = data.shape[0], data.shape[1]
i = 0
while i < oldShape[0]:
iToRead = min(deltaRow, oldShape[0] - i)
kToRead = iToRead * oldShape[1]
tmpData[:iToRead] = data[i:(i + iToRead), :, ::binning]
if cleanMask is not None:
tmpData[0:kToRead][badMask[i:(i + iToRead), :]] = 0
a = tmpData[:iToRead]
a.shape = kToRead, nChannels
a *= cleanWeights
if 0:
#weight each spectrum
a /= (a.sum(axis=1).reshape(-1, 1))
sumSpectrum += a.sum(axis=0)
covMatrix += dotblas.dot(a.T, a)
a = None
i += iToRead
#should one divide by N or by N-1 ??
covMatrix /= usedPixels - 1
if center:
#the n-1 appears again here
averageMatrix = numpy.outer(sumSpectrum, sumSpectrum)\
/ (usedPixels * (usedPixels - 1))
covMatrix -= averageMatrix
averageMatrix = None
return covMatrix, sumSpectrum / usedPixels, usedPixels
[docs]def numpyPCA(stack, index=-1, ncomponents=10, binning=None,
center=True, scale=True, mask=None, spectral_mask=None, **kw):
if DEBUG:
print("PCATools.numpyPCA")
print("index = %d" % index)
print("center = %s" % center)
print("scale = %s" % scale)
#recover the actual data to work with
if hasattr(stack, "info") and hasattr(stack, "data"):
#we are dealing with a PyMca data object
data = stack.data
else:
data = stack
force = kw.get("force", True)
oldShape = data.shape
if index not in [0, -1, len(oldShape) - 1]:
data = None
raise IndexError("1D index must be one of 0, -1 or %d, got %d" %\
(len(oldShape) - 1, index))
if index < 0:
actualIndex = len(oldShape) + index
else:
actualIndex = index
#workaround a problem with h5py
try:
if actualIndex in [0]:
testException = data[0:1]
else:
if len(data.shape) == 2:
testException = data[0:1,-1]
elif len(data.shape) == 3:
testException = data[0:1,0:1,-1]
except AttributeError:
txt = "%s" % type(data)
if 'h5py' in txt:
print("Implementing h5py workaround")
import h5py
data = h5py.Dataset(data.id)
else:
raise
#the number of spatial pixels
nPixels = 1
for i in range(len(oldShape)):
if i != actualIndex:
nPixels *= oldShape[i]
#the number of channels
nChannels = oldShape[actualIndex]
if binning is None:
binning = 1
N = int(nChannels / binning)
if ncomponents > N:
msg = "Requested %d components for a maximum of %d" % (ncomponents, N)
raise ValueError(msg)
# avgSpectrum is unused, but it makes the code readable
cov, avgSpectrum, calculatedPixels = getCovarianceMatrix(stack,
index=index,
binning=binning,
force=force,
center=center,
spatial_mask=mask,
weights=spectral_mask)
#the total variance is the sum of the elements of the diagonal
totalVariance = numpy.diag(cov)
standardDeviation = numpy.sqrt(totalVariance)
standardDeviation = standardDeviation + (standardDeviation == 0)
print("Total Variance = ", totalVariance.sum())
normalizeToUnitStandardDeviation = scale
if 0:
#option to normalize to unit standard deviation
if normalizeToUnitStandardDeviation:
for i in range(cov.shape[0]):
if totalVariance[i] > 0:
cov[i, :] /= numpy.sqrt(totalVariance[i])
cov[:, i] /= numpy.sqrt(totalVariance[i])
if DEBUG:
import time
t0 = time.time()
evalues, evectors = numpy.linalg.eigh(cov)
if DEBUG:
print("Eig elapsed = ", time.time() - t0)
cov = None
dtype = numpy.float32
images = numpy.zeros((ncomponents, nPixels), dtype)
eigenvectors = numpy.zeros((ncomponents, N), dtype)
eigenvalues = numpy.zeros((ncomponents,), dtype)
#sort eigenvalues
if 1:
a = [(evalues[i], i) for i in range(len(evalues))]
a.sort()
a.reverse()
totalExplainedVariance = 0.0
for i0 in range(ncomponents):
i = a[i0][1]
eigenvalues[i0] = evalues[i]
partialExplainedVariance = 100. * evalues[i] / totalVariance.sum()
print("PC%02d Explained variance %.5f %% " %\
(i0 + 1, partialExplainedVariance))
totalExplainedVariance += partialExplainedVariance
eigenvectors[i0, :] = evectors[:, i]
#print("NORMA = ", numpy.dot(evectors[:, i].T, evectors[:, i]))
print("Total explained variance = %.2f %% " % totalExplainedVariance)
else:
idx = numpy.argsort(evalues)
eigenvalues[:] = evalues[idx]
eigenvectors[:, :] = evectors[:, idx].T
#calculate the projections
if actualIndex in [0]:
for i in range(oldShape[actualIndex]):
tmpData = (data[i].reshape(1, -1) - avgSpectrum[i]) / standardDeviation[i]
for j in range(ncomponents):
images[j:j + 1, :] += tmpData * eigenvectors[j, i]
if len(oldShape) == 3:
#reshape the images
images.shape = ncomponents, oldShape[1], oldShape[2]
else:
#array of spectra
if len(oldShape) == 2:
for i in range(nPixels):
#print i
tmpData = data[i, :]
tmpData.shape = 1, nChannels
tmpData = (tmpData[:, ::binning] - avgSpectrum) / standardDeviation
for j in range(ncomponents):
images[j, i] = numpy.dot(tmpData, eigenvectors[j])
#reshape the images
images.shape = ncomponents, nPixels
elif len(oldShape) == 3:
i = 0
for r in range(oldShape[0]):
for c in range(oldShape[1]):
#print i
tmpData = data[r, c, :]
tmpData.shape = 1, nChannels
tmpData = (tmpData[:, ::binning] - avgSpectrum) / standardDeviation
for j in range(ncomponents):
images[j, i] = numpy.dot(tmpData, eigenvectors[j])
i += 1
#reshape the images
images.shape = ncomponents, oldShape[0], oldShape[1]
return images, eigenvalues, eigenvectors
[docs]def test():
x = numpy.array([[0.0, 2.0, 3.0],
[3.0, 0.0, -1.0],
[4.0, -4.0, 4.0],
[4.0, 4.0, 4.0]])
shape0 = x.shape
print("x:")
print(x)
print("Numpy covariance matrix. It uses (n-1)")
print(numpy.cov(x.T))
avg = x.sum(axis=0).reshape(-1, 1) / x.shape[0]
print("Average = ", avg)
print("OPERATION")
print(numpy.dot((x.T - avg), (x.T - avg).T) / (x.shape[0] - 1))
print("PCATools.getCovarianceMatrix(x, force=True)")
x.shape = 1, shape0[0], shape0[1]
pymcaCov, pymcaAvg, nData = getCovarianceMatrix(x, force=True)
print("PyMca covariance matrix. It uses (n-1)")
print(pymcaCov)
print("Average = ", pymcaAvg)
print("PCATools.getCovarianceMatrix(x, force=True) using spatial_mask")
x.shape = 1, shape0[0], shape0[1]
dataSum = x.sum(axis=-1)
spatial_mask = numpy.isfinite(dataSum)
pymcaCov, pymcaAvg, nData = getCovarianceMatrix(x, force=True,
spatial_mask=spatial_mask)
print("PyMca covariance matrix. It uses (n-1)")
print(pymcaCov)
print("Average = ", pymcaAvg)
print("PCATools.getCovarianceMatrix(x, force=False)")
x.shape = 1, shape0[0], shape0[1]
pymcaCov, pymcaAvg, nData = getCovarianceMatrix(x, force=False)
print("PyMca covariance matrix. It uses (n-1)")
print(pymcaCov)
print("Average = ", pymcaAvg)
print("PCATools.getCovarianceMatrix(x, force=False) using spatial_mask")
x.shape = 1, shape0[0], shape0[1]
y = numpy.zeros((2, shape0[0], shape0[1]))
y[0] = x[0]
y[1, :, :] = numpy.nan
dataSum = y.sum(axis=-1)
spatial_mask = numpy.isfinite(dataSum)
pymcaCov, pymcaAvg, nData = getCovarianceMatrix(y, force=False,
spatial_mask=spatial_mask)
print("PyMca covariance matrix. It uses (n-1)")
print(pymcaCov)
print("Average = ", pymcaAvg)
print("PCATools.getCovarianceMatrix(x, force=True) using spatial_mask")
y[1, :, :] = numpy.nan
dataSum = y.sum(axis=-1)
spatial_mask = numpy.isfinite(dataSum)
pymcaCov, pymcaAvg, nData = getCovarianceMatrix(y, force=True,
spatial_mask=spatial_mask)
print("PyMca covariance matrix. It uses (n-1)")
print(pymcaCov)
print("Average = ", pymcaAvg)
print("PCATools.getCovarianceMatrix(x)")
x.shape = shape0[0], 1, shape0[1]
pymcaCov, pymcaAvg, nData = getCovarianceMatrix(x)
print("PyMca covariance matrix. It uses (n-1)")
print(pymcaCov)
print("Average = ", pymcaAvg)
print("MDP")
import mdp
pca = mdp.nodes.PCANode(dtype=numpy.float)
x.shape = shape0
pca.train(x)
# access to a protected member to prevent
# deletion of the covariance matrix when using
# stop_training.
pca._stop_training(debug=True)
print("MDP covariance matrix. It uses (n-1)")
print(pca.cov_mtx)
print("Average = ", pca.avg)
print("TEST AS IMAGES")
stack = numpy.zeros((shape0[-1], shape0[0], 1), numpy.float)
for i in range(stack.shape[0]):
stack[i, :, 0] = x[:, i]
x = stack
print("PCATools.getCovarianceMatrix(x) force=True")
pymcaCov, pymcaAvg, nData = getCovarianceMatrix(x, index=0, force=True)
print("PyMca covariance matrix. It uses (n-1)")
print(pymcaCov)
print("Average = ", pymcaAvg)
print("PCATools.getCovarianceMatrix(x) force=True) use_spatialMask")
y = numpy.zeros((shape0[-1], shape0[0], 2), numpy.float)
y[:, :, 0] = x[:, :, 0]
y[:, :, 1] = numpy.nan
dataSum = y.sum(axis=0)
spatial_mask = numpy.isfinite(dataSum)
pymcaCov, pymcaAvg, nData = getCovarianceMatrix(y, index=0, force=True,
spatial_mask=spatial_mask)
print("PyMca covariance matrix. It uses (n-1)")
print(pymcaCov)
print("Average = ", pymcaAvg)
print("PCATools.getCovarianceMatrix(x), force=False")
pymcaCov, pymcaAvg, nData = getCovarianceMatrix(x, index=0, force=False)
print("PyMca covariance matrix. It uses (n-1)")
print(pymcaCov)
print("Average = ", pymcaAvg)
if __name__ == "__main__":
test()