import sys
#import warnings
import numpy as np
from difflib import SequenceMatcher
import functools
import logging
import math
from MJOLNIR.Marray import *
import os
import inspect
import matplotlib
import regex as re
# E = hbar^2k^2/(2m)
m = 1.67492749804e-27 # kg
hbar = 1.0545718e-34 # m^2kg/s
eV = 1.60218e-19
# sqrt(meV) to 1/Å
factorsqrtEK = 1/np.sqrt(hbar**2*(1e20)/(2*m*eV/1000)) #
MPLKwargs = ['agg_filter','alpha','animated','antialiased','aa','clip_box','clip_on','clip_path','color','c','contains','dash_capstyle','dash_joinstyle','dashes','drawstyle','figure','fillstyle','gid','label','linestyle','ls','linewidth','lw','marker','markeredgecolor','mec','markeredgewidth','mew','markerfacecolor','mfc','markerfacecoloralt','mfcalt','markersize','ms','markevery','path_effects','picker','pickradius','rasterized','sketch_params','snap','solid_capstyle','solid_joinstyle','transform','url','visible','xdata','ydata','zorder','linewidth','rasterized']
#Unused
def cutObject(func): # pragma: no cover
import MJOLNIR.Statistics.CutObject
@functools.wraps(func)
def newFunction(*args,**kwargs):
if 'internal' in kwargs:
if kwargs['internal'] is True:
del kwargs['internal']
return func(*args,**kwargs)
fittingKwargs = ['fitFunction','p0','costFunction','lock']
temp = {}
for fitKwarg in fittingKwargs:
if fitKwarg in kwargs:
temp[fitKwarg] = kwargs[fitKwarg]
del kwargs[fitKwarg]
else:
temp[fitKwarg] = None
ds = args[0]
if len(args)>1:
otherArgs = args[1:]
else:
otherArgs = None
co = MJOLNIR.Statistics.CutObject.CutObject(*func(*args,**kwargs),dataSet = ds,cutFunction = func,kwargs = kwargs,args = otherArgs)
for fitKwarg in fittingKwargs:
setattr(co,fitKwarg,temp[fitKwarg])
return co
return newFunction
[docs]def KwargChecker(function=None,include=None):
"""Function to check if given key-word is in the list of accepted Kwargs. If not directly therein, checks capitalization. If still not match raises error
with suggestion of closest argument.
Args:
- func (function): Function to be decorated.
Raises:
- AttributeError
"""
def KwargCheckerNone(func):
@functools.wraps(func)
def newFunc(*args,**kwargs):
argList = extractArgsList(func,newFunc,function,include)
checkArgumentList(argList,kwargs)
returnval = func(*args,**kwargs)
return returnval
newFunc._original = func
newFunc._include = include
newFunc._function = function
return newFunc
return KwargCheckerNone
def extractArgsList(func,newFunc,function,include):
N = func.__code__.co_argcount # Number of arguments with which the function is called
argList = list(newFunc._original.__code__.co_varnames[:N]) # List of arguments
if not function is None:
if isinstance(function,(list,np.ndarray)): # allow function kwarg to be list or ndarray
for f in function:
for arg in f.__code__.co_varnames[:f.__code__.co_argcount]: # extract all arguments from function
argList.append(str(arg))
else: # if single function
for arg in function.__code__.co_varnames[:function.__code__.co_argcount]:
argList.append(str(arg))
if not include is None:
if isinstance(include,(list,np.ndarray)):
for arg in include:
argList.append(str(arg))
else:
argList.append(str(include))
argList = list(set(argList)) # Cast to set to remove duplicates
argList.sort() # Sort alphabetically
return argList
def checkArgumentList(argList,kwargs):
notFound = []
for key in kwargs:
if key not in argList:
similarity = np.array([SequenceMatcher(None, key.lower(), x.lower()).ratio() for x in argList])
maxVal = np.max(similarity)
maxId = np.argmax(similarity)
notFound.append('Key-word argument "{}" not understood. Did you mean "{}"?'.format(key,argList[maxId]))
if len(notFound)>0:
if len(notFound)>1:
errorMsg = 'The following key-word arguments are not understood:\n'
errorMsg+='\n'.join(notFound)
else:
errorMsg = notFound[0]
error = AttributeError(errorMsg)
raise error
[docs]def my_timer_N(N=0): # pragma: no cover
"""Timer function to measure time consumbtion of function.
Kwargs:
- N (int): Number of itterations to perform.
Raises:
- AttributeError
"""
if N<0:
raise AttributeError('Number of runs need to be bigger or equal to 1 or equal to 0 for no timing, but {} given.'.format(N))
def my_timer(func):
import time
def newFunc(*args,**kwargs):
Time = []
if N ==0:
returnval = func(*args,**kwargs)
else:
for i in range(N):
startT = time.time()
returnval = func(*args,**kwargs)
stopT = time.time()
Time.append(stopT-startT)
if N>1:
print('Function "{}" took: {}s (\\pm{}s)'.format(func.__name__,np.mean(Time),np.std(Time)/np.sqrt(N)))
else:
print('Function "{}" took: {}s'.format(func.__name__,Time[0]))
return returnval
return newFunc
return my_timer
[docs]def beautifyArgs(args=(),kwargs={}): # pragma: no cover
"""Beautify arguments and keyword arguments. Returns formated string with arguments and
keyword argumenets seperated with commas as called in a function"""
returnStr = ''
if not args == ():
args = list(args)
returnStr =', '.join([str(x) if type(x)!=str else '"{}"'.format(x) for x in args])
if not kwargs == {}:
returnStr+=', '
if not kwargs == {}:
kwformat = ['{}={}'.format(key,kwargs[key]) if type(kwargs[key])!=str else '{}="{}"'.format(key,kwargs[key]) for key in kwargs]
returnStr+=', '.join([str(x) for x in kwformat])
return returnStr
def createLogger(self,name,stream=sys.stdout,level=logging.ERROR): # pragma: no cover
self._log = logging.getLogger(name)
self._log.setLevel(level)
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(level)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
self._log.addHandler(ch)
def logMethod(self,original_function): # pragma: no cover
@functools.wraps(original_function)
def new_function(*args,**kwargs):
self._log.info('Calling {}({})'.format(new_function.original_function.__name__,beautifyArgs(args,kwargs)))
try:
x = original_function(*args,**kwargs)
except Exception as WrapperError:
self._log.error('***** FAILED *****')
raise WrapperError
return x
new_function.original_function= original_function
return new_function
def logAttribute(self,original_attribute): # pragma: no cover
self._log.info('Calling attribute {}'.format(str(original_attribute)))
def logClass(parent=None,log=__name__,stream=sys.stdout,level=logging.CRITICAL): # pragma: no cover
if parent is None:
parent = object
def track_all_class_methods(Cls):
class NewCls(parent):
def __init__(self,*args,**kwargs):
createLogger(self,name=log,stream=stream,level=level)
self._log.info('Calling {}({})'.format(Cls.__name__,beautifyArgs(args,kwargs)))
self._Instance = Cls(*args,**kwargs)
def __getattribute__(self,s):
"""
this is called whenever any attribute of a NewCls object is accessed. This function first tries to
get the attribute off NewCls. If it fails then it tries to fetch the attribute from self.oInstance (an
instance of the decorated class). If it manages to fetch the attribute from self.oInstance, and
the attribute is an instance method then `time_this` is applied.
"""
try:
x = super(NewCls,self).__getattribute__(s)
skip = True
except AttributeError:
x = self._Instance.__getattribute__(s)
skip = False
else:
pass
finally:
if skip and s[0]!='_':
if type(x) == type(self.__init__):# and x != self.__init__: # it is an instance method
return logMethod(self,x) # this is equivalent of just decorating the method with logMethod
pass
else:
logAttribute(self,s)
return x
def _printLog(self):
pass
#print('\n'.join([x for x in self._log]))
d = Cls.__dict__
keys = d.keys()
copy = [x for x in list(Cls.__dict__) if not x in list(NewCls.__dict__)]
for key in copy+['__doc__']:
if key=='__dict__' or key in ['__getattribute__','__init__']: # Check if the value in dict is a function
continue
if type(d[key])==type(Cls.__init__):
if hasattr(d[key],'_original'):
setattr(NewCls, key, KwargChecker(d[key]._function,d[key]._include)(d[key]._original))
else:
setattr(NewCls, key, d[key])
for funckey in d[key].__dict__:
setattr(d[key],funckey,d[key].__dict__[funckey])
else:
setattr(NewCls, key, d[key])
return NewCls
return track_all_class_methods
[docs]def binEdges(values,tolerance,startPoint=None,endPoint=None):
"""Generate binning of values array with minimum bin size of tolerance. Binning starts at values[0]-tolerance/2.0 and ends at values[-1]+tolerance/2.0.
Args:
- values (array): 1D array to be binned.
- tolerance (float): Minimum length of bin sizes.
Kwargs:
- startPoint (float): Minimum position from wicht to start (default None)
- endPoint (float): Maximal end bin position (default None)
Returns:
- bins (array)
"""
values_array = np.array(values).ravel().flatten()
unique_values = np.asarray(list(set(values_array)))
unique_values.sort()
if len(unique_values)==0:
return []
bin_edges = [unique_values[0] - tolerance * 0.1]
add = 1
current = 0
while current<len(unique_values) - 1:
add=1
broken = False
while (unique_values[current+add]+unique_values[current+add-1])*0.5 - bin_edges[-1] < tolerance:
if current+add < len(unique_values)-1:
add+=1
else:
broken=True
break
if not broken:
bin_edges.append((unique_values[current+add-1] + unique_values[current+add]) / 2)
current+=add
if unique_values[-1]-bin_edges[-1]< 1.1*tolerance:
bin_edges.append(bin_edges[-1]+tolerance)
else:
bin_edges.append(unique_values[-1]+0.1*tolerance)
bin_edges = np.array(bin_edges)
if not endPoint is None:
if endPoint-bin_edges[-1]<tolerance:
bin_edges = np.concatenate([bin_edges[:np.sum(bin_edges<endPoint)],[endPoint]])
if not startPoint is None:
if bin_edges[0]-startPoint<tolerance or bin_edges[0]<startPoint:
bin_edges = np.concatenate([[startPoint],bin_edges[np.sum(bin_edges<startPoint):]])
return bin_edges
def without_keys(dictionary, keys): # Remove key word argument from kwargs
return {x: dictionary[x] for x in dictionary if x not in keys}
[docs]@KwargChecker()
def fileListGenerator(numberString,folder,year=2018, format = None, instrument = 'CAMEA'):
"""Function to generate list of data files.
Args:
- numberString (str): List if numbers separated with comma and dashes for sequences.
- folder (str): Folder of wanted data files.
Kwargs:
- year (int): Year of wanted data files (default 2018)
- format (str): format of data files (default None, but CAMEA if instrument is provided)
- instrument (str): Instrument to be used to determine format string (default CAMEA)
returns:
- list of strings: List containing the full file string for each number provided.
Example:
>>> numberString = '201-205,207-208,210,212'
>>> files = fileListGenerator(numberString,'data/',2018)
['data/camea2018n000201.hdf', 'data/camea2018n000202.hdf',
'data/camea2018n000203.hdf', 'data/camea2018n000204.hdf',
'data/camea2018n000205.hdf', 'data/camea2018n000207.hdf',
'data/camea2018n000208.hdf', 'data/camea2018n000210.hdf',
'data/camea2018n000212.hdf']
"""
splits = numberString.split(',')
dataFiles = []
if format is None: # If no user specified format is provided
if instrument == 'CAMEA':
format = 'camea{:d}n{:06d}.hdf'
elif instrument == 'MultiFLEXX':
format = '{1:06d}'
elif instrument.lower() == 'flatcone':
format = '{1:06d}'
else:
raise AttributeError('Provided instrument "{}" not understood'.format(instrument))
for sp in splits:
isRange = sp.find('-')!=-1
if isRange:
spSplits = sp.split('-')
if len(spSplits)>2:
raise AttributeError('Sequence "{}" not understood - too many dashes.'.format(sp))
startNumber = int(spSplits[0])
endNumber = int(spSplits[1])
if startNumber>endNumber:
numbers = np.arange(startNumber,endNumber-1,-1)
else:
numbers = np.arange(startNumber,endNumber+1)
else:
numbers = [int(sp)]
dataFiles.append([os.path.join(folder,format.format(year,x)) for x in numbers])
return list(np.concatenate(dataFiles))
[docs]def RoundBinning(X,deltas,Data=None):
""" Bin points to nearest delta value in D dimensions and reorder Data.
Args:
- X (np.array): Input points in shape (D,N) for D dimensions.
- deltas (float or list): binning size(s)
Kwargs:
- Data (list/array): List of data or data list of shape (N) to be binned like X (default None).
Returns:
- BinnedPoints (list): ND list of binned unique positions of points
- indices (list): Inverse indices from which points-array can be created from unique points
- count (list): Number of points going into each bin
(- returnData: Rebinned data as according to points being binned)
Algorithm takes the data points and rebins them into points closest to delta in N dimensions. If deltas=[0.05,0.05] and 2D points are given, points will be binned to closest 0.05 (...-0.1,-0.05,0.0,0.05,0.1...) in both directions. Data lists are also reshuffeled to match.
"""
points = np.array(X)
deltas = np.array([deltas]).squeeze()
if len(deltas.shape)==0 and points.shape[0]!=1:
binning = np.ones([points.shape[0],1])*deltas
else:
binning = np.array([deltas]).reshape(-1,1)
if not points.shape[0]==binning.shape[0]:
raise AttributeError('Shape mismatch between X and deltas. Expected {} deltas for X with shape {}, recieved {}'.format(points.shape[0],points.shape,deltas))
binningLog = np.log10(binning)
binningOrder = np.floor(binningLog)
binningScale = np.power(10,(binningOrder-binningLog)).reshape(-1,1)
Scale = np.multiply(points,binningScale)
binPoints,indices,count = np.unique(np.round(Scale*np.power(10,-binningOrder)),axis=1,return_inverse=True,return_counts=True)
BinnedPoints = binPoints*np.power(10,binningOrder)/binningScale
if not Data is None:
if isinstance(Data,list):
returnData = [np.histogram(indices,bins=BinnedPoints.shape[1],weights=x)[0] for x in Data]
else:
returnData = np.histogram(indices,bins=BinnedPoints.shape[1],weights=Data)[0]
return BinnedPoints,indices,count,returnData
return BinnedPoints,indices,count
[docs]def invert(M):
"""Invert non-square matrices as described on https://en.wikipedia.org/wiki/Generalized_inverse.
Args:
- M (matrix): Matrix in question.
Returns:
- Left or right inverse matrix depending on shape of provided matrix.
"""
s = M.shape
if s[0]>s[1]:
return np.dot(np.linalg.inv(np.dot(M.T,M)),M.T)
else:
return np.dot(M.T,np.linalg.inv(np.dot(M,M.T)))
def overWritingFunctionDecorator(overWritingFunction):
def overWriter(func):
return overWritingFunction
return overWriter
[docs]def calRefVector(points):# pragma: no cover
""" Calcualte reference vector as vector pointing from mean point to geometric center. For half moon shape this is anti-radially.
Args:
- points (list): list of points for which reference vector is calcualted, shape is 2xN
Returns:
vector: Reference vector
"""
center = np.mean(points,axis=1).reshape(2,1)
argMinDist = np.argmin(np.linalg.norm(center-points,axis=0))
return center-points[:,argMinDist].reshape(2,1),center
[docs]def minMax(x,axis=None):
"""Return minimal and maximal of list.
Args:
- x (list): Object from which min and max is to be found.
Kwargs:
- axis (int): Axis or axes along which to operate (default 0)
Returns:
- min: Minimal value
- max: Maximal value
"""
return np.min(x,axis=axis),np.max(x,axis=axis)
[docs]def unitVector(v):
"""Returns vector of unit length"""
return v/np.linalg.norm(v)
def rotate2X(v):
if np.isclose(v[2]/np.linalg.norm(v),1): # v is along z
return rotMatrix([0,1,0],np.pi/2,deg=False)
# Find axis perp to v and proj v into x-y plane -> rotate 2 plane and then to x
vRotInPlane = np.array([-v[1],v[0],0])
vPlan = np.array([v[0],v[1],0])
## TODO: Check this!
#if np.isclose(np.dot(v,vPlan)/(np.linalg.norm(v)*np.linalg.norm(vPlan)),1.0):
# return rotMatrix([1,0,0],0.0)
theta = np.arccos(np.dot(v,vPlan)/(np.linalg.norm(v)*np.linalg.norm(vPlan)))
R = rotMatrix(vRotInPlane,theta,deg=False)
v2 = np.dot(R,v)
theta2 = np.arccos(np.dot(v2,np.array([1,0,0]))/np.linalg.norm(v2))
R2 = rotMatrix(np.array([0,0,1.0]),-theta2,deg=False)
Rotation = np.dot(R2,R)
return Rotation
def LengthOrder(v):
nonZeroPos = np.logical_not(np.isclose(v,0.0))
if np.sum(nonZeroPos)==1:
Rv = v/np.linalg.norm(v)
return Rv
if np.sum(nonZeroPos)==0:
raise AttributeError('Provided vector is zero vector!')
if np.sum(nonZeroPos)==3:
v1 = Norm2D(v[:2])
ratio = v1[0]/v[0]
v2 = Norm2D(np.array([v1[0],v[2]*ratio]))
ratio2 = v2[0]/v1[0]
Rv = np.array([v2[0],v1[1]*ratio2,v2[1]])
else:
Rv = np.zeros(3)
nonZeros = v[nonZeroPos]
Rv[nonZeroPos] = Norm2D(nonZeros)
if not np.isclose(np.dot(Rv,v)/(np.linalg.norm(Rv)*np.linalg.norm(v)),1.0):
raise AttributeError('The found vector is not parallel to original vector: {}, {}',format(Rv,v))
return Rv
def Norm2D(v):
reciprocal = np.abs(1/v)
if np.isclose(reciprocal[0],reciprocal[1]):
return v*reciprocal[0]
ratio = np.max(reciprocal)/np.min(reciprocal)
if np.isclose(np.mod(ratio,1),0.0) or np.isclose(np.mod(ratio,1),1.0):
return v*np.min(reciprocal)*ratio
else:
return v
[docs]def rotMatrix(v,theta,deg=True):
""" Generalized rotation matrix.
Args:
- v (list): Rotation axis around which matrix rotates
- theta (float): Rotation angle (by default in degrees)
Kwargs:
- deg (bool): Whether or not angle is in degrees or radians (Default True)
Returns:
- 3x3 matrix rotating points around vector v by amount theta.
"""
if deg==True:
theta = np.deg2rad(theta.copy())
v/=np.linalg.norm(v)
m11 = np.cos(theta)+v[0]**2*(1-np.cos(theta))
m12 = v[0]*v[1]*(1-np.cos(theta))-v[2]*np.sin(theta)
m13 = v[0]*v[2]*(1-np.cos(theta))+v[1]*np.sin(theta)
m21 = v[0]*v[1]*(1-np.cos(theta))+v[2]*np.sin(theta)
m22 = np.cos(theta)+v[1]**2*(1-np.cos(theta))
m23 = v[1]*v[2]*(1-np.cos(theta))-v[0]*np.sin(theta)
m31 = v[0]*v[2]*(1-np.cos(theta))-v[1]*np.sin(theta)
m32 = v[1]*v[2]*(1-np.cos(theta))+v[0]*np.sin(theta)
m33 = np.cos(theta)+v[2]**2*(1-np.cos(theta))
return np.array([[m11,m12,m13],[m21,m22,m23],[m31,m32,m33]])
[docs]def Rot(theta,deg=True):
"""Create 2D rotation matrix
Args:
- theta (float): Rotation angle (by default in degrees)
Kwargs:
- deg (bool): Whether or not number provided is degree or radian (default True)
Returns:
- 2x2 rotation matrix definde with -sin in row 0 column 1
"""
if deg==True:
theta = np.deg2rad(theta)
return np.array([[np.cos(theta),-np.sin(theta)],[np.sin(theta),np.cos(theta)]])
[docs]def clockwiseangle_and_distance(point,origin=[0,0],refvec = [0,1]): # pragma: no cover
"""Sort points clockwise. Taken from https://stackoverflow.com/questions/41855695/sorting-list-of-two-dimensional-coordinates-by-clockwise-angle-using-python
Args:
- point (list): List of points in 2D of size 2xN
Kwargs:
- origin (list): Location of origin from which the points are to be sorted (default [0,0])
- refvec (list): Vector direction for definition of zero point (default [0,1])
"""
# Vector between point and the origin: v = p - o
vector = [point[0]-origin[0], point[1]-origin[1]]
# Length of vector: ||v||
lenvector = math.hypot(vector[0], vector[1])
# If length is zero there is no angle
if lenvector == 0:
return -math.pi, 0
# Normalize vector: v/||v||
normalized = [vector[0]/lenvector, vector[1]/lenvector]
dotprod = normalized[0]*refvec[0] + normalized[1]*refvec[1] # x1*x2 + y1*y2
diffprod = refvec[1]*normalized[0] - refvec[0]*normalized[1] # x1*y2 - y1*x2
angle = math.atan2(diffprod, dotprod)
# Negative angles represent counter-clockwise angles so we need to subtract them
# from 2*pi (360 degrees)
if angle < 0:
return 2*math.pi+angle, lenvector
# I return first the angle because that's the primary sorting criterium
# but if two vectors have the same angle then the shorter distance should come first.
return angle, lenvector
@KwargChecker()
def rotationMatrix(alpha,beta,gamma,format='deg'):
if format=='deg':
alpha = np.deg2rad(alpha)
beta = np.deg2rad(beta)
gamma = np.deg2rad(gamma)
Rx = np.array([[1,0,0],[0,np.cos(alpha),-np.sin(alpha)],[0,np.sin(alpha),np.cos(alpha)]])
Ry = np.array([[np.cos(beta),0,np.sin(beta)],[0,1,0],[-np.sin(beta),0,np.cos(beta)]])
Rz = np.array([[np.cos(gamma),-np.sin(gamma),0],[np.sin(gamma),np.cos(gamma),0],[0,0,1]])
return np.dot(Rz,np.dot(Ry,Rx))
[docs]@KwargChecker()
def vectorAngle(V1,V2):
"""calculate angle between V1 and V2.
Args:
- V1 (list): List or array of numbers
- V2 (list): List or array of numbers
Return:
- theta (float): Angle in degrees between the two vectors
"""
return np.arccos(np.dot(V1,V2.T)/(np.linalg.norm(V1)*np.linalg.norm(V2)))
# TODO: Write a test/update this function
[docs]def writeToSpinWFile(file,position,spinWaveEnergy,spinWaveWidth,spinWaveAmplitude,EMin,EMax,spinWaveEnergyErr=None): # pragma: no cover
"""Write fitted values for spin wave(s) into a SpinW readable format.
Args:
- files (string): File into which the spin waves is to be saved
- position (3D vector, 3 x m): HKL position of spin wave(s)
- spinWaveEnergy (array, n x m): Array with energy position of spin wave. For multiple spin waves fill with 0 if wave not found
- spinWaveWidth (array, n x m): Standard deviation of spin wave(s). For multiple spin waves fill with 0 if wave not found
- spinWaveAmplitude (array, n x m): Amplitude of spin wave(s). For multiple spin waves fill with 0 if wave not found
- EMin (float): Lowest energy measured in data [meV]
- EMin (float): Highest energy measured in data [meV]
n is the number of spin waves
m is the number of data points measured
"""
spinWaveEnergy = np.asarray(spinWaveEnergy)
if not spinWaveEnergyErr is None:
spinWaveEnergyErr = np.asarray(spinWaveEnergyErr)
if not spinWaveEnergy.shape == spinWaveEnergyErr.shape:
raise AttributeError('Arrays for spinWaveEnergy(shape: {}), spinWaveEnergyErr(shape: {}) have to have same shape.'.format(spinWaveEnergy.shape,spinWaveEnergyErr.shape))
spinWaveWidth = np.asarray(spinWaveWidth)
spinWaveAmplitude = np.asarray(spinWaveAmplitude)
position = np.asarray(position)
if not np.all([spinWaveEnergy.shape == spinWaveWidth.shape,spinWaveEnergy.shape == spinWaveAmplitude.shape]):
raise AttributeError('Arrays for spinWaveEnergy(shape: {}), spinWaveWidth(shape: {}), and spinWaveAmplitude(shape: {}) have to have same shape.'.format(spinWaveEnergy.shape,spinWaveWidth.shape,spinWaveAmplitude.shape))
if len(spinWaveEnergy.shape) == 1:
spinWaveEnergy.shape = (1,-1)
spinWaveWidth.shape = (1,-1)
spinWaveAmplitude.shape = (1,-1)
spinWaves,dataPoints = spinWaveEnergy.shape
if not position.shape == (3,dataPoints):
raise AttributeError('With provided spin wave parameters, expected position of shape (3,{}) but recieved {}.'.format(dataPoints,position.shape))
if not spinWaveEnergyErr is None:
columns = 4
else:
columns = 3
dataMatrix = np.zeros((5+columns*spinWaves,dataPoints))
dataMatrix[:3,:] = position
dataMatrix[3,:] = EMin
dataMatrix[4,:] = EMax
energyIndices = np.array([6+x*columns for x in range(spinWaves)],dtype=int)
widthIndices = np.array([7+x*columns for x in range(spinWaves)],dtype=int)
amplitudeIndices = np.array([5+x*columns for x in range(spinWaves)],dtype=int)
if not spinWaveEnergyErr is None:
errorIndices = np.array([8+x*columns for x in range(spinWaves)],dtype=int)
dataMatrix[errorIndices,:] = spinWaveEnergyErr
dataMatrix[energyIndices,:] = spinWaveEnergy
dataMatrix[widthIndices,:] = spinWaveWidth
dataMatrix[amplitudeIndices,:] = spinWaveAmplitude
headers = ['QH','QK','QL','ENlim1','ENlim2']
if not spinWaveEnergyErr is None:
headers+=list(np.concatenate([['I{}'.format(x+1),'EN{}'.format(x+1),'sigma{}'.format(x+1),'EErr{}'.format(x+1)] for x in range(spinWaves)]))
else:
headers+=list(np.concatenate([['I{}'.format(x+1),'EN{}'.format(x+1),'sigma{}'.format(x+1)] for x in range(spinWaves)]))
titles = ''.join(['{:>14s} '.format(x) for x in headers])
np.savetxt(file,dataMatrix.T,fmt='%+.7e',delimiter = ' ', header=titles,comments='')
[docs]def generateLabel(vec,labels=['H','K','L']):
"""Format a scattering vector with individual letters.
Args:
- vec (array): Vector to be formated.
Kwargs:
- lables (list): Letters to use for formating (default ['H','K','L'])
"""
# Individual letters
integers = np.isclose(np.abs(vec)-np.floor(np.abs(vec)),0.0,atol=1e-4)
signs = np.sign(vec)
vec = np.abs(vec)
zeros = np.isclose(vec,0.0,atol=1e-4)
ones = np.isclose(vec,1.0,atol=1e-4)
label = []
for l,i,s,z,o,v in zip(labels,integers,signs,zeros,ones,vec):
sign= '-' if s==-1 else ''
if not i: # not an integer
label.append(''.join([sign,str(v),l]))
else: # It is an integer
if z: # if zero
label.append('0')
elif o: # if one
label.append(''.join([sign,l]))
else: # if integer different from 0 or +- 1
label.append(''.join([sign,str(int(v)),l]))
returnLabel = ''.join(['(',', '.join([x for x in label]),')'])
return returnLabel
[docs]def generateLabelDirection(vec,labels=['H','K','L']):
"""Format a scattering vector with letters according to the first non-zero direction.
Args:
- vec (array): Vector to be formated.
Kwargs:
- lables (list): Letters to use for formating (default ['H','K','L'])
"""
# Individual letters
integers = np.isclose(np.abs(vec)-np.floor(np.abs(vec)),0.0,atol=1e-4)
signs = np.sign(vec)
vec = np.abs(vec)
zeros = np.isclose(vec,0.0,atol=1e-4)
ones = np.isclose(vec,1.0,atol=1e-4)
label = []
L = ''
for l,i,s,z,o,v in zip(labels,integers,signs,zeros,ones,vec):
sign= '-' if s==-1 else ''
if not i: # not an integer
if L == '':
L= l
label.append(''.join([sign,str(v),L]))
else: # It is an integer
if z: # if zero
label.append('0')
elif o: # if one
if L == '':
L = l
label.append(''.join([sign,L]))
else: # if integer different from 0 or +- 1
if L == '':
L = l
label.append(''.join([sign,str(int(v)),L]))
returnLabel = ''.join(['(',', '.join([x for x in label]),')'])
return returnLabel
symbols = (
'H', 'D', 'He', 'Li', 'Be', 'B', 'C', 'N', 'O', 'F', 'Ne', 'Na', 'Mg', 'Al',
'Si', 'P', 'S', 'Cl', 'Ar', 'K', 'Ca', 'Sc', 'Ti', 'V', 'Cr', 'Mn', 'Fe',
'Co', 'Ni', 'Cu', 'Zn', 'Ga', 'Ge', 'As', 'Se', 'Br', 'Kr', 'Rb', 'Sr',
'Y', 'Zr', 'Nb', 'Mo', 'Tc', 'Ru', 'Rh', 'Pd', 'Ag', 'Cd', 'In', 'Sn',
'Sb', 'Te', 'I', 'Xe', 'Cs', 'Ba', 'La', 'Ce', 'Pr', 'Nd', 'Pm', 'Sm',
'Eu', 'Gd', 'Tb', 'Dy', 'Ho', 'Er', 'Tm', 'Yb', 'Lu', 'Hf', 'Ta', 'W',
'Re', 'Os', 'Ir', 'Pt', 'Au', 'Hg', 'Tl', 'Pb', 'Bi', 'Po', 'At', 'Rn',
'Fr', 'Ra', 'Ac', 'Th', 'Pa', 'U', 'Np', 'Pu', 'Am', 'Cm', 'Bk', 'Cf',
'Es', 'Fm', 'Md', 'No', 'Lr', 'Rf', 'Db', 'Sg', 'Bh', 'Hs', 'Mt', 'Ds',
'Rg', 'Cn', 'Uut', 'Fl', 'Uup', 'Lv', 'Uus', 'Uuo'
)
# Molar mass of natural compund
_relative_atomic_masses = (
"1.008 2.0(1) 4.002602(2) 6.94 9.0121831(5) 10.81 12.011 14.007 15.999"
" 18.998403163(6) 20.1797(6) 22.98976928(2) 24.305 26.9815385(7) 28.085"
" 30.973761998(5) 32.06 35.45 39.948(1) 39.0983(1) 40.078(4)"
" 44.955908(5) 47.867(1) 50.9415(1) 51.9961(6) 54.938044(3) 55.845(2)"
" 58.933194(4) 58.6934(4) 63.546(3) 65.38(2) 69.723(1) 72.630(8)"
" 74.921595(6) 78.971(8) 79.904 83.798(2) 85.4678(3) 87.62(1)"
" 88.90584(2) 91.224(2) 92.90637(2) 95.95(1) [98] 101.07(2) 102.90550(2)"
" 106.42(1) 107.8682(2) 112.414(4) 114.818(1) 118.710(7) 121.760(1)"
" 127.60(3) 126.90447(3) 131.293(6) 132.90545196(6) 137.327(7)"
" 138.90547(7) 140.116(1) 140.90766(2) 144.242(3) [145] 150.36(2)"
" 151.964(1) 157.25(3) 158.92535(2) 162.500(1) 164.93033(2) 167.259(3)"
" 168.93422(2) 173.045(10) 174.9668(1) 178.49(2) 180.94788(2) 183.84(1)"
" 186.207(1) 190.23(3) 192.217(3) 195.084(9) 196.966569(5) 200.592(3)"
" 204.38 207.2(1) 208.98040(1) [209] [210] [222] [223] [226] [227]"
" 232.0377(4) 231.03588(2) 238.02891(3) [237] [244] [243] [247] [247]"
" [251] [252] [257] [258] [259] [266] [267] [268] [269] [270] [269]"
" [278] [281] [282] [285] [286] [289] [289] [293] [294] [294]"
)
[docs]def calculateMolarMass(sampleChemicalFormula,formulaUnitsPerUnitCell=1,returnElements=False):
"""Calculate Molar mass given chemical formula and number of formula units per cell
Args:
- sampleChemicalFormula (string): Chemical formula
Kwargs:
- formulaUnitsPerUnitCell (int): Number of units per unitcell (default 1)
- returnElements (bool): If true return also list of elements
Returns:
- MolarMass (float)
Raises:
- AttributeError
Based on https://stackoverflow.com/questions/18517779/make-outer-tokens-change-inner-tokens-in-a-chemical-formula-using-pyparsing/18555142#18555142
with the added feature of floats being allowed
"""
# Check if number of starting and ending parentheses match
start = sampleChemicalFormula.count('(')
stop = sampleChemicalFormula.count(')')
if not start==stop:
raise AttributeError('The number of starting and closing parenthesis do not match. Got {} "(" and {} ")"'.format(start,stop))
# Combination of https://stackoverflow.com/questions/26385984/recursive-pattern-in-regex
# and (\d*\.\d+|\d+) made to find integers or floats.
# Finds outer most parentheses
recursiveParenthesis = re.compile(r'\(((?>[^\(\)]+|(?R))*)\)(\d*\.\d+|\d+)?')
def splitParentheses(string,multiplyer):
results = [(x[0],float(x[1])*multiplyer) if x[1]!='' else (x[0],multiplyer) for x in recursiveParenthesis.findall(string)]
if len(results) == 0:
return string
results = [(r[0],int(r[1])) if np.isclose(math.modf(r[1])[0],0.0) else r for r in results]
return results
# Remove a result from chemical string
def replace(string,value,multi):
if np.isclose(multi,1.0):
replacing = "("+value+")"
else:
replacing = "("+value+")"+str(multi)
string = string.replace(replacing,'')
return string
def recursiveFinder(string,m=1.0):
returnValues = []
split = splitParentheses(string,m)
if isinstance(split,str):
return [(split,m)]
for inner in splitParentheses(string,m):
string = replace(string,*inner)
if '(' in inner[0]:
# print('inner = ',inner)
[returnValues.append((x[0],x[1]*inner[1])) for x in recursiveFinder(inner[0],1)]
else:
returnValues.append(inner)
returnValues.append((string,m))
return returnValues
# Split chemical formula at perentheses and update multiplyer
splitted = recursiveFinder(sampleChemicalFormula)
# Find elements in all strings and create dictionary to hold total number
elementPattern = r"([A-Z][a-z]*)([-+]?\d*\.\d+|\d+)?"
element = re.compile(elementPattern)
elements = {} # Dictionary to hold sample composition
for part,mult in splitted:
for elem in element.findall(part):
m = float(mult)*(float(elem[1]) if elem[1]!='' else 1)
if not elem[0] in elements:
elements[elem[0]]=m
else:
elements[elem[0]]+=m
# Convert chemical symbol into mass
def _get_relative_atomic_masses():
for mass in _relative_atomic_masses.split():
if mass.startswith('[') and mass.endswith(']'):
yield float(mass[1:-1])
elif '(' in mass:
yield float(mass.split('(')[0])
else:
yield(float(mass))
relative_atomic_masses = tuple(_get_relative_atomic_masses())
sampleMolarMass = 0.0
error = False
for element in elements.items():
try:
idx = symbols.index(element[0])
except ValueError:
error = True
break
sampleMolarMass += relative_atomic_masses[idx]*element[1]
if error:
raise AttributeError('Element "{}" not recognized...'.format(element[0]))
sampleMolarMass*=formulaUnitsPerUnitCell
for key in elements.keys():
elements[key] *= formulaUnitsPerUnitCell
if returnElements:
return sampleMolarMass,elements
return sampleMolarMass
[docs]def calculateAbsoluteNormalization(sampleMass=None,sampleChemicalFormula=None,sampleMolarMass=None,formulaUnitsPerUnitCell=1,
sampleGFactor=2,correctVanadium=False,vanadiumChemicalFormula = 'V', vanadiumMass=15.25,vanadiumMolarMass=None,
vanadiumMonitor=100000,vanadiumSigmaIncoherent=5.08,vanadiumGFactor=2.0,vanadiumUnitsPerUnitCell=1.0):
"""Calculate absolute normalization relative to Vanadium
Args:
- sampleMass (float): Mass of sample in grams
Kwargs:
- sampleChemicalFormula (string): Chemical formula
- sampleMolarMass (float): Molar mass of sample in g/mol
- formulaUnitsPerUnitCell (float): Number of units per unit cell (default 1)
- sampleGFactor (float): Magnetic G factor for sample (defalt 2.0)
- sampleDebyeWaller (float): DebyeWaller factor of sample (default 1)
- correctVanadium (bool): Whether to scale normalization with Vanadium or if this has been performed in normalziation tables (default False)
- vanadiumMass (float): Mass of vanadium used in normalization in gram (default 15.25)
- vanadiumMolarMass (float): Molar mass of vanadium (default None)
- vanadiumMonitor (int): Monitor count used in normalization scan (default 100000)
- vanadiumSigmaIncoherent (float): Incoherent scattering strength of Vanadium (default 5.08)
Returns:
- normalizationFactor (float): Relative normalization of sample to Vanadium scan
"""
if not sampleMass is None: # No normalization to sample!
if sampleMolarMass is None:
sampleMolarMass = calculateMolarMass(sampleChemicalFormula=sampleChemicalFormula,
formulaUnitsPerUnitCell=formulaUnitsPerUnitCell)
else:
sampleMass = 1.0
sampleMolarMass = 1.0
sampleGFactor = 2.0
if correctVanadium:
if vanadiumMolarMass is None:
vanadiumMolarMass = calculateMolarMass(sampleChemicalFormula=vanadiumChemicalFormula,
formulaUnitsPerUnitCell=vanadiumUnitsPerUnitCell)
vanadiumFactor = vanadiumMolarMass/(vanadiumGFactor*vanadiumMass*vanadiumSigmaIncoherent*vanadiumMonitor)
else:
vanadiumFactor = 1.0
##########################
#Calculations
##########################
normalizationFactor = 4*np.pi*sampleMass*sampleGFactor*vanadiumFactor/(sampleMolarMass*13.77)
return normalizationFactor
[docs]def KWavelength(wavelength):
"""Calculate wave vector k vactor [1/AA] from wavelength [AA]"""
return np.reciprocal(wavelength)*2.0*np.pi
[docs]def WavelengthK(k):
"""Calcualte wavelength [AA] from wave vector k vactor [1/AA]"""
return np.reciprocal(k)*2.0*np.pi
# Calculate energy to k and reverse
[docs]def KEnergy(energy):
"""Calculate energy [meV] from length of k [1/AA]"""
return np.sqrt(energy)*factorsqrtEK
[docs]def EnergyK(k):
"""Calculate length of k [1/AA] from energy [meV]"""
return np.power(np.divide(k,factorsqrtEK),2.0)
# Calculate energy to wavelength and reverse
[docs]def WavelengthEnergy(energy):
"""Calculate energy [meV] from wavelength [AA]"""
return WavelengthK(KEnergy(energy))
[docs]def EnergyWavelength(wavelength):
"""Calculate wavelength [AA] from energy [meV]"""
return EnergyK(KWavelength(wavelength))
# Calculate scattering angle from d
[docs]def ScatteringAngle(d,Energy=None,Wavelength=None,K=None,degrees = True):
"""Calculate scattering angle [deg or rad] from d [AA] and one of the following [Energy [meV], Wavelength [AA], K[1/AA]]."""
pars = np.array([not x is None for x in [Energy,Wavelength,K]],dtype=bool)
parsNames = np.array(['Energy','Wavelength','K'])
if np.all(pars==False): # No provided
raise AttributeError('None of the energy, wavelenght, or k were provided...')
elif np.sum(pars)>1: # more than one was provided..
raise AttributeError('More than one parameter was provided. Got {}'.format(', '.join(parsNames[pars])))
available = parsNames[pars][0]
if available == 'Energy':
Wavelength = WavelengthEnergy(Energy)
K = KEnergy(Energy)
elif available == 'Wavelength':
Energy = EnergyWavelength(Wavelength)
K = KWavelength(Wavelength)
elif available == 'K':
Energy = EnergyK(K)
Wavelength = WavelengthK(K)
scatAngle = 2.0*np.arcsin(Wavelength/(2.0*d))
if degrees == True:
scatAngle = np.rad2deg(scatAngle)
return scatAngle
[docs]def DSpacing(TwoTheta,Energy=None,Wavelength=None,K=None,degrees = True):
"""Calculate d spacing [AA] from scattering angle [deg or rad] and one of the following [Energy [meV], Wavelength [AA], K[1/AA]]."""
pars = np.array([not x is None for x in [Energy,Wavelength,K]],dtype=bool)
parsNames = np.array(['Energy','Wavelength','K'])
if np.all(pars==False): # No provided
raise AttributeError('None of the energy, wavelenght, or k were provided...')
elif np.sum(pars)>1: # more than one was provided..
raise AttributeError('More than one parameter was provided. Got {}'.format(', '.join(parsNames[pars])))
available = parsNames[pars][0]
if available == 'Energy':
Wavelength = WavelengthEnergy(Energy)
K = KEnergy(Energy)
elif available == 'Wavelength':
Energy = EnergyWavelength(Wavelength)
K = KWavelength(Wavelength)
elif available == 'K':
Energy = EnergyK(K)
Wavelength = WavelengthK(K)
if degrees is True:
TwoTheta = np.deg2rad(TwoTheta)
d = Wavelength/(2.0*np.sin(TwoTheta/2.0))
return d
# Scaling functions needed for adaptive binning of energies
def rescale(x,f=0.5):
return f*x*x*np.sign(x)
def scale(x,f=0.5):
return np.sqrt(np.abs(x)/f)*np.sign(x)
[docs]def getNext(l,delete=True):
"""get elements in itterator and delete list depending on flag
Args:
- l (itterable): List or liste-like to loop over
Kwargs: delete (bool): If true, itteratively delete elements in list, else keep elements (default True)
"""
if delete:
while len(l)>0:
yield l[0]
del l[0]
else:
for entry in l:
yield entry
[docs]def histogramdd(sample, bins, weights, returnCounts = False):
"""
Restricted version of numpys multidimensional histogram function.
Args:
- sample (n x m array): Position in m-dimensional space
- bins (m list): List of bins
- weights (list): List of weights where each entry has the length n
Kwargs:
- returnCounts (bool): if True return also number of entries in each bin (default False)
"""
try:
# Sample is an ND-array.
N, D = sample.shape
except (AttributeError, ValueError):
# Sample is a sequence of 1D arrays.
sample = np.atleast_2d(sample).T
N, D = sample.shape
try:
M = len(bins)
if M != D:
raise ValueError(
'The dimension of bins must be equal to the dimension of the '
' sample x.')
except TypeError:
# bins is an integer
bins = D*[bins]
nbin = np.empty(D, int)
edges = D*[None]
for i in range(D):
edges[i] = np.asarray(bins[i])
nbin[i] = len(edges[i])+1
# Compute the bin number each sample falls into.
Ncount = tuple(
# avoid np.digitize to work around gh-11022
np.searchsorted(edges[i], sample[:, i], side='right')
for i in range(D)
)
# Using digitize, values that fall on an edge are put in the right bin.
# For the rightmost bin, we want values equal to the right edge to be
# counted in the last bin, and not as an outlier.
for i in range(D):
# Find which points are on the rightmost edge.
on_edge = (sample[:, i] == edges[i][-1])
# Shift these points one bin to the left.
Ncount[i][on_edge] -= 1
# Compute the sample indices in the flattened histogram matrix.
# This raises an error if the array is too large.
xy = np.ravel_multi_index(Ncount, nbin)
# Compute the number of repetitions in xy and assign it to the
# flattened histmat.
histograms = []
for w in weights:
hist = np.bincount(xy, w, minlength=nbin.prod())
# Shape into a proper matrix
hist = hist.reshape(nbin)
# This preserves the (bad) behavior observed in gh-7845, for now.
hist = hist.astype(w.dtype)#, casting='safe')
# Remove outliers (indices 0 and -1 for each dimension).
core = D*(slice(1, -1),)
hist = hist[core]
histograms.append(hist)
if returnCounts:
hist = np.bincount(xy, minlength=nbin.prod())
# Shape into a proper matrix
hist = hist.reshape(nbin)
# This preserves the (bad) behavior observed in gh-7845, for now.
hist = hist.astype(float)#, casting='safe')
# Remove outliers (indices 0 and -1 for each dimension).
core = D*(slice(1, -1),)
hist = hist[core]
histograms.append(hist)
return histograms
[docs]class PointerArray():
"""Array-like object designed to facilitate data acquisition from a list of differently sized list of data files having the same attributes.
Args:
- attribute (str): Name of wanted attribute existing on the data
- datafiles (list): List of pointers to the data files
"""
def __init__(self,attribute,datafiles):
self._attribute = attribute
self._datafiles = datafiles
self._shape = None
self._multiD = None
def __getitem__(self,index):
gotten = self._datafiles[index]
if isinstance(gotten,type(self._datafiles[0])):
return getattr(gotten,self._attribute)
else:
return [getattr(df,self._attribute) for df in gotten]
def __iter__(self):
self._index=0
return self
def __next__(self):
if self._index >= len(self):
raise StopIteration
result = getattr(self._datafiles[self._index],self._attribute)
self._index += 1
return result
def next(self):
return self.__next__()
def __len__(self):
return len(self._datafiles)
@property
def shape(self):
return [getattr(df,self._attribute).shape for df in self._datafiles]
@property
def mask(self):
return [df.mask for df in self._datafiles]
@mask.setter
def mask(self,mask):
for m,df in zip(mask,self._datafiles):
df.mask = m
@property
def size(self):
return np.sum([getattr(df,self._attribute).size for df in self._datafiles])
def extractData(self):
if self._multiD is None: # State is unknown
self._multiD = len(getattr(self._datafiles[0],self._attribute).shape)>1 #
if self._multiD:
return np.concatenate([getattr(df,self._attribute)[np.logical_not(df.mask)] for df in self._datafiles])
else:
return np.concatenate([getattr(df,self._attribute)[np.logical_not(np.all(df.mask))] for df in self._datafiles])
@property
def data(self):
return np.concatenate([getattr(df,self._attribute) for df in self._datafiles])
def min(self):
return np.min(self.extractData())
def max(self):
return np.max(self.extractData())
[docs]def findFlattenedIndex(sample,bins):
"""Rewriting of numpy binning algorithm
Args:
- sample (list): Position to be binned, of size N x D, where D is the dimensionality
- bins (list): Bins into which to perform the binning, of length D, where D is the dimensionality
"""
_, D = sample.shape
#D = 2 # Two coordinates (|q| and E)
nbin = np.empty(D, int)
edges = D*[None]
for i in range(D):
edges[i] = np.asarray(bins[i])
nbin[i] = len(edges[i])+1
#for df in ds:
# Compute the bin number each sample falls into.
Ncount = tuple(
# avoid np.digitize to work around gh-11022
np.searchsorted(edges[i], sample[:, i], side='right')
for i in range(D)
)
# Using digitize, values that fall on an edge are put in the right bin.
# For the rightmost bin, we want values equal to the right edge to be
# counted in the last bin, and not as an outlier.
for i in range(D):
# Find which points are on the rightmost edge.
on_edge = (sample[:, i] == edges[i][-1])
# Shift these points one bin to the left.
Ncount[i][on_edge] -= 1
# Compute the sample indices in the flattened histogram matrix.
# This raises an error if the array is too large.
xy = np.ravel_multi_index(Ncount, nbin)
return xy