Merge branch 'DADF5-multiprocessing' into 'development'

Dadf5 multiprocessing

See merge request damask/DAMASK!134
This commit is contained in:
Francisco Jose Gallardo Basile 2020-02-26 10:52:52 +01:00
commit fdd356fae6
2 changed files with 324 additions and 388 deletions

View File

@ -1,7 +1,8 @@
from queue import Queue import multiprocessing
import re import re
import glob import glob
import os import os
from functools import partial
import vtk import vtk
from vtk.util import numpy_support from vtk.util import numpy_support
@ -43,17 +44,16 @@ class DADF5():
self.version_minor = f.attrs['DADF5-minor'] self.version_minor = f.attrs['DADF5-minor']
if self.version_major != 0 or not 2 <= self.version_minor <= 6: if self.version_major != 0 or not 2 <= self.version_minor <= 6:
raise TypeError('Unsupported DADF5 version {}.{} '.format(f.attrs['DADF5_version_major'], raise TypeError('Unsupported DADF5 version {}.{} '.format(self.version_major,
f.attrs['DADF5_version_minor'])) self.version_minor))
self.structured = 'grid' in f['geometry'].attrs.keys() self.structured = 'grid' in f['geometry'].attrs.keys()
if self.structured: if self.structured:
self.grid = f['geometry'].attrs['grid'] self.grid = f['geometry'].attrs['grid']
self.size = f['geometry'].attrs['size'] self.size = f['geometry'].attrs['size']
if self.version_major == 0 and self.version_minor >= 5: self.origin = f['geometry'].attrs['origin'] if self.version_major == 0 and self.version_minor >= 5 else \
self.origin = f['geometry'].attrs['origin'] np.zeros(3)
r=re.compile('inc[0-9]+') r=re.compile('inc[0-9]+')
increments_unsorted = {int(i[3:]):i for i in f.keys() if r.match(i)} increments_unsorted = {int(i[3:]):i for i in f.keys() if r.match(i)}
@ -446,6 +446,17 @@ class DADF5():
return f['geometry/x_c'][()] return f['geometry/x_c'][()]
@staticmethod
def _add_absolute(x):
return {
'data': np.abs(x['data']),
'label': '|{}|'.format(x['label']),
'meta': {
'Unit': x['meta']['Unit'],
'Description': 'Absolute value of {} ({})'.format(x['label'],x['meta']['Description']),
'Creator': 'dadf5.py:add_abs v{}'.format(version)
}
}
def add_absolute(self,x): def add_absolute(self,x):
""" """
Add absolute value. Add absolute value.
@ -456,21 +467,24 @@ class DADF5():
Label of scalar, vector, or tensor dataset to take absolute value of. Label of scalar, vector, or tensor dataset to take absolute value of.
""" """
def _add_absolute(x): self._add_generic_pointwise(self._add_absolute,{'x':x})
@staticmethod
def _add_calculation(**kwargs):
formula = kwargs['formula']
for d in re.findall(r'#(.*?)#',formula):
formula = formula.replace('#{}#'.format(d),"kwargs['{}']['data']".format(d))
return { return {
'data': np.abs(x['data']), 'data': eval(formula),
'label': '|{}|'.format(x['label']), 'label': kwargs['label'],
'meta': { 'meta': {
'Unit': x['meta']['Unit'], 'Unit': kwargs['unit'],
'Description': 'Absolute value of {} ({})'.format(x['label'],x['meta']['Description']), 'Description': '{} (formula: {})'.format(kwargs['description'],kwargs['formula']),
'Creator': 'dadf5.py:add_abs v{}'.format(version) 'Creator': 'dadf5.py:add_calculation v{}'.format(version)
} }
} }
self.__add_generic_pointwise(_add_absolute,{'x':x})
def add_calculation(self,label,formula,unit='n/a',description=None,vectorized=True): def add_calculation(self,label,formula,unit='n/a',description=None,vectorized=True):
""" """
Add result of a general formula. Add result of a general formula.
@ -492,28 +506,24 @@ class DADF5():
if not vectorized: if not vectorized:
raise NotImplementedError raise NotImplementedError
def _add_calculation(**kwargs):
formula = kwargs['formula']
for d in re.findall(r'#(.*?)#',formula):
formula = formula.replace('#{}#'.format(d),"kwargs['{}']['data']".format(d))
return {
'data': eval(formula),
'label': kwargs['label'],
'meta': {
'Unit': kwargs['unit'],
'Description': '{} (formula: {})'.format(kwargs['description'],kwargs['formula']),
'Creator': 'dadf5.py:add_calculation v{}'.format(version)
}
}
dataset_mapping = {d:d for d in set(re.findall(r'#(.*?)#',formula))} # datasets used in the formula dataset_mapping = {d:d for d in set(re.findall(r'#(.*?)#',formula))} # datasets used in the formula
args = {'formula':formula,'label':label,'unit':unit,'description':description} args = {'formula':formula,'label':label,'unit':unit,'description':description}
self._add_generic_pointwise(self._add_calculation,dataset_mapping,args)
self.__add_generic_pointwise(_add_calculation,dataset_mapping,args)
@staticmethod
def _add_Cauchy(P,F):
return {
'data': mechanics.Cauchy(P['data'],F['data']),
'label': 'sigma',
'meta': {
'Unit': P['meta']['Unit'],
'Description': 'Cauchy stress calculated from {} ({}) '.format(P['label'],
P['meta']['Description'])+\
'and {} ({})'.format(F['label'],F['meta']['Description']),
'Creator': 'dadf5.py:add_Cauchy v{}'.format(version)
}
}
def add_Cauchy(self,P='P',F='F'): def add_Cauchy(self,P='P',F='F'):
""" """
Add Cauchy stress calculated from first Piola-Kirchhoff stress and deformation gradient. Add Cauchy stress calculated from first Piola-Kirchhoff stress and deformation gradient.
@ -526,23 +536,20 @@ class DADF5():
Label of the dataset containing the deformation gradient. Defaults to F. Label of the dataset containing the deformation gradient. Defaults to F.
""" """
def _add_Cauchy(P,F): self._add_generic_pointwise(self._add_Cauchy,{'P':P,'F':F})
@staticmethod
def _add_determinant(T):
return { return {
'data': mechanics.Cauchy(P['data'],F['data']), 'data': np.linalg.det(T['data']),
'label': 'sigma', 'label': 'det({})'.format(T['label']),
'meta': { 'meta': {
'Unit': P['meta']['Unit'], 'Unit': T['meta']['Unit'],
'Description': 'Cauchy stress calculated from {} ({}) '.format(P['label'], 'Description': 'Determinant of tensor {} ({})'.format(T['label'],T['meta']['Description']),
P['meta']['Description'])+\ 'Creator': 'dadf5.py:add_determinant v{}'.format(version)
'and {} ({})'.format(F['label'],F['meta']['Description']),
'Creator': 'dadf5.py:add_Cauchy v{}'.format(version)
} }
} }
self.__add_generic_pointwise(_add_Cauchy,{'P':P,'F':F})
def add_determinant(self,T): def add_determinant(self,T):
""" """
Add the determinant of a tensor. Add the determinant of a tensor.
@ -553,34 +560,12 @@ class DADF5():
Label of tensor dataset. Label of tensor dataset.
""" """
def _add_determinant(T): self._add_generic_pointwise(self._add_determinant,{'T':T})
return {
'data': np.linalg.det(T['data']),
'label': 'det({})'.format(T['label']),
'meta': {
'Unit': T['meta']['Unit'],
'Description': 'Determinant of tensor {} ({})'.format(T['label'],T['meta']['Description']),
'Creator': 'dadf5.py:add_determinant v{}'.format(version)
}
}
self.__add_generic_pointwise(_add_determinant,{'T':T})
def add_deviator(self,T): @staticmethod
"""
Add the deviatoric part of a tensor.
Parameters
----------
T : str
Label of tensor dataset.
"""
def _add_deviator(T): def _add_deviator(T):
if not T['data'].shape[1:] == (3,3):
if not np.all(np.array(T['data'].shape[1:]) == np.array([3,3])):
raise ValueError raise ValueError
return { return {
@ -592,10 +577,30 @@ class DADF5():
'Creator': 'dadf5.py:add_deviator v{}'.format(version) 'Creator': 'dadf5.py:add_deviator v{}'.format(version)
} }
} }
def add_deviator(self,T):
"""
Add the deviatoric part of a tensor.
self.__add_generic_pointwise(_add_deviator,{'T':T}) Parameters
----------
T : str
Label of tensor dataset.
"""
self._add_generic_pointwise(self._add_deviator,{'T':T})
@staticmethod
def _add_eigenvalue(S):
return {
'data': mechanics.eigenvalues(S['data']),
'label': 'lambda({})'.format(S['label']),
'meta' : {
'Unit': S['meta']['Unit'],
'Description': 'Eigenvalues of {} ({})'.format(S['label'],S['meta']['Description']),
'Creator': 'dadf5.py:add_eigenvalues v{}'.format(version)
}
}
def add_eigenvalues(self,S): def add_eigenvalues(self,S):
""" """
Add eigenvalues of symmetric tensor. Add eigenvalues of symmetric tensor.
@ -606,21 +611,20 @@ class DADF5():
Label of symmetric tensor dataset. Label of symmetric tensor dataset.
""" """
def _add_eigenvalue(S): self._add_generic_pointwise(self._add_eigenvalue,{'S':S})
@staticmethod
def _add_eigenvector(S):
return { return {
'data': mechanics.eigenvalues(S['data']), 'data': mechanics.eigenvectors(S['data']),
'label': 'lambda({})'.format(S['label']), 'label': 'v({})'.format(S['label']),
'meta' : { 'meta' : {
'Unit': S['meta']['Unit'], 'Unit': '1',
'Description': 'Eigenvalues of {} ({})'.format(S['label'],S['meta']['Description']), 'Description': 'Eigenvectors of {} ({})'.format(S['label'],S['meta']['Description']),
'Creator': 'dadf5.py:add_eigenvalues v{}'.format(version) 'Creator': 'dadf5.py:add_eigenvectors v{}'.format(version)
} }
} }
self.__add_generic_pointwise(_add_eigenvalue,{'S':S})
def add_eigenvectors(self,S): def add_eigenvectors(self,S):
""" """
Add eigenvectors of symmetric tensor. Add eigenvectors of symmetric tensor.
@ -631,35 +635,11 @@ class DADF5():
Label of symmetric tensor dataset. Label of symmetric tensor dataset.
""" """
def _add_eigenvector(S): self._add_generic_pointwise(self._add_eigenvector,{'S':S})
return {
'data': mechanics.eigenvectors(S['data']),
'label': 'v({})'.format(S['label']),
'meta' : {
'Unit': '1',
'Description': 'Eigenvectors of {} ({})'.format(S['label'],S['meta']['Description']),
'Creator': 'dadf5.py:add_eigenvectors v{}'.format(version)
}
}
self.__add_generic_pointwise(_add_eigenvector,{'S':S})
def add_IPFcolor(self,q,l): @staticmethod
"""
Add RGB color tuple of inverse pole figure (IPF) color.
Parameters
----------
q : str
Label of the dataset containing the crystallographic orientation as quaternions.
l : numpy.array of shape (3)
Lab frame direction for inverse pole figure.
"""
def _add_IPFcolor(q,l): def _add_IPFcolor(q,l):
d = np.array(l) d = np.array(l)
d_unit = d/np.linalg.norm(d) d_unit = d/np.linalg.norm(d)
m = util.scale_to_coprime(d) m = util.scale_to_coprime(d)
@ -681,10 +661,32 @@ class DADF5():
'Creator': 'dadf5.py:add_IPFcolor v{}'.format(version) 'Creator': 'dadf5.py:add_IPFcolor v{}'.format(version)
} }
} }
def add_IPFcolor(self,q,l):
"""
Add RGB color tuple of inverse pole figure (IPF) color.
self.__add_generic_pointwise(_add_IPFcolor,{'q':q},{'l':l}) Parameters
----------
q : str
Label of the dataset containing the crystallographic orientation as quaternions.
l : numpy.array of shape (3)
Lab frame direction for inverse pole figure.
"""
self._add_generic_pointwise(self._add_IPFcolor,{'q':q},{'l':l})
@staticmethod
def _add_maximum_shear(S):
return {
'data': mechanics.maximum_shear(S['data']),
'label': 'max_shear({})'.format(S['label']),
'meta': {
'Unit': S['meta']['Unit'],
'Description': 'Maximum shear component of {} ({})'.format(S['label'],S['meta']['Description']),
'Creator': 'dadf5.py:add_maximum_shear v{}'.format(version)
}
}
def add_maximum_shear(self,S): def add_maximum_shear(self,S):
""" """
Add maximum shear components of symmetric tensor. Add maximum shear components of symmetric tensor.
@ -695,21 +697,23 @@ class DADF5():
Label of symmetric tensor dataset. Label of symmetric tensor dataset.
""" """
def _add_maximum_shear(S): self._add_generic_pointwise(self._add_maximum_shear,{'S':S})
@staticmethod
def _add_Mises(S):
t = 'strain' if S['meta']['Unit'] == '1' else \
'stress'
return { return {
'data': mechanics.maximum_shear(S['data']), 'data': mechanics.Mises_strain(S['data']) if t=='strain' else mechanics.Mises_stress(S['data']),
'label': 'max_shear({})'.format(S['label']), 'label': '{}_vM'.format(S['label']),
'meta': { 'meta': {
'Unit': S['meta']['Unit'], 'Unit': S['meta']['Unit'],
'Description': 'Maximum shear component of {} ({})'.format(S['label'],S['meta']['Description']), 'Description': 'Mises equivalent {} of {} ({})'.format(t,S['label'],S['meta']['Description']),
'Creator': 'dadf5.py:add_maximum_shear v{}'.format(version) 'Creator': 'dadf5.py:add_Mises v{}'.format(version)
} }
} }
self.__add_generic_pointwise(_add_maximum_shear,{'S':S})
def add_Mises(self,S): def add_Mises(self,S):
""" """
Add the equivalent Mises stress or strain of a symmetric tensor. Add the equivalent Mises stress or strain of a symmetric tensor.
@ -720,37 +724,11 @@ class DADF5():
Label of symmetric tensorial stress or strain dataset. Label of symmetric tensorial stress or strain dataset.
""" """
def _add_Mises(S): self._add_generic_pointwise(self._add_Mises,{'S':S})
t = 'strain' if S['meta']['Unit'] == '1' else \
'stress'
return {
'data': mechanics.Mises_strain(S['data']) if t=='strain' else mechanics.Mises_stress(S['data']),
'label': '{}_vM'.format(S['label']),
'meta': {
'Unit': S['meta']['Unit'],
'Description': 'Mises equivalent {} of {} ({})'.format(t,S['label'],S['meta']['Description']),
'Creator': 'dadf5.py:add_Mises v{}'.format(version)
}
}
self.__add_generic_pointwise(_add_Mises,{'S':S})
def add_norm(self,x,ord=None): @staticmethod
"""
Add the norm of vector or tensor.
Parameters
----------
x : str
Label of vector or tensor dataset.
ord : {non-zero int, inf, -inf, fro, nuc}, optional
Order of the norm. inf means NumPys inf object. For details refer to numpy.linalg.norm.
"""
def _add_norm(x,ord): def _add_norm(x,ord):
o = ord o = ord
if len(x['data'].shape) == 2: if len(x['data'].shape) == 2:
axis = 1 axis = 1
@ -768,28 +746,27 @@ class DADF5():
'label': '|{}|_{}'.format(x['label'],o), 'label': '|{}|_{}'.format(x['label'],o),
'meta': { 'meta': {
'Unit': x['meta']['Unit'], 'Unit': x['meta']['Unit'],
'Description': '{}-norm of {} {} ({})'.format(ord,t,x['label'],x['meta']['Description']), 'Description': '{}-norm of {} {} ({})'.format(o,t,x['label'],x['meta']['Description']),
'Creator': 'dadf5.py:add_norm v{}'.format(version) 'Creator': 'dadf5.py:add_norm v{}'.format(version)
} }
} }
def add_norm(self,x,ord=None):
self.__add_generic_pointwise(_add_norm,{'x':x},{'ord':ord})
def add_PK2(self,P='P',F='F'):
""" """
Add 2. Piola-Kirchhoff calculated from first Piola-Kirchhoff stress and deformation gradient. Add the norm of vector or tensor.
Parameters Parameters
---------- ----------
P : str, optional x : str
Label first Piola-Kirchhoff stress dataset. Defaults to P. Label of vector or tensor dataset.
F : str, optional ord : {non-zero int, inf, -inf, fro, nuc}, optional
Label of deformation gradient dataset. Defaults to F. Order of the norm. inf means NumPys inf object. For details refer to numpy.linalg.norm.
""" """
def _add_PK2(P,F): self._add_generic_pointwise(self._add_norm,{'x':x},{'ord':ord})
@staticmethod
def _add_PK2(P,F):
return { return {
'data': mechanics.PK2(P['data'],F['data']), 'data': mechanics.PK2(P['data'],F['data']),
'label': 'S', 'label': 'S',
@ -801,26 +778,23 @@ class DADF5():
'Creator': 'dadf5.py:add_PK2 v{}'.format(version) 'Creator': 'dadf5.py:add_PK2 v{}'.format(version)
} }
} }
def add_PK2(self,P='P',F='F'):
self.__add_generic_pointwise(_add_PK2,{'P':P,'F':F})
def add_pole(self,q,p,polar=False):
""" """
Add coordinates of stereographic projection of given pole in crystal frame. Add second Piola-Kirchhoff calculated from first Piola-Kirchhoff stress and deformation gradient.
Parameters Parameters
---------- ----------
q : str P : str, optional
Label of the dataset containing the crystallographic orientation as quaternions. Label first Piola-Kirchhoff stress dataset. Defaults to P.
p : numpy.array of shape (3) F : str, optional
Crystallographic direction or plane. Label of deformation gradient dataset. Defaults to F.
polar : bool, optional
Give pole in polar coordinates. Defaults to False.
""" """
def _add_pole(q,p,polar): self._add_generic_pointwise(self._add_PK2,{'P':P,'F':F})
@staticmethod
def _add_pole(q,p,polar):
pole = np.array(p) pole = np.array(p)
unit_pole = pole/np.linalg.norm(pole) unit_pole = pole/np.linalg.norm(pole)
m = util.scale_to_coprime(pole) m = util.scale_to_coprime(pole)
@ -842,10 +816,36 @@ class DADF5():
'Creator' : 'dadf5.py:add_pole v{}'.format(version) 'Creator' : 'dadf5.py:add_pole v{}'.format(version)
} }
} }
def add_pole(self,q,p,polar=False):
"""
Add coordinates of stereographic projection of given pole in crystal frame.
self.__add_generic_pointwise(_add_pole,{'q':q},{'p':p,'polar':polar}) Parameters
----------
q : str
Label of the dataset containing the crystallographic orientation as quaternions.
p : numpy.array of shape (3)
Crystallographic direction or plane.
polar : bool, optional
Give pole in polar coordinates. Defaults to False.
"""
self._add_generic_pointwise(self._add_pole,{'q':q},{'p':p,'polar':polar})
@staticmethod
def _add_rotational_part(F):
if not F['data'].shape[1:] == (3,3):
raise ValueError
return {
'data': mechanics.rotational_part(F['data']),
'label': 'R({})'.format(F['label']),
'meta': {
'Unit': F['meta']['Unit'],
'Description': 'Rotational part of {} ({})'.format(F['label'],F['meta']['Description']),
'Creator': 'dadf5.py:add_rotational_part v{}'.format(version)
}
}
def add_rotational_part(self,F): def add_rotational_part(self,F):
""" """
Add rotational part of a deformation gradient. Add rotational part of a deformation gradient.
@ -856,34 +856,12 @@ class DADF5():
Label of deformation gradient dataset. Label of deformation gradient dataset.
""" """
def _add_rotational_part(F): self._add_generic_pointwise(self._add_rotational_part,{'F':F})
return {
'data': mechanics.rotational_part(F['data']),
'label': 'R({})'.format(F['label']),
'meta': {
'Unit': F['meta']['Unit'],
'Description': 'Rotational part of {} ({})'.format(F['label'],F['meta']['Description']),
'Creator': 'dadf5.py:add_rotational_part v{}'.format(version)
}
}
self.__add_generic_pointwise(_add_rotational_part,{'F':F})
def add_spherical(self,T): @staticmethod
"""
Add the spherical (hydrostatic) part of a tensor.
Parameters
----------
T : str
Label of tensor dataset.
"""
def _add_spherical(T): def _add_spherical(T):
if not T['data'].shape[1:] == (3,3):
if not np.all(np.array(T['data'].shape[1:]) == np.array([3,3])):
raise ValueError raise ValueError
return { return {
@ -895,10 +873,33 @@ class DADF5():
'Creator': 'dadf5.py:add_spherical v{}'.format(version) 'Creator': 'dadf5.py:add_spherical v{}'.format(version)
} }
} }
def add_spherical(self,T):
"""
Add the spherical (hydrostatic) part of a tensor.
self.__add_generic_pointwise(_add_spherical,{'T':T}) Parameters
----------
T : str
Label of tensor dataset.
"""
self._add_generic_pointwise(self._add_spherical,{'T':T})
@staticmethod
def _add_strain_tensor(F,t,m):
if not F['data'].shape[1:] == (3,3):
raise ValueError
return {
'data': mechanics.strain_tensor(F['data'],t,m),
'label': 'epsilon_{}^{}({})'.format(t,m,F['label']),
'meta': {
'Unit': F['meta']['Unit'],
'Description': 'Strain tensor of {} ({})'.format(F['label'],F['meta']['Description']),
'Creator': 'dadf5.py:add_strain_tensor v{}'.format(version)
}
}
def add_strain_tensor(self,F='F',t='V',m=0.0): def add_strain_tensor(self,F='F',t='V',m=0.0):
""" """
Add strain tensor of a deformation gradient. Add strain tensor of a deformation gradient.
@ -916,21 +917,24 @@ class DADF5():
Order of the strain calculation. Defaults to 0.0. Order of the strain calculation. Defaults to 0.0.
""" """
def _add_strain_tensor(F,t,m): self._add_generic_pointwise(self._add_strain_tensor,{'F':F},{'t':t,'m':m})
@staticmethod
def _add_stretch_tensor(F,t):
if not F['data'].shape[1:] == (3,3):
raise ValueError
return { return {
'data': mechanics.strain_tensor(F['data'],t,m), 'data': mechanics.left_stretch(F['data']) if t == 'V' else mechanics.right_stretch(F['data']),
'label': 'epsilon_{}^{}({})'.format(t,m,F['label']), 'label': '{}({})'.format(t,F['label']),
'meta': { 'meta': {
'Unit': F['meta']['Unit'], 'Unit': F['meta']['Unit'],
'Description': 'Strain tensor of {} ({})'.format(F['label'],F['meta']['Description']), 'Description': '{} stretch tensor of {} ({})'.format('Left' if t == 'V' else 'Right',
'Creator': 'dadf5.py:add_strain_tensor v{}'.format(version) F['label'],F['meta']['Description']),
'Creator': 'dadf5.py:add_stretch_tensor v{}'.format(version)
} }
} }
self.__add_generic_pointwise(_add_strain_tensor,{'F':F},{'t':t,'m':m})
def add_stretch_tensor(self,F='F',t='V'): def add_stretch_tensor(self,F='F',t='V'):
""" """
Add stretch tensor of a deformation gradient. Add stretch tensor of a deformation gradient.
@ -944,77 +948,65 @@ class DADF5():
Defaults to V. Defaults to V.
""" """
def _add_stretch_tensor(F,t): self._add_generic_pointwise(self._add_stretch_tensor,{'F':F},{'t':t})
return {
'data': mechanics.left_stretch(F['data']) if t == 'V' else mechanics.right_stretch(F['data']),
'label': '{}({})'.format(t,F['label']),
'meta': {
'Unit': F['meta']['Unit'],
'Description': '{} stretch tensor of {} ({})'.format('Left' if t == 'V' else 'Right',
F['label'],F['meta']['Description']),
'Creator': 'dadf5.py:add_stretch_tensor v{}'.format(version)
}
}
self.__add_generic_pointwise(_add_stretch_tensor,{'F':F},{'t':t})
def __add_generic_pointwise(self,func,dataset_mapping,args={}): def _job(self,group,func,datasets,args,lock):
"""Execute job for _add_generic_pointwise."""
try:
datasets_in = {}
lock.acquire()
with h5py.File(self.fname,'r') as f:
for arg,label in datasets.items():
loc = f[group+'/'+label]
datasets_in[arg]={'data' :loc[()],
'label':label,
'meta': {k:v.decode() for k,v in loc.attrs.items()}}
lock.release()
r = func(**datasets_in,**args)
return [group,r]
except Exception as err:
print('Error during calculation: {}.'.format(err))
return None
def _add_generic_pointwise(self,func,datasets,args={}):
""" """
General function to add pointwise data. General function to add pointwise data.
Parameters Parameters
---------- ----------
func : function func : function
Function that calculates a new dataset from one or more datasets per HDF5 group. Callback function that calculates a new dataset from one or more datasets per HDF5 group.
dataset_mapping : dictionary datasets : dictionary
Mapping HDF5 data label to callback function argument Details of the datasets to be used: label (in HDF5 file) and arg (argument to which the data is parsed in func).
extra_args : dictionary, optional args : dictionary, optional
Any extra arguments parsed to func. Arguments parsed to func.
""" """
def job(args): N_threads = int(Environment().options['DAMASK_NUM_THREADS'])
"""Call function with input data + extra arguments, returns results + group.""" pool = multiprocessing.Pool(N_threads)
args['results'].put({**args['func'](**args['in']),'group':args['group']}) lock = multiprocessing.Manager().Lock()
env = Environment() groups = self.groups_with_datasets(datasets.values())
N_threads = int(env.options['DAMASK_NUM_THREADS']) default_arg = partial(self._job,func=func,datasets=datasets,args=args,lock=lock)
N_threads //=N_threads # disable for the moment
results = Queue(N_threads) util.progressBar(iteration=0,total=len(groups))
pool = util.ThreadPool(N_threads) for i,result in enumerate(pool.imap_unordered(default_arg,groups)):
N_added = N_threads + 1 util.progressBar(iteration=i+1,total=len(groups))
if not result: continue
lock.acquire()
with h5py.File(self.fname, 'a') as f:
try:
dataset = f[result[0]].create_dataset(result[1]['label'],data=result[1]['data'])
for l,v in result[1]['meta'].items():
dataset.attrs[l]=v.encode()
except OSError as err:
print('Could not add dataset: {}.'.format(err))
lock.release()
todo = [] pool.close()
# ToDo: It would be more memory efficient to read only from file when required, i.e. do to it in pool.add_task pool.join()
for group in self.groups_with_datasets(dataset_mapping.values()):
with h5py.File(self.fname,'r') as f:
datasets_in = {}
for arg,label in dataset_mapping.items():
loc = f[group+'/'+label]
data = loc[()]
meta = {k:loc.attrs[k].decode() for k in loc.attrs.keys()}
datasets_in[arg] = {'data': data, 'meta': meta, 'label': label}
todo.append({'in':{**datasets_in,**args},'func':func,'group':group,'results':results})
pool.map(job, todo[:N_added]) # initialize
N_not_calculated = len(todo)
while N_not_calculated > 0:
result = results.get()
with h5py.File(self.fname,'a') as f: # write to file
dataset_out = f[result['group']].create_dataset(result['label'],data=result['data'])
for k in result['meta'].keys():
dataset_out.attrs[k] = result['meta'][k].encode()
N_not_calculated-=1
if N_added < len(todo): # add more jobs
pool.add_task(job,todo[N_added])
N_added +=1
pool.wait_completion()
def to_vtk(self,labels,mode='cell'): def to_vtk(self,labels,mode='cell'):

View File

@ -6,8 +6,6 @@ import shlex
from fractions import Fraction from fractions import Fraction
from functools import reduce from functools import reduce
from optparse import Option from optparse import Option
from queue import Queue
from threading import Thread
import numpy as np import numpy as np
@ -201,57 +199,3 @@ class return_message():
def __repr__(self): def __repr__(self):
"""Return message suitable for interactive shells.""" """Return message suitable for interactive shells."""
return srepr(self.message) return srepr(self.message)
class ThreadPool:
"""Pool of threads consuming tasks from a queue."""
class Worker(Thread):
"""Thread executing tasks from a given tasks queue."""
def __init__(self, tasks):
"""Worker for tasks."""
Thread.__init__(self)
self.tasks = tasks
self.daemon = True
self.start()
def run(self):
while True:
func, args, kargs = self.tasks.get()
try:
func(*args, **kargs)
except Exception as e:
# An exception happened in this thread
print(e)
finally:
# Mark this task as done, whether an exception happened or not
self.tasks.task_done()
def __init__(self, num_threads):
"""
Thread pool.
Parameters
----------
num_threads : int
number of threads
"""
self.tasks = Queue(num_threads)
for _ in range(num_threads):
self.Worker(self.tasks)
def add_task(self, func, *args, **kargs):
"""Add a task to the queue."""
self.tasks.put((func, args, kargs))
def map(self, func, args_list):
"""Add a list of tasks to the queue."""
for args in args_list:
self.add_task(func, args)
def wait_completion(self):
"""Wait for completion of all the tasks in the queue."""
self.tasks.join()