import sys, os
from typing import DefaultDict
sys.path.append('.')
sys.path.append('..')
sys.path.append('../..')
import scipy
import matplotlib.pyplot as plt
import numpy as np
import h5py as hdf
import warnings
from MJOLNIR import _tools
import MJOLNIR
import datetime
import math
#import shapely
# from shapely.geometry import Polygon as PolygonS, Point as PointS
from MJOLNIR import TasUBlibDEG as TasUBlib
from MJOLNIR._tools import PointerArray
from MJOLNIR.Data import Mask
import MJOLNIR.Data.Sample
import re
import copy
import platform
from collections import defaultdict
multiFLEXXDetectors = 31*5
reFloat = r'-?\d*\.\d*'
reInt = r'-?\d'
factorsqrtEK = 0.694692
supportedRawFormats = ['hdf','','dat']
supportedInstruments = ['CAMEA','MultiFLEXX','FlatCone','Bambus']
supportedConvertedFormats = ['nxs']
def cosd(x):
return np.cos(np.deg2rad(x))
def sind(x):
return np.sin(np.deg2rad(x))
## Dictionary for holding hdf position of attributes. HDFTranslation['a3'] gives hdf position of 'a3'
HDFTranslation = {'sample':'/entry/sample',
'sampleName':'/entry/sample/name',
'unitCell':'/entry/sample/unit_cell',
'intensity':'entry/data/intensity',
'qx':'entry/data/qx',
'qy':'entry/data/qy',
'QH':'entry/data/h',
'QK':'entry/data/k',
'QL':'entry/data/l',
'energy':'entry/data/en',
'normalization':'entry/data/normalization',
'mode':'entry/control/mode',
'preset':'entry/control/preset',
'startTime':'entry/start_time',
'hdfMonitor':'entry/control/data',
'monitor':'entry/monitor_2/data',
'time':'entry/control/time',
'endTime':'entry/end_time',
'experimentalIdentifier':'entry/experiment_identifier',
'comment':'entry/comment',
'proposal':'entry/proposal_id',
'proposalTitle':'entry/proposal_title',
'localContact':'entry/local_contact/name',
'proposalUser':'entry/proposal_user/name',
'proposalEmail':'entry/proposal_user/email',
'user':'entry/user/name',
'email':'entry/user/email',
'address':'entry/user/address',
'affiliation':'entry/user/affiliation',
'A3':'entry/sample/rotation_angle',
'temperature':'entry/sample/temperature',
'magneticField':'entry/sample/magnetic_field',
'electricField':'entry/sample/electric_field',
'scanCommand':'entry/scancommand',
'title':'entry/title',
'absoluteTime':'entry/control/absolute_time',
'protonBeam':'entry/proton_beam/data',
'singleDetector1':'entry/CAMEA/segment_1/data',
'singleDetector8':'entry/CAMEA/segment_8/data'
}
HDFTranslationNICOSAlternative = {
'temperature':['entry/sample/Ts','entry/sample/Ts/value','entry/sample/se_ts','entry/sample/T','entry/sample/temperature'],
'temperature_log':['entry/sample/temperature_log/value','entry/sample/T_log/value'],
'temperature_time_log':['entry/sample/temperature_log/time','entry/sample/temperature_log/time','entry/sample/T_log/time'],
'magneticField':['entry/sample/B','entry/sample/B/value','entry/sample/magnetic_field'],
'ei':'entry/CAMEA/monochromator/energy',
'hdfMonitor':['entry/monitor_2/data','entry/control/data']
}
## Default dictionary to perform on loaded data, i.e. take the zeroth element, swap axes, etc
HDFTranslationFunctions = defaultdict(lambda : [])
HDFTranslationFunctions['mode'] = [['__getitem__',[0]],['decode',['utf8']]]
HDFTranslationFunctions['sampleName'] = [['__getitem__',[0]],['decode',['utf8']]]
HDFTranslationFunctions['startTime'] = [['__getitem__',[0]]]
HDFTranslationFunctions['endTime'] = [['__getitem__',[0]]]
HDFTranslationFunctions['experimentalIdentifier'] = [['__getitem__',[0]]]
HDFTranslationFunctions['comment'] = [['__getitem__',[0]],['decode',['utf8']]]
HDFTranslationFunctions['proposal'] = [['__getitem__',[0]]]
HDFTranslationFunctions['proposalTitle'] = [['__getitem__',[0]],['decode',['utf8']]]
HDFTranslationFunctions['localContact'] = [['__getitem__',[0]],['decode',['utf8']]]
HDFTranslationFunctions['proposalUser'] = [['__getitem__',[0]],['decode',['utf8']]]
HDFTranslationFunctions['proposalEmail'] = [['__getitem__',[0]],['decode',['utf8']]]
HDFTranslationFunctions['user'] = [['__getitem__',[0]],['decode',['utf8']]]
HDFTranslationFunctions['email'] = [['__getitem__',[0]],['decode',['utf8']]]
HDFTranslationFunctions['address'] = [['__getitem__',[0]],['decode',['utf8']]]
HDFTranslationFunctions['affiliation'] = [['__getitem__',[0]],['decode',['utf8']]]
HDFTranslationFunctions['scanCommand'] = [['__getitem__',[0]],['decode',['utf8']]]
HDFTranslationFunctions['title'] = [['__getitem__',[0]],['decode',['utf8']]]
HDFInstrumentTranslation = {
'A4':'analyzer/polar_angle',
'ei':'monochromator/energy',
'A4Offset':'analyzer/polar_angle_offset',
'counts':'detector/counts'
}
HDFInstrumentTranslationNICOSAlternative = {
'counts':'detector/data',
'A4':'analyzer/polar_angle'#_raw'
}
HDFInstrumentTranslationFunctions = DefaultDict(lambda : [])
HDFInstrumentTranslationFunctions['counts'] = [['swapaxes',[1,2]]]
HDFInstrumentTranslationFunctions['A4'] = [['reshape',[-1]]]
extraAttributes = ['name','fileLocation','twoTheta']
possibleAttributes = list(HDFTranslation.keys())+list(HDFInstrumentTranslation.keys())+extraAttributes
possibleAttributes.sort(key=lambda v: v.lower())
def getHDFEntry(f,prop,fromNICOS=False):
if fromNICOS:
if prop in HDFTranslationNICOSAlternative:
proplist = HDFTranslationNICOSAlternative[prop]
if isinstance(proplist,list):
for subprop in proplist:
value = f.get(subprop)
if not value is None:
break
else:
value = f.get(HDFTranslationNICOSAlternative[prop])
return value
return f.get(HDFTranslation[prop])
def getHDFInstrumentEntry(instr,prop,fromNICOS=False):
if fromNICOS:
if prop in HDFInstrumentTranslationNICOSAlternative:
value = instr.get(HDFInstrumentTranslationNICOSAlternative[prop])
return value
return instr.get(HDFInstrumentTranslation[prop])
analyzerLimits = {'CAMEA':7,
'Bambus': 4,
'MultiFLEXX': 4,
'FlatCone':1}
detectorLimits = {'CAMEA':103,
'Bambus':19,
'MultiFLEXX':31,
'FlatCone':32}
[docs]class DataFile(object):
"""Object to load and keep track of HdF files and their conversions"""
def __init__(self,fileLocation=None,McStas=False):
# Check if file exists
if isinstance(fileLocation,DataFile): # Copy everything in provided file
self.updateProperty(fileLocation.__dict__)
elif isinstance(fileLocation,str) :
if not os.path.isfile(fileLocation):
raise AttributeError('File location does not exist({}).'.format(fileLocation))
if fileLocation.split('.')[-1]=='nxs':
self.type='nxs'
elif fileLocation.split('.')[-1]=='hdf':
self.type='hdf'
elif os.path.splitext(fileLocation)[1]=='': # No extension
self.type = 'MultiFLEXX'
elif fileLocation.split('.')[-1]=='dat':
self.type = 'Bambus'
else:
raise AttributeError('File is not of type nxs or hdf.')
self.name = os.path.basename(fileLocation)
self.fileLocation = os.path.abspath(fileLocation)
self._binning = 1
self._mask = False
self.absolutNormalized = False
if self.type in ['hdf','nxs']:
with hdf.File(fileLocation,mode='r') as f:
# Find out if file is a NICOS file
# NICOS has data saved in /entry/data/data while six has /entry/data/counts
if McStas:
self.fromNICOS = 0
else:
self.fromNICOS = checkNICOS(f)
instr = getInstrument(f)
if self.fromNICOS is None:
raise AttributeError('Data File {} has no data in {}/detector/counts. The file might be empty.'.format(self.name,instr.name))
self.sample = MJOLNIR.Data.Sample.Sample(sample=getHDFEntry(f,'sample'),recalculateUB=self.fromNICOS)
if self.type == 'hdf':
if np.shape(np.array(getHDFInstrumentEntry(instr,'counts',fromNICOS=self.fromNICOS))) == ():
raise AttributeError('Data File {} has no data in {}/detector/counts. The file might be empty.'.format(self.name,instr.name))
self.I = np.array(getHDFInstrumentEntry(instr,'counts',fromNICOS=self.fromNICOS)).swapaxes(1,2)
else:
self.I=np.array(getHDFEntry(f,'intensity'))
self.counts = np.array(getHDFInstrumentEntry(instr,'counts')).swapaxes(1,2)
self.qx=np.array(getHDFEntry(f,'qx'))
self.qy=np.array(getHDFEntry(f,'qy'))
self.h=np.array(getHDFEntry(f,'QH'))
self.k=np.array(getHDFEntry(f,'QK'))
self.l=np.array(getHDFEntry(f,'QL'))
self.energy=np.array(getHDFEntry(f,'energy'))
self.Norm=np.array(getHDFEntry(f,'normalization'))
self.MonitorMode = np.array(getHDFEntry(f,'mode',fromNICOS=self.fromNICOS))[0].decode()
self.MonitorPreset=np.array(getHDFEntry(f,'preset',fromNICOS=self.fromNICOS))
if len(self.MonitorPreset)>1:
self.MonitorPreset = self.MonitorPreset[0]
self.startTime = np.array(getHDFEntry(f,'startTime',fromNICOS=self.fromNICOS))[0].decode()
self.totalCounts = self.I.sum(axis=(1,2))
if self.type == 'hdf':
self.Monitor = np.array(getHDFEntry(f,'hdfMonitor',fromNICOS=self.fromNICOS))
if not self.MonitorMode == 't' and len(self.Monitor)>1: # If not counting on time and more than one point saved
if self.Monitor.flatten()[0]!=self.MonitorPreset and self.startTime[:4]=='2018': # For all data in 2018 with wrong monitor saved
self.Monitor = np.ones_like(self.Monitor)*self.MonitorPreset ### TODO: Make Mark save the correct monitor!!
if len(self.Monitor)==len(self.I)+1: # Monitor is exactly one longer than I when opening a running file under NICOS
self.Monitor = self.Monitor[:-1]
else:
self.Monitor=np.array(getHDFEntry(f,'monitor',fromNICOS=self.fromNICOS))
self.Time = np.array(getHDFEntry(f,'time',fromNICOS=self.fromNICOS))
self.endTime = np.array(getHDFEntry(f,'endTime',fromNICOS=self.fromNICOS))[0]
expIdx =getHDFEntry(f,'experimentalIdentifier',fromNICOS=self.fromNICOS)
if not expIdx is None:
self.experimentIdentifier = np.array(expIdx)[0]
else:
self.experimentIdentifier = 'UNKNOWN'
self.comment = np.array(getHDFEntry(f,'comment',fromNICOS=self.fromNICOS))[0]
self.proposalId = np.array(getHDFEntry(f,'proposal',fromNICOS=self.fromNICOS))[0]
self.proposalTitle = np.array(getHDFEntry(f,'proposalTitle',fromNICOS=self.fromNICOS))[0]
localContact = getHDFEntry(f,'localContact',fromNICOS=self.fromNICOS)
if not localContact is None:
self.localContactName = np.array(localContact)[0]
else:
self.localContactName = 'UNKNOWN'
proposalUserName = getHDFEntry(f,'proposalUser',fromNICOS=self.fromNICOS)
if not proposalUserName is None:
self.proposalUserName = np.array(proposalUserName)[0]
self.proposalUserEmail = np.array(getHDFEntry(f,'proposalEmail'))[0]
else:
self.proposalUserName = 'UNKNOWN'
self.proposalUserEmail = 'UNKNOWN'
self.userName = np.array(getHDFEntry(f,'user',fromNICOS=self.fromNICOS))[0]
self.userEmail = np.array(getHDFEntry(f,'email',fromNICOS=self.fromNICOS))[0]
userAddress = getHDFEntry(f,'address',fromNICOS=self.fromNICOS)
if not userAddress is None:
self.userAddress = np.array(userAddress)[0]
else:
self.userAddress = 'UNKNOWN'
userAffiliation = getHDFEntry(f,'affiliation',fromNICOS=self.fromNICOS)
if not userAffiliation is None:
self.userAffiliation = np.array(userAffiliation)[0]
else:
self.userAffiliation = 'UNKNOWN'
try:
self.singleDetector1 = np.array(getHDFEntry(f,'singleDetector1',fromNICOS=self.fromNICOS))[0]
except:
pass
try:
self.singleDetector8 = np.array(getHDFEntry(f,'singleDetector8',fromNICOS=self.fromNICOS))[0]
except:
pass
# Monochromator
attributes = ['type','d_spacing','horizontal_curvature','vertical_curvature',
'horizontal_curvature_zero','vertical_curvature_zero',
'gm','gm_zero','tlm','tlm_zero','tum','tum_zero','rotation_angle','rotation_angle_zero']
values = ['monochromator'+x for x in ['Type','DSpacing','HorizontalCurvature',
'VerticalCurvature','HorizontalCurvatureZero','VerticalCurvatureZero',
'GM','GMZero','TLM','TLMZero','TUM','TUMZero','RotationAngle','RotationAngleZero']]
for att,value in zip(attributes,values):
loadedValue = f.get('entry/CAMEA/monochromator/{}'.format(att))
if not loadedValue is None: # if it does exist
setattr(self,value,np.array(loadedValue))
# MonochromatorSlit
attributes = [x+zero for x in ['bottom','left','right','top'] for zero in ['','_zero']]+\
['x_gap','y_gap']
values = ['monochromatorSlit'+x+zero for x in ['Bottom','Left','Right','Top'] for zero in ['','Zero']]+\
['monochromatorSlitXGap','monochromatorSlitYGap']
for att,value in zip(attributes,values):
setattr(self,value,np.array(f.get('entry/CAMEA/monochromator_slit/{}'.format(att))))
# Analyzer
# analyzer_selection
attributes = ['d_spacing','nominal_energy','polar_angle','polar_angle_offset','polar_angle_raw']
values = ['analyzer'+x.replace('_',' ').title().replace(' ','') for x in attributes]
for att,value in zip(attributes,values):
setattr(self,value,np.array(f.get('entry/CAMEA/analyzer/{}'.format(att))))
self.analyzerType = np.array(f.get('entry/CAMEA/analyzer/type'))[0]
self.analyzerSelection = int(np.array(f.get('entry/CAMEA/analyzer/analyzer_selection'))[0])
self.detectorSelection = int(np.array(f.get('entry/CAMEA/detector/detector_selection'))[0])
instr = getInstrument(f)
self.instrument = instr.name.split('/')[-1]
self.possibleBinnings = np.array([int(x[-1]) for x in np.array(instr) if x[:5]=='calib'])
self.Ei = np.array(getHDFInstrumentEntry(instr,'ei',fromNICOS=self.fromNICOS))
self.A3 = np.array(getHDFEntry(f,'A3',fromNICOS=self.fromNICOS))
self.A4 = np.array(getHDFInstrumentEntry(instr,'A4',fromNICOS=self.fromNICOS)).reshape(-1)
self.A4Off = np.array(getHDFInstrumentEntry(instr,'A4Offset',fromNICOS=self.fromNICOS))
if self.fromNICOS:
self.twotheta = copy.deepcopy(self.A4) - self.A4Off
else:
self.twotheta = self.A4-self.A4Off
# As of 2022 and implementation of NICOS temperature and magnetic field is saved
# in another subfolder in the data file (e.g. entry/sample/B/value)
self.temperature = np.array(getHDFEntry(f,'temperature',fromNICOS=self.fromNICOS))
self.magneticField = np.array(getHDFEntry(f,'magneticField',fromNICOS=self.fromNICOS))
self.electricField = np.array(getHDFEntry(f,'electricField',fromNICOS=self.fromNICOS))
if self.fromNICOS and self.temperature is None: # Only use interpolated temperature if no temperature was found
self.temperature_log = np.array(getHDFEntry(f,'temperature_log',fromNICOS=self.fromNICOS))
self.temperature_time_log = np.array(getHDFEntry(f,'temperature_time_log',fromNICOS=self.fromNICOS))
if not self.temperature_log is None:
timeSteps = self.absoluteTime-self.absoluteTime[0]
try:
self.temperature_log = self.temperature_log[:len(self.temperature_time_log)]
self.temperature =[np.mean(self.temperature_log[np.logical_and(self.temperature_time_log>tStart,self.temperature_time_log<tStop)]) for tStart,tStop in zip(timeSteps,timeSteps[1:])]
## Above adds all but last temperature interval
self.temperature.append(np.mean(self.temperature_log[self.temperature_time_log>timeSteps[-1]]))
self.temperature = np.array(self.temperature)
except TypeError: # no length of temperature_time_log, i.e. the log is not present
self.temperature = np.asarray(len(self.absoluteTime)*[None])
self.scanCommand = np.array(getHDFEntry(f,'scanCommand',fromNICOS=self.fromNICOS))[0]
try:
self.scanParameters,self.scanValues,self.scanUnits,self.scanDataPosition = getScanParameter(self,f)
except KeyError:
warnings.warn("Couldn't load scan parameters")
self.scanParameters = []
self.scanValues= np.array([[None]])
self.scanUnits =''
self.scanDataPosition = ''
if len(self.scanParameters) == 1 and self.scanParameters[0] == 'rotation_angle' and len(self.A4)>1 and np.all(np.isclose(self.A4,self.A4[0],atol=0.01)):
# If all A4 values are the same, out 2t has been written on instrument computer
# and because of this, six saves 2t and not a4. Solution: set A4Offset to 0 and
# use only the first value for A4 when an A3 (=rotation_angle) scan is made and
self.A4 = np.array([self.A4[0]])
self.analyzerPolarAngle = self.A4
self.A4Off = np.array([0.0])
self.analyzerPolarAngleOffset = self.A4Off
else:
self.A4Off = np.array(getHDFInstrumentEntry(instr,'A4Offset',fromNICOS=self.fromNICOS))
try:
self.A3Off = self.sample.A3Off#np.array(f.get('entry/sample/rotation_angle_zero'))
except:
self.A3Off = [0.0]
if self.type == 'hdf':
if len(self.possibleBinnings) == 0:
self.possibleBinnings = [None]
self.binning = None
else:
self.binning=np.max(self.possibleBinnings).astype(int) # Choose standard binning max
else:
self.binning = int(np.array(f.get('entry/reduction/MJOLNIR_algorithm_convert/binning'))[0])
calibrations = []
for binning in self.possibleBinnings:
Ef = np.array(instr.get('calib{}/final_energy'.format(str(binning))))
width = np.array(instr.get('calib{}/width'.format(str(binning))))
bg = np.array(instr.get('calib{}/background'.format(str(binning))))
amp = np.array(instr.get('calib{}/amplitude'.format(str(binning))))
EfTable = np.array([amp,Ef,width,bg]).T
A4 = np.array(instr.get('calib{}/a4offset'.format(str(binning))))
bound = np.array(instr.get('calib{}/boundaries'.format(str(binning))))
calibrations.append([EfTable,A4,bound])
self.instrumentCalibrations = np.array(calibrations,dtype=object)
self.loadBinning(self.binning)
if self.type == 'nxs':
self.original_fileLocation = np.array(f.get('entry/reduction/MJOLNIR_algorithm_convert/rawdata'))[0].decode()
self.title = np.array(getHDFEntry(f,'title'))[0]
self.absoluteTime = np.array(getHDFEntry(f,'absoluteTime',fromNICOS=self.fromNICOS))
self.protonBeam = np.array(getHDFEntry(f,'protonBeam',fromNICOS=self.fromNICOS))
if self.type == 'hdf':
###################
#self.I[:,:,:150]=0#
###################
pass
self.mask = np.zeros_like(self.I,dtype=bool)
if self.binning == 8:
self.mask[:,:,:2] = True
elif self.type == 'MultiFLEXX': # type is multiFLEXX
self.loadMultiFLEXXData(fileLocation)
elif self.type == 'Bambus':
self.loadBambusData(fileLocation)
try:
self.scanSteps = self.scanValues.shape[1]
except:
pass
if True:
for attr in dir(self):
# If attribute is a function or property, skip it
if hasattr(getattr(self,attr),'__call__') or isinstance(getattr(self,attr),property):
continue
value = getattr(self,attr)
if hasattr(value,'shape'): # Does it have a shape method -> Numpy array
if value.shape == (): # It is 0 D but either bytes or simple list
if isinstance(value,np.bytes_):
setattr(self,attr,value.decode()) # decode into text
else:
value = np.array([value]) # recast into 1D-array
setattr(self,attr,value)
for key in ['magneticField','temperature','electricField']:
if hasattr(getattr(self,key),'dtype'):
if self.__dict__[key].dtype ==object: # Is np nan object
self.__dict__[key] = None
else:
setattr(self,key,None)
else: # Create an empty data set
self.instrument = 'CAMEA'
self.type = 'hdf'
self.I = np.array([])
self._binning = 1
self._mask = False
self.absolutNormalized = False
self.scanParameters = ''
self.scanUnits = ''
self.scanValues = np.array([])
self.Monitor = np.array([])
self._A3Off = 0.0
self._A3 = np.array([])
self._A4Off = 0.0
self._A4 = np.array([])
self.Ei = np.array([])
self.possibleBinnings = [1,3,5,8]
calibrations = []
for binning in self.possibleBinnings:
fileName = getattr(MJOLNIR,'__CAMEANormalizationBinning{}__'.format(binning))
calib = np.loadtxt(fileName,delimiter=',',skiprows=3)
calibrations.append([calib[:,3:7],calib[:,-1],calib[:,7:9]])
self.instrumentCalibrations = np.array(calibrations,dtype=object)
self.loadBinning(self.binning)
if self.instrument == 'CAMEA':
self.EPrDetector = 8
elif self.type in ['MultiFLEXX','FlatCone','Bambus']:
self.EPrDetector = 1
else:
pass
@property
def A3Off(self):
return self._A3Off
@A3Off.getter
def A3Off(self):
return self._A3Off
@A3Off.setter
def A3Off(self,A3Off):
if A3Off is None:
self._A3Off = np.array([0.0])
else:
self._A3Off = A3Off
@property
def A4Off(self):
return self._A4Off
@A4Off.getter
def A4Off(self):
return self._A4Off
@A4Off.setter
def A4Off(self,A4Off):
if A4Off is None:
self._A4Off = np.array([0.0])
else:
self._A4Off = A4Off
@property
def A3(self):
return self._A3
@A3.getter
def A3(self):
return self._A3
@A3.setter
def A3(self,A3):
if not hasattr(A3,'shape'):
self._A3 = np.array(A3)
else:
if A3.shape == ():
self._A3 = np.array([0.0])
else:
if A3[0] is None:
self._A3 = np.array([0.0])
else:
self._A3 = A3
@property
def A4(self):
return self._A4
@A4.getter
def A4(self):
return self._A4
@A4.setter
def A4(self,A4):
if not hasattr(A4,'shape'):
self._A4 = np.array(A4)
else:
if A4.shape == ():
self._A4 = [0.0]
else:
self._A4 = A4
@property
def dasel(self):
if hasattr(self,'detectorSelection') and hasattr(self,'analyzerSelection'):
return self.detectorSelection,self.analyzerSelection
else:
return None,None
@dasel.getter
def dasel(self):
if hasattr(self,'detectorSelection') and hasattr(self,'analyzerSelection'):
return self.detectorSelection,self.analyzerSelection
else:
return None,None
@property
def binning(self):
return self._binning
@binning.getter
def binning(self):
try:
len(self._binning)
except TypeError:
return self._binning
else:
return self._binning[0]
@binning.setter
def binning(self,value):
try:
len(value)
except TypeError:
pass
else:
value = value[0]
if hasattr(self,'possibleBinnings'):
if not value in self.possibleBinnings:
raise AttributeError('Wanted binning ({}) not allowed in {}. Possible binnings are: {}'.format(value,self.name,self.possibleBinnings))
self._binning = value
self.loadBinning(value)
else:
self._binning = value
@property
def mask(self):
return self._mask
@mask.getter
def mask(self):
return self._mask
@mask.setter
def mask(self,mask):
if hasattr(self,'I') and len(self.I.shape) > 1:
if not np.all(self.I.shape == mask.shape):
raise AttributeError('Shape of provided mask {} does not match shape of data {}.'.format(mask.shape,self.I.shape))
self._mask = mask
@property
def shape(self):
return self.I.shape
@shape.getter
def shape(self):
return self.I.shape
@shape.setter
def shape(self,shape):
raise AttributeError('Shape of a DataFile cannot be set.')
@property
def size(self):
return self.I.size
@size.getter
def size(self):
return self.I.size
@size.setter
def size(self,size):
raise AttributeError('size of a DataFile cannot be set.')
def updateProperty(self,dictionary):
if isinstance(dictionary,dict):
for key in dictionary.keys():
self.__setattr__(key,copy.deepcopy(dictionary[key]))
def __eq__(self,other):
return len(self.difference(other))==0
[docs] def difference(self,other,keys = set(['sample','instrument','Ei','I','_A3','_A4','_binning','scanParameters','Monitor'])):
"""Return the difference between two data files by keys"""
dif = []
if not set(self.__dict__.keys()) == set(other.__dict__.keys()): # Check if same generation and type (hdf or nxs)
return list(set(self.__dict__.keys())-set(other.__dict__.keys()))
comparisonKeys = keys
for key in comparisonKeys:
skey = self.__dict__[key]
okey = other.__dict__[key]
if isinstance(skey,np.ndarray):
try:
if not np.all(np.isclose(skey,okey)):
if not np.all(np.isnan(skey),np.isnan(okey)):
dif.append(key)
except (TypeError, AttributeError,ValueError):
if np.all(skey!=okey):
dif.append(key)
elif not np.all(self.__dict__[key]==other.__dict__[key]):
dif.append(key)
return dif
def __str__(self):
returnStr = 'Data file {} from the MJOLNIR software package of type {}\n'.format(self.name,self.type)
returnStr+= 'Ei: '+ str(self.Ei) + '\nA3: ' + ','.join([str(x) for x in self.A3])
returnStr+= '\nA4: ' + ','.join([str(x) for x in self.A4]) + '\nSample: '+str(self.sample)
return returnStr
def __add__(self,other):
raise NotImplementedError('Adding two data files is not yet supported.')
def __hasattr__(self,s):
return s in self.__dict__.keys()
[docs] @_tools.KwargChecker()
def loadMultiFLEXXData(self,fileLocation,calibrationFile=None):
""""Dedicated loader for MultiFLEXX data.
Args:
- fileLocation (str): Location of file
Kwargs:
- calibrationFile (str): Location of calibration file (default None: uses shipped calibration)
"""
#if calibrationFile is None: # Use shipped calibration
# this_dir, _ = os.path.split(__file__)
# calibrationFile = os.path.realpath(os.path.join(this_dir,"..", "CalibrationMultiFLEXX.csv"))
self.fileLocation = fileLocation
self.possibleBinnings = [1] # Standard value (1 energy/detector)
self.binning = 1
with open(fileLocation) as f:
dataString = f.readlines()
dataString = ''.join(dataString)
## Load header data:
headerStart = dataString.find('VVVVVVVVVVVVVVVVVVVV')
headerEnd = dataString.find('DATA_:')
headerString = dataString[headerStart:headerEnd].split('\n')[1:-1]
dataPart = dataString[headerEnd:].split('\n')[1:-1]
# Markers of multiple lines of parameters
multiLines = ['param','varia','zeros','posqe']
stringParameters = ['file_']
parameters = {}
for line in headerString:
splitLine = line.split(': ')
if isinstance(splitLine,list):
try:
description,value = splitLine
except ValueError:
description = splitLine[0].replace(':','').strip()
value = 'NoValue'
else:
continue
description = description.lower()
if not description in multiLines:
## Single value
if not description in stringParameters:
try:
value = float(value)
except ValueError:
pass
description=description.replace('_','')
parameters[description] = value
#print(description,value)
elif description in ['varia','param','posqe']:
keyValuePattern = re.compile(r'\w*\s*=\s*'+reFloat)
KVPairs = keyValuePattern.findall(value)
keyValuePatternINT = re.compile(r'\w*\s*=\s*'+reInt)
KVPairsINT = keyValuePatternINT.findall(value)
for pair in KVPairs+KVPairsINT:
key,val = pair.split('=')
key = key.strip()
try:
val = np.array(val.strip(),dtype=float)
except ValueError:
continue
if not hasattr(self,key):
setattr(self,key,val)
elif description == 'zeros':
keyValuePattern = re.compile(r'\w+\s+=\s*'+reFloat)
KVPairs = keyValuePattern.findall(value)
for pair in KVPairs:
key,val = pair.split('=')
key = key.strip()
try:
val = np.array(val.strip(),dtype=float)
except ValueError:
continue
key+='Off'
setattr(self,key,val)
self.__dict__.update(parameters)
## Format scanCommand
scanParameters = [x.split('=')[0].strip() for x in self.steps.split(',')]
self.scanParameters = [p.capitalize() for p in scanParameters]
self.scanUnits = []
for param in self.scanParameters:
if param.lower() in ['a3','a4','sgu','sgl']:
unit= 'degree'
elif param.lower() == 'ei':
unit = 'meV'
else:
unit = 'unknown'
self.scanUnits.append(unit)
scanSteps = [float(x.split('=')[1]) for x in self.steps.split(',')]
scanCommandNPMN = re.compile(r'\w+\s+\d+')
NPMN = scanCommandNPMN.findall(self.comnd)[-2:]
for paramVal in NPMN:
param,val = paramVal.split()
if param.lower() == 'np':
self.steps = int(val)
elif param.lower() == 'mn':
self.MonitorPreset = int(val)
self.MonitorMode = 'm'
elif param.lower() == 'ti':
self.MonitorPreset = int(val)
self.MonitorMode = 't'
def extractSample(obj,sample=None):
if sample is None:
sample = dict()
nameConversion = [['AA','alpha'],
['BB','beta'],
['CC','gamma'],
['AS','a'],
['BS','b'],
['CS','c'],
]
for oldName,newName in nameConversion:
sample[newName] = getattr(obj,oldName)
delattr(obj,oldName)
Ei = 10.0 # TODO: What is the good solution here? Dummy incoming energy needed to calcualte UB
k = np.sqrt(Ei)*factorsqrtEK
q1 = [getattr(obj,x) for x in ['AX','AY','AZ']]
q2 = [getattr(obj,x) for x in ['BX','BY','BZ']]
cell = TasUBlib.calcCell([sample[x] for x in ['a','b','c','alpha','beta','gamma']])
B = TasUBlib.calculateBMatrix(cell)
A41 = TasUBlib.calTwoTheta(B,[*q1,Ei,Ei],-1)
A31 = TasUBlib.calcTheta(k,k,A41)
A42 = TasUBlib.calTwoTheta(B,[*q2,Ei,Ei],-1)
A32 = TasUBlib.calcTheta(k,k,A42)
planeVector1 = q1
planeVector1.append(A31) # A3
planeVector1.append(A41) # A4
[planeVector1.append(0.0) for _ in range(2)]# append values for gonios set to zero
planeVector1.append(Ei)
planeVector1.append(Ei)
planeVector2 = q2
planeVector2.append(A32) # A3
planeVector2.append(A42) # A4
[planeVector2.append(0.0) for _ in range(2)]# append values for gonios set to zero
planeVector2.append(Ei)
planeVector2.append(Ei)
# add correct angle in theta between the two reflections
between = TasUBlib.tasAngleBetweenReflections(B,np.array(planeVector1),np.array(planeVector2))
planeVector2[3]+=between
sample['projectionVector1']=np.array(planeVector1)
sample['projectionVector2']=np.array(planeVector2)
return sample
def updateKeyName(obj,key,newName):
if not hasattr(obj,key):
return
setattr(obj,newName,getattr(obj,key))
delattr(obj,key)
## Find correct Ei
KIPattern = re.compile(r'PARAM:\s*(FX=\s*\d*.?\d*),\s*(KFIX=\s*'+reFloat+')')
EI = np.power(self.KFIX/0.694692,2)
if np.isclose(self.FX,1.0): # If FX == 1 subtract, otherwise add
self.EI = EI-self.EN
else:
self.EI = EI+self.EN
sampleDict = extractSample(self)
## Find labels for data
labels = dataPart[0].split()
stepData = np.zeros((self.steps,len(labels)))
singleDetectorData = np.zeros(self.steps,dtype = int)
## Extract the intensity data
if dataPart[3].strip() == 'endflat': # Data is mix of steps and data in each line
self.pntData = np.array([np.array(x.split(),dtype=float) for x in dataPart[1::3] if x[:8]!='Finished'])
self.multiData = np.array([np.array(x.split()[1:],dtype=int) for x in dataPart[2::3]])
self.multiData = self.multiData[:,:155] # Remove additional 5 zeros in data file
self.type = 'MultiFLEXX'
self.instrument = 'MultiFLEXX'
if 'Finished' in dataPart[-1]:
self.endTime = dataPart[-1].replace('Finished at ','')
else:
self.endTime = 'N/A'
else: # Assume flatcone, data is split in steps and then flatcone data
# Split is marked with 'MULTI:'
splitMarker = np.array([x=='MULTI:' for x in dataPart])
split = splitMarker.argmax()
if split !=0: # We have flatCone data!
self.pntData = np.array([np.array(x.split(),dtype=float) for x in dataPart[1:split]])
self.multiData = np.array([np.array(x.split(),dtype=int) for x in dataPart[split+1:]])
self.type = 'FlatCone'
self.instrument = 'FlatCone'
else:
self.pntData = np.array([np.array(x.split(),dtype=float) for x in dataPart[1:] if x[:8]!='Finished'])
self.type='1D'
self.instrument = 'UNKNOWN'
## Extract the data from above
for parameter,value in zip(labels[1:],self.pntData[:,1:].T):
setattr(self,parameter,np.array(value))
dataLen = self.pntData.shape[0]
if not dataLen == self.steps: # Scan stopped prematurely
self.steps = dataLen
if hasattr(self,'multiData'):
updateKeyName(self,'multiData','I')
else:
updateKeyName(self,'CNTS','I')
nameSwaps = [['file','name'],
['MAG','magneticField'],
['CNTS','ISingleDetector'],
['M1','Monitor'],
['TIME','Time'],
['TT','temperature'],
['comnd','scanCommand'],
['instr','instrument'],
['EI','Ei'],
['local','localContact'],
['user','userName'],
['DA','analyzerDSpacing'],
['expno','experimentIdentifier'],
['localContact','localContactName'],
['date','startTime']
]
for pair in nameSwaps:
updateKeyName(self,pair[0],pair[1])
self.scanValues = np.array([getattr(self,param) for param in self.scanParameters])
## Create sample from sample dictionary
self.sample = MJOLNIR.Data.Sample.Sample(**sampleDict)
self.sample.name = self.title
self.comment = 'N/A'
## Create calibration data from file
if calibrationFile is None: # Use shipped calibration
if self.type in ['MultiFLEXX','FlatCone']:
if self.type =='MultiFLEXX':
calibrationFile = MJOLNIR.__multiFLEXXNormalization__
detectors = 155
mask = np.zeros_like(self.I,dtype=bool)
self._mask = mask
else:
calibrationFile = MJOLNIR.__flatConeNormalization__
detectors = 31
mask = np.zeros_like(self.I,dtype=bool)
self._mask = mask
calibrationData = np.genfromtxt(calibrationFile,skip_header=1,delimiter=',')
amplitude = calibrationData[:,3]
background = calibrationData[:,6]
bound = calibrationData[:,7:9]
final_energy = calibrationData[:,4]
width = calibrationData[:,5]
A4 = -calibrationData[:,9]
EfTable = np.array([amplitude,final_energy,width,background]).T
calibrations=[[EfTable,A4,bound]]
bound = np.array(detectors*[0,1],dtype=int).reshape(-1,2)
self.instrumentCalibrationEdges = bound
self.instrumentCalibrationEf = EfTable
self.instrumentCalibrationA4 = A4
self.instrumentCalibrations = calibrations
if self.type == 'MultiFLEXX':
mask = np.zeros_like(self.I.data,dtype=bool)
mask[:,np.isnan(self.instrumentCalibrationEf[:,0])] = True
self.mask=mask
elif self.type == '1D':
pass
else:
import warnings
warnings.warn('Instrument of type "{}" is not supported yet....'.format(self.type))
#raise NotImplementedError('Instrument of type "{}" is not supported yet....'.format(self.type))
# Create Dasel
self.analyzerSelection = 0
self.detectorSelection = 0
### Weird additional changes:
if self.type == 'MultiFLEXX':
self.A3Off = np.array([90.0])
self.A4Off = np.array([0.0])
if not hasattr(self,'electricField'):
self.electricField = None
if not hasattr(self,'magneticField'):
self.magneticField = None
self.twotheta = self.A4
def loadBambusData(self,fileLocation,calibrationFile=None):
self.fileLocation = fileLocation
self.possibleBinnings = [1] # Standard value (1 energy/detector)
self.binning = 1
self.instrument = 'Bambus'
self.type = self.instrument
## No dasel
self.analyzerSelection = 0
self.detectorSelection = 0
with open(fileLocation) as f:
dataString = f.readlines()
# Format for starting time is 2021-11-26 10:45:05
searchString = r'([0-9]+)-([0-9]+)-([0-9]+)\s([0-9]+):([0-9]+):([0-9]+)'
matches = re.findall(searchString,dataString[0])
if len(matches) == 0:
self.startTime = 'UNKNONW'
else:
self.startTime = '-'.join(matches[0][:3])+' '+':'.join(matches[0][3:])
self.endTime = 'UNKNOWN'
if calibrationFile is None:
calibrationFile = MJOLNIR.__bambusNormalization__
detectors = 20
self.mask = False
calibrationData = np.genfromtxt(calibrationFile,skip_header=1,delimiter=',')
amplitude = calibrationData[:,3]
background = calibrationData[:,6]
bound = calibrationData[:,7:9]
final_energy = calibrationData[:,4]
width = calibrationData[:,5]
A4 = -calibrationData[:,9]
EfTable = np.array([amplitude,final_energy,width,background]).T
calibrations=[[EfTable,A4,bound]]
bound = np.array(detectors*[0,1],dtype=int).reshape(-1,2)
self.instrumentCalibrationEdges = bound
self.instrumentCalibrationEf = EfTable
self.instrumentCalibrationA4 = A4
self.instrumentCalibrations = calibrations
## Load header data:
## Format is "# param : value"
parameters = {}
for I,line in enumerate(dataString):
if line[:2] == '# ':
param,*value = [x.strip() for x in line.split(':')]
param = param.replace('#','').strip()
if len(value)>1:
value = ':'.join(value)
parameters[param] = value[0]
else:
if line.find('### Scan data') >-1:
dataline = I
break
# Convert from ch number in data file to detector,energy (A1,A2,...,20D,20E)
detectorMap = np.array([ 0, 1, 2, 27, 3, 50, 51, 52, 77, 53, 4, 5, 6, 28, 7, 54, 55,
56, 78, 57, 8, 9, 10, 29, 11, 58, 59, 60, 79, 61, 12, 13, 14, 30,
15, 62, 63, 64, 80, 65, 16, 17, 18, 31, 19, 66, 67, 68, 81, 69, 20,
21, 22, 32, 23, 70, 71, 72, 82, 73, 24, 25, 26, 33, 37, 74, 75, 76,
83, 87, 38, 39, 40, 34, 41, 88, 89, 90, 84, 91, 42, 43, 44, 35, 45,
92, 93, 94, 85, 95, 46, 47, 48, 36, 49, 96, 97, 98, 86, 99])
# Load data
titles = dataString[dataline+1].strip().split('\t')
units = dataString[dataline+2].strip().split('\t')
# remove the '# ' part of the first title and unit
titles[0] = titles[0].replace('#','').strip()
units[0] = units[0].replace('#','').strip()
# Find splitter in data set ';'
splitter = titles.index(';')
# Shape becomes [scan points, timer + mon1 + mon2 + chsum + 100 detectors + events]
data = [[float(x) for x in line.split('\t')[splitter+1:]] for line in dataString[dataline+3:-1]]
scanData = [[float(x) for x in line.split('\t')[:splitter]] for line in dataString[dataline+3:-1]]
if len(data[-1]) == 0: # empty last line
data = np.array(data[:-1])
scanData = np.array(scanData[:-1])
else:
data = np.array(data)
scanData = np.array(scanData)
#reshape into [scan points, 20 wedges, 5 energies]
self.I = np.array([d[detectorMap] for d in data[:,4:-1]]).reshape(-1,20*5,1)
self.timer = data[:,0].astype(float)
self.mon1,self.mon2,self.ctot,self.events = data[:,[1,2,3,-1]].astype(int).T
self.scanParameters = np.asarray(titles[:splitter])
self.scanUnits = np.asarray(units[:splitter])
self.scanValues = np.asarray(scanData[:,:splitter]).T.astype(float)
self.scanParameters[0] = self.scanParameters[0].replace('#','').strip()
self.scanUnits[0] = self.scanUnits[0].replace('#','').strip()
### Move last scan parameter to first position in lists
self.scanParameters = self.scanParameters[[-1,*range(len(self.scanParameters)-1)]]
self.scanUnits = self.scanUnits[[-1,*range(len(self.scanParameters)-1)]]
self.scanValues = self.scanValues[[-1,*range(len(self.scanParameters)-1)]]
self.__dict__.update(parameters)
#### Import sample correctly
sample = {}
def extractFromParenthesis(string):
return list(map(float,re.findall(r'(?<=\().*?(?=\))',string)[0].split(',')))
cell = np.concatenate([extractFromParenthesis(dat) for dat in [self.Sample_lattice,self.Sample_angles]])
for param,value in zip(['a','b','c','alpha','beta','gamma'],cell):
sample[param] = value
q1 = np.array(extractFromParenthesis(self.Sample_orient1))
q2 = np.array(extractFromParenthesis(self.Sample_orient2))
Ei = 10.0 # TODO: What is the good solution here? Dummy incoming energy needed to calcualte UB
k = np.sqrt(Ei)*factorsqrtEK
Cell = TasUBlib.calcCell(cell)
B = TasUBlib.calculateBMatrix(Cell)
A3offset = float(self.Sample_psi0.split(' ')[0])
A41 = TasUBlib.calTwoTheta(B,[*q1,Ei,Ei],-1)
A31 = TasUBlib.calcTheta(k,k,A41)+A3offset
A42 = TasUBlib.calTwoTheta(B,[*q2,Ei,Ei],-1)
A32 = TasUBlib.calcTheta(k,k,A42)
planeVector1 = list(q1)
planeVector1.append(A31) # A3
planeVector1.append(A41) # A4
[planeVector1.append(0.0) for _ in range(2)]# append values for gonios set to zero
planeVector1.append(Ei)
planeVector1.append(Ei)
planeVector2 = list(q2)
planeVector2.append(A32) # A3
planeVector2.append(A42) # A4
[planeVector2.append(0.0) for _ in range(2)]# append values for gonios set to zero
planeVector2.append(Ei)
planeVector2.append(Ei)
# add correct angle in theta between the two reflections
between = TasUBlib.tasAngleBetweenReflections(B,np.array(planeVector1),np.array(planeVector2))
planeVector2[3]+=between
sample['projectionVector1']=np.array(planeVector1)
sample['projectionVector2']=np.array(planeVector2)
sample['name'] = self.Sample_samplename
self.sample = MJOLNIR.Data.Sample.Sample(**sample)
for sP,sV in zip(self.scanParameters,self.scanValues):
setattr(self,sP,sV)
monoQ = np.array(float(self.mono_value.split(' ')[0]))
self.Ei = np.array(float(self.Ei_value.split(' ')[0]))
self.A4 = np.array([float(self.stt_value.split(' ')[0])])
if isinstance(self.sth_value,str):
self._A3 = np.array(float(self.sth_value.split(' ')[0]))
else:
self._A3 = self.sth_value
self.A4Off = 0.0
self.A3Off = None
self.countingTime = np.array([self.etime[0],*np.diff(self.etime)])
nameSwaps = [['filename','name'],
['info','scanCommand'],
['Exp_remark','comment'],
#['MAG','magneticField'],
['mon1','Monitor'],
['timer','Time'],
#['TT','temperature'],
#['comnd','scanCommand'],
#['instr','instrument'],
#['EI','Ei'],
['Exp_localcontact','localContact'],
['Exp_proposal','proposalID'],
['Exp_title','title'],
['Exp_users','userName'],
#['DA','analyzerDSpacing'],
#['expno','experimentIdentifier'],
['localContact','localContactName'],
#['date','startTime']
]
if not hasattr(self,'electricField'):
self.electricField = None
if not hasattr(self,'magneticField'):
self.magneticField = None
if not hasattr(self,'temperature'):
self.temperature = None
self.twotheta = self.A4
## Format scanCommand
def updateKeyName(obj,key,newName):
if not hasattr(obj,key):
return
setattr(obj,newName,getattr(obj,key))
delattr(obj,key)
for pair in nameSwaps:
updateKeyName(self,pair[0],pair[1])
self._mask = np.zeros_like(self.I)
@_tools.KwargChecker()
def calcualteDataIndexFromDasel(self,detectorSelection=None,analyzerSelection=None):
if detectorSelection is None:
detectorSelection = self.detectorSelection
if analyzerSelection is None:
analyzerSelection = self.analyzerSelection
if self.instrument == 'CAMEA':
if detectorSelection > detectorLimits['CAMEA']:
raise AttributeError('Provided detectorSelection is out of range. Recieved {} and instrument has {} detectors enumerated [0-{}]'.format(detectorSelection,detectorLimits['CAMEA']+1,detectorLimits['CAMEA']))
if analyzerSelection > analyzerLimits['CAMEA']:
raise AttributeError('Provided analyzerSelection is out of range. Recieved {} and instrument has {} analyzers enumerated [0-{}]'.format(analyzerSelection,analyzerLimits['CAMEA']+1,analyzerLimits['CAMEA']))
analyzerPixels = self.instrumentCalibrationEdges[analyzerSelection+detectorSelection*8]
return detectorSelection,range(int(analyzerPixels[0]),int(analyzerPixels[1]))
else:
if detectorSelection > detectorLimits[self.instrument]:
raise AttributeError('Provided detectorSelection is out of range. Recieved {} and instrument has {} detectors enumerated [0-{}]'.format(detectorSelection,detectorLimits[self.instrument]+1,detectorLimits[self.instrument]))
if analyzerSelection > analyzerLimits['Bambus']:
raise AttributeError('Provided analyzerSelection is out of range. Recieved {} and instrument has {} analyzers enumerated [0-{}]'.format(analyzerSelection,analyzerLimits[self.instrument]+1,analyzerLimits[self.instrument]))
# Calcualte detector number from detector and analyzer. As there are 5 energies(analyzers) per wedge
return detectorSelection*(analyzerLimits[self.instrument]+1)+analyzerSelection,[0] # Last index is to be able to sum over
@_tools.KwargChecker()
def convert(self,binning=None,printFunction=None):
if self.instrument == 'CAMEA' or self.type in ['MultiFLEXX','FlatCone','Bambus']:
if binning is None:
binning = self.binning
else:
raise AttributeError('Instrument type of data file not understood. {} was given.'.format(self.instrument))
self.loadBinning(binning)
if printFunction is None:
printFunction = warnings.warn
EfNormalization = self.instrumentCalibrationEf.copy()
A4Normalization = self.instrumentCalibrationA4.copy()#np.array(instrument.get('calib{}/a4offset'.format(str(binning))))
EdgesNormalization = self.instrumentCalibrationEdges.copy()#np.array(instrument.get('calib{}/boundaries'.format(str(binning))))
self.counts = self.I.copy()
Data = self.I.copy()#np.array(instrument.get('detector/data'))
detectors = Data.shape[1]
steps = Data.shape[0]
if self.type in ['MultiFLEXX','FlatCone','Bambus']:
Data.shape = (Data.shape[0],Data.shape[1],-1)
A4Zero = self.A4Off#file.get('entry/sample/polar_angle_zero')
if A4Zero is None:
A4Zero=0.0
else:
A4Zero = np.array(A4Zero)
A3Zero = self.A3Off
if A3Zero is None:
A3Zero=0.0
else:
A3Zero = np.deg2rad(np.array(A3Zero))
A4 = np.deg2rad(A4Normalization)
A4=A4.reshape(detectors,binning*self.EPrDetector,order='C')
PixelEdge = EdgesNormalization.reshape(detectors,self.EPrDetector,binning,2).astype(int)
A4File = self.A4.copy()
A4File = A4File.reshape((-1,1,1))
A4Mean = (A4.reshape((1,detectors,binning*self.EPrDetector))+np.deg2rad(A4File-A4Zero))
Intensity=np.zeros((Data.shape[0],Data.shape[1],self.EPrDetector*binning),dtype=int)
for i in range(detectors): # for each detector
for j in range(self.EPrDetector):
for k in range(binning):
Intensity[:,i,j*binning+k] = np.sum(Data[:,i,PixelEdge[i,j,k,0]:PixelEdge[i,j,k,1]],axis=1)
EfMean = EfNormalization[:,1].reshape(1,A4.shape[0],self.EPrDetector*binning)
EfNormalization = EfNormalization[:,0]#.reshape(1,A4.shape[0],EPrDetector*binning)#
#EfNormalization = EfNormalization[:,0]*(np.sqrt(2*np.pi)*EfNormalization[:,2])
EfNormalization.shape = (1,A4.shape[0],self.EPrDetector*binning)
A3 = np.deg2rad(np.array(self.A3).copy())+A3Zero #file.get('/entry/sample/rotation_angle/')
if A3.shape[0]==1:
A3 = A3*np.ones((steps))
A3.resize((steps,1,1))
Ei = self.Ei.copy().reshape(-1,1,1)#np.array(instrument.get('monochromator/energy'))
if False:
kf = factorsqrtEK*np.sqrt(EfMean)#.reshape(1,detectors,binning*EPrDetector)
ki = factorsqrtEK*np.sqrt(Ei).reshape(-1,1,1)
# Shape everything into shape (steps,detectors,bins) (if external parameter
# is changed, this is assured by A3 reshape)
Qx = ki-kf*np.cos(A4Mean)
Qy = -kf*np.sin(A4Mean)
QX = Qx*np.cos(A3)-Qy*np.sin(A3)
QY = Qx*np.sin(A3)+Qy*np.cos(A3)
else:
UB = self.sample.orientationMatrix
UBINV = np.linalg.inv(UB)
HKL,QX,QY = TasUBlib.calcTasQH(UBINV,[np.rad2deg(A3),
np.rad2deg(A4Mean)],Ei,EfMean)
H,K,L = np.swapaxes(np.swapaxes(HKL,1,2),0,3)
self.sample.B = TasUBlib.calculateBMatrix(self.sample.cell)
DeltaE = Ei-EfMean
if DeltaE.shape[0]==1:
DeltaE = DeltaE*np.ones((steps,1,1))
Monitor = self.Monitor.copy().reshape((steps,1,1))
Monitor = Monitor*np.ones((1,detectors,self.EPrDetector*binning))
Normalization = EfNormalization*np.ones((steps,1,1))
mask = np.zeros_like(Intensity) # TODO: Redo???
###########################
#Monitor[:,:,:binning] = 0 #
###########################
convFile = DataFile(self) # Copy everything from old file
del convFile.counts
updateDict = {'I':Intensity,'Monitor':Monitor,'qx':QX,'qy':QY,'energy':DeltaE,'binning':binning,'Norm':Normalization,
'h':H,'k':K,'l':L,'type':'nxs','fileLocation':None,'original_fileLocation':self.fileLocation,'name':self.name.replace('.hdf','.nxs'),'mask':mask}
convFile.updateProperty(updateDict)
if convFile.type == 'nxs' and convFile.binning == 8:
convFile.mask = np.zeros_like(convFile.I,dtype=bool)
convFile.mask[:,:,:2] = True
if convFile.instrument == 'CAMEA':
defectTubes = np.arange(104)[np.any(np.isnan(convFile.instrumentCalibrationA4.reshape(104,-1)),axis=1)]
if len(defectTubes)>0: # if any tubes are defect
if len(defectTubes)>1:
printFunction('Detector tubes {} masked'.format(', '.join([str(x) for x in defectTubes])))
else:
printFunction('Detector tube {} masked'.format(defectTubes[0]))
newMask = np.repeat(np.isnan(convFile.instrumentCalibrationA4.reshape(104,-1))[np.newaxis],len(convFile.I),axis=0)
if np.all(convFile.mask.shape==newMask.shape):
newMask = np.logical_or(convFile.mask,newMask)
convFile.mask = newMask
return convFile
[docs] @_tools.KwargChecker()
def plotA4(self,binning=None):
"""Method to plot the fitted A4 values of the normalization table
Kwargs:
- binning (int): Binning for the corresponding normalization table (default self.binning or 8)
returns:
- fig (matplotlib figure): Figure into which the A4 values are plotted
"""
self.loadBinning(binning)
binning = self.binning
Norm = (self.instrumentCalibrationEf[:,0]*self.instrumentCalibrationEf[:,2]*np.sqrt(2*np.pi)).reshape((104,8*binning))
A4 = np.reshape(self.instrumentCalibrationA4,(104,8*binning))
fig = plt.figure()
for a4,N in zip(A4,Norm):
plt.scatter(-a4,np.arange(len(a4)),c=N)
plt.title('Instrument calibration')
plt.ylabel('Pixel')
plt.xlabel('-A4 [deg]')
return fig
[docs] @_tools.KwargChecker()
def plotEf(self,binning=None):
"""Method to plot the fitted Ef values of the normalization table
Kwargs:
- binning (int): Binning for the corresponding normalization table (default self.binning or 8)
returns:
- fig (matplotlib figure): Figure into which the Ef values are plotted
"""
self.loadBinning(binning)
binning = self.binning
Ef = self.instrumentCalibrationEf[:,1].reshape(104,8*binning)
fig = plt.figure()
for i in range(104):
plt.scatter(i*np.ones_like(Ef[i]),Ef[i],zorder=10)
plt.xlabel('Detector number')
plt.ylabel('Ef [meV]')
plt.title('Final Energy Individual')
plt.grid(True)
return fig
[docs] @_tools.KwargChecker()
def plotEfOverview(self,binning=None):
"""Method to plot the fitted Ef values of the normalization table
Kwargs:
- binning (int): Binning for the corresponding normalization table (default self.binning or 8)
returns:
- fig (matplotlib figure): Figure into which the Ef values are plotted
"""
self.loadBinning(binning)
binning = self.binning
Ef = self.instrumentCalibrationEf[:,1].reshape(104,8*binning)
fig = plt.figure()
plt.imshow(Ef.T,origin='lower')
plt.xlabel('Detector number')
plt.ylabel('Pixel number')
plt.colorbar()
plt.title('Final Energy Overview')
return fig
[docs] @_tools.KwargChecker()
def plotNormalization(self,binning=None):
"""Method to plot the fitted integrated intensities of the normalization table
Kwargs:
- binning (int): Binning for the corresponding normalization table (default self.binning or 8)
returns:
- fig (matplotlib figure): Figure into which the Ef values are plotted
"""
self.loadBinning(binning)
binning = self.binning
Norm = (self.instrumentCalibrationEf[:,0]*self.instrumentCalibrationEf[:,2]*np.sqrt(2*np.pi)).reshape((104,8*binning))
fig = plt.figure()
plt.imshow(Norm.T,origin='lower')
plt.xlabel('Detector number')
plt.ylabel('Pixel number')
plt.title('Normalization')
plt.colorbar()
return fig
[docs] @_tools.KwargChecker()
def loadBinning(self,binning):
"""Small function to check if current binning is equal to wanted binning and if not reloads to binning wanted"""
if binning is None or not hasattr(self,'instrumentCalibrations'):
return
if not binning in self.possibleBinnings:
raise AttributeError('Wanted binning not in possible binnings!')
binID = list(self.possibleBinnings).index(binning)
self.instrumentCalibrationEf,self.instrumentCalibrationA4,self.instrumentCalibrationEdges = self.instrumentCalibrations[binID]
try:
len(binning)
except TypeError:
pass
else:
binning = binning[0]
self._binning = binning
self.instrumentCalibrationEf.shape = (-1,4)
self.instrumentCalibrationA4.shape = (-1)
self.instrumentCalibrationEdges.shape = (-1,2)
[docs] def saveNXsqom(self,saveFileName):
"""Save converted file into an NXsqom.
Args:
- saveFileName (string): File name to be saved into.
"""
if not self.__hasattr__('original_fileLocation'):
raise AttributeError('Data file does not have link to the original file. This is needed to make a complete copy when creating nxs-files')
if not self.type =='nxs':
raise AttributeError('Only nxs typed files can be saved as nxs-files.')
datafile = self.original_fileLocation
Intensity = self.I # Dont swap axis as they are correct!
Monitor = self.Monitor
QX = self.qx
QY = self.qy
DeltaE = self.energy
binning = self.binning
Normalization = self.Norm
H = self.h
K = self.k
L = self.l
if os.path.exists(saveFileName):
warnings.warn('The file {} exists alread. Old file will be renamed to {}.'.format(saveFileName,saveFileName+'_old'))
if os.path.exists(saveFileName+'_old'):
os.remove(saveFileName+'_old')
os.rename(saveFileName,saveFileName+'_old')
with hdf.File(saveFileName,'w') as fd:
with hdf.File(datafile,'r') as fs:
group_path = fs['/entry'].parent.name
group_id = fd.require_group(group_path)
fs.copy('/entry', group_id, name="/entry")
definition = fd.create_dataset('entry/definition',(1,),dtype='S70',data=np.string_('NXsqom'))
definition.attrs['NX_class'] = 'NX_CHAR'
process = fd.create_group('entry/reduction')
process.attrs['NX_class']=b'NXprocess'
proc = process.create_group('MJOLNIR_algorithm_convert')
proc.attrs['NX_class']=b'NXprocess'
author= proc.create_dataset('author',shape=(1,),dtype='S70',data=np.string_('Jakob Lass'))
author.attrs['NX_class']=b'NX_CHAR'
date= proc.create_dataset('date',shape=(1,),dtype='S70',data=np.string_(datetime.datetime.now()))
date.attrs['NX_class']=b'NX_CHAR'
description = proc.create_dataset('description',shape=(1,),dtype='S70',data=np.string_('Conversion from pixel to Qx,Qy,E in reference system of instrument.'))
description.attrs['NX_class']=b'NX_CHAR'
rawdata = proc.create_dataset('rawdata',shape=(1,),dtype='S200',data=np.string_(os.path.realpath(datafile)))
rawdata.attrs['NX_class']=b'NX_CHAR'
normalizationString = proc.create_dataset('binning',shape=(1,),dtype='int32',data=binning)
normalizationString.attrs['NX_class']=b'NX_INT'
data = fd.get('entry/data')
fileLength = Intensity.shape
Int = data.create_dataset('intensity',shape=(fileLength),dtype='int32',data=Intensity)
Int.attrs['NX_class']='NX_INT'
if self.fromNICOS:
counts = np.array(fd.get('entry/data/data'))
Int = data.create_dataset('counts',dtype='int32',data=counts)
Int.attrs['NX_class']='NX_INT'
instr = getInstrument(fd)
Int = instr.create_dataset('detector/counts',dtype='int32',data=counts)
Int.attrs['NX_class']='NX_INT'
monitor = data.create_dataset('monitor',shape=(fileLength),dtype='int32',data=Monitor)
monitor.attrs['NX_class']=b'NX_INT'
if fd.get('entry/monitor_2') is None:
mon = fd.create_group('entry/monitor_2')
monitor = mon.create_dataset('data',shape=(fileLength),dtype='int32',data=Monitor)
monitor.attrs['NX_class']=b'NX_INT'
else:
pass
normalization = data.create_dataset('normalization',shape=(fileLength),dtype='float32',data=Normalization)
normalization.attrs['NX_class']=b'NX_FLOAT'
qx = data.create_dataset('qx',shape=(fileLength),dtype='float32',data=QX)
qx.attrs['NX_class']=b'NX_FLOAT'
qx.attrs['units']=b'1/angstrom'
qy = data.create_dataset('qy',shape=(fileLength),dtype='float32',data=QY)
qy.attrs['NX_class']=b'NX_FLOAT'
qy.attrs['units']=b'1/angstrom'
en = data.create_dataset('en',shape=(fileLength),dtype='float32',data=DeltaE)
en.attrs['NX_class']=b'NX_FLOAT'
en.attrs['units']=b'mev'
h = data.create_dataset('h',shape=(fileLength),dtype='float32',data=H)
k = data.create_dataset('k',shape=(fileLength),dtype='float32',data=K)
l = data.create_dataset('l',shape=(fileLength),dtype='float32',data=L)
for x in [h,k,l]:
x.attrs['NX_class']=b'NX_FLOAT'
x.attrs['units']=b'rlu'
#fd.close()
[docs] def updateCalibration(self,calibrationFile,overwrite=False):
"""Update calibrations for the data file. Does not save the changes.
Args:
- calibrationFile (string or list): calibration file, as generated from MJOLNIR.Geometry.Instrument or list of these.
Kwargs:
- overwrite (bool): If true, previous binnings will be overwritten if new files contain same binning (default False)
.. note::
Changes performed by this method is not saved to disk. If this is wanted, use the saveNXsqom method.
"""
try:
len(calibrationFile)
except TypeError:
calibrationFile = [calibrationFile]
currentBinnings = self.possibleBinnings
calibrations = {}
newBinnings = []
for binning in self.possibleBinnings:
if binning is None:
continue
self.loadBinning(binning)
calibrations[binning] = [self.instrumentCalibrationEf,self.instrumentCalibrationA4,self.instrumentCalibrationEdges]
express = re.compile(r'\w*\spixel')
for f in calibrationFile:
with open(f) as file:
line = file.readline()
pixel = int(express.findall(line)[0].split(' ')[0])
if overwrite == False and pixel in currentBinnings: # Do not overwrote current values
warnings.warn('Binning {} from file "{}" is skipped as overwrite is set to False.'.format(pixel,f))
continue
newBinnings.append(pixel)
data = np.loadtxt(f,skiprows=3,delimiter=',')
EfTable = data[:,[3,4,5,6]]
A4 = data[:,-1]
bound = data[:,[7,8]]
calibrations[pixel] = [EfTable,A4,bound]
self.instrumentCalibrations = np.array([c for c in calibrations.values()],dtype=object)
self.possibleBinnings = np.array(list(calibrations.keys()))
[docs] def updateSampleParameters(self,unitCell):
"""Update unit cell parameters and corresponding UB matrix
Args:
- unitCell (list): List of cell parameters (a,b,c,alpha,beta,gamma)
"""
self.sample.updateSampleParameters(unitCell=unitCell)
[docs] def calculateCurratAxeMask(self,BraggPeaks,dqx=None,dqy=None,dH=None,dK=None,dL=None,spurionType='both',maskInside=True):
"""Generate an elliptical mask centered on the Currat-Axe spurion.
Args:
- BraggPeaks (list): List of Bragg peaks to be used for mask. Shape is given by [[H1,K1,L1], [H2,K2,L2], ... [HN,KN,LN]]
Kwargs:
- dqx (float): Radius used for masking along qx (default None)
- dqy (float): Radius used for masking along qy (default None)
- dH (float): Radius used for masking along H (default None)
- dK (float): Radius used for masking along K (default None)
- dL (float): Radius used for masking along L (default None)
- spurionType (str): Either monochromator, analyser or both (default 'both')
- maskInside (bool): If true, points inside is masked otherwise outside (default True)
Returns:
- mask (list): Boolean numpy array with shape equal to self.I.shape
Note:
If either dqx or dqy is None, utilizes the dH, dK, dL instead.
"""
if np.any([dqx is None,dqy is None]):
if np.any([x is None for x in [dH,dK,dL]]):
raise AttributeError('Provided masking radius not understood. Either dqx and dqy are to be given or dH, dK, and dL.\nRecieved: \n{}'.format('\n'.join([' {}\t= {}'.format(x,v) for x,v in zip(['dqx','dqy','dH','dK','dL'],[dqx,dqy,dH,dK,dL])])))
rlu = True
factor = np.array([1.0,dH/dK,dH/dL]).reshape(3,1,1,1)
else:
rlu = False
factor = dqx/dqy
if not spurionType.lower() in ['monochromator','analyser','both']:
raise AttributeError('Provided spurion type not understood. Received '+spurionType+' but expected '+', '.join(['monochromator','analyser','both']))
s = self.sample
Ei = self.Ei[0]
Ef = self.instrumentCalibrationEf[:,1].reshape(-1,self.EPrDetector*self.binning).mean(axis=0)
if spurionType.lower() in ['monochromator','both']:
monoQx,monoQy = s.CurratAxe(Ei=Ei,Ef=Ef,Bragg=BraggPeaks,HKL=False)[:,0,:,:2].transpose(2,0,1)
if spurionType.lower() in ['analyser','both']:
anaQx,anaQy = s.CurratAxe(Ei=Ei,Ef=Ef,Bragg=BraggPeaks,HKL=False,spurionType='Analyser')[:,0,:,:2].transpose(2,0,1) # Into shape (2,len(Bragg),len(Ef))
if not rlu:
qx = self.qx[:,:,:]
qy = self.qy[:,:,:]
monoInside = None
anaInside = None
# Reshape to fit qx,qy from data file
if spurionType.lower() in ['monochromator','both']:
monoQx = monoQx.transpose(1,0).reshape(1,1,len(Ef),-1).transpose(3,0,1,2)
monoQy = monoQy.transpose(1,0).reshape(1,1,len(Ef),-1).transpose(3,0,1,2)
for localMonoQx,localMonoQy in zip(monoQx,monoQy):
if monoInside is None:
monoInside = np.linalg.norm([qx-localMonoQx,(qy-localMonoQy)*factor],axis=0)<dqx
monoInside.dtype = bool
else:
monoInside += np.linalg.norm([qx-localMonoQx,(qy-localMonoQy)*factor],axis=0)<dqx
if spurionType.lower() in ['analyser','both']:
anaQx = anaQx.transpose(1,0).reshape(1,1,len(Ef),-1).transpose(3,0,1,2)
anaQy = anaQy.transpose(1,0).reshape(1,1,len(Ef),-1).transpose(3,0,1,2)
for localAnaQx,localAnaQy in zip(anaQx,anaQy):
if anaInside is None:
anaInside = np.linalg.norm([qx-localAnaQx,(qy-localAnaQy)*factor],axis=0)<dqx
anaInside.dtype = bool
else:
anaInside += np.linalg.norm([qx-localAnaQx,(qy-localAnaQy)*factor],axis=0)<dqx
else:
H = self.h[:,:,:]
K = self.k[:,:,:]
L = self.l[:,:,:]
monoInside = None
anaInside = None
if spurionType.lower() in ['monochromator','both']:
monoH,monoK,monoL = s.calculateQxQyToHKL(monoQx,monoQy)
monoH = monoH.transpose(1,0).reshape(1,1,len(Ef),-1).transpose(3,0,1,2)
monoK = monoK.transpose(1,0).reshape(1,1,len(Ef),-1).transpose(3,0,1,2)
monoL = monoL.transpose(1,0).reshape(1,1,len(Ef),-1).transpose(3,0,1,2)
for localMonoH,localMonoK,localMonoL in zip(monoH,monoK,monoL):
if monoInside is None:
monoInside = np.linalg.norm(np.array([H-localMonoH,K-localMonoK,L-localMonoL])*factor,axis=0)<dH
monoInside.dtype = bool
else:
monoInside += np.linalg.norm(np.array([H-localMonoH,K-localMonoK,L-localMonoL])*factor,axis=0)<dH
if spurionType.lower() in ['analyser','both']:
anaH,anaK,anaL = s.calculateQxQyToHKL(anaQx,anaQy)
anaH = anaH.transpose(1,0).reshape(1,1,len(Ef),-1).transpose(3,0,1,2)
anaK = anaK.transpose(1,0).reshape(1,1,len(Ef),-1).transpose(3,0,1,2)
anaL = anaL.transpose(1,0).reshape(1,1,len(Ef),-1).transpose(3,0,1,2)
for localAnaH,localAnaK,localAnaL in zip(anaH,anaK,anaL):
if anaInside is None:
anaInside = np.linalg.norm(np.asarray([H-localAnaH,K-localAnaK,L-localAnaL])*factor,axis=0)<dH
anaInside.dtype = bool
else:
anaInside += np.linalg.norm(np.asarray([H-localAnaH,K-localAnaK,L-localAnaL])*factor,axis=0)<dH
if spurionType.lower() == 'both':
mask = np.logical_or(monoInside,anaInside)
elif spurionType.lower() == 'monochromator':
mask = monoInside
else:
mask = anaInside
if not maskInside:
mask = np.logical_not(mask)
return mask
[docs] def saveHDF(self,saveFileName):
"""Save current HDF file object into an HDF file.
Args:
- saveFileName (string): File name to be saved into.
"""
def addMetaData(self,entry):
dset = entry.create_dataset('start_time',(1,),dtype='<S70')
dset[0] = np.string_(self.startTime)
dset = entry.create_dataset('end_time',(1,),dtype='<S70')
dset[0] = np.string_(self.endTime)
dset = entry.create_dataset('experiment_identifier',(1,),dtype='<S70')
dset[0] = self.experimentIdentifier.encode('utf8')
dset = entry.create_dataset('instrument',(1,),dtype='<S70')
dset[0] = self.instrument.title().upper().encode('utf8')
dset = entry.create_dataset('comment',(1,),data=np.string_(self.comment))
dset = entry.create_dataset('title',(1,),data=np.string_(self.title))
dset = entry.create_dataset('proposal_id',(1,),data=np.string_(self.proposalId))
dset = entry.create_dataset('proposal_title',(1,),data=np.string_(self.proposalTitle))
cont = entry.create_group('local_contact')
cont.attrs['NX_class'] = np.string_('NXuser')
dset = cont.create_dataset('name',(1,),data=np.string_(self.localContactName))
us = entry.create_group('proposal_user')
us.attrs['NX_class'] = np.string_('NXuser')
dset = us.create_dataset('name',(1,),data=np.string_(self.proposalUserName))
dset = us.create_dataset('email',(1,),data=np.string_(self.proposalUserEmail))
pus = entry.create_group('user')
pus.attrs['NX_class'] = np.string_('NXuser')
dset = pus.create_dataset('name',(1,),data=np.string_(self.userName))
dset = pus.create_dataset('email',(1,),data=np.string_(self.userEmail))
dset = pus.create_dataset('address',(1,),data=np.string_(self.userAddress))
dset = pus.create_dataset('affiliation',(1,),data=np.string_(self.userAffiliation))
def addMono(self,inst):
mono = inst.create_group('monochromator')
mono.attrs['NX_class'] = np.string_('NXmonochromator')
dset = mono.create_dataset('type',(1,),dtype='S70')
dset[0] = getattr(self,'monochromatorType')
attributes = ['d_spacing','horizontal_curvature','vertical_curvature',
'horizontal_curvature_zero','vertical_curvature_zero',
'gm','gm_zero','tlm','tlm_zero','tum','tum_zero']
units = ['angstrom']+['meter']*4+['degree']*6
values = ['monochromator'+x for x in ['DSpacing','HorizontalCurvature',
'VerticalCurvature','HorizontalCurvatureZero','VerticalCurvatureZero',
'GM','GMZero','TLM','TLMZero','TUM','TUMZero']]
for att,val,unit in zip(attributes,values,units):
if val in self.__dict__:
dset = mono.create_dataset(att,(1,),'float32')
dset[0] = getattr(self,val)
dset.attrs['units'] = unit
monoSlit = inst.create_group('monochromator_slit')
monoSlit.attrs['NX_class'] = np.string_('NXmonochromatorslit')
attributes = [x+zero for x in ['bottom','left','right','top'] for zero in ['','_zero']]
values = ['monochromatorSlit'+x+zero for x in ['Bottom','Left','Right','Top'] for zero in ['','Zero']]
if self.fromNICOS:
attributes += ['x_gap','y_gap']
values += ['monochromatorSlit'+x+'Gap' for x in ['X','Y']]
for att,value in zip(attributes,values):
val = getattr(self,value)
if not val.dtype == 'O':
dset = monoSlit.create_dataset(att,(1,),'float32')
dset[0] = val
dset.attrs['units'] = np.string_('mm')
def addAna(self,inst):
ana = inst.create_group('analyzer')
ana.attrs['NX_class'] = np.string_('NXcrystal')
attributes = ['d_spacing','nominal_energy','polar_angle','polar_angle_offset']+self.fromNICOS*['polar_angle_raw']
values = ['analyzer'+x.replace('_',' ').title().replace(' ','') for x in attributes]
units = ['anstrom','mev','degree','degree']+self.fromNICOS*['degree']
for att,value,unit in zip(attributes,values,units):
data = getattr(self,value)
dset = ana.create_dataset(att,(len(data),),'float32')
dset[:len(data)] = data
if not unit is None:
dset.attrs['units'] = np.string_(unit)
dset = ana.create_dataset('type',data = np.array([np.string_(self.analyzerType)]))
dset = ana.create_dataset('analyzer_selection',(1,),'int32',data=self.analyzerSelection)
def addDetector(inst):
det = inst.create_group('detector')
det.attrs['NX_class'] = np.string_('NXdetector')
def addSample(self,entry):
sam = entry.create_group('sample')
sam.attrs['NX_class'] = np.string_('NXsample')
dset = sam.create_dataset('name',(1,),data=np.string_(self.sample.name))
ub = self.sample.orientationMatrix/(2*np.pi) # 2pi is for change in convention
dset = sam.create_dataset('orientation_matrix',data=ub)
dset = sam.create_dataset('plane_vector_1',data=self.sample.plane_vector1)
dset = sam.create_dataset('plane_vector_2',data=self.sample.plane_vector2)
normal = self.sample.planeNormal
dset = sam.create_dataset('plane_normal',data=normal)
cell = np.array(self.sample.unitCell,dtype='float32')
dset = sam.create_dataset('unit_cell',data=cell)
dset = sam.create_dataset('azimuthal_angle',data=self.sample.azimuthalAngle)
dset.attrs['units']=np.string_('degree')
dset = sam.create_dataset('x',data=self.sample.x)
dset.attrs['units']=np.string_('degree')
dset = sam.create_dataset('y',data=self.sample.y)
dset.attrs['units']=np.string_('degree')
if hasattr(self,'temperature'):
if not self.temperature is None:
dset = sam.create_dataset('temperature',data=self.temperature,dtype='float32')
dset.attrs['units'] = np.string_('K')
if hasattr(self,'magneticField'):
if not self.magneticField is None:
dset = sam.create_dataset('magnetic_field',data=self.magneticField,dtype='float32')
dset.attrs['units'] = np.string_('T')
if hasattr(self,'electricField'):
if not self.electricField is None:
dset = sam.create_dataset('electric_field',data=self.electricField,dtype='float32')
dset.attrs['units'] = np.string_('V') # TODO: Check if this unit is correct.
for attr,value in zip(['sgu','sgl'],['sgu','sgl']):
dset = sam.create_dataset(attr,(1,),data=getattr(self.sample,value))
dset.attrs['units']=np.string_('degree')
dset = sam.create_dataset(attr+'_zero',(1,),data=getattr(self.sample,value+'Zero'))
dset.attrs['units']=np.string_('degree')
def makeTheta(self):
k = np.sqrt(self.Ei/2.072)
fd = np.pi/(k*self.monochromatorDSpacing[0])
theta = np.degrees(np.arcsin(fd))
return theta,2*theta
def storeScanData(self,entry):
nxdata = entry.create_group('data')
nxdata.attrs['NX_class'] = np.string_('NXdata')
det = entry['CAMEA/detector']
dset = det.create_dataset('counts',data=self.I.swapaxes(1,2), compression="gzip", compression_opts=6)
dset.attrs['target'] = np.string_('/entry/CAMEA/detector/counts')
nxdata['counts'] = dset
dset = det.create_dataset('detector_selection',(1,),'int32',data=self.detectorSelection)
dset = det.create_dataset('summed_counts',data=np.sum(self.I,axis=(1,2)))
dset.attrs['target'] = np.string_('/entry/CAMEA/detector/summed_counts')
nxdata['summed_counts'] = dset
sam = entry['sample']
dset = sam.create_dataset('rotation_angle',data=self.A3,dtype='float32')
dset_zero = sam.create_dataset('rotation_angle_zero',data=self.A3Off,dtype='float32')
dset.attrs['units'] = np.string_('degree')
dset_zero.attrs['units'] = np.string_('degree')
dset = sam.create_dataset('polar_angle',data=self.A4,dtype='float32')
dset_zero = sam.create_dataset('polar_angle_zero',data=self.A4Off,dtype='float32')
dset.attrs['units'] = np.string_('degree')
dset_zero.attrs['units'] = np.string_('degree')
dset.attrs['units'] = np.string_('degree')
dset_zero.attrs['units'] = np.string_('degree')
mono = entry['CAMEA/monochromator']
dset = mono.create_dataset('energy',data=self.Ei,dtype='float32')
dset.attrs['units'] = np.string_('mev')
dset = mono.create_dataset('rotation_angle',data=self.monochromatorRotationAngle,dtype='float32')
dset.attrs['units'] = np.string_('degree')
if hasattr(self,'monochromatorRotationAngleZero'):
v = self.monochromatorRotationAngleZero
else:
v = 0.0
dset = mono.create_dataset('rotation_angle_zero',data=v,dtype='float32')
dset.attrs['units'] = np.string_('degree')
entry.create_dataset('scancommand',(1,),data=np.string_(self.scanCommand))
entry.create_dataset('scanvars',data=np.string_([x.encode('utf8') for x in self.scanParameters]))
# save the correct scan variables
for variable,pos in zip(self.scanParameters,self.scanDataPosition):
positionRelativeEntry = '/'.join([x for x in pos.split('/')[2:]])
original = entry.get(positionRelativeEntry)
nxdata[variable] = original
nxdata[variable].attrs['target'] = np.string_('/entry/'+positionRelativeEntry)
control = entry.create_group('control')
control.attrs['NX_class'] = np.string_('NXmonitor')
mons = self.Monitor
control.create_dataset('data',data=mons,dtype='int32')
dset = control.create_dataset('preset',(1,),dtype='int32')
dset[0] = self.MonitorPreset
dset = control.create_dataset('mode',(1,),data=np.string_(self.MonitorMode))
time = self.Time
dset = control.create_dataset('time',data=time,dtype='float32')
dset.attrs['units'] = np.string_('seconds')
time = self.absoluteTime
if time[0] == np.array(None):
time = [0.0]
dset = control.create_dataset('absolute_time',data=time,dtype='float32')
dset.attrs['units'] = np.string_('seconds')
pb = entry.create_group('proton_beam')
pb.attrs['NX_class'] = np.string_('NXmonitor')
vals = self.protonBeam
dset = pb.create_dataset('data',data=vals,dtype='int32')
with hdf.File(saveFileName,'w') as f:
f.attrs['file_name'] = np.string_(saveFileName)
import datetime,time
cT = datetime.datetime.now()
f.attrs['file_time'] = np.string_('{}-{}-{}T{}:{}:{}{:+02.0f}:00'.format(cT.year,cT.month,cT.day,cT.hour,cT.minute,cT.second,-time.timezone/(60*60)))
entry = f.create_group('entry')
entry.attrs['NX_class'] = np.string_('NXentry')
#------------ Instrument
inst = entry.create_group(b'CAMEA')
inst.attrs['NX_class'] = np.string_('NXinstrument')
if hasattr(self,'singleDetector1'): # If the single detectors have been loaded
for idx in ['1','8']:
segment = inst.create_group('segment_'+idx)
dset = segment.create_dataset('data',data=getattr(self,'singleDetector'+idx),dtype='int32')
dset.attrs['units']=np.string_('counts')
attribute = ['a4offset','amplitude','background','boundaries','final_energy','width']
for calibration,binning in zip(self.instrumentCalibrations,self.possibleBinnings):
if binning is None: continue
pixelCalib = inst.create_group('calib{}'.format(binning))
Etable, A4, bound = calibration
amp,Ef,width,bg = Etable.T
values = [A4,amp,bg,bound,Ef,width]
dtypes = ['float32','float32','float32','int','float32','float32']
units = ['degree',None,None,None,'mev','mev']
for att,value,dtype,unit in zip(attribute,values,dtypes,units):
dset = pixelCalib.create_dataset(att,data=value,dtype=dtype)
if not unit is None:
dset.attrs['units']=np.string_(unit)
addMetaData(self,entry)
addMono(self,inst)
addAna(self,inst)
addDetector(inst)
addSample(self,entry)
storeScanData(self,entry)
def decodeStr(string):
#try:
if hasattr(string,'decode'):
return string.decode('utf8')
else:
return string
#except:
# return string
@_tools.KwargChecker()
def getScanParameter(self,f):
"""Extract scan parameter from hdf file.
Args:
- f (hdf): Open HDF5 file object from which parameters are extracted.
"""
if f.get('/entry/data') is None:
return [],[],[]
scanParameters = []
scanValues = []
scanUnits = []
scanDataPosition = []
scanItems = f.get('/entry/data/')
for item in scanItems:
if item in ['scanvar']: # only present in some files, but has to be ignored
continue
if not item in ['counts','summed_counts','en','h','intensity','k','l','monitor',
'normalization','qx','qy','data'] and item[-4:]!='zero':
scanParameters.append(item)
fItem = f.get('/entry/data/{}'.format(item))
scanUnits.append(decodeStr(fItem.attrs['units']))
scanValues.append(np.array(fItem))
try:
scanDataPosition.append(decodeStr(fItem.attrs['target']))
except:
pass
if len(scanParameters) == 0: # no data was stored... NICOS again, but due to manual scan
scanItems = [x for x in np.array(f.get('/entry/scanvars'))[0].decode().split(',') if len(x)>0 ]
## In a manual scan, no scanvars are saved.... guess that it is either A3, A4 or Ei
# Check first temperature and magnetic field
if len(scanItems) == 0:
if np.abs(np.diff(self.temperature)).mean()>0.01:
scanParameters = ['T']
scanValues = np.array([self.temperature])
scanUnits = ['K']
scanDataPosition = ['/entry/sample/temperature']
elif np.abs(np.diff(self.magneticField)).mean()>0.01:
scanParameters = ['B']
scanValues = np.array([self.magneticField])
scanUnits = ['T']
scanDataPosition = ['/entry/sample/B']
elif np.abs(np.diff(self.Ei)).mean()>0.001: # it is an energy scan!
scanParameters = ['Ei']
scanValues = np.array([self.Ei])
scanUnits = ['meV']
scanDataPosition = ['/entry/CAMEA/monochromator/energy']
elif np.abs(np.diff(self.A3)).mean()>0.001:
scanParameters = ['A3']
scanValues = np.array([self.A3])
scanUnits = ['deg']
scanDataPosition = ['/entry/sample/rotation_angle']
elif np.abs(np.diff(self.twotheta)).mean()>0.001:
scanParameters = ['A4']
scanValues = np.array([self.twotheta])
scanUnits = ['deg']
scanDataPosition = ['/entry/analyzer/polar_angle_raw']
else:
pass
#raise AttributeError('Scan values from Datafile ("{}") cannot be determined'.format(self.name))
else:
for item in scanItems:
if item in ['a3']:
item = item.upper()
if item == 's2t':
item = 'A4'
if item == 'A4':
fItem = getHDFInstrumentEntry(getInstrument(f),item,self.fromNICOS)
elif item != 'CAMEA':
fItem = getHDFEntry(f,item,self.fromNICOS)
#fItem = f.get(position)
if item == 'CAMEA':
# We are doing a QScan
scanCommand = self.scanCommand.decode()
t = scanCommand.split('(')[0]
if not t.lower() == 'qcscan': continue
start = scanCommand.split('(')[2].split(')')[0]
step = scanCommand.split('(')[3].split(')')[0]
_,steps,monitor = scanCommand.split(')')[2].split(',')
centre = np.asarray([float(x) for x in start.split(',')])
step = np.asarray([float(x) for x in step.split(',')])
dist =np.linalg.norm(step)
step = _tools.LengthOrder(step)
dist*=1.0/np.linalg.norm(step)
steps = int(steps)
monitor = int(monitor.split('=')[1])
#constant = np.isclose(step,0)
textCentre = '['+', '.join([str(x) for x in centre])+'] '
textStep = _tools.generateLabelDirection(step,labels=['H','K','L','E'])
scanParameters.append(textCentre+' + '+textStep)
scanUnits.append('RLU, meV')
scanValues.append(np.linspace(-dist*steps,dist*steps,steps*2+1))
else:
scanParameters.append(item)
scanUnits.append(decodeStr(fItem.attrs['units']))
scanValues.append(np.array(fItem))
try:
scanDataPosition.append(decodeStr(fItem.attrs['target']))
except:
pass
return scanParameters,np.array(scanValues),scanUnits,scanDataPosition
[docs]@_tools.KwargChecker()
def createEmptyDataFile(A3,A4,Ei,sample,Monitor=50000, A3Off = 0.0, A4Off = 0.0,
title='EmptyDataFileTitle', name='EmptyDataFile',
temperature = None, electricField = None, magneticField = None,
detectors = 104, pixels = 1024, normalizationFiles = None):
"""Create an empty data file with a given sample and measurement parameters.
Args:
- A3 (list or float): Value(s) of measurement A3.
- A4 (list or float): Value(s) of measurement A4.
- Ei (list or float): Value(s) of measurement Ei.
- sample (MJOLNIR Sample): Sample measured in data file.
Kwargs:
- Monitor (int): Monitor count for datafile (default 50 000)
- A3Off (float): Offset in A3 used in datafile (default 0.0)
- A4Off (float): Offset in A4 used in datafile (default 0.0)
- title (string): Title of datafile (default EmptyDataFileTitle)
- name (string): name of datafile (default EmptyDataFile)
- temperature (double): Sample temperature (default None)
- electricField (double): Sample electricField (default None)
- magneticField (double): Sample magneticField (default None)
- detectors (int): Number of detectors in spectrometer (default 104)
- pixels (int): Number of pixels/detector in spectrometer (default 1024)
- normalizationFiles (list or string): List or string to normalization file (default None)
.. warning::
If no normalization file(s) is/are provided, the resulting data file cannot be converted to HKL!
"""
df = DataFile()
A3 = np.asarray([A3]).flatten()
A4 = np.asarray([A4]).flatten()
Ei = np.asarray([Ei]).flatten()
isChanging = np.array([len(A3)>1,len(A4)>1,len(Ei)>1])
isChangingData = np.array([A3,A4,Ei],dtype=object)[isChanging]
if np.sum(isChanging)>1:
# Check if all arrays then have same shape
if not np.all([x.shape == isChangingData[0].shape for x in isChangingData[1:]]):
names = np.array(['A3','A4','Ei'])
raise AttributeError('More than one parameter is changing but they do not have the same shape! Changing: {}'.format(', '.join(str(x) for x in names[isChanging])))
elif np.sum(isChanging)==0:
raise AttributeError('No parameter is changing. At least one parameter must be changing through the scan.')
steps = len(isChangingData[0])
Monitor = np.asarray([Monitor]*steps)
df.A3Off = np.array([A3Off])
df.A4Off = np.array([A4Off])
df.Monitor = np.array(Monitor)
df.MonitorPreset = Monitor[0]
df.MonitorMode = 'm'
df.sample = sample
df.title = title
df.name = name
df.fileLocation = 'Unknown'
df.temperature = temperature
df.electricField = electricField
df.magneticField = magneticField
units = np.array(['degree','degree','meV'])
params= np.array(['a3','a4','ei'])
df.scanUnits = units[isChanging]
df.scanParameters = params[isChanging]
df.type = 'hdf'
df.scanCommand = 'Unknown'
df.scanValues = isChangingData
df.instrument = 'CAMEA'
df.Time = df.Monitor*0.0013011099243 # Typical measurement time RITA2 in 2018
df.Ei = np.array(Ei)
df.A3 = np.array(A3)
df.A4 = np.array(A4)
df.I = np.array(np.zeros((steps,detectors,pixels)))
df.binning = 1
if not normalizationFiles is None:
calib = []
binning = []
for f in normalizationFiles:
data = np.loadtxt(f,skiprows=3,delimiter=',')
EfTable = data[:,[3,4,5,6]]
A4 = data[:,-1]
bound = data[:,[7,8]]
calib.append([EfTable,A4,bound])
binning.append(len(A4)/(104*8))
df.instrumentCalibrations = np.array(calib,dtype=object)
df.possibleBinnings = binning
df.loadBinning(1)
df.mask = np.zeros_like(df.I,dtype=bool)
return df
def shallowRead(files,parameters,fromNICOS=None):
"""Read a list of paramters from hdf file with minimal overhead
Args:
- files (list): List of files
- parameters (list): List of parameters
Returns:
- list: Parameters in list after file, with asked properties in a sublist for each file
"""
parameters = np.array(parameters)
values = []
possibleAttributes.sort(key=lambda v: v.lower())
possible = []
for p in parameters:
possible.append(p in possibleAttributes)
if not np.all(possible):
if np.sum(np.logical_not(possible))>1:
raise AttributeError('Parameters {} not found'.format(parameters[np.logical_not(possible)]))
else:
raise AttributeError('Parameter {} not found'.format(parameters[np.logical_not(possible)]))
for file in files:
vals = []
if os.path.splitext(file)[-1] == '.hdf': # if an hdf file
with hdf.File(file,mode='r') as f:
if fromNICOS is None:
NICOS = checkNICOS(f)
else:
NICOS = fromNICOS
instr = getInstrument(f)
for p in parameters:
if p == 'name':
v = os.path.basename(file)
vals.append(v)
continue
elif p == 'fileLocation':
v = os.path.dirname(file)
vals.append(v)
continue
elif p == 'twoTheta':
A4 = np.array(getHDFInstrumentEntry(instr,'A4',fromNICOS=NICOS))
for func,args in HDFInstrumentTranslationFunctions['A4']:
A4 = getattr(A4,func)(*args)
A4Offset = np.array(getHDFInstrumentEntry(instr,'A4Offset',fromNICOS=NICOS))
for func,args in HDFInstrumentTranslationFunctions['A4Offset']:
A4Offset = getattr(A4Offset,func)(*args)
vals.append(A4-A4Offset)
continue
elif p in HDFTranslation:
v = np.array(getHDFEntry(f,p,fromNICOS=NICOS))
TrF= HDFTranslationFunctions
elif p in HDFInstrumentTranslation:
v = np.array(getHDFInstrumentEntry(instr,p,fromNICOS=NICOS))
TrF= HDFInstrumentTranslationFunctions
else:
raise AttributeError('Parameter "{}" not found'.format(p))
for func,args in TrF[p]:
try:
v = getattr(v,func)(*args)
except IndexError as e:
v = 'Not In File'+func
break
vals.append(v)
values.append(vals)
else:
df = DataFile(file)
for p in parameters:
try:
vals.append(getattr(df,p))
except:
if 'sample' in p:
try:
vals.append(getattr(df.sample,p.replace('sample','').lower()))
except:
vals.append('Not Found :S')
else:
try:
vals.append(getattr(df,p.lower()))
except:
vals.append('Not Found :S')
values.append(vals)
return values
def getNX_class(x,y,attribute):
try:
variableType = y.attrs['NX_class']
except:
variableType = ''
if variableType==attribute:
return x
def getInstrument(file):
location = file.visititems(lambda x,y: getNX_class(x,y,b'NXinstrument'))
return file.get(location)
def extractData(files):
if not isinstance(files,list):
files = [files]
I = PointerArray('I',files)
Norm = PointerArray('Norm',files)
Monitor = PointerArray('Monitor',files)
energy = PointerArray('energy',files)
a3 = []
a4 = []
a3Off = PointerArray('a3Off',files)
a4Off = PointerArray('a4Off',files)
Ei = PointerArray('Ei',files)
instrumentCalibrationEf = PointerArray('instrumentCalibrationEf',files)
instrumentCalibrationA4 = PointerArray('instrumentCalibrationA4',files)
instrumentCalibrationEdges = PointerArray('instrumentCalibrationEdges',files)
qx = PointerArray('qx',files)
qy = PointerArray('qy',files)
H = PointerArray('H',files)
K = PointerArray('K',files)
L = PointerArray('L',files)
temperature = PointerArray('temperature',files)
#mask = []
scanParameters = []
scanParamValue = []
scanParamUnit = []
#mask = []
for datafile in files:
#mask.append(datafile.mask)
scanParameters.append(datafile.scanParameters)
scanParamValue.append(datafile.scanValues)
scanParamUnit.append(datafile.scanUnits)
if np.array(datafile.A3Off).shape == ():
datafile.A3Off = 0.0
a3.append(datafile.A3-datafile.A3Off)
if np.array(datafile.A4Off).shape == ():
datafile.A4Off = [0.0]
a4.append(datafile.A4-datafile.A4Off)
if files[0].type=='nxs':
return I,qx,qy,energy,Norm,Monitor,a3,a3Off,a4,a4Off,instrumentCalibrationEf,\
instrumentCalibrationA4,instrumentCalibrationEdges,Ei,scanParameters,scanParamValue,scanParamUnit,temperature,H,K,L
else:
return I,Monitor,a3,a3Off,a4,a4Off,instrumentCalibrationEf,\
instrumentCalibrationA4,instrumentCalibrationEdges,Ei,scanParameters,scanParamValue,scanParamUnit,temperature
def assertFile(file):
"""Make sure that file exists for methods to work"""
if not os.path.isfile(file):
df = DataFile(file.replace('.nxs','.hdf'))
con = df.convert(binning=8)
con.saveNXsqom(file)
def checkNICOS(f):
"""Check if open hdf file is NICOS"""
if not f.get('/entry/data/data') is None: # we have a NICOS file
return True
elif not f.get('/entry/data/counts') is None:
return False
else:
return None # Corresponding to an error message