updated commits of original branch to suit current state of development

This commit is contained in:
Daniel Otto de Mentock 2022-04-27 16:26:33 +02:00
parent 831cd42842
commit b06a272586
2 changed files with 199 additions and 108 deletions

View File

@ -23,18 +23,22 @@ from . import mechanics
from . import tensor
from . import util
from typing import Union, Optional, Callable, Any, Sequence, Literal, Dict, List, Tuple
from multiprocessing.synchronize import Lock as LockBase
h5py3 = h5py.__version__[0] == '3'
chunk_size = 1024**2//8 # for compression in HDF5
prefix_inc = 'increment_'
def _read(dataset):
def _read(dataset: h5py._hl.dataset.Dataset):
"""Read a dataset and its metadata into a numpy.ndarray."""
metadata = {k:(v.decode() if not h5py3 and type(v) is bytes else v) for k,v in dataset.attrs.items()}
dtype = np.dtype(dataset.dtype,metadata=metadata)
dtype = np.dtype(dataset.dtype,metadata=metadata) # type: ignore
return np.array(dataset,dtype=dtype)
def _match(requested,existing):
def _match(requested: Union[None, str, Sequence[Any], np.ndarray],
existing: h5py._hl.base.KeysViewHDF5) -> List[Any]:
"""Find matches among two sets of labels."""
def flatten_list(list_of_lists):
return [e for e_ in list_of_lists for e in e_]
@ -50,7 +54,10 @@ def _match(requested,existing):
return sorted(set(flatten_list([fnmatch.filter(existing,r) for r in requested_])),
key=util.natural_sort)
def _empty_like(dataset,N_materialpoints,fill_float,fill_int):
def _empty_like(dataset: np.ma.core.MaskedArray,
N_materialpoints: int,
fill_float: float,
fill_int: int) -> np.ma.core.MaskedArray:
"""Create empty numpy.ma.MaskedArray."""
return ma.array(np.empty((N_materialpoints,)+dataset.shape[1:],dataset.dtype),
fill_value = fill_float if dataset.dtype in np.sctypes['float'] else fill_int,
@ -84,7 +91,7 @@ class Result:
"""
def __init__(self,fname):
def __init__(self, fname: Union[str, Path]):
"""
New result view bound to a HDF5 file.
@ -102,7 +109,7 @@ class Result:
if self.version_major != 0 or not 12 <= self.version_minor <= 14:
raise TypeError(f'unsupported DADF5 version "{self.version_major}.{self.version_minor}"')
if self.version_major == 0 and self.version_minor < 14:
self.export_setup = None
self.export_setup = None # type: ignore
self.structured = 'cells' in f['geometry'].attrs.keys()
@ -111,7 +118,7 @@ class Result:
self.size = f['geometry'].attrs['size']
self.origin = f['geometry'].attrs['origin']
else:
self.add_curl = self.add_divergence = self.add_gradient = None
self.add_curl = self.add_divergence = self.add_gradient = None # type: ignore
r = re.compile(rf'{prefix_inc}([0-9]+)')
self.increments = sorted([i for i in f.keys() if r.match(i)],key=util.natural_sort)
@ -126,7 +133,7 @@ class Result:
self.phase = f['cell_to/phase']['label'].astype('str')
self.phases = sorted(np.unique(self.phase),key=util.natural_sort)
self.fields = []
self.fields: List[str] = []
for c in self.phases:
self.fields += f['/'.join([self.increments[0],'phase',c])].keys()
for m in self.homogenizations:
@ -144,14 +151,14 @@ class Result:
self._protected = True
def __copy__(self):
def __copy__(self) -> "Result":
"""Create deep copy."""
return copy.deepcopy(self)
copy = __copy__
def __repr__(self):
def __repr__(self) -> str:
"""Give short human-readable summary."""
with h5py.File(self.fname,'r') as f:
header = [f'Created by {f.attrs["creator"]}',
@ -171,12 +178,12 @@ class Result:
def _manage_view(self,
action,
action: Literal['set', 'add', 'del'],
increments=None,
times=None,
phases=None,
homogenizations=None,
fields=None):
fields=None) -> "Result":
"""
Manages the visibility of the groups.
@ -223,8 +230,8 @@ class Result:
if idx >= len(self.times): continue
if np.isclose(c,self.times[idx]):
choice.append(self.increments[idx])
elif np.isclose(c,self.times[idx+1]):
choice.append(self.increments[idx+1])
elif np.isclose(c,self.times[idx+1]): #type: ignore
choice.append(self.increments[idx+1]) #type: ignore
valid = _match(choice,getattr(self,what))
existing = set(self.visible[what])
@ -241,7 +248,9 @@ class Result:
return dup
def increments_in_range(self,start=None,end=None):
def increments_in_range(self,
start: Union[str, int] = None,
end: Union[str, int] = None) -> Sequence[int]:
"""
Get all increments within a given range.
@ -263,7 +272,9 @@ class Result:
self.incs[-1] if end is None else end))
return [i for i in self.incs if s <= i <= e]
def times_in_range(self,start=None,end=None):
def times_in_range(self,
start: float = None,
end: float = None) -> Sequence[int]:
"""
Get all increments within a given time range.
@ -286,12 +297,12 @@ class Result:
def view(self,*,
increments=None,
times=None,
phases=None,
homogenizations=None,
fields=None,
protected=None):
increments: Union[int, Sequence[int], str, Sequence[str], bool] = None,
times: Union[float, Sequence[float], str, Sequence[str], bool] = None,
phases: Union[str, Sequence[str], bool] = None,
homogenizations: Union[str, Sequence[str], bool] = None,
fields: Union[str, Sequence[str], bool] = None,
protected: bool = None) -> "Result":
"""
Set view.
@ -343,11 +354,11 @@ class Result:
def view_more(self,*,
increments=None,
times=None,
phases=None,
homogenizations=None,
fields=None):
increments: Union[int, Sequence[int], str, Sequence[str], bool] = None,
times: Union[float, Sequence[float], str, Sequence[str], bool] = None,
phases: Union[str, Sequence[str], bool] = None,
homogenizations: Union[str, Sequence[str], bool] = None,
fields: Union[str, Sequence[str], bool] = None) -> "Result":
"""
Add to view.
@ -386,11 +397,11 @@ class Result:
def view_less(self,*,
increments=None,
times=None,
phases=None,
homogenizations=None,
fields=None):
increments: Union[int, Sequence[int], str, Sequence[str], bool] = None,
times: Union[float, Sequence[float], str, Sequence[str], bool] = None,
phases: Union[str, Sequence[str], bool] = None,
homogenizations: Union[str, Sequence[str], bool] = None,
fields: Union[str, Sequence[str], bool] = None) -> "Result":
"""
Remove from view.
@ -427,7 +438,9 @@ class Result:
return self._manage_view('del',increments,times,phases,homogenizations,fields)
def rename(self,name_src,name_dst):
def rename(self,
name_src: str,
name_dst: str):
"""
Rename/move datasets (within the same group/folder).
@ -468,7 +481,7 @@ class Result:
del f[path_src]
def remove(self,name):
def remove(self, name: str):
"""
Remove/delete datasets.
@ -502,7 +515,7 @@ class Result:
if path in f.keys(): del f[path]
def list_data(self):
def list_data(self) -> List[str]:
"""
Collect information on all active datasets in the file.
@ -533,7 +546,8 @@ class Result:
return msg
def enable_user_function(self,func):
def enable_user_function(self,
func: Callable):
globals()[func.__name__]=func
print(f'Function {func.__name__} enabled in add_calculation.')
@ -544,7 +558,7 @@ class Result:
@property
def coordinates0_point(self):
def coordinates0_point(self) -> np.ndarray:
"""Initial/undeformed cell center coordinates."""
if self.structured:
return grid_filters.coordinates0_point(self.cells,self.size,self.origin).reshape(-1,3,order='F')
@ -553,7 +567,7 @@ class Result:
return f['geometry/x_p'][()]
@property
def coordinates0_node(self):
def coordinates0_node(self) -> np.ndarray:
"""Initial/undeformed nodal coordinates."""
if self.structured:
return grid_filters.coordinates0_node(self.cells,self.size,self.origin).reshape(-1,3,order='F')
@ -562,7 +576,7 @@ class Result:
return f['geometry/x_n'][()]
@property
def geometry0(self):
def geometry0(self) -> VTK:
"""Initial/undeformed geometry."""
if self.structured:
return VTK.from_image_data(self.cells,self.size,self.origin)
@ -575,7 +589,7 @@ class Result:
@staticmethod
def _add_absolute(x):
def _add_absolute(x: Dict[str, Any]) -> Dict[str, Union[Dict[str, Union[str, int, slice]], str, np.ndarray]]:
return {
'data': np.abs(x['data']),
'label': f'|{x["label"]}|',
@ -585,7 +599,7 @@ class Result:
'creator': 'add_absolute'
}
}
def add_absolute(self,x):
def add_absolute(self, x: str):
"""
Add absolute value.
@ -599,7 +613,7 @@ class Result:
@staticmethod
def _add_calculation(**kwargs):
def _add_calculation(**kwargs) -> Dict[str, Union[Dict[str, str], str]]:
formula = kwargs['formula']
for d in re.findall(r'#(.*?)#',formula):
formula = formula.replace(f'#{d}#',f"kwargs['{d}']['data']")
@ -617,7 +631,11 @@ class Result:
'creator': 'add_calculation'
}
}
def add_calculation(self,formula,name,unit='n/a',description=None):
def add_calculation(self,
formula: str,
name: str,
unit: str = 'n/a',
description: str = None):
"""
Add result of a general formula.
@ -661,13 +679,14 @@ class Result:
... 'Mises equivalent of the Cauchy stress')
"""
dataset_mapping = {d:d for d in set(re.findall(r'#(.*?)#',formula))} # datasets used in the formula
dataset_mapping: Dict[str, str] = {d:d for d in set(re.findall(r'#(.*?)#',formula))} # datasets used in the formula
args = {'formula':formula,'label':name,'unit':unit,'description':description}
self._add_generic_pointwise(self._add_calculation,dataset_mapping,args)
@staticmethod
def _add_stress_Cauchy(P,F):
def _add_stress_Cauchy(P: Dict[str, np.ndarray],
F: Dict[str, np.ndarray]) -> Dict[str, Any]:
return {
'data': mechanics.stress_Cauchy(P['data'],F['data']),
'label': 'sigma',
@ -679,7 +698,9 @@ class Result:
'creator': 'add_stress_Cauchy'
}
}
def add_stress_Cauchy(self,P='P',F='F'):
def add_stress_Cauchy(self,
P: str = 'P',
F: str = 'F'):
"""
Add Cauchy stress calculated from first Piola-Kirchhoff stress and deformation gradient.
@ -695,7 +716,7 @@ class Result:
@staticmethod
def _add_determinant(T):
def _add_determinant(T: Dict[str, np.ndarray]) -> Dict[str, Any]:
return {
'data': np.linalg.det(T['data']),
'label': f"det({T['label']})",
@ -705,7 +726,7 @@ class Result:
'creator': 'add_determinant'
}
}
def add_determinant(self,T):
def add_determinant(self, T: str):
"""
Add the determinant of a tensor.
@ -727,7 +748,7 @@ class Result:
@staticmethod
def _add_deviator(T):
def _add_deviator(T: Dict[str, np.ndarray]) -> Dict[str, Any]:
return {
'data': tensor.deviatoric(T['data']),
'label': f"s_{T['label']}",
@ -737,7 +758,7 @@ class Result:
'creator': 'add_deviator'
}
}
def add_deviator(self,T):
def add_deviator(self, T: str):
"""
Add the deviatoric part of a tensor.
@ -759,13 +780,16 @@ class Result:
@staticmethod
def _add_eigenvalue(T_sym,eigenvalue):
def _add_eigenvalue(T_sym: Dict[str, np.ndarray],
eigenvalue: Literal['max, mid, minimum']) -> Dict[str, Any]:
if eigenvalue == 'max':
label,p = 'maximum',2
elif eigenvalue == 'mid':
label,p = 'intermediate',1
elif eigenvalue == 'min':
label,p = 'minimum',0
else:
raise TypeError
return {
'data': tensor.eigenvalues(T_sym['data'])[:,p],
@ -776,7 +800,9 @@ class Result:
'creator': 'add_eigenvalue'
}
}
def add_eigenvalue(self,T_sym,eigenvalue='max'):
def add_eigenvalue(self,
T_sym: str,
eigenvalue: Literal['max', 'mid', 'minimum'] = 'max'):
"""
Add eigenvalues of symmetric tensor.
@ -800,7 +826,8 @@ class Result:
@staticmethod
def _add_eigenvector(T_sym,eigenvalue):
def _add_eigenvector(T_sym: Dict[str, np.ndarray],
eigenvalue: Literal['max', 'mid', 'minimum']) -> Dict[str, Any]:
if eigenvalue == 'max':
label,p = 'maximum',2
elif eigenvalue == 'mid':
@ -817,7 +844,9 @@ class Result:
'creator': 'add_eigenvector'
}
}
def add_eigenvector(self,T_sym,eigenvalue='max'):
def add_eigenvector(self,
T_sym: str,
eigenvalue: Literal['max', 'mid', 'minimum'] = 'max'):
"""
Add eigenvector of symmetric tensor.
@ -834,7 +863,8 @@ class Result:
@staticmethod
def _add_IPF_color(l,q):
def _add_IPF_color(l: Sequence[float],
q: Dict[str, Any]) -> Dict[str, Any]:
m = util.scale_to_coprime(np.array(l))
lattice = q['meta']['lattice']
o = Orientation(rotation = q['data'],lattice=lattice)
@ -849,7 +879,9 @@ class Result:
'creator': 'add_IPF_color'
}
}
def add_IPF_color(self,l,q='O'):
def add_IPF_color(self,
l: np.ndarray,
q: str = 'O'):
"""
Add RGB color tuple of inverse pole figure (IPF) color.
@ -874,7 +906,7 @@ class Result:
@staticmethod
def _add_maximum_shear(T_sym):
def _add_maximum_shear(T_sym: dict) -> dict:
return {
'data': mechanics.maximum_shear(T_sym['data']),
'label': f"max_shear({T_sym['label']})",
@ -884,7 +916,7 @@ class Result:
'creator': 'add_maximum_shear'
}
}
def add_maximum_shear(self,T_sym):
def add_maximum_shear(self, T_sym: str):
"""
Add maximum shear components of symmetric tensor.
@ -898,7 +930,8 @@ class Result:
@staticmethod
def _add_equivalent_Mises(T_sym,kind):
def _add_equivalent_Mises(T_sym: Dict[str, np.ndarray],
kind: str) -> Dict[str, Any]:
k = kind
if k is None:
if T_sym['meta']['unit'] == '1':
@ -918,7 +951,9 @@ class Result:
'creator': 'add_Mises'
}
}
def add_equivalent_Mises(self,T_sym,kind=None):
def add_equivalent_Mises(self,
T_sym: str,
kind: str = None):
"""
Add the equivalent Mises stress or strain of a symmetric tensor.
@ -949,10 +984,11 @@ class Result:
@staticmethod
def _add_norm(x,ord):
def _add_norm(x: Dict[str, np.ndarray],
ord: int) -> Dict[str, Any]:
o = ord
if len(x['data'].shape) == 2:
axis = 1
axis: Union[int, Tuple[int, int]] = 1
t = 'vector'
if o is None: o = 2
elif len(x['data'].shape) == 3:
@ -971,7 +1007,9 @@ class Result:
'creator': 'add_norm'
}
}
def add_norm(self,x,ord=None):
def add_norm(self,
x: str,
ord: Union[int, Literal['fro', 'nuc']] = None):
"""
Add the norm of vector or tensor.
@ -987,7 +1025,8 @@ class Result:
@staticmethod
def _add_stress_second_Piola_Kirchhoff(P,F):
def _add_stress_second_Piola_Kirchhoff(P: Dict[str, np.ndarray],
F: Dict[str, np.ndarray]) -> Dict[str, Any]:
return {
'data': mechanics.stress_second_Piola_Kirchhoff(P['data'],F['data']),
'label': 'S',
@ -999,7 +1038,9 @@ class Result:
'creator': 'add_stress_second_Piola_Kirchhoff'
}
}
def add_stress_second_Piola_Kirchhoff(self,P='P',F='F'):
def add_stress_second_Piola_Kirchhoff(self,
P: str = 'P',
F: str = 'F'):
"""
Add second Piola-Kirchhoff stress calculated from first Piola-Kirchhoff stress and deformation gradient.
@ -1023,7 +1064,10 @@ class Result:
@staticmethod
def _add_pole(q,uvw,hkl,with_symmetry):
def _add_pole(q: Dict[str, Any],
uvw: np.ndarray,
hkl: np.ndarray,
with_symmetry: bool) -> Dict[str, Any]:
c = q['meta']['c/a'] if 'c/a' in q['meta'] else 1
pole = Orientation(q['data'],lattice=q['meta']['lattice'],a=1,c=c).to_pole(uvw=uvw,hkl=hkl,with_symmetry=with_symmetry)
@ -1038,7 +1082,12 @@ class Result:
'creator': 'add_pole'
}
}
def add_pole(self,q='O',*,uvw=None,hkl=None,with_symmetry=False):
def add_pole(self,
q: str = 'O',
*,
uvw: np.ndarray = None,
hkl: np.ndarray = None,
with_symmetry: bool = False):
"""
Add lab frame vector along lattice direction [uvw] or plane normal (hkl).
@ -1057,7 +1106,7 @@ class Result:
@staticmethod
def _add_rotation(F):
def _add_rotation(F: Dict[str, np.ndarray]) -> Dict[str, Any]:
return {
'data': mechanics.rotation(F['data']).as_matrix(),
'label': f"R({F['label']})",
@ -1067,7 +1116,7 @@ class Result:
'creator': 'add_rotation'
}
}
def add_rotation(self,F):
def add_rotation(self, F: str):
"""
Add rotational part of a deformation gradient.
@ -1089,7 +1138,7 @@ class Result:
@staticmethod
def _add_spherical(T):
def _add_spherical(T: Dict[str, np.ndarray]) -> Dict[str, Any]:
return {
'data': tensor.spherical(T['data'],False),
'label': f"p_{T['label']}",
@ -1099,7 +1148,7 @@ class Result:
'creator': 'add_spherical'
}
}
def add_spherical(self,T):
def add_spherical(self, T: str):
"""
Add the spherical (hydrostatic) part of a tensor.
@ -1121,7 +1170,8 @@ class Result:
@staticmethod
def _add_strain(F,t,m):
def _add_strain(F: Dict[str, np.ndarray],
t: str, m: float) -> Dict[str, Any]:
return {
'data': mechanics.strain(F['data'],t,m),
'label': f"epsilon_{t}^{m}({F['label']})",
@ -1131,7 +1181,10 @@ class Result:
'creator': 'add_strain'
}
}
def add_strain(self,F='F',t='V',m=0.0):
def add_strain(self,
F: str = 'F',
t: Literal['V', 'U'] = 'V',
m: float = 0.0):
"""
Add strain tensor of a deformation gradient.
@ -1167,7 +1220,8 @@ class Result:
@staticmethod
def _add_stretch_tensor(F,t):
def _add_stretch_tensor(F: Dict[str, np.ndarray],
t: str) -> Dict[str, Any]:
return {
'data': (mechanics.stretch_left if t.upper() == 'V' else mechanics.stretch_right)(F['data']),
'label': f"{t}({F['label']})",
@ -1178,7 +1232,9 @@ class Result:
'creator': 'add_stretch_tensor'
}
}
def add_stretch_tensor(self,F='F',t='V'):
def add_stretch_tensor(self,
F: str = 'F',
t: Literal['V', 'U'] = 'V'):
"""
Add stretch tensor of a deformation gradient.
@ -1195,7 +1251,8 @@ class Result:
@staticmethod
def _add_curl(f,size):
def _add_curl(f: Dict[str, np.ndarray],
size: np.ndarray) -> Dict[str, Any]:
return {
'data': grid_filters.curl(size,f['data']),
'label': f"curl({f['label']})",
@ -1205,7 +1262,7 @@ class Result:
'creator': 'add_curl'
}
}
def add_curl(self,f):
def add_curl(self, f: str):
"""
Add curl of a field.
@ -1224,7 +1281,8 @@ class Result:
@staticmethod
def _add_divergence(f,size):
def _add_divergence(f: Dict[str, np.ndarray],
size: np.ndarray) -> Dict[str, Any]:
return {
'data': grid_filters.divergence(size,f['data']),
'label': f"divergence({f['label']})",
@ -1234,7 +1292,7 @@ class Result:
'creator': 'add_divergence'
}
}
def add_divergence(self,f):
def add_divergence(self, f: str):
"""
Add divergence of a field.
@ -1253,7 +1311,8 @@ class Result:
@staticmethod
def _add_gradient(f,size):
def _add_gradient(f: Dict[str, np.ndarray],
size: np.ndarray) -> Dict[str, Any]:
return {
'data': grid_filters.gradient(size,f['data'] if len(f['data'].shape) == 4 else \
f['data'].reshape(f['data'].shape+(1,))),
@ -1264,7 +1323,7 @@ class Result:
'creator': 'add_gradient'
}
}
def add_gradient(self,f):
def add_gradient(self, f: str):
"""
Add gradient of a field.
@ -1282,7 +1341,11 @@ class Result:
self._add_generic_grid(self._add_gradient,{'f':f},{'size':self.size})
def _add_generic_grid(self,func,datasets,args={},constituents=None):
def _add_generic_grid(self,
func: Callable,
datasets: Dict[str, str],
args: Dict[str, str] = {},
constituents=None):
"""
General function to add data on a regular grid.
@ -1303,11 +1366,13 @@ class Result:
at_cell_ph,in_data_ph,at_cell_ho,in_data_ho = self._mappings()
increments = self.place(list(datasets.values()),False)
if not increments: raise RuntimeError("received invalid dataset")
with h5py.File(self.fname, 'a') as f:
for increment in self.place(datasets.values(),False).items():
for increment in increments.items():
for ty in increment[1].items():
for field in ty[1].items():
d = list(field[1].values())[0]
d: np.ma.MaskedArray = list(field[1].values())[0]
if np.any(d.mask): continue
dataset = {'f':{'data':np.reshape(d.data,tuple(self.cells)+d.data.shape[1:]),
'label':list(datasets.values())[0],
@ -1321,21 +1386,26 @@ class Result:
result1 = result[at_cell_ho[x]]
path = '/'.join(['/',increment[0],ty[0],x,field[0]])
dataset = f[path].create_dataset(r['label'],data=result1)
h5_dataset : h5py._hl.dataset.Dataset = f[path].create_dataset(r['label'],data=result1)
now = datetime.datetime.now().astimezone()
dataset.attrs['created'] = now.strftime('%Y-%m-%d %H:%M:%S%z') if h5py3 else \
h5_dataset.attrs['created'] = now.strftime('%Y-%m-%d %H:%M:%S%z') if h5py3 else \
now.strftime('%Y-%m-%d %H:%M:%S%z').encode()
for l,v in r['meta'].items():
dataset.attrs[l.lower()]=v if h5py3 else v.encode()
creator = dataset.attrs['creator'] if h5py3 else \
dataset.attrs['creator'].decode()
dataset.attrs['creator'] = f'damask.Result.{creator} v{damask.version}' if h5py3 else \
h5_dataset.attrs[l.lower()]=v if h5py3 else v.encode()
creator = h5_dataset.attrs['creator'] if h5py3 else \
h5_dataset.attrs['creator'].decode()
h5_dataset.attrs['creator'] = f'damask.Result.{creator} v{damask.version}' if h5py3 else \
f'damask.Result.{creator} v{damask.version}'.encode()
def _job_pointwise(self,group,func,datasets,args,lock):
def _job_pointwise(self,
group: str,
func: Callable,
datasets: Dict[str, str],
args: Dict[str, str],
lock: LockBase) -> List[Union[None, Any]]:
"""Execute job for _add_generic_pointwise."""
try:
datasets_in = {}
@ -1354,7 +1424,10 @@ class Result:
print(f'Error during calculation: {err}.')
return [None,None]
def _add_generic_pointwise(self,func,datasets,args={}):
def _add_generic_pointwise(self,
func: Callable,
datasets: Dict[str, Any],
args: Dict[str, Any] = {}):
"""
General function to add pointwise data.
@ -1388,7 +1461,7 @@ class Result:
default_arg = partial(self._job_pointwise,func=func,datasets=datasets,args=args,lock=lock)
for group,result in util.show_progress(pool.imap_unordered(default_arg,groups),len(groups)):
for group,result in util.show_progress(pool.imap_unordered(default_arg,groups),len(groups)): #type: ignore
if not result:
continue
lock.acquire()
@ -1402,7 +1475,7 @@ class Result:
shape = result['data'].shape
if result['data'].size >= chunk_size*2:
chunks = (chunk_size//np.prod(shape[1:]),)+shape[1:]
compression = ('gzip',6)
compression: Tuple[Optional[str], Optional[int]] = ('gzip',6)
else:
chunks = shape
compression = (None,None)
@ -1430,7 +1503,8 @@ class Result:
pool.join()
def export_XDMF(self,output='*'):
def export_XDMF(self,
output: Union[str, List[str]] = '*'):
"""
Write XDMF file to directly visualize data from DADF5 file.
@ -1544,7 +1618,7 @@ class Result:
f.write(xml.dom.minidom.parseString(ET.tostring(xdmf).decode()).toprettyxml())
def _mappings(self):
def _mappings(self) -> Tuple[List[Dict[Any, np.ndarray]], List[Dict[Any, Any]], Dict[Any, np.ndarray], Dict[Any, Any]]:
"""Mappings to place data spatially."""
with h5py.File(self.fname,'r') as f:
@ -1564,7 +1638,13 @@ class Result:
return at_cell_ph,in_data_ph,at_cell_ho,in_data_ho
def export_VTK(self,output='*',mode='cell',constituents=None,fill_float=np.nan,fill_int=0,parallel=True):
def export_VTK(self,
output: Union[str,list] = '*',
mode: str = 'cell',
constituents: Optional[Union[int, list]] = None,
fill_float: float = np.nan,
fill_int: int = 0,
parallel: bool = True):
"""
Export to VTK cell/point data.
@ -1627,7 +1707,7 @@ class Result:
for ty in ['phase','homogenization']:
for field in self.visible['fields']:
outs = {}
outs: Dict[str, np.ma.core.MaskedArray] = {}
for label in self.visible[ty+'s']:
if field not in f['/'.join([inc,ty,label])].keys(): continue
@ -1655,7 +1735,10 @@ class Result:
v.save(f'{self.fname.stem}_inc{inc[10:].zfill(N_digits)}',parallel=parallel)
def get(self,output='*',flatten=True,prune=True):
def get(self,
output: Union[str, List[str]] = '*',
flatten: bool = True,
prune: bool = True) -> Optional[Dict[str, Dict[str, Any]]]:
"""
Collect data per phase/homogenization reflecting the group/folder structure in the DADF5 file.
@ -1677,7 +1760,7 @@ class Result:
Datasets structured by phase/homogenization and according to selected view.
"""
r = {}
r: Dict[str, Dict[str, Any]] = {}
with h5py.File(self.fname,'r') as f:
for inc in util.show_progress(self.visible['increments']):
@ -1700,7 +1783,13 @@ class Result:
return None if (type(r) == dict and r == {}) else r
def place(self,output='*',flatten=True,prune=True,constituents=None,fill_float=np.nan,fill_int=0):
def place(self,
output: Union[str, list] = '*',
flatten: bool = True,
prune: bool = True,
constituents: Union[None, int, List[int]] = None,
fill_float: float = np.nan,
fill_int: int = 0) -> Optional[Dict[str, Dict[str, Dict[str, Dict[str, Union[np.ma.MaskedArray]]]]]]:
"""
Merge data into spatial order that is compatible with the damask.VTK geometry representation.
@ -1738,9 +1827,9 @@ class Result:
Datasets structured by spatial position and according to selected view.
"""
r = {}
r: Dict[str, Dict[str, Dict[str, Dict[str, Union[np.ma.MaskedArray]]]]] = {}
constituents_ = constituents if isinstance(constituents,Iterable) else \
constituents_: Sequence[int] = constituents if isinstance(constituents,Iterable) else \
(range(self.N_constituents) if constituents is None else [constituents])
suffixes = [''] if self.N_constituents == 1 or isinstance(constituents,int) else \
@ -1787,7 +1876,9 @@ class Result:
return None if (type(r) == dict and r == {}) else r
def export_setup(self,output='*',overwrite=False):
def export_setup(self,
output: Union[str, List[str]] = '*',
overwrite: bool = False):
"""
Export configuration files.
@ -1801,7 +1892,7 @@ class Result:
Defaults to False.
"""
def export(name,obj,output,overwrite):
def export(name: str,obj: Union[h5py.Dataset,h5py.Group],output: Union[str,list],overwrite: bool):
if type(obj) == h5py.Dataset and _match(output,[name]):
d = obj.attrs['description'] if h5py3 else obj.attrs['description'].decode()
if not Path(name).exists() or overwrite:

View File

@ -1,7 +1,7 @@
import os
import multiprocessing as mp
from pathlib import Path
from typing import Union, Literal, List, Sequence
from typing import Union, Literal
import numpy as np
import vtk
@ -86,7 +86,7 @@ class VTK:
@property
def comments(self) -> List[str]:
def comments(self):
"""Return the comments."""
field_data = self.vtk_data.GetFieldData()
for a in range(field_data.GetNumberOfArrays()):
@ -97,7 +97,7 @@ class VTK:
@comments.setter
def comments(self,
comments: Union[str, Sequence[str]]):
comments):
"""
Set comments.