Merge branch '272-parallel-post-processing' into 'development'

no multiprocessing for adding datasets

Closes #272

See merge request damask/DAMASK!820
This commit is contained in:
Daniel Otto de Mentock 2023-09-25 15:31:43 +00:00
commit c3d3ea6588
3 changed files with 348 additions and 359 deletions

View File

@ -1,5 +1,3 @@
import multiprocessing as mp
from multiprocessing.synchronize import Lock
import re
import fnmatch
import os
@ -7,8 +5,8 @@ import copy
import datetime
import xml.etree.ElementTree as ET # noqa
import xml.dom.minidom
import functools
from pathlib import Path
from functools import partial
from collections import defaultdict
from collections.abc import Iterable
from typing import Optional, Union, Callable, Any, Sequence, Literal, Dict, List, Tuple
@ -601,17 +599,6 @@ class Result:
f['/geometry/T_c'].attrs['VTK_TYPE'].decode())
@staticmethod
def _add_absolute(x: Dict[str, Any]) -> Dict[str, Any]:
return {
'data': np.abs(x['data']),
'label': f'|{x["label"]}|',
'meta': {
'unit': x['meta']['unit'],
'description': f"absolute value of {x['label']} ({x['meta']['description']})",
'creator': 'add_absolute'
}
}
def add_absolute(self, x: str):
"""
Add absolute value.
@ -622,28 +609,20 @@ class Result:
Name of scalar, vector, or tensor dataset to take absolute value of.
"""
self._add_generic_pointwise(self._add_absolute,{'x':x})
@staticmethod
def _add_calculation(**kwargs) -> Dict[str, Any]:
formula = kwargs['formula']
for d in re.findall(r'#(.*?)#',formula):
formula = formula.replace(f'#{d}#',f"kwargs['{d}']['data']")
data = eval(formula)
if not hasattr(data,'shape') or data.shape[0] != kwargs[d]['data'].shape[0]:
raise ValueError('"{}" results in invalid shape'.format(kwargs['formula']))
def absolute(x: Dict[str, Any]) -> Dict[str, Any]:
return {
'data': data,
'label': kwargs['label'],
'data': np.abs(x['data']),
'label': f'|{x["label"]}|',
'meta': {
'unit': kwargs['unit'],
'description': f"{kwargs['description']} (formula: {kwargs['formula']})",
'creator': 'add_calculation'
'unit': x['meta']['unit'],
'description': f"absolute value of {x['label']} ({x['meta']['description']})",
'creator': 'add_absolute'
}
}
self._add_generic_pointwise(absolute,{'x':x})
def add_calculation(self,
formula: str,
name: str,
@ -692,24 +671,30 @@ class Result:
... 'Mises equivalent of the Cauchy stress')
"""
def calculation(**kwargs) -> Dict[str, Any]:
formula = kwargs['formula']
for d in re.findall(r'#(.*?)#',formula):
formula = formula.replace(f'#{d}#',f"kwargs['{d}']['data']")
data = eval(formula)
if not hasattr(data,'shape') or data.shape[0] != kwargs[d]['data'].shape[0]:
raise ValueError('"{}" results in invalid shape'.format(kwargs['formula']))
return {
'data': data,
'label': kwargs['label'],
'meta': {
'unit': kwargs['unit'],
'description': f"{kwargs['description']} (formula: {kwargs['formula']})",
'creator': 'add_calculation'
}
}
dataset_mapping = {d:d for d in set(re.findall(r'#(.*?)#',formula))} # datasets used in the formula
args = {'formula':formula,'label':name,'unit':unit,'description':description}
self._add_generic_pointwise(self._add_calculation,dataset_mapping,args)
self._add_generic_pointwise(calculation,dataset_mapping,args)
@staticmethod
def _add_stress_Cauchy(P: Dict[str, Any], F: Dict[str, Any]) -> Dict[str, Any]:
return {
'data': mechanics.stress_Cauchy(P['data'],F['data']),
'label': 'sigma',
'meta': {
'unit': P['meta']['unit'],
'description': "Cauchy stress calculated "
f"from {P['label']} ({P['meta']['description']})"
f" and {F['label']} ({F['meta']['description']})",
'creator': 'add_stress_Cauchy'
}
}
def add_stress_Cauchy(self,
P: str = 'P',
F: str = 'F'):
@ -726,20 +711,23 @@ class Result:
Defaults to 'F'.
"""
self._add_generic_pointwise(self._add_stress_Cauchy,{'P':P,'F':F})
@staticmethod
def _add_determinant(T: Dict[str, Any]) -> Dict[str, Any]:
def stress_Cauchy(P: Dict[str, Any], F: Dict[str, Any]) -> Dict[str, Any]:
return {
'data': np.linalg.det(T['data']),
'label': f"det({T['label']})",
'data': mechanics.stress_Cauchy(P['data'],F['data']),
'label': 'sigma',
'meta': {
'unit': T['meta']['unit'],
'description': f"determinant of tensor {T['label']} ({T['meta']['description']})",
'creator': 'add_determinant'
'unit': P['meta']['unit'],
'description': "Cauchy stress calculated "
f"from {P['label']} ({P['meta']['description']})"
f" and {F['label']} ({F['meta']['description']})",
'creator': 'add_stress_Cauchy'
}
}
self._add_generic_pointwise(stress_Cauchy,{'P':P,'F':F})
def add_determinant(self, T: str):
"""
Add the determinant of a tensor.
@ -758,20 +746,21 @@ class Result:
>>> r.add_determinant('F_p')
"""
self._add_generic_pointwise(self._add_determinant,{'T':T})
@staticmethod
def _add_deviator(T: Dict[str, Any]) -> Dict[str, Any]:
def determinant(T: Dict[str, Any]) -> Dict[str, Any]:
return {
'data': tensor.deviatoric(T['data']),
'label': f"s_{T['label']}",
'data': np.linalg.det(T['data']),
'label': f"det({T['label']})",
'meta': {
'unit': T['meta']['unit'],
'description': f"deviator of tensor {T['label']} ({T['meta']['description']})",
'creator': 'add_deviator'
'description': f"determinant of tensor {T['label']} ({T['meta']['description']})",
'creator': 'add_determinant'
}
}
self._add_generic_pointwise(determinant,{'T':T})
def add_deviator(self, T: str):
"""
Add the deviatoric part of a tensor.
@ -790,29 +779,21 @@ class Result:
>>> r.add_deviator('sigma')
"""
self._add_generic_pointwise(self._add_deviator,{'T':T})
@staticmethod
def _add_eigenvalue(T_sym: Dict[str, Any], eigenvalue: Literal['max, mid, min']) -> Dict[str, Any]:
if eigenvalue == 'max':
label,p = 'maximum',2
elif eigenvalue == 'mid':
label,p = 'intermediate',1
elif eigenvalue == 'min':
label,p = 'minimum',0
else:
raise ValueError(f'invalid eigenvalue: {eigenvalue}')
def deviator(T: Dict[str, Any]) -> Dict[str, Any]:
return {
'data': tensor.eigenvalues(T_sym['data'])[:,p],
'label': f"lambda_{eigenvalue}({T_sym['label']})",
'data': tensor.deviatoric(T['data']),
'label': f"s_{T['label']}",
'meta': {
'unit': T_sym['meta']['unit'],
'description': f"{label} eigenvalue of {T_sym['label']} ({T_sym['meta']['description']})",
'creator': 'add_eigenvalue'
'unit': T['meta']['unit'],
'description': f"deviator of tensor {T['label']} ({T['meta']['description']})",
'creator': 'add_deviator'
}
}
self._add_generic_pointwise(deviator,{'T':T})
def add_eigenvalue(self,
T_sym: str,
eigenvalue: Literal['max', 'mid', 'min'] = 'max'):
@ -835,11 +816,47 @@ class Result:
>>> r.add_eigenvalue('sigma','min')
"""
self._add_generic_pointwise(self._add_eigenvalue,{'T_sym':T_sym},{'eigenvalue':eigenvalue})
def eigenval(T_sym: Dict[str, Any], eigenvalue: Literal['max, mid, min']) -> Dict[str, Any]:
if eigenvalue == 'max':
label,p = 'maximum',2
elif eigenvalue == 'mid':
label,p = 'intermediate',1
elif eigenvalue == 'min':
label,p = 'minimum',0
else:
raise ValueError(f'invalid eigenvalue: {eigenvalue}')
return {
'data': tensor.eigenvalues(T_sym['data'])[:,p],
'label': f"lambda_{eigenvalue}({T_sym['label']})",
'meta' : {
'unit': T_sym['meta']['unit'],
'description': f"{label} eigenvalue of {T_sym['label']} ({T_sym['meta']['description']})",
'creator': 'add_eigenvalue'
}
}
self._add_generic_pointwise(eigenval,{'T_sym':T_sym},{'eigenvalue':eigenvalue})
@staticmethod
def _add_eigenvector(T_sym: Dict[str, Any], eigenvalue: Literal['max', 'mid', 'min']) -> Dict[str, Any]:
def add_eigenvector(self,
T_sym: str,
eigenvalue: Literal['max', 'mid', 'min'] = 'max'):
"""
Add eigenvector of symmetric tensor.
Parameters
----------
T_sym : str
Name of symmetric tensor dataset.
eigenvalue : {'max', 'mid', 'min'}, optional
Eigenvalue to which the eigenvector corresponds.
Defaults to 'max'.
"""
def eigenvector(T_sym: Dict[str, Any], eigenvalue: Literal['max', 'mid', 'min']) -> Dict[str, Any]:
if eigenvalue == 'max':
label,p = 'maximum',2
elif eigenvalue == 'mid':
@ -859,40 +876,10 @@ class Result:
'creator': 'add_eigenvector'
}
}
def add_eigenvector(self,
T_sym: str,
eigenvalue: Literal['max', 'mid', 'min'] = 'max'):
"""
Add eigenvector of symmetric tensor.
Parameters
----------
T_sym : str
Name of symmetric tensor dataset.
eigenvalue : {'max', 'mid', 'min'}, optional
Eigenvalue to which the eigenvector corresponds.
Defaults to 'max'.
"""
self._add_generic_pointwise(self._add_eigenvector,{'T_sym':T_sym},{'eigenvalue':eigenvalue})
self._add_generic_pointwise(eigenvector,{'T_sym':T_sym},{'eigenvalue':eigenvalue})
@staticmethod
def _add_IPF_color(l: FloatSequence, q: Dict[str, Any]) -> Dict[str, Any]:
m = util.scale_to_coprime(np.array(l))
lattice = q['meta']['lattice']
o = Orientation(rotation = q['data'],lattice=lattice)
return {
'data': np.uint8(o.IPF_color(l)*255),
'label': 'IPFcolor_({} {} {})'.format(*m),
'meta' : {
'unit': '8-bit RGB',
'lattice': q['meta']['lattice'],
'description': 'Inverse Pole Figure (IPF) colors along sample direction ({} {} {})'.format(*m),
'creator': 'add_IPF_color'
}
}
def add_IPF_color(self,
l: FloatSequence,
q: str = 'O'):
@ -916,20 +903,26 @@ class Result:
>>> r.add_IPF_color(np.array([0,1,1]))
"""
self._add_generic_pointwise(self._add_IPF_color,{'q':q},{'l':l})
def IPF_color(l: FloatSequence, q: Dict[str, Any]) -> Dict[str, Any]:
m = util.scale_to_coprime(np.array(l))
lattice = q['meta']['lattice']
o = Orientation(rotation = q['data'],lattice=lattice)
@staticmethod
def _add_maximum_shear(T_sym: Dict[str, Any]) -> Dict[str, Any]:
return {
'data': mechanics.maximum_shear(T_sym['data']),
'label': f"max_shear({T_sym['label']})",
'data': np.uint8(o.IPF_color(l)*255),
'label': 'IPFcolor_({} {} {})'.format(*m),
'meta' : {
'unit': T_sym['meta']['unit'],
'description': f"maximum shear component of {T_sym['label']} ({T_sym['meta']['description']})",
'creator': 'add_maximum_shear'
'unit': '8-bit RGB',
'lattice': q['meta']['lattice'],
'description': 'Inverse Pole Figure (IPF) colors along sample direction ({} {} {})'.format(*m),
'creator': 'add_IPF_color'
}
}
self._add_generic_pointwise(IPF_color,{'q':q},{'l':l})
def add_maximum_shear(self, T_sym: str):
"""
Add maximum shear components of symmetric tensor.
@ -940,30 +933,20 @@ class Result:
Name of symmetric tensor dataset.
"""
self._add_generic_pointwise(self._add_maximum_shear,{'T_sym':T_sym})
@staticmethod
def _add_equivalent_Mises(T_sym: Dict[str, Any], kind: str) -> Dict[str, Any]:
k = kind
if k is None:
if T_sym['meta']['unit'] == '1':
k = 'strain'
elif T_sym['meta']['unit'] == 'Pa':
k = 'stress'
if k not in ['stress', 'strain']:
raise ValueError(f'invalid von Mises kind "{kind}"')
def maximum_shear(T_sym: Dict[str, Any]) -> Dict[str, Any]:
return {
'data': (mechanics.equivalent_strain_Mises if k=='strain' else \
mechanics.equivalent_stress_Mises)(T_sym['data']),
'label': f"{T_sym['label']}_vM",
'data': mechanics.maximum_shear(T_sym['data']),
'label': f"max_shear({T_sym['label']})",
'meta': {
'unit': T_sym['meta']['unit'],
'description': f"Mises equivalent {k} of {T_sym['label']} ({T_sym['meta']['description']})",
'creator': 'add_Mises'
'description': f"maximum shear component of {T_sym['label']} ({T_sym['meta']['description']})",
'creator': 'add_maximum_shear'
}
}
self._add_generic_pointwise(maximum_shear,{'T_sym':T_sym})
def add_equivalent_Mises(self,
T_sym: str,
kind: Optional[str] = None):
@ -993,11 +976,45 @@ class Result:
>>> r.add_equivalent_Mises('epsilon_V^0.0(F)')
"""
self._add_generic_pointwise(self._add_equivalent_Mises,{'T_sym':T_sym},{'kind':kind})
def equivalent_Mises(T_sym: Dict[str, Any], kind: str) -> Dict[str, Any]:
k = kind
if k is None:
if T_sym['meta']['unit'] == '1':
k = 'strain'
elif T_sym['meta']['unit'] == 'Pa':
k = 'stress'
if k not in ['stress', 'strain']:
raise ValueError(f'invalid von Mises kind "{kind}"')
return {
'data': (mechanics.equivalent_strain_Mises if k=='strain' else \
mechanics.equivalent_stress_Mises)(T_sym['data']),
'label': f"{T_sym['label']}_vM",
'meta': {
'unit': T_sym['meta']['unit'],
'description': f"Mises equivalent {k} of {T_sym['label']} ({T_sym['meta']['description']})",
'creator': 'add_Mises'
}
}
self._add_generic_pointwise(equivalent_Mises,{'T_sym':T_sym},{'kind':kind})
@staticmethod
def _add_norm(x: Dict[str, Any], ord: Union[int, float, Literal['fro', 'nuc']]) -> Dict[str, Any]:
def add_norm(self,
x: str,
ord: Union[None, int, float, Literal['fro', 'nuc']] = None):
"""
Add the norm of a vector or tensor.
Parameters
----------
x : str
Name of vector or tensor dataset.
ord : {non-zero int, inf, -inf, 'fro', 'nuc'}, optional
Order of the norm. inf means NumPy's inf object. For details refer to numpy.linalg.norm.
"""
def norm(x: Dict[str, Any], ord: Union[int, float, Literal['fro', 'nuc']]) -> Dict[str, Any]:
o = ord
if len(x['data'].shape) == 2:
axis: Union[int, Tuple[int, int]] = 1
@ -1019,36 +1036,10 @@ class Result:
'creator': 'add_norm'
}
}
def add_norm(self,
x: str,
ord: Union[None, int, float, Literal['fro', 'nuc']] = None):
"""
Add the norm of a vector or tensor.
Parameters
----------
x : str
Name of vector or tensor dataset.
ord : {non-zero int, inf, -inf, 'fro', 'nuc'}, optional
Order of the norm. inf means NumPy's inf object. For details refer to numpy.linalg.norm.
"""
self._add_generic_pointwise(self._add_norm,{'x':x},{'ord':ord})
self._add_generic_pointwise(norm,{'x':x},{'ord':ord})
@staticmethod
def _add_stress_second_Piola_Kirchhoff(P: Dict[str, Any], F: Dict[str, Any]) -> Dict[str, Any]:
return {
'data': mechanics.stress_second_Piola_Kirchhoff(P['data'],F['data']),
'label': 'S',
'meta': {
'unit': P['meta']['unit'],
'description': "second Piola-Kirchhoff stress calculated "
f"from {P['label']} ({P['meta']['description']})"
f" and {F['label']} ({F['meta']['description']})",
'creator': 'add_stress_second_Piola_Kirchhoff'
}
}
def add_stress_second_Piola_Kirchhoff(self,
P: str = 'P',
F: str = 'F'):
@ -1071,34 +1062,23 @@ class Result:
is taken into account.
"""
self._add_generic_pointwise(self._add_stress_second_Piola_Kirchhoff,{'P':P,'F':F})
@staticmethod
def _add_pole(q: Dict[str, Any],
uvw: FloatSequence,
hkl: FloatSequence,
with_symmetry: bool,
normalize: bool) -> Dict[str, Any]:
c = q['meta']['c/a'] if 'c/a' in q['meta'] else 1
brackets = ['[]','()','⟨⟩','{}'][(uvw is None)*1+with_symmetry*2]
label = 'p^' + '{}{} {} {}{}'.format(brackets[0],
*(uvw if uvw else hkl),
brackets[-1],)
ori = Orientation(q['data'],lattice=q['meta']['lattice'],a=1,c=c)
def stress_second_Piola_Kirchhoff(P: Dict[str, Any], F: Dict[str, Any]) -> Dict[str, Any]:
return {
'data': ori.to_pole(uvw=uvw,hkl=hkl,with_symmetry=with_symmetry,normalize=normalize),
'label': label,
'data': mechanics.stress_second_Piola_Kirchhoff(P['data'],F['data']),
'label': 'S',
'meta': {
'unit': '1',
'description': f'{"normalized " if normalize else ""}lab frame vector along lattice ' \
+ ('direction' if uvw is not None else 'plane') \
+ ('s' if with_symmetry else ''),
'creator': 'add_pole'
'unit': P['meta']['unit'],
'description': "second Piola-Kirchhoff stress calculated "
f"from {P['label']} ({P['meta']['description']})"
f" and {F['label']} ({F['meta']['description']})",
'creator': 'add_stress_second_Piola_Kirchhoff'
}
}
self._add_generic_pointwise(stress_second_Piola_Kirchhoff,{'P':P,'F':F})
def add_pole(self,
q: str = 'O',
*,
@ -1124,22 +1104,33 @@ class Result:
Defaults to True.
"""
self._add_generic_pointwise(self._add_pole,
{'q':q},
{'uvw':uvw,'hkl':hkl,'with_symmetry':with_symmetry,'normalize':normalize})
def pole(q: Dict[str, Any],
uvw: FloatSequence,
hkl: FloatSequence,
with_symmetry: bool,
normalize: bool) -> Dict[str, Any]:
c = q['meta']['c/a'] if 'c/a' in q['meta'] else 1
brackets = ['[]','()','⟨⟩','{}'][(uvw is None)*1+with_symmetry*2]
label = 'p^' + '{}{} {} {}{}'.format(brackets[0],
*(uvw if uvw else hkl),
brackets[-1],)
ori = Orientation(q['data'],lattice=q['meta']['lattice'],a=1,c=c)
@staticmethod
def _add_rotation(F: Dict[str, Any]) -> Dict[str, Any]:
return {
'data': mechanics.rotation(F['data']).as_matrix(),
'label': f"R({F['label']})",
'data': ori.to_pole(uvw=uvw,hkl=hkl,with_symmetry=with_symmetry,normalize=normalize),
'label': label,
'meta' : {
'unit': F['meta']['unit'],
'description': f"rotational part of {F['label']} ({F['meta']['description']})",
'creator': 'add_rotation'
'unit': '1',
'description': f'{"normalized " if normalize else ""}lab frame vector along lattice ' \
+ ('direction' if uvw is not None else 'plane') \
+ ('s' if with_symmetry else ''),
'creator': 'add_pole'
}
}
self._add_generic_pointwise(pole,{'q':q},{'uvw':uvw,'hkl':hkl,'with_symmetry':with_symmetry,'normalize':normalize})
def add_rotation(self, F: str):
"""
Add rotational part of a deformation gradient.
@ -1158,20 +1149,20 @@ class Result:
>>> r.add_rotation('F')
"""
self._add_generic_pointwise(self._add_rotation,{'F':F})
@staticmethod
def _add_spherical(T: Dict[str, Any]) -> Dict[str, Any]:
def rotation(F: Dict[str, Any]) -> Dict[str, Any]:
return {
'data': tensor.spherical(T['data'],False),
'label': f"p_{T['label']}",
'data': mechanics.rotation(F['data']).as_matrix(),
'label': f"R({F['label']})",
'meta': {
'unit': T['meta']['unit'],
'description': f"spherical component of tensor {T['label']} ({T['meta']['description']})",
'creator': 'add_spherical'
'unit': F['meta']['unit'],
'description': f"rotational part of {F['label']} ({F['meta']['description']})",
'creator': 'add_rotation'
}
}
self._add_generic_pointwise(rotation,{'F':F})
def add_spherical(self, T: str):
"""
Add the spherical (hydrostatic) part of a tensor.
@ -1190,22 +1181,20 @@ class Result:
>>> r.add_spherical('sigma')
"""
self._add_generic_pointwise(self._add_spherical,{'T':T})
@staticmethod
def _add_strain(F: Dict[str, Any], t: Literal['V', 'U'], m: float) -> Dict[str, Any]:
side = 'left' if t == 'V' else 'right'
def spherical(T: Dict[str, Any]) -> Dict[str, Any]:
return {
'data': mechanics.strain(F['data'],t,m),
'label': f"epsilon_{t}^{m}({F['label']})",
'data': tensor.spherical(T['data'],False),
'label': f"p_{T['label']}",
'meta': {
'unit': F['meta']['unit'],
'description': f'Seth-Hill strain tensor of order {m} based on {side} stretch tensor '+\
f"of {F['label']} ({F['meta']['description']})",
'creator': 'add_strain'
'unit': T['meta']['unit'],
'description': f"spherical component of tensor {T['label']} ({T['meta']['description']})",
'creator': 'add_spherical'
}
}
self._add_generic_pointwise(spherical,{'T':T})
def add_strain(self,
F: str = 'F',
t: Literal['V', 'U'] = 'V',
@ -1266,21 +1255,22 @@ class Result:
| https://de.wikipedia.org/wiki/Verzerrungstensor
"""
self._add_generic_pointwise(self._add_strain,{'F':F},{'t':t,'m':m})
@staticmethod
def _add_stretch_tensor(F: Dict[str, Any], t: str) -> Dict[str, Any]:
def strain(F: Dict[str, Any], t: Literal['V', 'U'], m: float) -> Dict[str, Any]:
side = 'left' if t == 'V' else 'right'
return {
'data': (mechanics.stretch_left if t.upper() == 'V' else mechanics.stretch_right)(F['data']),
'label': f"{t}({F['label']})",
'data': mechanics.strain(F['data'],t,m),
'label': f"epsilon_{t}^{m}({F['label']})",
'meta': {
'unit': F['meta']['unit'],
'description': f"{'left' if t.upper() == 'V' else 'right'} stretch tensor "\
+f"of {F['label']} ({F['meta']['description']})", # noqa
'creator': 'add_stretch_tensor'
'description': f'Seth-Hill strain tensor of order {m} based on {side} stretch tensor '+\
f"of {F['label']} ({F['meta']['description']})",
'creator': 'add_strain'
}
}
self._add_generic_pointwise(strain,{'F':F},{'t':t,'m':m})
def add_stretch_tensor(self,
F: str = 'F',
t: Literal['V', 'U'] = 'V'):
@ -1296,20 +1286,21 @@ class Result:
Defaults to 'V'.
"""
self._add_generic_pointwise(self._add_stretch_tensor,{'F':F},{'t':t})
@staticmethod
def _add_curl(f: Dict[str, Any], size: np.ndarray) -> Dict[str, Any]:
def stretch_tensor(F: Dict[str, Any], t: str) -> Dict[str, Any]:
return {
'data': grid_filters.curl(size,f['data']),
'label': f"curl({f['label']})",
'data': (mechanics.stretch_left if t.upper() == 'V' else mechanics.stretch_right)(F['data']),
'label': f"{t}({F['label']})",
'meta': {
'unit': f['meta']['unit']+'/m',
'description': f"curl of {f['label']} ({f['meta']['description']})",
'creator': 'add_curl'
'unit': F['meta']['unit'],
'description': f"{'left' if t.upper() == 'V' else 'right'} stretch tensor "\
+f"of {F['label']} ({F['meta']['description']})", # noqa
'creator': 'add_stretch_tensor'
}
}
self._add_generic_pointwise(stretch_tensor,{'F':F},{'t':t})
def add_curl(self, f: str):
"""
Add curl of a field.
@ -1325,20 +1316,20 @@ class Result:
i.e. fields resulting from the grid solver.
"""
self._add_generic_grid(self._add_curl,{'f':f},{'size':self.size})
@staticmethod
def _add_divergence(f: Dict[str, Any], size: np.ndarray) -> Dict[str, Any]:
def curl(f: Dict[str, Any], size: np.ndarray) -> Dict[str, Any]:
return {
'data': grid_filters.divergence(size,f['data']),
'label': f"divergence({f['label']})",
'data': grid_filters.curl(size,f['data']),
'label': f"curl({f['label']})",
'meta': {
'unit': f['meta']['unit']+'/m',
'description': f"divergence of {f['label']} ({f['meta']['description']})",
'creator': 'add_divergence'
'description': f"curl of {f['label']} ({f['meta']['description']})",
'creator': 'add_curl'
}
}
self._add_generic_grid(curl,{'f':f},{'size':self.size})
def add_divergence(self, f: str):
"""
Add divergence of a field.
@ -1354,21 +1345,20 @@ class Result:
i.e. fields resulting from the grid solver.
"""
self._add_generic_grid(self._add_divergence,{'f':f},{'size':self.size})
@staticmethod
def _add_gradient(f: Dict[str, Any], size: np.ndarray) -> Dict[str, Any]:
def divergence(f: Dict[str, Any], size: np.ndarray) -> Dict[str, Any]:
return {
'data': grid_filters.gradient(size,f['data'] if len(f['data'].shape) == 4 else \
f['data'].reshape(f['data'].shape+(1,))),
'label': f"gradient({f['label']})",
'data': grid_filters.divergence(size,f['data']),
'label': f"divergence({f['label']})",
'meta': {
'unit': f['meta']['unit']+'/m',
'description': f"gradient of {f['label']} ({f['meta']['description']})",
'creator': 'add_gradient'
'description': f"divergence of {f['label']} ({f['meta']['description']})",
'creator': 'add_divergence'
}
}
self._add_generic_grid(divergence,{'f':f},{'size':self.size})
def add_gradient(self, f: str):
"""
Add gradient of a field.
@ -1384,7 +1374,19 @@ class Result:
i.e. fields resulting from the grid solver.
"""
self._add_generic_grid(self._add_gradient,{'f':f},{'size':self.size})
def gradient(f: Dict[str, Any], size: np.ndarray) -> Dict[str, Any]:
return {
'data': grid_filters.gradient(size,f['data'] if len(f['data'].shape) == 4 else \
f['data'].reshape(f['data'].shape+(1,))),
'label': f"gradient({f['label']})",
'meta': {
'unit': f['meta']['unit']+'/m',
'description': f"gradient of {f['label']} ({f['meta']['description']})",
'creator': 'add_gradient'
}
}
self._add_generic_grid(gradient,{'f':f},{'size':self.size})
def _add_generic_grid(self,
@ -1446,29 +1448,6 @@ class Result:
f'damask.Result.{creator} v{damask.version}'.encode()
def _job_pointwise(self,
group: str,
callback: Callable,
datasets: Dict[str, str],
args: Dict[str, str],
lock: Lock) -> List[Union[None, Any]]:
"""Execute job for _add_generic_pointwise."""
try:
datasets_in = {}
lock.acquire()
with h5py.File(self.fname,'r') as f:
for arg,label in datasets.items():
loc = f[group+'/'+label]
datasets_in[arg]={'data' :loc[()],
'label':label,
'meta': {k:(v.decode() if not h5py3 and type(v) is bytes else v) \
for k,v in loc.attrs.items()}}
lock.release()
r = callback(**datasets_in,**args)
return [group,r]
except Exception as err:
print(f'Error during calculation: {err}.')
return [None,None]
def _add_generic_pointwise(self,
@ -1490,8 +1469,24 @@ class Result:
Arguments parsed to func.
"""
pool = mp.Pool(int(os.environ.get('OMP_NUM_THREADS',4)))
lock = mp.Manager().Lock()
def job_pointwise(group: str,
callback: Callable,
datasets: Dict[str, str],
args: Dict[str, str]) -> Union[None, Any]:
try:
datasets_in = {}
with h5py.File(self.fname,'r') as f:
for arg,label in datasets.items():
loc = f[group+'/'+label]
datasets_in[arg]={'data' :loc[()],
'label':label,
'meta': {k:(v.decode() if not h5py3 and type(v) is bytes else v) \
for k,v in loc.attrs.items()}}
return callback(**datasets_in,**args)
except Exception as err:
print(f'Error during calculation: {err}.')
return None
groups = []
with h5py.File(self.fname,'r') as f:
@ -1506,12 +1501,10 @@ class Result:
print('No matching dataset found, no data was added.')
return
default_arg = partial(self._job_pointwise,callback=func,datasets=datasets,args=args,lock=lock)
for group,result in util.show_progress(pool.imap_unordered(default_arg,groups),len(groups)):# type: ignore
if not result:
for group in util.show_progress(groups):
if not (result := job_pointwise(group, callback=func, datasets=datasets, args=args)): # type: ignore
continue
lock.acquire()
with h5py.File(self.fname, 'a') as f:
try:
if not self._protected and '/'.join([group,result['label']]) in f:
@ -1543,10 +1536,6 @@ class Result:
except (OSError,RuntimeError) as err:
print(f'Could not add dataset: {err}.')
lock.release()
pool.close()
pool.join()
def _mappings(self):
@ -2064,7 +2053,7 @@ class Result:
cfg_dir = (Path.cwd() if target_dir is None else Path(target_dir))
with h5py.File(self.fname,'r') as f_in:
f_in['setup'].visititems(partial(export,
f_in['setup'].visititems(functools.partial(export,
output=output,
cfg_dir=cfg_dir,
overwrite=overwrite))

View File

@ -326,7 +326,7 @@ class TestResult:
if shape == 'pseudo_scalar': default.add_calculation('#F#[:,0,0:1]','x','1','a pseudo scalar')
if shape == 'scalar': default.add_calculation('#F#[:,0,0]','x','1','just a scalar')
if shape == 'vector': default.add_calculation('#F#[:,:,1]','x','1','just a vector')
x = default.place('x').reshape((np.product(default.cells),-1))
x = default.place('x').reshape((np.prod(default.cells),-1))
default.add_gradient('x')
in_file = default.place('gradient(x)')
in_memory = grid_filters.gradient(default.size,x.reshape(tuple(default.cells)+x.shape[1:])).reshape(in_file.shape)

View File

@ -398,7 +398,7 @@ class TestGridFilters:
np.arange(cells[1]),
np.arange(cells[2]),indexing='ij')).reshape(tuple(cells)+(3,),order='F')
x,y,z = map(np.random.randint,cells)
assert grid_filters.ravel_index(indices)[x,y,z] == np.arange(0,np.product(cells)).reshape(cells,order='F')[x,y,z]
assert grid_filters.ravel_index(indices)[x,y,z] == np.arange(0,np.prod(cells)).reshape(cells,order='F')[x,y,z]
def test_unravel_index(self):
cells = np.random.randint(8,32,(3))