rewrite pointwise add function

- all vectorized: Much faster
- passing in all relevant information allows to do sanity checks and add
useful meta data in HDF5
- improved readability
This commit is contained in:
Martin Diehl 2019-09-13 06:02:42 -07:00
parent 3db3e9e762
commit 6f008c5d5f
1 changed files with 187 additions and 215 deletions

View File

@ -84,7 +84,7 @@ class DADF5():
self.mode = mode self.mode = mode
def get_candidates(self,l): def get_groups(self,l):
""" """
Get groups that contain all requested datasets. Get groups that contain all requested datasets.
@ -222,252 +222,224 @@ class DADF5():
""" """
def Cauchy(F,P): def Cauchy(F,P):
return np.einsum('i,ijk,ilk->ijl',1.0/np.linalg.det(F),F,P) return {
'data' : np.einsum('i,ijk,ilk->ijl',1.0/np.linalg.det(F['data']),F['data'],P['data']),
'label' : 'sigma',
'meta' : {
'Unit' : P['meta']['Unit'],
'Description' : 'Cauchy stress calculated from {} ({}) '.format(P['label'],P['meta']['Description'])+\
'and deformation gradient {} ({})'.format(F['label'],P['meta']['Description']),
'Creator' : 'dadf5.py:add_Cauchy vXXXXX'
}
}
args = [{'label':F,'shape':[3,3],'unit':'-'}, requested = [{'label':F,'arg':'F'},
{'label':P,'shape':[3,3],'unit':'Pa'} ] {'label':P,'arg':'P'} ]
result = {'label':'sigma',
'unit':'Pa',
'Description': 'Cauchy stress calculated from 1st Piola-Kirchhoff stress and deformation gradient'}
self.add_generic_pointwise_vectorized(Cauchy,args,None,result) self.__add_generic_pointwise(Cauchy,requested)
def add_Mises_stress(self,stress='sigma'): def add_Mises(self,x):
"""Adds equivalent von Mises stress.""" """Adds the equivalent Mises stres of a tensor."""
def Mises_stress(stress): def deviator(x):
dev = stress - np.trace(stress)/3.0*np.eye(3)
symdev = 0.5*(dev+dev.T)
return np.sqrt(np.sum(symdev*symdev.T)*3.0/2.0)
args = [{'label':stress,'shape':[3,3],'unit':'Pa'}] if x['meta']['Unit'] == 'Pa':
result = {'label':'Mises({})'.format(stress), factor = 3.0/2.0
'unit':'Pa', elif x['meta']['Unit'] == '-':
'Description': 'Equivalent Mises stress'} factor = 2.0/3.0
else:
ValueError
self.add_generic_pointwise(Mises_stress,args,result) d = x['data']
dev = d - np.einsum('ijk,i->ijk',np.broadcast_to(np.eye(3),[d.shape[0],3,3]),np.trace(d,axis1=1,axis2=2)/3.0)
dev_sym = (dev + np.einsum('ikj',dev))*0.5
return {
'data' : np.sqrt(np.einsum('ijk->i',dev_sym**2)*factor),
'label' : 'dev({})'.format(x['label']),
'meta' : {
'Unit' : x['meta']['Unit'],
'Description' : 'Mises equivalent stress of {} ({})'.format(x['label'],x['meta']['Description']),
'Creator' : 'dadf5.py:add_Mises_stress vXXXXX'
}
}
requested = [{'label':x,'arg':'x'}]
self.__add_generic_pointwise(deviator,requested)
def add_norm(self,x,ord=None): def add_norm(self,x,ord=None):
""" """
Adds norm of vector or tensor or magnitude of a scalar. Adds norm of vector or tensor or magnitude of a scalar.
See numpy.linalg.norm manual for details.
Todo
----
The output unit should be the input unit.
The ord parameter should be taken into account.
The whole thing should be vectorized. This requires to parse optional arguments to func.
""" """
args = [{'label':x,'shape':None,'unit':None}] def norm(x,ord):
result = {'label':'norm_{}({})'.format(str(ord),x),
'unit':'n/a',
'Description': 'Norm of vector or tensor or magnitude of a scalar. See numpy.linalg.norm manual for details'}
self.add_generic_pointwise(np.linalg.norm,args,result) if len(x['data'].shape) == 1:
axis = 0
t = 'scalar'
def add_determinant(self,a): elif len(x['data'].shape) == 2:
"""Adds the determinant of a tensor.""" axis = 1
# ToDo: The output unit should be the input unit t = 'vector'
args = [{'label':a,'shape':[3,3],'unit':None}] elif len(x['data'].shape) == 3:
result = {'label':'det({})'.format(a), axis = (1,2)
'unit':'n/a', t = 'tensor'
'Description': 'Determinant of a tensor'}
self.add_generic_pointwise_vectorized(np.linalg.det,args,result)
def add_spherical(self,a):
"""Adds the spherical component of a tensor."""
def spherical(m):
return (m[0,0]+m[1,1]+m[2,2])/3.0
# ToDo: The output unit should be the input unit
args = [{'label':a,'shape':[3,3],'unit':None}]
result = {'label':'sph({})'.format(a),
'unit':'n/a',
'Description': 'Spherical component of a tensor'}
self.add_generic_pointwise(spherical,args,result)
def add_deviator(self,a):
"""Adds the deviator of a tensor."""
def deviator(m):
return m - np.eye(3)*(m[0,0]+m[1,1]+m[2,2])/3.0
# ToDo: The output unit should be the input unit
args = [{'label':a,'shape':[3,3],'unit':'Pa'}]
result = {'label':'dev({})'.format(a),
'unit':'n/a',
'Description': 'Deviatoric component of a tensor'}
self.add_generic_pointwise(deviator,args,result)
def add_strain_tensors(self,defgrad='F'):
"""Adds a strain definition."""
def strain(defgrad):
(U,S,Vh) = np.linalg.svd(defgrad) # singular value decomposition
R_inv = np.dot(U,Vh).T # inverse rotation of polar decomposition
U = np.dot(R_inv,defgrad) # F = RU
U = np.where(abs(U) < 1e-12, 0, U) # kill nasty noisy data
(D,V) = np.linalg.eig(U) # eigen decomposition (of symmetric matrix)
neg = np.where(D < 0.0) # find negative eigenvalues ...
D[neg] *= -1. # ... flip value ...
V[:,neg] *= -1. # ... and vector
for i,eigval in enumerate(D):
if np.dot(V[:,i],V[:,(i+1)%3]) != 0.0: # check each vector for orthogonality
V[:,(i+1)%3] = np.cross(V[:,(i+2)%3],V[:,i]) # correct next vector
V[:,(i+1)%3] /= np.sqrt(np.dot(V[:,(i+1)%3],V[:,(i+1)%3].conj())) # and renormalize (hyperphobic?)
d = np.log(D) # operate on eigenvalues of U o r V
return np.dot(V,np.dot(np.diag(d),V.T)).real # build tensor back from eigenvalue/vector basis
# ToDo: The output unit should be the input unit
args = [{'label':defgrad,'shape':[3,3],'unit':None}]
result = {'label':'strain({})'.format(defgrad),
'unit':'-',
'Description': 'strain (ln(V)) of a deformation gradient'}
self.add_generic_pointwise(strain,args,result)
def get_fitting(self,data):
groups = []
if type(data) is not list:
print('mist')
with h5py.File(self.filename,'r') as f:
for g in self.get_candidates([l['label'] for l in data]):
print(g)
fits = True
for d in data: # ToDo: check for unit
if d['shape'] is not None:
fits = fits and np.all(np.array(f[g+'/'+d['label']].shape[1:]) == np.array(d['shape']))
if fits: groups.append(g)
return groups
def add_generic_pointwise(self,func,args,result):
"""
General function to add pointwise data.
function 'func' first needs to have data arguments before other arguments
Works for functions that are pointwise defined.
"""
groups = self.get_fitting(args)
def job(args):
out = args['out']
datasets_in = args['dat']
func = args['fun']
for i in range(out.shape[0]):
arg = tuple([d[i,] for d in datasets_in])
out[i,] = func(*arg)
args['results'].put({'out':out,'group':args['group']})
Nthreads = 4 # ToDo: should be a parameter
results = Queue(Nthreads+1)
todo = []
for g in groups:
with h5py.File(self.filename,'r') as f:
datasets_in = [f[g+'/'+u['label']][()] for u in args]
# figure out dimension of results
testArg = tuple([d[0,] for d in datasets_in]) # to call function with first point
out = np.empty([datasets_in[0].shape[0]] + list(func(*testArg).shape)) # shape is Npoints x shape of the results for one point
todo.append({'dat':datasets_in,'fun':func,'out':out,'group':g,'results':results})
# Instantiate a thread pool with worker threads
pool = util.ThreadPool(Nthreads)
missingResults = len(todo)
# Add the jobs in bulk to the thread pool. Alternatively you could use
# `pool.add_task` to add single jobs. The code will block here, which
# makes it possible to cancel the thread pool with an exception when
# the currently running batch of workers is finished
pool.map(job, todo[:Nthreads+1])
i = 0
while missingResults > 0:
r=results.get() # noqa
print(r['group'])
with h5py.File(self.filename,'r+') as f:
dataset_out = f[r['group']].create_dataset(result['label'],data=r['out'])
dataset_out.attrs['Unit'] = result['unit']
dataset_out.attrs['Description'] = result['Description']
dataset_out.attrs['Creator'] = 'dadf5.py v{}'.format('n/a')
missingResults-=1
try:
pool.add_task(job,todo[Nthreads+1+i])
except IndexError:
pass
i+=1
pool.wait_completion()
def add_generic_pointwise_vectorized(self,func,args,args2=None,result=None):
"""
General function to add pointwise data.
function 'func' first needs to have data arguments before other arguments
Works for vectorized functions.
"""
groups = self.get_fitting(args)
def job(args):
"""
A job. It has different args!
"""
print('args for job',args)
out = args['out']
datasets_in = args['dat']
func = args['fun']
# try:
# out = func(*datasets_in,*args['fun_args'])
# except:
out = func(*datasets_in)
args['results'].put({'out':out,'group':args['group']})
Nthreads = 4 # ToDo: should be a parameter
results = Queue(Nthreads+1)
todo = []
for g in groups:
with h5py.File(self.filename,'r') as f:
datasets_in = [f[g+'/'+u['label']][()] for u in args]
if args2 is not None:
todo.append({'dat':datasets_in,'fun':func,'group':g,'results':results,'func_args':args,'out':None})
else: else:
todo.append({'dat':datasets_in,'fun':func,'group':g,'results':results,'out':None}) raise ValueError
# Instantiate a thread pool with worker threads return {
pool = util.ThreadPool(Nthreads) 'data' : np.linalg.norm(x['data'],ord=ord,axis=axis,keepdims=True),
missingResults = len(todo) 'label' : 'norm({})'.format(x['label']),
'meta' : {
'Unit' : x['meta']['Unit'],
'Description' : 'Norm of {} {} ({})'.format(t,x['label'],x['meta']['Description']),
'Creator' : 'dadf5.py:add_norm vXXXXX'
}
}
requested = [{'label':x,'arg':'x'}]
self.__add_generic_pointwise(norm,requested,{'ord':ord})
pool.map(job, todo[:Nthreads+1]) def add_determinant(self,x):
i = 0 """Adds the determinant component of a tensor."""
while missingResults > 0: def determinant(x):
r=results.get() # noqa
print(r['group']) return {
with h5py.File(self.filename,'r+') as f: 'data' : np.linalg.det(x['data']),
dataset_out = f[r['group']].create_dataset(result['label'],data=r['out']) 'label' : 'det({})'.format(x['label']),
dataset_out.attrs['Unit'] = result['unit'] 'meta' : {
dataset_out.attrs['Description'] = result['Description'] 'Unit' : x['meta']['Unit'],
dataset_out.attrs['Creator'] = 'dadf5.py v{}'.format('n/a') 'Description' : 'Determinant of tensor {} ({})'.format(x['label'],x['meta']['Description']),
missingResults-=1 'Creator' : 'dadf5.py:add_determinant vXXXXX'
try: }
pool.add_task(job,todo[Nthreads+1+i]) }
except IndexError:
pass requested = [{'label':x,'arg':'x'}]
i+=1
self.__add_generic_pointwise(determinant,requested)
def add_spherical(self,x):
"""Adds the spherical component of a tensor."""
def spherical(x):
if not np.all(np.array(x['data'].shape[1:]) == np.array([3,3])):
raise ValueError
return {
'data' : np.trace(x['data'],axis1=1,axis2=2)/3.0,
'label' : 'sph({})'.format(x['label']),
'meta' : {
'Unit' : x['meta']['Unit'],
'Description' : 'Spherical component of tensor {} ({})'.format(x['label'],x['meta']['Description']),
'Creator' : 'dadf5.py:add_spherical vXXXXX'
}
}
requested = [{'label':x,'arg':'x'}]
self.__add_generic_pointwise(spherical,requested)
def add_deviator(self,x):
"""Adds the deviator of a tensor."""
def deviator(x):
d = x['data']
return {
'data' : d - np.einsum('ijk,i->ijk',np.broadcast_to(np.eye(3),[d.shape[0],3,3]),np.trace(d,axis1=1,axis2=2)/3.0),
'label' : 'dev({})'.format(x['label']),
'meta' : {
'Unit' : x['meta']['Unit'],
'Description' : 'Deviator of tensor {} ({})'.format(x['label'],x['meta']['Description']),
'Creator' : 'dadf5.py:add_deviator vXXXXX'
}
}
requested = [{'label':x,'arg':'x'}]
self.__add_generic_pointwise(deviator,requested)
def add_strain_tensor(self,t,ord,defgrad='F'):
"""Adds the a strain tensor."""
def strain_tensor(defgrad,t,ord):
(U,S,Vh) = np.linalg.svd(defgrad['data']) # singular value decomposition
R_inv = np.einsum('ijk->ikj',np.matmul(U,Vh)) # inverse rotation of polar decomposition
U = np.matmul(R_inv,defgrad['data']) # F = RU
(D,V) = np.linalg.eigh((U+np.einsum('ikj',U))*.5) # eigen decomposition (of symmetric(ed) matrix)
neg = np.where(D < 0.0) # find negative eigenvalues ...
D[neg[0],neg[1]] = D[neg[0],neg[1]]* -1 # ... flip value ...
V[neg[0],:,neg[1]] = V[neg[0],:,neg[1]]* -1 # ... and vector
d = np.log(D)
a = np.matmul(V,np.einsum('ij,ikj->ijk',d,V)) # this is wrong ...
for j in range(V.shape[0]): # but this is slow ...
a[j,:,:] = np.dot(V[j,:,:],np.dot(np.diag(d[j,:]),V[j,:,:].T))
print(np.max(a))
return {
'data' : a,
'label' : 'lnV({})'.format(defgrad['label']),
'meta' : {
'Unit' : defgrad['meta']['Unit'],
'Description' : 'Strain tensor {} ({})'.format(defgrad['label'],defgrad['meta']['Description']),
'Creator' : 'dadf5.py:add_deviator vXXXXX'
}
}
requested = [{'label':defgrad,'arg':'defgrad'}]
self.__add_generic_pointwise(strain_tensor,requested,{'t':t,'ord':ord})
def __add_generic_pointwise(self,func,datasets_requested,extra_args={}):
"""
General function to add pointwise data.
"""
def job(args):
"""
Call function with input data + extra arguments, returns results + group.
"""
args['results'].put({**args['func'](**args['in']),'group':args['group']})
N_threads = 1 # ToDo: should be a parameter
results = Queue(N_threads)
pool = util.ThreadPool(N_threads)
N_added = N_threads + 1
todo = []
# ToDo: It would be more memory efficient to read only from file when required, i.e. do to it in pool.add_task
for group in self.get_groups([d['label'] for d in datasets_requested]):
with h5py.File(self.filename,'r') as f:
datasets_in = {}
for d in datasets_requested:
loc = f[group+'/'+d['label']]
data = loc[()]
meta = {k:loc.attrs[k] for k in loc.attrs.keys()}
datasets_in[d['arg']] = {'data': data, 'meta' : meta, 'label' : d['label']}
todo.append({'in':{**datasets_in,**extra_args},'func':func,'group':group,'results':results})
pool.map(job, todo[:N_added]) # initialize
N_not_calculated = len(todo)
while N_not_calculated > 0:
result = results.get()
with h5py.File(self.filename,self.mode) as f: # write to file
dataset_out = f[result['group']].create_dataset(result['label'],data=result['data'])
for k in result['meta'].keys():
dataset_out.attrs[k] = result['meta'][k]
N_not_calculated-=1
if N_added < len(todo): # add more jobs
pool.add_task(job,todo[N_added])
N_added +=1
pool.wait_completion() pool.wait_completion()