Source code for PyMca5.PyMcaIO.PilatusCBF

#/*##########################################################################
#
# The PyMca X-Ray Fluorescence Toolkit
#
# Copyright (c) 2004-2014 European Synchrotron Radiation Facility
#
# This file is part of the PyMca X-ray Fluorescence Toolkit developed at
# the ESRF by the Software group.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
#############################################################################*/
"""
Authors: Jerome Kieffer, ESRF
         email:jerome.kieffer@esrf.fr

Cif Binary Files images are 2D images written by the Pilatus detector and others.
They use a modified (simplified) byte-offset algorithm.
"""

__author__    = "Jerome Kieffer"
__contact__   = "sole@esrf.fr"
__copyright__ = "European Synchrotron Radiation Facility, Grenoble, France"
__license__   = "MIT"

import sys
import os
import numpy as np
import logging
if sys.version < '3':
    _fileClass = file
else:
    import io
    _fileClass = io.IOBase

DEBUG = False

DATA_TYPES = { "signed 8-bit integer"   : np.int8,
               "signed 16-bit integer"  : np.int16,
               "signed 32-bit integer"  : np.int32
                }

MINIMUM_KEYS = ["X-Binary-Size-Fastest-Dimension",
                'ByteOrder',
                'Data type',
                'X dimension',
                'Y dimension',
                'Number of readouts']

DEFAULT_VALUES = {
                  "Data type": "signed 32-bit integer",
                  "X-Binary-Size-Fastest-Dimension": 2463,
                  "X-Binary-Element-Byte-Order": "LITTLE_ENDIAN"
                  }


[docs]class PilatusCBF(object): def __init__(self, filename): if isinstance(filename, _fileClass): fd = filename else: #the b is needed for windows fd = open(filename, 'rb') self.cif = CIF() self.__data = np.array([]) self.__info = {} #read the file if isinstance(filename, _fileClass): self.read(filename.name) else: self.read(filename)
[docs] def getData(self, *var, **kw): return self.__data
[docs] def getInfo(self, *var, **kw): return self.__info
def _readheader(self, inStream): """ Read in a header in some CBF format from a string representing the binary stuff @param inStream: the binary image (without any CIF decorators) @type inStream: python string. """ if sys.platform < '3.0' or\ isinstance(inStream, str): sep = "\r\n" iSepPos = inStream.find(sep) if iSepPos < 0 or iSepPos > 80: sep = "\n" #switch back to unix representation lines = inStream.split(sep) for oneLine in lines[1:]: if len(oneLine) < 10: break try: key, val = oneLine.split(':' , 1) except ValueError: key, val = oneLine.split('=' , 1) key = key.strip() self.__header[key] = val.strip(" \"\n\r\t") else: sep = "\r\n".encode('utf-8') iSepPos = inStream.find(sep) if iSepPos < 0 or iSepPos > 80: sep = "\n".encode('utf-8') #switch back to unix representation lines = inStream.split(sep) for oneLine in lines[1:]: if len(oneLine) < 10: break try: key, val = oneLine.split(':'.encode('utf-8') , 1) except ValueError: key, val = oneLine.split('='.encode('utf-8') , 1) key = key.strip() self.__header[key.decode('utf-8')] = val.decode('utf-8').\ strip(" \"\n\r\t") missing = [] for item in MINIMUM_KEYS: if item not in self.__header.keys(): missing.append(item) if len(missing) > 0: if DEBUG: print("CBF file misses the keys " + " ".join(missing)) def _readbinary_byte_offset(self, inStream): """ Read in a binary part of an x-CBF_BYTE_OFFSET compressed image @param inStream: the binary image (without any CIF decorators) @type inStream: python string. @return: a linear numpy array without shape and dtype set @rtype: numpy array """ def analyse(stream): """ Analyze a stream of char with any length of exception (2,4, or 8 bytes integers) @return list of NParrays """ listnpa = [] if sys.version < '3.0' or\ isinstance(stream, str): key16 = "\x80" key32 = "\x00\x80" key64 = "\x00\x00\x00\x80" else: # I avoid the b"..." syntax to try to keep python 2.5 compatibility # encoding with utf-8 does not work key16 = "\x80".encode('latin-1') key32 = "\x00\x80".encode('latin-1') key64 = "\x00\x040\x00\x80".encode('latin-1') # idx = 0 shift = 1 # position = 0 while True: # lns = len(stream) idx = stream.find(key16) if idx == -1: listnpa.append(np.fromstring(stream, dtype="int8")) break listnpa.append(np.fromstring(stream[:idx], dtype="int8")) # position += listnpa[-1].size if stream[idx + 1:idx + 3] == key32: if stream[idx + 3:idx + 7] == key64: listnpa.append(np.fromstring(stream[idx + 7:idx + 15], dtype="int64")) # position += 1 # print "loop64 x=%4i y=%4i in idx %4i lns %4i value=%s" % ((position % 2463), (position // 2463), idx, lns, listnpa[-1]) shift = 15 else: #32 bit int listnpa.append(np.fromstring(stream[idx + 3:idx + 7], dtype="int32")) # position += 1 # print "loop32 x=%4i y=%4i in idx %4i lns %4i value=%s" % ((position % 2463), (position // 2463), idx, lns, listnpa[-1]) shift = 7 else: #int16 listnpa.append(np.fromstring(stream[idx + 1:idx + 3], dtype="int16")) # position += 1 # print "loop16 x=%4i y=%4i in idx %4i lns %4i value=%s" % ((position % 2463), (position // 2463), idx, lns, listnpa[-1]) shift = 3 stream = stream[idx + shift:] return listnpa if sys.version < '3.0' or\ isinstance(inStream, str): starter = "\x0c\x1a\x04\xd5" else: # I avoid the b"..." syntax to try to keep python 2.5 compatibility # encoding with utf-8 does not work starter = "\x0c\x1a\x04\xd5".encode('latin-1') startPos = inStream.find(starter) + 4 data = inStream[ startPos: startPos + int(self.__header["X-Binary-Size"])] myData = np.hstack(analyse(data)).cumsum() assert len(myData) == self.dim1 * self.dim2 return myData
[docs] def read(self, fname): self.__header = {} self.cif.loadCIF(fname, _bKeepComment=True) # backport contents of the CIF data to the headers for key in self.cif: if key != "_array_data.data": if isinstance(self.cif[key], str): self.__header[key] = self.cif[key].strip(" \"\n\r\t") else: self.__header[key] = self.cif[key].decode('utf-8').\ strip(" \"\n\r\t") if not "_array_data.data" in self.cif: raise IOError("CBF file %s is corrupt, cannot find data block with '_array_data.data' key" % fname) self._readheader(self.cif["_array_data.data"]) # Compute image size try: self.dim1 = int(self.__header['X-Binary-Size-Fastest-Dimension']) self.dim2 = int(self.__header['X-Binary-Size-Second-Dimension']) except: raise Exception(IOError, "CBF file %s is corrupt, no dimensions in it" % fname) try: bytecode = DATA_TYPES[self.__header['X-Binary-Element-Type']] self.bpp = len(np.array(0, bytecode).tostring()) except KeyError: bytecode = np.int32 self.bpp = 32 logging.warning("Defaulting type to int32") if self.__header["conversions"] == "x-CBF_BYTE_OFFSET": self.__data = self._readbinary_byte_offset(self.cif["_array_data.data"]).astype(bytecode).reshape((self.dim2, self.dim1)) else: raise Exception(IOError, "Compression scheme not yet supported, please contact FABIO development team") self.__info = self.__header
[docs]class CIF(dict): """ This is the CIF class, it represents the CIF dictionnary as a a python dictionnary thus inherits from the dict built in class. """ if sys.version < '3.0': EOL = ["\r", "\n", "\r\n", "\n\r"] BLANK = [" ", "\t"] + EOL START_COMMENT = ["\"", "\'"] BINARY_MARKER = "--CIF-BINARY-FORMAT-SECTION--" else: EOL = ["\r", "\n", "\r\n", "\n\r", "\r".encode('utf-8'), "\n".encode('utf-8'), "\r\n".encode('utf-8'), "\n\r".encode('utf-8')] BLANK = [" ", "\t", " ".encode('utf-8'), "\t".encode('utf-8')] + EOL START_COMMENT = ["\"", "\'", "\"".encode('utf-8'), "\'".encode('utf-8')] bBINARY_MARKER = "--CIF-BINARY-FORMAT-SECTION--".encode('utf-8') def __init__(self, _strFilename=None): """ Constructor of the class. @param _strFilename: the name of the file to open @type _strFilename: string """ dict.__init__(self) if _strFilename is not None: #load the file) self.loadCIF(_strFilename)
[docs] def readCIF(self, _strFilename): """ Just call loadCIF: Load the CIF file and sets the CIF dictionnary into the object @param _strFilename: the name of the file to open @type _strFilename: string """ self.loadCIF(_strFilename)
[docs] def loadCIF(self, _strFilename, _bKeepComment=False): """Load the CIF file and returns the CIF dictionnary into the object @param _strFilename: the name of the file to open @type _strFilename: string @param _strFilename: the name of the file to open @type _strFilename: string @return the """ if not os.path.isfile(_strFilename): print("I cannot find the file %s" % _strFilename) raise IOError("I cannot find the file %s" % _strFilename) if _bKeepComment: self._parseCIF(open(_strFilename, "rb").read()) else: self._parseCIF(CIF._readCIF(_strFilename))
@staticmethod
[docs] def isAscii(_strIn): """ Check if all characters in a string are ascii, @param _strIn: input string @type _strIn: python string @return: boolean @rtype: boolean """ bIsAcii = True for i in _strIn: if ord(i) > 127: bIsAcii = False break return bIsAcii
@staticmethod def _readCIF(_strFilename): """ -Check if the filename containing the CIF data exists -read the cif file -removes the comments @param _strFilename: the name of the CIF file @type _strFilename: string @return: a string containing the raw data @rtype: string """ if not os.path.isfile(_strFilename): print("I cannot find the file %s" % _strFilename) raise IOError("I cannot find the file %s" % _strFilename) lLinesRead = open(_strFilename, "r").readlines() sText = "" for sLine in lLinesRead: iPos = sLine.find("#") if iPos >= 0: if CIF.isAscii(sLine): sText += sLine[:iPos] + "\n" if iPos > 80 : print("Warning, this line is too long and could cause problems in PreQuest\n", sLine) else : sText += sLine if len(sLine.strip()) > 80 : print("Warning, this line is too long and could cause problems in PreQuest\n", sLine) return sText def _parseCIF(self, sText): """ -Parses the text of a CIF file -Cut it in fields -Find all the loops and process -Find all the keys and values @param sText: the content of the CIF-file @type sText: string @return: Nothing, the data are incorporated at the CIF object dictionnary @rtype: dictionnary """ loopidx = [] looplen = [] loop = [] #first of all : separate the cif file in fields lFields = CIF._splitCIF(sText.strip()) #Then : look for loops for i in range(len(lFields)): if lFields[i].lower() in ["loop_", "loop_".encode('utf-8')]: loopidx.append(i) if len(loopidx) > 0: for i in loopidx: loopone, length, keys = CIF._analyseOneLoop(lFields, i) loop.append([keys, loopone]) looplen.append(length) for i in range(len(loopidx) - 1, -1, -1): f1 = lFields[:loopidx[i]] + lFields[loopidx[i] + looplen[i]:] lFields = f1 self["loop_"] = loop for i in range(len(lFields) - 1): if len(lFields[i + 1]) == 0 : lFields[i + 1] = "?" if lFields[i][0:1] in ["_", "_".encode('utf-8')] and \ lFields[i + 1][0:1] not in ["_", "_".encode('utf-8')]: if sys.version < '3.0' or\ isinstance(lFields[i], str): self[lFields[i]] = lFields[i + 1] else: self[lFields[i].decode('utf-8')] = lFields[i + 1] @staticmethod def _splitCIF(sText): """ Separate the text in fields as defined in the CIF @param sText: the content of the CIF-file @type sText: string @return: list of all the fields of the CIF @rtype: list """ lFields = [] while True: if len(sText) == 0: break elif sText[0:1] in ["'", "'".encode('utf-8')]: toTest = sText[0:1] idx = 0 bFinished = False while not bFinished: idx += 1 + sText[idx + 1:].find(sText[0]) ##########debuging in case we arrive at the end of the text if idx >= len(sText) - 1: lFields.append(sText[1:-1].strip()) sText = "" bFinished = True break if sText[idx + 1:idx + 2] in CIF.BLANK: lFields.append(sText[1:idx].strip()) sText1 = sText[idx + 1:] sText = sText1.strip() bFinished = True elif sText[0:1] in ['"', '"'.encode('utf-8')]: toTest = sText[0:1] idx = 0 bFinished = False while not bFinished: idx += 1 + sText[idx + 1:].find(toTest) ##########debuging in case we arrive at the end of the text if idx >= len(sText) - 1: # print sText,idx,len(sText) lFields.append(sText[1:-1].strip()) # print lFields[-1] sText = "" bFinished = True break if sText[idx + 1:idx + 2] in CIF.BLANK: lFields.append(sText[1:idx].strip()) #print lFields[-1] sText1 = sText[idx + 1:] sText = sText1.strip() bFinished = True elif sText[0:1] in [';', ';'.encode('utf-8')]: toTest = sText[0:1] if isinstance(sText, str): CIF_BINARY_MARKER = CIF.BINARY_MARKER else: CIF_BINARY_MARKER = CIF.bBINARY_MARKER if sText[1:].strip().find(CIF_BINARY_MARKER) == 0: idx = sText[32:].find(CIF_BINARY_MARKER) if idx == -1: idx = 0 else: idx += 32 + len(CIF_BINARY_MARKER) else: idx = 0 bFinished = False while not bFinished: idx += 1 + sText[idx + 1:].find(toTest) if sText[idx - 1:idx] in CIF.EOL: lFields.append(sText[1:idx - 1].strip()) sText1 = sText[idx + 1:] sText = sText1.strip() bFinished = True else: f = sText.split(None, 1)[0] lFields.append(f) #print lFields[-1] sText1 = sText[len(f):].strip() sText = sText1 return lFields @staticmethod def _analyseOneLoop(lFields, iStart): """Processes one loop in the data extraction of the CIF file @param lFields: list of all the words contained in the cif file @type lFields: list @param iStart: the starting index corresponding to the "loop_" key @type iStart: integer @return: the list of loop dictionnaries, the length of the data extracted from the lFields and the list of all the keys of the loop. @rtype: tupple """ # in earch loop we first search the length of the loop # print lFields # curloop = {} loop = [] keys = [] i = iStart + 1 bFinished = False while not bFinished: if lFields[i][0] == "_": keys.append(lFields[i])#.lower()) i += 1 else: bFinished = True data = [] while True: if i >= len(lFields): break elif len(lFields[i]) == 0: break elif lFields[i][0] == "_": break elif lFields[i] in ["loop_", "stop_", "global_", "data_", "save_"]: break else: data.append(lFields[i]) i += 1 #print len(keys), len(data) k = 0 if len(data) < len(keys): element = {} for j in keys: if k < len(data): element[j] = data[k] else : element[j] = "?" k += 1 #print element loop.append(element) else: #print data #print keys for i in range(len(data) / len(keys)): element = {} for j in keys: element[j] = data[k] k += 1 # print element loop.append(element) # print loop return loop, 1 + len(keys) + len(data), keys ############################################################################################# ######## everything needed to write a cif file ######################################### #############################################################################################
[docs] def saveCIF(self, _strFilename="test.cif"): """Transforms the CIF object in string then write it into the given file @param _strFilename: the of the file to be written @type param: string """ #TODO We should definitly handle exception here try: fFile = open(_strFilename, "w") except IOError: print("Error during the opening of file for write : %s" % _strFilename) return fFile.write(self._cif2str(_strFilename)) try: fFile.close() except IOError: print("Error during the closing of file for write : %s" % _strFilename) raise
def _cif2str(self, _strFilename): """converts a cif dictionnary to a string according to the CIF syntax @param _strFilename: the name of the filename to be apppended in the header of the CIF file @type _strFilename: string @return : a sting that corresponds to the content of the CIF-file. @rtype: string """ sCifText = "" for i in __version__: sCifText += "# " + i + "\n" if self.exists("_chemical_name_common"): t = self["_chemical_name_common"].split()[0] else: t = os.path.splitext(os.path.split(_strFilename.strip())[1])[0] sCifText += "data_%s\n" % t #first of all get all the keys : lKeys = self.keys() lKeys.sort() for sKey in lKeys: if sKey == "loop_": continue sValue = str(self[sKey]) if sValue.find("\n") > -1: #should add value between ;; sLine = "%s \n;\n %s \n;\n" % (sKey, sValue) elif len(sValue.split()) > 1: #should add value between '' sLine = "%s '%s' \n" % (sKey, sValue) if len(sLine) > 80: sLine = "%s\n '%s' \n" % (sKey, sValue) else: sLine = "%s %s \n" % (sKey, sValue) if len(sLine) > 80: sLine = "%s\n %s \n" % (sKey, sValue) sCifText += sLine if "loop_" in self: for loop in self["loop_"]: sCifText += "loop_ \n" lKeys = loop[0] llData = loop[1] for sKey in lKeys: sCifText += " %s \n" % sKey for lData in llData: sLine = "" for key in lKeys: sRawValue = lData[key] if sRawValue.find("\n") > -1: #should add value between ;; sLine += "\n; %s \n;\n" % (sRawValue) sCifText += sLine sLine = "" else: if len(sRawValue.split()) > 1: #should add value between '' value = "'%s'" % (sRawValue) else: value = sRawValue if len(sLine) + len(value) > 78: sCifText += sLine + " \n" sLine = " " + value else: sLine += " " + value sCifText += sLine + " \n" sCifText += "\n" #print sCifText return sCifText
[docs] def exists(self, sKey): """ Check if the key exists in the CIF and is non empty. @param sKey: CIF key @type sKey: string @param cif: CIF dictionnary @return: True if the key exists in the CIF dictionnary and is non empty @rtype: boolean """ bExists = False if sKey in self: if len(self[sKey]) >= 1: if self[sKey][0] not in ["?", "."]: bExists = True return bExists
[docs] def existsInLoop(self, sKey): """ Check if the key exists in the CIF dictionnary. @param sKey: CIF key @type sKey: string @param cif: CIF dictionnary @return: True if the key exists in the CIF dictionnary and is non empty @rtype: boolean """ if not self.exists("loop_"): return False bExists = False if not bExists: for i in self["loop_"]: for j in i[0]: if j == sKey: bExists = True return bExists
[docs] def loadCHIPLOT(self, _strFilename): """Load the powder diffraction CHIPLOT file and returns the pd_CIF dictionnary in the object @param _strFilename: the name of the file to open @type _strFilename: string @return: the CIF object corresponding to the powder diffraction @rtype: dictionnary """ if not os.path.isfile(_strFilename): print("I cannot find the file %s" % _strFilename) raise IOError("I cannot find the file %s" % _strFilename) lInFile = open(_strFilename, "r").readlines() self["_audit_creation_method"] = 'From 2-D detector using FIT2D and CIFfile' self["_pd_meas_scan_method"] = "fixed" self["_pd_spec_description"] = lInFile[0].strip() try: iLenData = int(lInFile[3]) except ValueError: iLenData = None lOneLoop = [] try: f2ThetaMin = float(lInFile[4].split()[0]) last = "" for sLine in lInFile[-20:]: if sLine.strip() != "": last = sLine.strip() f2ThetaMax = float(last.split()[0]) limitsOK = True except (ValueError, IndexError): limitsOK = False f2ThetaMin = 180.0 f2ThetaMax = 0 # print "limitsOK:", limitsOK for sLine in lInFile[4:]: sCleaned = sLine.split("#")[0].strip() data = sCleaned.split() if len(data) == 2 : if not limitsOK: f2Theta = float(data[0]) if f2Theta < f2ThetaMin : f2ThetaMin = f2Theta if f2Theta > f2ThetaMax : f2ThetaMax = f2Theta lOneLoop.append({ "_pd_meas_intensity_total": data[1] }) if not iLenData: iLenData = len(lOneLoop) assert (iLenData == len(lOneLoop)) self[ "_pd_meas_2theta_range_inc" ] = "%.4f" % ((f2ThetaMax - f2ThetaMin) / (iLenData - 1)) if self[ "_pd_meas_2theta_range_inc" ] < 0: self[ "_pd_meas_2theta_range_inc" ] = abs (self[ "_pd_meas_2theta_range_inc" ]) tmp = f2ThetaMax f2ThetaMax = f2ThetaMin f2ThetaMin = tmp self[ "_pd_meas_2theta_range_max" ] = "%.4f" % f2ThetaMax self[ "_pd_meas_2theta_range_min" ] = "%.4f" % f2ThetaMin self[ "_pd_meas_number_of_points" ] = str(iLenData) self["loop_"] = [ [ ["_pd_meas_intensity_total" ], lOneLoop ] ]
@staticmethod
[docs] def LoopHasKey(loop, key): "Returns True if the key (string) existe in the array called loop""" try: loop.index(key) return True except ValueError: return False
if __name__ == "__main__": import os import sys from PyMca5 import EdfFile #fd = open('Cu_ZnO_20289.mccd', 'rb') filename = sys.argv[1] cbf = PilatusCBF(filename) print(cbf.getInfo()) edfFile = filename+".edf" if os.path.exists(edfFile): os.remove(edfFile) edf = EdfFile.EdfFile(edfFile) edf.WriteImage(cbf.getInfo(), cbf.getData()) edf = None