polishing

This commit is contained in:
Martin Diehl 2022-05-09 00:21:53 +02:00
parent 57177303b3
commit 13f4d77791
1 changed files with 50 additions and 61 deletions

View File

@ -1,4 +1,5 @@
import multiprocessing as mp
from multiprocessing.synchronize import Lock
import re
import fnmatch
import os
@ -11,7 +12,6 @@ from functools import partial
from collections import defaultdict
from collections.abc import Iterable
from typing import Union, Optional, Callable, Any, Sequence, Literal, Dict, List, Tuple
from multiprocessing.synchronize import Lock as LockBase
import h5py
import numpy as np
@ -24,21 +24,20 @@ from . import grid_filters
from . import mechanics
from . import tensor
from . import util
from ._typehints import FloatSequence
from ._typehints import FloatSequence, IntSequence
h5py3 = h5py.__version__[0] == '3'
chunk_size = 1024**2//8 # for compression in HDF5
prefix_inc = 'increment_'
def _read(dataset: h5py._hl.dataset.Dataset):
def _read(dataset: h5py._hl.dataset.Dataset) -> np.ndarray:
"""Read a dataset and its metadata into a numpy.ndarray."""
metadata = {k:(v.decode() if not h5py3 and type(v) is bytes else v) for k,v in dataset.attrs.items()}
dtype = np.dtype(dataset.dtype,metadata=metadata) # type: ignore
dtype = np.dtype(dataset.dtype,metadata=metadata) # type: ignore
return np.array(dataset,dtype=dtype)
def _match(requested: Union[None, str, Sequence[Any], np.ndarray],
def _match(requested,
existing: h5py._hl.base.KeysViewHDF5) -> List[Any]:
"""Find matches among two sets of labels."""
def flatten_list(list_of_lists):
@ -110,7 +109,7 @@ class Result:
if self.version_major != 0 or not 12 <= self.version_minor <= 14:
raise TypeError(f'unsupported DADF5 version "{self.version_major}.{self.version_minor}"')
if self.version_major == 0 and self.version_minor < 14:
self.export_setup = None # type: ignore
self.export_setup = None # type: ignore
self.structured = 'cells' in f['geometry'].attrs.keys()
@ -119,7 +118,7 @@ class Result:
self.size = f['geometry'].attrs['size']
self.origin = f['geometry'].attrs['origin']
else:
self.add_curl = self.add_divergence = self.add_gradient = None # type: ignore
self.add_curl = self.add_divergence = self.add_gradient = None # type: ignore
r = re.compile(rf'{prefix_inc}([0-9]+)')
self.increments = sorted([i for i in f.keys() if r.match(i)],key=util.natural_sort)
@ -180,11 +179,11 @@ class Result:
def _manage_view(self,
action: Literal['set', 'add', 'del'],
increments=None,
times=None,
phases=None,
homogenizations=None,
fields=None) -> "Result":
increments: Union[int, Sequence[int], str, Sequence[str], bool] = None,
times: Union[float, Sequence[float], str, Sequence[str], bool] = None,
phases: Union[str, Sequence[str], bool] = None,
homogenizations: Union[str, Sequence[str], bool] = None,
fields: Union[str, Sequence[str], bool] = None) -> "Result":
"""
Manages the visibility of the groups.
@ -212,8 +211,7 @@ class Result:
datasets = '*'
elif datasets is False:
datasets = []
choice = list(datasets).copy() if hasattr(datasets,'__iter__') and not isinstance(datasets,str) else \
[datasets]
choice = [datasets] if not hasattr(datasets,'__iter__') or isinstance(datasets,str) else list(datasets) # type: ignore
if what == 'increments':
choice = [c if isinstance(c,str) and c.startswith(prefix_inc) else
@ -224,15 +222,15 @@ class Result:
if choice == ['*']:
choice = self.increments
else:
iterator = map(float,choice)
iterator = map(float,choice) # type: ignore
choice = []
for c in iterator:
idx = np.searchsorted(self.times,c)
if idx >= len(self.times): continue
if np.isclose(c,self.times[idx]):
choice.append(self.increments[idx])
elif np.isclose(c,self.times[idx+1]): #type: ignore
choice.append(self.increments[idx+1]) #type: ignore
elif np.isclose(c,self.times[idx+1]): # type: ignore
choice.append(self.increments[idx+1]) # type: ignore
valid = _match(choice,getattr(self,what))
existing = set(self.visible[what])
@ -686,8 +684,7 @@ class Result:
@staticmethod
def _add_stress_Cauchy(P: Dict[str, np.ndarray],
F: Dict[str, np.ndarray]) -> Dict[str, Any]:
def _add_stress_Cauchy(P: Dict[str, Any], F: Dict[str, Any]) -> Dict[str, Any]:
return {
'data': mechanics.stress_Cauchy(P['data'],F['data']),
'label': 'sigma',
@ -717,7 +714,7 @@ class Result:
@staticmethod
def _add_determinant(T: Dict[str, np.ndarray]) -> Dict[str, Any]:
def _add_determinant(T: Dict[str, Any]) -> Dict[str, Any]:
return {
'data': np.linalg.det(T['data']),
'label': f"det({T['label']})",
@ -749,7 +746,7 @@ class Result:
@staticmethod
def _add_deviator(T: Dict[str, np.ndarray]) -> Dict[str, Any]:
def _add_deviator(T: Dict[str, Any]) -> Dict[str, Any]:
return {
'data': tensor.deviatoric(T['data']),
'label': f"s_{T['label']}",
@ -781,8 +778,7 @@ class Result:
@staticmethod
def _add_eigenvalue(T_sym: Dict[str, np.ndarray],
eigenvalue: Literal['max, mid, minimum']) -> Dict[str, Any]:
def _add_eigenvalue(T_sym: Dict[str, Any], eigenvalue: Literal['max, mid, min']) -> Dict[str, Any]:
if eigenvalue == 'max':
label,p = 'maximum',2
elif eigenvalue == 'mid':
@ -790,7 +786,7 @@ class Result:
elif eigenvalue == 'min':
label,p = 'minimum',0
else:
raise TypeError(f'invalid eigenvalue passed to function: {eigenvalue}')
raise ValueError(f'invalid value for "eigenvalue": {eigenvalue}')
return {
'data': tensor.eigenvalues(T_sym['data'])[:,p],
@ -803,7 +799,7 @@ class Result:
}
def add_eigenvalue(self,
T_sym: str,
eigenvalue: Literal['max', 'mid', 'minimum'] = 'max'):
eigenvalue: Literal['max', 'mid', 'min'] = 'max'):
"""
Add eigenvalues of symmetric tensor.
@ -827,14 +823,16 @@ class Result:
@staticmethod
def _add_eigenvector(T_sym: Dict[str, np.ndarray],
eigenvalue: Literal['max', 'mid', 'minimum']) -> Dict[str, Any]:
def _add_eigenvector(T_sym: Dict[str, Any], eigenvalue: Literal['max', 'mid', 'min']) -> Dict[str, Any]:
if eigenvalue == 'max':
label,p = 'maximum',2
elif eigenvalue == 'mid':
label,p = 'intermediate',1
elif eigenvalue == 'min':
label,p = 'minimum',0
else:
raise ValueError(f'invalid value for "eigenvalue": {eigenvalue}')
return {
'data': tensor.eigenvectors(T_sym['data'])[:,p],
'label': f"v_{eigenvalue}({T_sym['label']})",
@ -847,7 +845,7 @@ class Result:
}
def add_eigenvector(self,
T_sym: str,
eigenvalue: Literal['max', 'mid', 'minimum'] = 'max'):
eigenvalue: Literal['max', 'mid', 'min'] = 'max'):
"""
Add eigenvector of symmetric tensor.
@ -864,8 +862,7 @@ class Result:
@staticmethod
def _add_IPF_color(l: FloatSequence,
q: Dict[str, Any]) -> Dict[str, Any]:
def _add_IPF_color(l: FloatSequence, q: Dict[str, Any]) -> Dict[str, Any]:
m = util.scale_to_coprime(np.array(l))
lattice = q['meta']['lattice']
o = Orientation(rotation = q['data'],lattice=lattice)
@ -881,7 +878,7 @@ class Result:
}
}
def add_IPF_color(self,
l: np.ndarray,
l: FloatSequence,
q: str = 'O'):
"""
Add RGB color tuple of inverse pole figure (IPF) color.
@ -931,8 +928,7 @@ class Result:
@staticmethod
def _add_equivalent_Mises(T_sym: Dict[str, np.ndarray],
kind: str) -> Dict[str, Any]:
def _add_equivalent_Mises(T_sym: Dict[str, Any], kind: str) -> Dict[str, Any]:
k = kind
if k is None:
if T_sym['meta']['unit'] == '1':
@ -985,8 +981,7 @@ class Result:
@staticmethod
def _add_norm(x: Dict[str, np.ndarray],
ord: int) -> Dict[str, Any]:
def _add_norm(x: Dict[str, Any], ord: Union[int, Literal['fro', 'nuc']]) -> Dict[str, Any]:
o = ord
if len(x['data'].shape) == 2:
axis: Union[int, Tuple[int, int]] = 1
@ -1026,8 +1021,7 @@ class Result:
@staticmethod
def _add_stress_second_Piola_Kirchhoff(P: Dict[str, np.ndarray],
F: Dict[str, np.ndarray]) -> Dict[str, Any]:
def _add_stress_second_Piola_Kirchhoff(P: Dict[str, Any], F: Dict[str, Any]) -> Dict[str, Any]:
return {
'data': mechanics.stress_second_Piola_Kirchhoff(P['data'],F['data']),
'label': 'S',
@ -1086,8 +1080,8 @@ class Result:
def add_pole(self,
q: str = 'O',
*,
uvw: np.ndarray = None,
hkl: np.ndarray = None,
uvw: FloatSequence = None,
hkl: FloatSequence = None,
with_symmetry: bool = False):
"""
Add lab frame vector along lattice direction [uvw] or plane normal (hkl).
@ -1097,7 +1091,7 @@ class Result:
q : str
Name of the dataset containing the crystallographic orientation as quaternions.
Defaults to 'O'.
uvw|hkl : numpy.ndarray of shape (...,3)
uvw|hkl : numpy.ndarray of shape (3)
Miller indices of crystallographic direction or plane normal.
with_symmetry : bool, optional
Calculate all N symmetrically equivalent vectors.
@ -1107,7 +1101,7 @@ class Result:
@staticmethod
def _add_rotation(F: Dict[str, np.ndarray]) -> Dict[str, Any]:
def _add_rotation(F: Dict[str, Any]) -> Dict[str, Any]:
return {
'data': mechanics.rotation(F['data']).as_matrix(),
'label': f"R({F['label']})",
@ -1139,7 +1133,7 @@ class Result:
@staticmethod
def _add_spherical(T: Dict[str, np.ndarray]) -> Dict[str, Any]:
def _add_spherical(T: Dict[str, Any]) -> Dict[str, Any]:
return {
'data': tensor.spherical(T['data'],False),
'label': f"p_{T['label']}",
@ -1171,8 +1165,7 @@ class Result:
@staticmethod
def _add_strain(F: Dict[str, np.ndarray],
t: str, m: float) -> Dict[str, Any]:
def _add_strain(F: Dict[str, Any], t: Literal['V', 'U'], m: float) -> Dict[str, Any]:
return {
'data': mechanics.strain(F['data'],t,m),
'label': f"epsilon_{t}^{m}({F['label']})",
@ -1221,8 +1214,7 @@ class Result:
@staticmethod
def _add_stretch_tensor(F: Dict[str, np.ndarray],
t: str) -> Dict[str, Any]:
def _add_stretch_tensor(F: Dict[str, Any], t: str) -> Dict[str, Any]:
return {
'data': (mechanics.stretch_left if t.upper() == 'V' else mechanics.stretch_right)(F['data']),
'label': f"{t}({F['label']})",
@ -1252,8 +1244,7 @@ class Result:
@staticmethod
def _add_curl(f: Dict[str, np.ndarray],
size: np.ndarray) -> Dict[str, Any]:
def _add_curl(f: Dict[str, Any], size: np.ndarray) -> Dict[str, Any]:
return {
'data': grid_filters.curl(size,f['data']),
'label': f"curl({f['label']})",
@ -1282,8 +1273,7 @@ class Result:
@staticmethod
def _add_divergence(f: Dict[str, np.ndarray],
size: np.ndarray) -> Dict[str, Any]:
def _add_divergence(f: Dict[str, Any], size: np.ndarray) -> Dict[str, Any]:
return {
'data': grid_filters.divergence(size,f['data']),
'label': f"divergence({f['label']})",
@ -1312,8 +1302,7 @@ class Result:
@staticmethod
def _add_gradient(f: Dict[str, np.ndarray],
size: np.ndarray) -> Dict[str, Any]:
def _add_gradient(f: Dict[str, Any], size: np.ndarray) -> Dict[str, Any]:
return {
'data': grid_filters.gradient(size,f['data'] if len(f['data'].shape) == 4 else \
f['data'].reshape(f['data'].shape+(1,))),
@ -1406,7 +1395,7 @@ class Result:
callback: Callable,
datasets: Dict[str, str],
args: Dict[str, str],
lock: LockBase) -> List[Union[None, Any]]:
lock: Lock) -> List[Union[None, Any]]:
"""Execute job for _add_generic_pointwise."""
try:
datasets_in = {}
@ -1619,7 +1608,7 @@ class Result:
f.write(xml.dom.minidom.parseString(ET.tostring(xdmf).decode()).toprettyxml())
def _mappings(self) -> Tuple[List[Dict[Any, np.ndarray]], List[Dict[Any, Any]], Dict[Any, np.ndarray], Dict[Any, Any]]:
def _mappings(self):
"""Mappings to place data spatially."""
with h5py.File(self.fname,'r') as f:
@ -1642,7 +1631,7 @@ class Result:
def export_VTK(self,
output: Union[str,list] = '*',
mode: str = 'cell',
constituents: Optional[Union[int, list]] = None,
constituents: IntSequence = None,
fill_float: float = np.nan,
fill_int: int = 0,
parallel: bool = True):
@ -1688,7 +1677,7 @@ class Result:
N_digits = int(np.floor(np.log10(max(1,self.incs[-1]))))+1
constituents_ = constituents if isinstance(constituents,Iterable) else \
(range(self.N_constituents) if constituents is None else [constituents])
(range(self.N_constituents) if constituents is None else [constituents]) # type: ignore
suffixes = [''] if self.N_constituents == 1 or isinstance(constituents,int) else \
[f'#{c}' for c in constituents_]
@ -1739,7 +1728,7 @@ class Result:
def get(self,
output: Union[str, List[str]] = '*',
flatten: bool = True,
prune: bool = True) -> Optional[Dict[str, Dict[str, Any]]]:
prune: bool = True):
"""
Collect data per phase/homogenization reflecting the group/folder structure in the DADF5 file.
@ -1788,7 +1777,7 @@ class Result:
output: Union[str, list] = '*',
flatten: bool = True,
prune: bool = True,
constituents: Union[None, int, List[int]] = None,
constituents: IntSequence = None,
fill_float: float = np.nan,
fill_int: int = 0) -> Optional[Dict[str, Dict[str, Dict[str, Dict[str, Union[np.ma.MaskedArray]]]]]]:
"""
@ -1830,7 +1819,7 @@ class Result:
"""
r: Dict[str, Dict[str, Dict[str, Dict[str, Union[np.ma.MaskedArray]]]]] = {}
constituents_: Sequence[int] = constituents if isinstance(constituents,Iterable) else \
constituents_ = list(map(int,constituents)) if isinstance(constituents,Iterable) else \
(range(self.N_constituents) if constituents is None else [constituents])
suffixes = [''] if self.N_constituents == 1 or isinstance(constituents,int) else \
@ -1893,7 +1882,7 @@ class Result:
Defaults to False.
"""
def export(name: str,obj: Union[h5py.Dataset,h5py.Group],output: Union[str,list],overwrite: bool):
def export(name: str, obj: Union[h5py.Dataset,h5py.Group], output: Union[str,list], overwrite: bool):
if type(obj) == h5py.Dataset and _match(output,[name]):
d = obj.attrs['description'] if h5py3 else obj.attrs['description'].decode()
if not Path(name).exists() or overwrite: