Merge branch 'typehints_results' into 'development'

06 Added typehints for results module

See merge request damask/DAMASK!498
This commit is contained in:
Martin Diehl 2022-05-23 11:29:38 +00:00
commit 9696c19676
3 changed files with 206 additions and 123 deletions

View File

@ -1,4 +1,5 @@
import multiprocessing as mp
from multiprocessing.synchronize import Lock
import re
import fnmatch
import os
@ -10,6 +11,7 @@ from pathlib import Path
from functools import partial
from collections import defaultdict
from collections.abc import Iterable
from typing import Union, Callable, Any, Sequence, Literal, Dict, List, Tuple
import h5py
import numpy as np
@ -22,19 +24,21 @@ from . import grid_filters
from . import mechanics
from . import tensor
from . import util
from ._typehints import FloatSequence, IntSequence
h5py3 = h5py.__version__[0] == '3'
chunk_size = 1024**2//8 # for compression in HDF5
prefix_inc = 'increment_'
def _read(dataset):
def _read(dataset: h5py._hl.dataset.Dataset) -> np.ndarray:
"""Read a dataset and its metadata into a numpy.ndarray."""
metadata = {k:(v.decode() if not h5py3 and type(v) is bytes else v) for k,v in dataset.attrs.items()}
dtype = np.dtype(dataset.dtype,metadata=metadata)
dtype = np.dtype(dataset.dtype,metadata=metadata) # type: ignore
return np.array(dataset,dtype=dtype)
def _match(requested,existing):
def _match(requested,
existing: h5py._hl.base.KeysViewHDF5) -> List[Any]:
"""Find matches among two sets of labels."""
def flatten_list(list_of_lists):
return [e for e_ in list_of_lists for e in e_]
@ -50,7 +54,10 @@ def _match(requested,existing):
return sorted(set(flatten_list([fnmatch.filter(existing,r) for r in requested_])),
key=util.natural_sort)
def _empty_like(dataset,N_materialpoints,fill_float,fill_int):
def _empty_like(dataset: np.ma.core.MaskedArray,
N_materialpoints: int,
fill_float: float,
fill_int: int) -> np.ma.core.MaskedArray:
"""Create empty numpy.ma.MaskedArray."""
return ma.array(np.empty((N_materialpoints,)+dataset.shape[1:],dataset.dtype),
fill_value = fill_float if dataset.dtype in np.sctypes['float'] else fill_int,
@ -84,7 +91,7 @@ class Result:
"""
def __init__(self,fname):
def __init__(self, fname: Union[str, Path]):
"""
New result view bound to a HDF5 file.
@ -102,7 +109,7 @@ class Result:
if self.version_major != 0 or not 12 <= self.version_minor <= 14:
raise TypeError(f'unsupported DADF5 version "{self.version_major}.{self.version_minor}"')
if self.version_major == 0 and self.version_minor < 14:
self.export_setup = None
self.export_setup = None # type: ignore
self.structured = 'cells' in f['geometry'].attrs.keys()
@ -111,7 +118,7 @@ class Result:
self.size = f['geometry'].attrs['size']
self.origin = f['geometry'].attrs['origin']
else:
self.add_curl = self.add_divergence = self.add_gradient = None
self.add_curl = self.add_divergence = self.add_gradient = None # type: ignore
r = re.compile(rf'{prefix_inc}([0-9]+)')
self.increments = sorted([i for i in f.keys() if r.match(i)],key=util.natural_sort)
@ -126,7 +133,7 @@ class Result:
self.phase = f['cell_to/phase']['label'].astype('str')
self.phases = sorted(np.unique(self.phase),key=util.natural_sort)
self.fields = []
self.fields: List[str] = []
for c in self.phases:
self.fields += f['/'.join([self.increments[0],'phase',c])].keys()
for m in self.homogenizations:
@ -144,14 +151,14 @@ class Result:
self._protected = True
def __copy__(self):
def __copy__(self) -> "Result":
"""Create deep copy."""
return copy.deepcopy(self)
copy = __copy__
def __repr__(self):
def __repr__(self) -> str:
"""Give short human-readable summary."""
with h5py.File(self.fname,'r') as f:
header = [f'Created by {f.attrs["creator"]}',
@ -171,12 +178,12 @@ class Result:
def _manage_view(self,
action,
increments=None,
times=None,
phases=None,
homogenizations=None,
fields=None):
action: Literal['set', 'add', 'del'],
increments: Union[int, Sequence[int], str, Sequence[str], bool] = None,
times: Union[float, Sequence[float], str, Sequence[str], bool] = None,
phases: Union[str, Sequence[str], bool] = None,
homogenizations: Union[str, Sequence[str], bool] = None,
fields: Union[str, Sequence[str], bool] = None) -> "Result":
"""
Manages the visibility of the groups.
@ -204,8 +211,7 @@ class Result:
datasets = '*'
elif datasets is False:
datasets = []
choice = list(datasets).copy() if hasattr(datasets,'__iter__') and not isinstance(datasets,str) else \
[datasets]
choice = [datasets] if not hasattr(datasets,'__iter__') or isinstance(datasets,str) else list(datasets) # type: ignore
if what == 'increments':
choice = [c if isinstance(c,str) and c.startswith(prefix_inc) else
@ -216,7 +222,7 @@ class Result:
if choice == ['*']:
choice = self.increments
else:
iterator = map(float,choice)
iterator = map(float,choice) # type: ignore
choice = []
for c in iterator:
idx = np.searchsorted(self.times,c)
@ -224,7 +230,7 @@ class Result:
if np.isclose(c,self.times[idx]):
choice.append(self.increments[idx])
elif np.isclose(c,self.times[idx+1]):
choice.append(self.increments[idx+1])
choice.append(self.increments[idx+1]) # type: ignore
valid = _match(choice,getattr(self,what))
existing = set(self.visible[what])
@ -241,7 +247,9 @@ class Result:
return dup
def increments_in_range(self,start=None,end=None):
def increments_in_range(self,
start: Union[str, int] = None,
end: Union[str, int] = None) -> Sequence[int]:
"""
Get all increments within a given range.
@ -263,7 +271,9 @@ class Result:
self.incs[-1] if end is None else end))
return [i for i in self.incs if s <= i <= e]
def times_in_range(self,start=None,end=None):
def times_in_range(self,
start: float = None,
end: float = None) -> Sequence[int]:
"""
Get all increments within a given time range.
@ -286,12 +296,12 @@ class Result:
def view(self,*,
increments=None,
times=None,
phases=None,
homogenizations=None,
fields=None,
protected=None):
increments: Union[int, Sequence[int], str, Sequence[str], bool] = None,
times: Union[float, Sequence[float], str, Sequence[str], bool] = None,
phases: Union[str, Sequence[str], bool] = None,
homogenizations: Union[str, Sequence[str], bool] = None,
fields: Union[str, Sequence[str], bool] = None,
protected: bool = None) -> "Result":
"""
Set view.
@ -343,11 +353,11 @@ class Result:
def view_more(self,*,
increments=None,
times=None,
phases=None,
homogenizations=None,
fields=None):
increments: Union[int, Sequence[int], str, Sequence[str], bool] = None,
times: Union[float, Sequence[float], str, Sequence[str], bool] = None,
phases: Union[str, Sequence[str], bool] = None,
homogenizations: Union[str, Sequence[str], bool] = None,
fields: Union[str, Sequence[str], bool] = None) -> "Result":
"""
Add to view.
@ -386,11 +396,11 @@ class Result:
def view_less(self,*,
increments=None,
times=None,
phases=None,
homogenizations=None,
fields=None):
increments: Union[int, Sequence[int], str, Sequence[str], bool] = None,
times: Union[float, Sequence[float], str, Sequence[str], bool] = None,
phases: Union[str, Sequence[str], bool] = None,
homogenizations: Union[str, Sequence[str], bool] = None,
fields: Union[str, Sequence[str], bool] = None) -> "Result":
"""
Remove from view.
@ -427,7 +437,9 @@ class Result:
return self._manage_view('del',increments,times,phases,homogenizations,fields)
def rename(self,name_src,name_dst):
def rename(self,
name_src: str,
name_dst: str):
"""
Rename/move datasets (within the same group/folder).
@ -468,7 +480,7 @@ class Result:
del f[path_src]
def remove(self,name):
def remove(self, name: str):
"""
Remove/delete datasets.
@ -502,7 +514,7 @@ class Result:
if path in f.keys(): del f[path]
def list_data(self):
def list_data(self) -> List[str]:
"""
Collect information on all active datasets in the file.
@ -533,7 +545,8 @@ class Result:
return msg
def enable_user_function(self,func):
def enable_user_function(self,
func: Callable):
globals()[func.__name__]=func
print(f'Function {func.__name__} enabled in add_calculation.')
@ -544,7 +557,7 @@ class Result:
@property
def coordinates0_point(self):
def coordinates0_point(self) -> np.ndarray:
"""Initial/undeformed cell center coordinates."""
if self.structured:
return grid_filters.coordinates0_point(self.cells,self.size,self.origin).reshape(-1,3,order='F')
@ -553,7 +566,7 @@ class Result:
return f['geometry/x_p'][()]
@property
def coordinates0_node(self):
def coordinates0_node(self) -> np.ndarray:
"""Initial/undeformed nodal coordinates."""
if self.structured:
return grid_filters.coordinates0_node(self.cells,self.size,self.origin).reshape(-1,3,order='F')
@ -562,7 +575,7 @@ class Result:
return f['geometry/x_n'][()]
@property
def geometry0(self):
def geometry0(self) -> VTK:
"""Initial/undeformed geometry."""
if self.structured:
return VTK.from_image_data(self.cells,self.size,self.origin)
@ -575,7 +588,7 @@ class Result:
@staticmethod
def _add_absolute(x):
def _add_absolute(x: Dict[str, Any]) -> Dict[str, Any]:
return {
'data': np.abs(x['data']),
'label': f'|{x["label"]}|',
@ -585,7 +598,7 @@ class Result:
'creator': 'add_absolute'
}
}
def add_absolute(self,x):
def add_absolute(self, x: str):
"""
Add absolute value.
@ -599,7 +612,7 @@ class Result:
@staticmethod
def _add_calculation(**kwargs):
def _add_calculation(**kwargs) -> Dict[str, Any]:
formula = kwargs['formula']
for d in re.findall(r'#(.*?)#',formula):
formula = formula.replace(f'#{d}#',f"kwargs['{d}']['data']")
@ -617,7 +630,11 @@ class Result:
'creator': 'add_calculation'
}
}
def add_calculation(self,formula,name,unit='n/a',description=None):
def add_calculation(self,
formula: str,
name: str,
unit: str = 'n/a',
description: str = None):
"""
Add result of a general formula.
@ -661,13 +678,13 @@ class Result:
... 'Mises equivalent of the Cauchy stress')
"""
dataset_mapping = {d:d for d in set(re.findall(r'#(.*?)#',formula))} # datasets used in the formula
dataset_mapping = {d:d for d in set(re.findall(r'#(.*?)#',formula))} # datasets used in the formula
args = {'formula':formula,'label':name,'unit':unit,'description':description}
self._add_generic_pointwise(self._add_calculation,dataset_mapping,args)
@staticmethod
def _add_stress_Cauchy(P,F):
def _add_stress_Cauchy(P: Dict[str, Any], F: Dict[str, Any]) -> Dict[str, Any]:
return {
'data': mechanics.stress_Cauchy(P['data'],F['data']),
'label': 'sigma',
@ -679,7 +696,9 @@ class Result:
'creator': 'add_stress_Cauchy'
}
}
def add_stress_Cauchy(self,P='P',F='F'):
def add_stress_Cauchy(self,
P: str = 'P',
F: str = 'F'):
"""
Add Cauchy stress calculated from first Piola-Kirchhoff stress and deformation gradient.
@ -695,7 +714,7 @@ class Result:
@staticmethod
def _add_determinant(T):
def _add_determinant(T: Dict[str, Any]) -> Dict[str, Any]:
return {
'data': np.linalg.det(T['data']),
'label': f"det({T['label']})",
@ -705,7 +724,7 @@ class Result:
'creator': 'add_determinant'
}
}
def add_determinant(self,T):
def add_determinant(self, T: str):
"""
Add the determinant of a tensor.
@ -727,7 +746,7 @@ class Result:
@staticmethod
def _add_deviator(T):
def _add_deviator(T: Dict[str, Any]) -> Dict[str, Any]:
return {
'data': tensor.deviatoric(T['data']),
'label': f"s_{T['label']}",
@ -737,7 +756,7 @@ class Result:
'creator': 'add_deviator'
}
}
def add_deviator(self,T):
def add_deviator(self, T: str):
"""
Add the deviatoric part of a tensor.
@ -759,13 +778,15 @@ class Result:
@staticmethod
def _add_eigenvalue(T_sym,eigenvalue):
def _add_eigenvalue(T_sym: Dict[str, Any], eigenvalue: Literal['max, mid, min']) -> Dict[str, Any]:
if eigenvalue == 'max':
label,p = 'maximum',2
elif eigenvalue == 'mid':
label,p = 'intermediate',1
elif eigenvalue == 'min':
label,p = 'minimum',0
else:
raise ValueError(f'invalid eigenvalue: {eigenvalue}')
return {
'data': tensor.eigenvalues(T_sym['data'])[:,p],
@ -776,7 +797,9 @@ class Result:
'creator': 'add_eigenvalue'
}
}
def add_eigenvalue(self,T_sym,eigenvalue='max'):
def add_eigenvalue(self,
T_sym: str,
eigenvalue: Literal['max', 'mid', 'min'] = 'max'):
"""
Add eigenvalues of symmetric tensor.
@ -800,13 +823,16 @@ class Result:
@staticmethod
def _add_eigenvector(T_sym,eigenvalue):
def _add_eigenvector(T_sym: Dict[str, Any], eigenvalue: Literal['max', 'mid', 'min']) -> Dict[str, Any]:
if eigenvalue == 'max':
label,p = 'maximum',2
elif eigenvalue == 'mid':
label,p = 'intermediate',1
elif eigenvalue == 'min':
label,p = 'minimum',0
else:
raise ValueError(f'invalid eigenvalue: {eigenvalue}')
return {
'data': tensor.eigenvectors(T_sym['data'])[:,p],
'label': f"v_{eigenvalue}({T_sym['label']})",
@ -817,7 +843,9 @@ class Result:
'creator': 'add_eigenvector'
}
}
def add_eigenvector(self,T_sym,eigenvalue='max'):
def add_eigenvector(self,
T_sym: str,
eigenvalue: Literal['max', 'mid', 'min'] = 'max'):
"""
Add eigenvector of symmetric tensor.
@ -834,7 +862,7 @@ class Result:
@staticmethod
def _add_IPF_color(l,q):
def _add_IPF_color(l: FloatSequence, q: Dict[str, Any]) -> Dict[str, Any]:
m = util.scale_to_coprime(np.array(l))
lattice = q['meta']['lattice']
o = Orientation(rotation = q['data'],lattice=lattice)
@ -849,7 +877,9 @@ class Result:
'creator': 'add_IPF_color'
}
}
def add_IPF_color(self,l,q='O'):
def add_IPF_color(self,
l: FloatSequence,
q: str = 'O'):
"""
Add RGB color tuple of inverse pole figure (IPF) color.
@ -874,7 +904,7 @@ class Result:
@staticmethod
def _add_maximum_shear(T_sym):
def _add_maximum_shear(T_sym: Dict[str, Any]) -> Dict[str, Any]:
return {
'data': mechanics.maximum_shear(T_sym['data']),
'label': f"max_shear({T_sym['label']})",
@ -884,7 +914,7 @@ class Result:
'creator': 'add_maximum_shear'
}
}
def add_maximum_shear(self,T_sym):
def add_maximum_shear(self, T_sym: str):
"""
Add maximum shear components of symmetric tensor.
@ -898,7 +928,7 @@ class Result:
@staticmethod
def _add_equivalent_Mises(T_sym,kind):
def _add_equivalent_Mises(T_sym: Dict[str, Any], kind: str) -> Dict[str, Any]:
k = kind
if k is None:
if T_sym['meta']['unit'] == '1':
@ -918,7 +948,9 @@ class Result:
'creator': 'add_Mises'
}
}
def add_equivalent_Mises(self,T_sym,kind=None):
def add_equivalent_Mises(self,
T_sym: str,
kind: str = None):
"""
Add the equivalent Mises stress or strain of a symmetric tensor.
@ -949,10 +981,10 @@ class Result:
@staticmethod
def _add_norm(x,ord):
def _add_norm(x: Dict[str, Any], ord: Union[int, float, Literal['fro', 'nuc']]) -> Dict[str, Any]:
o = ord
if len(x['data'].shape) == 2:
axis = 1
axis: Union[int, Tuple[int, int]] = 1
t = 'vector'
if o is None: o = 2
elif len(x['data'].shape) == 3:
@ -971,7 +1003,9 @@ class Result:
'creator': 'add_norm'
}
}
def add_norm(self,x,ord=None):
def add_norm(self,
x: str,
ord: Union[int, float, Literal['fro', 'nuc']] = None):
"""
Add the norm of vector or tensor.
@ -987,7 +1021,7 @@ class Result:
@staticmethod
def _add_stress_second_Piola_Kirchhoff(P,F):
def _add_stress_second_Piola_Kirchhoff(P: Dict[str, Any], F: Dict[str, Any]) -> Dict[str, Any]:
return {
'data': mechanics.stress_second_Piola_Kirchhoff(P['data'],F['data']),
'label': 'S',
@ -999,7 +1033,9 @@ class Result:
'creator': 'add_stress_second_Piola_Kirchhoff'
}
}
def add_stress_second_Piola_Kirchhoff(self,P='P',F='F'):
def add_stress_second_Piola_Kirchhoff(self,
P: str = 'P',
F: str = 'F'):
"""
Add second Piola-Kirchhoff stress calculated from first Piola-Kirchhoff stress and deformation gradient.
@ -1023,7 +1059,11 @@ class Result:
@staticmethod
def _add_pole(q,uvw,hkl,with_symmetry,normalize):
def _add_pole(q: Dict[str, Any],
uvw: FloatSequence,
hkl: FloatSequence,
with_symmetry: bool,
normalize: bool) -> Dict[str, Any]:
c = q['meta']['c/a'] if 'c/a' in q['meta'] else 1
brackets = ['[]','()','⟨⟩','{}'][(uvw is None)*1+with_symmetry*2]
label = 'p^' + '{}{} {} {}{}'.format(brackets[0],
@ -1042,7 +1082,13 @@ class Result:
'creator': 'add_pole'
}
}
def add_pole(self,q='O',*,uvw=None,hkl=None,with_symmetry=False,normalize=True):
def add_pole(self,
q: str = 'O',
*,
uvw: FloatSequence = None,
hkl: FloatSequence = None,
with_symmetry: bool = False,
normalize: bool = True):
"""
Add lab frame vector along lattice direction [uvw] or plane normal (hkl).
@ -1067,7 +1113,7 @@ class Result:
@staticmethod
def _add_rotation(F):
def _add_rotation(F: Dict[str, Any]) -> Dict[str, Any]:
return {
'data': mechanics.rotation(F['data']).as_matrix(),
'label': f"R({F['label']})",
@ -1077,7 +1123,7 @@ class Result:
'creator': 'add_rotation'
}
}
def add_rotation(self,F):
def add_rotation(self, F: str):
"""
Add rotational part of a deformation gradient.
@ -1099,7 +1145,7 @@ class Result:
@staticmethod
def _add_spherical(T):
def _add_spherical(T: Dict[str, Any]) -> Dict[str, Any]:
return {
'data': tensor.spherical(T['data'],False),
'label': f"p_{T['label']}",
@ -1109,7 +1155,7 @@ class Result:
'creator': 'add_spherical'
}
}
def add_spherical(self,T):
def add_spherical(self, T: str):
"""
Add the spherical (hydrostatic) part of a tensor.
@ -1131,7 +1177,7 @@ class Result:
@staticmethod
def _add_strain(F,t,m):
def _add_strain(F: Dict[str, Any], t: Literal['V', 'U'], m: float) -> Dict[str, Any]:
return {
'data': mechanics.strain(F['data'],t,m),
'label': f"epsilon_{t}^{m}({F['label']})",
@ -1141,7 +1187,10 @@ class Result:
'creator': 'add_strain'
}
}
def add_strain(self,F='F',t='V',m=0.0):
def add_strain(self,
F: str = 'F',
t: Literal['V', 'U'] = 'V',
m: float = 0.0):
"""
Add strain tensor of a deformation gradient.
@ -1177,7 +1226,7 @@ class Result:
@staticmethod
def _add_stretch_tensor(F,t):
def _add_stretch_tensor(F: Dict[str, Any], t: str) -> Dict[str, Any]:
return {
'data': (mechanics.stretch_left if t.upper() == 'V' else mechanics.stretch_right)(F['data']),
'label': f"{t}({F['label']})",
@ -1188,7 +1237,9 @@ class Result:
'creator': 'add_stretch_tensor'
}
}
def add_stretch_tensor(self,F='F',t='V'):
def add_stretch_tensor(self,
F: str = 'F',
t: Literal['V', 'U'] = 'V'):
"""
Add stretch tensor of a deformation gradient.
@ -1205,7 +1256,7 @@ class Result:
@staticmethod
def _add_curl(f,size):
def _add_curl(f: Dict[str, Any], size: np.ndarray) -> Dict[str, Any]:
return {
'data': grid_filters.curl(size,f['data']),
'label': f"curl({f['label']})",
@ -1215,7 +1266,7 @@ class Result:
'creator': 'add_curl'
}
}
def add_curl(self,f):
def add_curl(self, f: str):
"""
Add curl of a field.
@ -1234,7 +1285,7 @@ class Result:
@staticmethod
def _add_divergence(f,size):
def _add_divergence(f: Dict[str, Any], size: np.ndarray) -> Dict[str, Any]:
return {
'data': grid_filters.divergence(size,f['data']),
'label': f"divergence({f['label']})",
@ -1244,7 +1295,7 @@ class Result:
'creator': 'add_divergence'
}
}
def add_divergence(self,f):
def add_divergence(self, f: str):
"""
Add divergence of a field.
@ -1263,7 +1314,7 @@ class Result:
@staticmethod
def _add_gradient(f,size):
def _add_gradient(f: Dict[str, Any], size: np.ndarray) -> Dict[str, Any]:
return {
'data': grid_filters.gradient(size,f['data'] if len(f['data'].shape) == 4 else \
f['data'].reshape(f['data'].shape+(1,))),
@ -1274,7 +1325,7 @@ class Result:
'creator': 'add_gradient'
}
}
def add_gradient(self,f):
def add_gradient(self, f: str):
"""
Add gradient of a field.
@ -1292,7 +1343,11 @@ class Result:
self._add_generic_grid(self._add_gradient,{'f':f},{'size':self.size})
def _add_generic_grid(self,func,datasets,args={},constituents=None):
def _add_generic_grid(self,
func: Callable,
datasets: Dict[str, str],
args: Dict[str, str] = {},
constituents = None):
"""
General function to add data on a regular grid.
@ -1313,11 +1368,13 @@ class Result:
at_cell_ph,in_data_ph,at_cell_ho,in_data_ho = self._mappings()
increments = self.place(list(datasets.values()),False)
if not increments: raise RuntimeError("received invalid dataset")
with h5py.File(self.fname, 'a') as f:
for increment in self.place(datasets.values(),False).items():
for increment in increments.items():
for ty in increment[1].items():
for field in ty[1].items():
d = list(field[1].values())[0]
d: np.ma.MaskedArray = list(field[1].values())[0]
if np.any(d.mask): continue
dataset = {'f':{'data':np.reshape(d.data,tuple(self.cells)+d.data.shape[1:]),
'label':list(datasets.values())[0],
@ -1331,21 +1388,26 @@ class Result:
result1 = result[at_cell_ho[x]]
path = '/'.join(['/',increment[0],ty[0],x,field[0]])
dataset = f[path].create_dataset(r['label'],data=result1)
h5_dataset = f[path].create_dataset(r['label'],data=result1)
now = datetime.datetime.now().astimezone()
dataset.attrs['created'] = now.strftime('%Y-%m-%d %H:%M:%S%z') if h5py3 else \
now.strftime('%Y-%m-%d %H:%M:%S%z').encode()
h5_dataset.attrs['created'] = now.strftime('%Y-%m-%d %H:%M:%S%z') if h5py3 else \
now.strftime('%Y-%m-%d %H:%M:%S%z').encode()
for l,v in r['meta'].items():
dataset.attrs[l.lower()]=v if h5py3 else v.encode()
creator = dataset.attrs['creator'] if h5py3 else \
dataset.attrs['creator'].decode()
dataset.attrs['creator'] = f'damask.Result.{creator} v{damask.version}' if h5py3 else \
f'damask.Result.{creator} v{damask.version}'.encode()
h5_dataset.attrs[l.lower()]=v if h5py3 else v.encode()
creator = h5_dataset.attrs['creator'] if h5py3 else \
h5_dataset.attrs['creator'].decode()
h5_dataset.attrs['creator'] = f'damask.Result.{creator} v{damask.version}' if h5py3 else \
f'damask.Result.{creator} v{damask.version}'.encode()
def _job_pointwise(self,group,func,datasets,args,lock):
def _job_pointwise(self,
group: str,
callback: Callable,
datasets: Dict[str, str],
args: Dict[str, str],
lock: Lock) -> List[Union[None, Any]]:
"""Execute job for _add_generic_pointwise."""
try:
datasets_in = {}
@ -1358,19 +1420,23 @@ class Result:
'meta': {k:(v.decode() if not h5py3 and type(v) is bytes else v) \
for k,v in loc.attrs.items()}}
lock.release()
r = func(**datasets_in,**args)
r = callback(**datasets_in,**args)
return [group,r]
except Exception as err:
print(f'Error during calculation: {err}.')
return [None,None]
def _add_generic_pointwise(self,func,datasets,args={}):
def _add_generic_pointwise(self,
func: Callable,
datasets: Dict[str, Any],
args: Dict[str, Any] = {}):
"""
General function to add pointwise data.
Parameters
----------
func : function
callback : function
Callback function that calculates a new dataset from one or
more datasets per HDF5 group.
datasets : dictionary
@ -1396,9 +1462,9 @@ class Result:
print('No matching dataset found, no data was added.')
return
default_arg = partial(self._job_pointwise,func=func,datasets=datasets,args=args,lock=lock)
default_arg = partial(self._job_pointwise,callback=func,datasets=datasets,args=args,lock=lock)
for group,result in util.show_progress(pool.imap_unordered(default_arg,groups),len(groups)):
for group,result in util.show_progress(pool.imap_unordered(default_arg,groups),len(groups)):# type: ignore
if not result:
continue
lock.acquire()
@ -1410,15 +1476,14 @@ class Result:
dataset.attrs['overwritten'] = True
else:
shape = result['data'].shape
if result['data'].size >= chunk_size*2:
if compress := (result['data'].size >= chunk_size*2):
chunks = (chunk_size//np.prod(shape[1:]),)+shape[1:]
compression = ('gzip',6)
else:
chunks = shape
compression = (None,None)
dataset = f[group].create_dataset(result['label'],data=result['data'],
maxshape=shape, chunks=chunks,
compression=compression[0], compression_opts=compression[1],
compression = 'gzip' if compress else None,
compression_opts = 6 if compress else None,
shuffle=True,fletcher32=True)
now = datetime.datetime.now().astimezone()
@ -1440,7 +1505,8 @@ class Result:
pool.join()
def export_XDMF(self,output='*'):
def export_XDMF(self,
output: Union[str, List[str]] = '*'):
"""
Write XDMF file to directly visualize data from DADF5 file.
@ -1574,7 +1640,13 @@ class Result:
return at_cell_ph,in_data_ph,at_cell_ho,in_data_ho
def export_VTK(self,output='*',mode='cell',constituents=None,fill_float=np.nan,fill_int=0,parallel=True):
def export_VTK(self,
output: Union[str,list] = '*',
mode: str = 'cell',
constituents: IntSequence = None,
fill_float: float = np.nan,
fill_int: int = 0,
parallel: bool = True):
"""
Export to VTK cell/point data.
@ -1612,12 +1684,12 @@ class Result:
else:
raise ValueError(f'invalid mode "{mode}"')
v.comments = util.execution_stamp('Result','export_VTK')
v.comments = [util.execution_stamp('Result','export_VTK')]
N_digits = int(np.floor(np.log10(max(1,self.incs[-1]))))+1
constituents_ = constituents if isinstance(constituents,Iterable) else \
(range(self.N_constituents) if constituents is None else [constituents])
(range(self.N_constituents) if constituents is None else [constituents]) # type: ignore
suffixes = [''] if self.N_constituents == 1 or isinstance(constituents,int) else \
[f'#{c}' for c in constituents_]
@ -1637,7 +1709,7 @@ class Result:
for ty in ['phase','homogenization']:
for field in self.visible['fields']:
outs = {}
outs: Dict[str, np.ma.core.MaskedArray] = {}
for label in self.visible[ty+'s']:
if field not in f['/'.join([inc,ty,label])].keys(): continue
@ -1665,7 +1737,10 @@ class Result:
v.save(f'{self.fname.stem}_inc{inc[10:].zfill(N_digits)}',parallel=parallel)
def get(self,output='*',flatten=True,prune=True):
def get(self,
output: Union[str, List[str]] = '*',
flatten: bool = True,
prune: bool = True):
"""
Collect data per phase/homogenization reflecting the group/folder structure in the DADF5 file.
@ -1687,7 +1762,7 @@ class Result:
Datasets structured by phase/homogenization and according to selected view.
"""
r = {}
r = {} # type: ignore
with h5py.File(self.fname,'r') as f:
for inc in util.show_progress(self.visible['increments']):
@ -1710,7 +1785,13 @@ class Result:
return None if (type(r) == dict and r == {}) else r
def place(self,output='*',flatten=True,prune=True,constituents=None,fill_float=np.nan,fill_int=0):
def place(self,
output: Union[str, List[str]] = '*',
flatten: bool = True,
prune: bool = True,
constituents: IntSequence = None,
fill_float: float = np.nan,
fill_int: int = 0):
"""
Merge data into spatial order that is compatible with the damask.VTK geometry representation.
@ -1748,10 +1829,10 @@ class Result:
Datasets structured by spatial position and according to selected view.
"""
r = {}
r = {} # type: ignore
constituents_ = constituents if isinstance(constituents,Iterable) else \
(range(self.N_constituents) if constituents is None else [constituents])
constituents_ = list(map(int,constituents)) if isinstance(constituents,Iterable) else \
(range(self.N_constituents) if constituents is None else [constituents]) # type: ignore
suffixes = [''] if self.N_constituents == 1 or isinstance(constituents,int) else \
[f'#{c}' for c in constituents_]
@ -1797,7 +1878,9 @@ class Result:
return None if (type(r) == dict and r == {}) else r
def export_setup(self,output='*',overwrite=False):
def export_setup(self,
output: Union[str, List[str]] = '*',
overwrite: bool = False):
"""
Export configuration files.
@ -1811,7 +1894,7 @@ class Result:
Defaults to False.
"""
def export(name,obj,output,overwrite):
def export(name: str, obj: Union[h5py.Dataset,h5py.Group], output: Union[str,List[str]], overwrite: bool):
if type(obj) == h5py.Dataset and _match(output,[name]):
d = obj.attrs['description'] if h5py3 else obj.attrs['description'].decode()
if not Path(name).exists() or overwrite:

View File

@ -994,7 +994,7 @@ class Rotation:
"""
rng = np.random.default_rng(rng_seed)
r = rng.random(3 if shape is None else tuple(shape)+(3,) if hasattr(shape, '__iter__') else (shape,3)) #type: ignore
r = rng.random(3 if shape is None else tuple(shape)+(3,) if hasattr(shape, '__iter__') else (shape,3)) # type: ignore
A = np.sqrt(r[...,2])
B = np.sqrt(1.0-r[...,2])

View File

@ -160,7 +160,7 @@ class Table:
'linear' ==> 1_v 2_v 3_v
"""
self.data.columns = self._label(self.shapes,how) #type: ignore
self.data.columns = self._label(self.shapes,how) # type: ignore
def _add_comment(self,