From 13f4d7779183ad11a0f8362f62268c7889e43755 Mon Sep 17 00:00:00 2001 From: Martin Diehl Date: Mon, 9 May 2022 00:21:53 +0200 Subject: [PATCH] polishing --- python/damask/_result.py | 111 ++++++++++++++++++--------------------- 1 file changed, 50 insertions(+), 61 deletions(-) diff --git a/python/damask/_result.py b/python/damask/_result.py index 66c8df882..c37a1fd46 100644 --- a/python/damask/_result.py +++ b/python/damask/_result.py @@ -1,4 +1,5 @@ import multiprocessing as mp +from multiprocessing.synchronize import Lock import re import fnmatch import os @@ -11,7 +12,6 @@ from functools import partial from collections import defaultdict from collections.abc import Iterable from typing import Union, Optional, Callable, Any, Sequence, Literal, Dict, List, Tuple -from multiprocessing.synchronize import Lock as LockBase import h5py import numpy as np @@ -24,21 +24,20 @@ from . import grid_filters from . import mechanics from . import tensor from . import util - -from ._typehints import FloatSequence +from ._typehints import FloatSequence, IntSequence h5py3 = h5py.__version__[0] == '3' chunk_size = 1024**2//8 # for compression in HDF5 prefix_inc = 'increment_' -def _read(dataset: h5py._hl.dataset.Dataset): +def _read(dataset: h5py._hl.dataset.Dataset) -> np.ndarray: """Read a dataset and its metadata into a numpy.ndarray.""" metadata = {k:(v.decode() if not h5py3 and type(v) is bytes else v) for k,v in dataset.attrs.items()} - dtype = np.dtype(dataset.dtype,metadata=metadata) # type: ignore + dtype = np.dtype(dataset.dtype,metadata=metadata) # type: ignore return np.array(dataset,dtype=dtype) -def _match(requested: Union[None, str, Sequence[Any], np.ndarray], +def _match(requested, existing: h5py._hl.base.KeysViewHDF5) -> List[Any]: """Find matches among two sets of labels.""" def flatten_list(list_of_lists): @@ -110,7 +109,7 @@ class Result: if self.version_major != 0 or not 12 <= self.version_minor <= 14: raise TypeError(f'unsupported DADF5 version "{self.version_major}.{self.version_minor}"') if self.version_major == 0 and self.version_minor < 14: - self.export_setup = None # type: ignore + self.export_setup = None # type: ignore self.structured = 'cells' in f['geometry'].attrs.keys() @@ -119,7 +118,7 @@ class Result: self.size = f['geometry'].attrs['size'] self.origin = f['geometry'].attrs['origin'] else: - self.add_curl = self.add_divergence = self.add_gradient = None # type: ignore + self.add_curl = self.add_divergence = self.add_gradient = None # type: ignore r = re.compile(rf'{prefix_inc}([0-9]+)') self.increments = sorted([i for i in f.keys() if r.match(i)],key=util.natural_sort) @@ -180,11 +179,11 @@ class Result: def _manage_view(self, action: Literal['set', 'add', 'del'], - increments=None, - times=None, - phases=None, - homogenizations=None, - fields=None) -> "Result": + increments: Union[int, Sequence[int], str, Sequence[str], bool] = None, + times: Union[float, Sequence[float], str, Sequence[str], bool] = None, + phases: Union[str, Sequence[str], bool] = None, + homogenizations: Union[str, Sequence[str], bool] = None, + fields: Union[str, Sequence[str], bool] = None) -> "Result": """ Manages the visibility of the groups. @@ -212,8 +211,7 @@ class Result: datasets = '*' elif datasets is False: datasets = [] - choice = list(datasets).copy() if hasattr(datasets,'__iter__') and not isinstance(datasets,str) else \ - [datasets] + choice = [datasets] if not hasattr(datasets,'__iter__') or isinstance(datasets,str) else list(datasets) # type: ignore if what == 'increments': choice = [c if isinstance(c,str) and c.startswith(prefix_inc) else @@ -224,15 +222,15 @@ class Result: if choice == ['*']: choice = self.increments else: - iterator = map(float,choice) + iterator = map(float,choice) # type: ignore choice = [] for c in iterator: idx = np.searchsorted(self.times,c) if idx >= len(self.times): continue if np.isclose(c,self.times[idx]): choice.append(self.increments[idx]) - elif np.isclose(c,self.times[idx+1]): #type: ignore - choice.append(self.increments[idx+1]) #type: ignore + elif np.isclose(c,self.times[idx+1]): # type: ignore + choice.append(self.increments[idx+1]) # type: ignore valid = _match(choice,getattr(self,what)) existing = set(self.visible[what]) @@ -686,8 +684,7 @@ class Result: @staticmethod - def _add_stress_Cauchy(P: Dict[str, np.ndarray], - F: Dict[str, np.ndarray]) -> Dict[str, Any]: + def _add_stress_Cauchy(P: Dict[str, Any], F: Dict[str, Any]) -> Dict[str, Any]: return { 'data': mechanics.stress_Cauchy(P['data'],F['data']), 'label': 'sigma', @@ -717,7 +714,7 @@ class Result: @staticmethod - def _add_determinant(T: Dict[str, np.ndarray]) -> Dict[str, Any]: + def _add_determinant(T: Dict[str, Any]) -> Dict[str, Any]: return { 'data': np.linalg.det(T['data']), 'label': f"det({T['label']})", @@ -749,7 +746,7 @@ class Result: @staticmethod - def _add_deviator(T: Dict[str, np.ndarray]) -> Dict[str, Any]: + def _add_deviator(T: Dict[str, Any]) -> Dict[str, Any]: return { 'data': tensor.deviatoric(T['data']), 'label': f"s_{T['label']}", @@ -781,8 +778,7 @@ class Result: @staticmethod - def _add_eigenvalue(T_sym: Dict[str, np.ndarray], - eigenvalue: Literal['max, mid, minimum']) -> Dict[str, Any]: + def _add_eigenvalue(T_sym: Dict[str, Any], eigenvalue: Literal['max, mid, min']) -> Dict[str, Any]: if eigenvalue == 'max': label,p = 'maximum',2 elif eigenvalue == 'mid': @@ -790,7 +786,7 @@ class Result: elif eigenvalue == 'min': label,p = 'minimum',0 else: - raise TypeError(f'invalid eigenvalue passed to function: {eigenvalue}') + raise ValueError(f'invalid value for "eigenvalue": {eigenvalue}') return { 'data': tensor.eigenvalues(T_sym['data'])[:,p], @@ -803,7 +799,7 @@ class Result: } def add_eigenvalue(self, T_sym: str, - eigenvalue: Literal['max', 'mid', 'minimum'] = 'max'): + eigenvalue: Literal['max', 'mid', 'min'] = 'max'): """ Add eigenvalues of symmetric tensor. @@ -827,14 +823,16 @@ class Result: @staticmethod - def _add_eigenvector(T_sym: Dict[str, np.ndarray], - eigenvalue: Literal['max', 'mid', 'minimum']) -> Dict[str, Any]: + def _add_eigenvector(T_sym: Dict[str, Any], eigenvalue: Literal['max', 'mid', 'min']) -> Dict[str, Any]: if eigenvalue == 'max': label,p = 'maximum',2 elif eigenvalue == 'mid': label,p = 'intermediate',1 elif eigenvalue == 'min': label,p = 'minimum',0 + else: + raise ValueError(f'invalid value for "eigenvalue": {eigenvalue}') + return { 'data': tensor.eigenvectors(T_sym['data'])[:,p], 'label': f"v_{eigenvalue}({T_sym['label']})", @@ -847,7 +845,7 @@ class Result: } def add_eigenvector(self, T_sym: str, - eigenvalue: Literal['max', 'mid', 'minimum'] = 'max'): + eigenvalue: Literal['max', 'mid', 'min'] = 'max'): """ Add eigenvector of symmetric tensor. @@ -864,8 +862,7 @@ class Result: @staticmethod - def _add_IPF_color(l: FloatSequence, - q: Dict[str, Any]) -> Dict[str, Any]: + def _add_IPF_color(l: FloatSequence, q: Dict[str, Any]) -> Dict[str, Any]: m = util.scale_to_coprime(np.array(l)) lattice = q['meta']['lattice'] o = Orientation(rotation = q['data'],lattice=lattice) @@ -881,7 +878,7 @@ class Result: } } def add_IPF_color(self, - l: np.ndarray, + l: FloatSequence, q: str = 'O'): """ Add RGB color tuple of inverse pole figure (IPF) color. @@ -931,8 +928,7 @@ class Result: @staticmethod - def _add_equivalent_Mises(T_sym: Dict[str, np.ndarray], - kind: str) -> Dict[str, Any]: + def _add_equivalent_Mises(T_sym: Dict[str, Any], kind: str) -> Dict[str, Any]: k = kind if k is None: if T_sym['meta']['unit'] == '1': @@ -985,8 +981,7 @@ class Result: @staticmethod - def _add_norm(x: Dict[str, np.ndarray], - ord: int) -> Dict[str, Any]: + def _add_norm(x: Dict[str, Any], ord: Union[int, Literal['fro', 'nuc']]) -> Dict[str, Any]: o = ord if len(x['data'].shape) == 2: axis: Union[int, Tuple[int, int]] = 1 @@ -1026,8 +1021,7 @@ class Result: @staticmethod - def _add_stress_second_Piola_Kirchhoff(P: Dict[str, np.ndarray], - F: Dict[str, np.ndarray]) -> Dict[str, Any]: + def _add_stress_second_Piola_Kirchhoff(P: Dict[str, Any], F: Dict[str, Any]) -> Dict[str, Any]: return { 'data': mechanics.stress_second_Piola_Kirchhoff(P['data'],F['data']), 'label': 'S', @@ -1086,8 +1080,8 @@ class Result: def add_pole(self, q: str = 'O', *, - uvw: np.ndarray = None, - hkl: np.ndarray = None, + uvw: FloatSequence = None, + hkl: FloatSequence = None, with_symmetry: bool = False): """ Add lab frame vector along lattice direction [uvw] or plane normal (hkl). @@ -1097,7 +1091,7 @@ class Result: q : str Name of the dataset containing the crystallographic orientation as quaternions. Defaults to 'O'. - uvw|hkl : numpy.ndarray of shape (...,3) + uvw|hkl : numpy.ndarray of shape (3) Miller indices of crystallographic direction or plane normal. with_symmetry : bool, optional Calculate all N symmetrically equivalent vectors. @@ -1107,7 +1101,7 @@ class Result: @staticmethod - def _add_rotation(F: Dict[str, np.ndarray]) -> Dict[str, Any]: + def _add_rotation(F: Dict[str, Any]) -> Dict[str, Any]: return { 'data': mechanics.rotation(F['data']).as_matrix(), 'label': f"R({F['label']})", @@ -1139,7 +1133,7 @@ class Result: @staticmethod - def _add_spherical(T: Dict[str, np.ndarray]) -> Dict[str, Any]: + def _add_spherical(T: Dict[str, Any]) -> Dict[str, Any]: return { 'data': tensor.spherical(T['data'],False), 'label': f"p_{T['label']}", @@ -1171,8 +1165,7 @@ class Result: @staticmethod - def _add_strain(F: Dict[str, np.ndarray], - t: str, m: float) -> Dict[str, Any]: + def _add_strain(F: Dict[str, Any], t: Literal['V', 'U'], m: float) -> Dict[str, Any]: return { 'data': mechanics.strain(F['data'],t,m), 'label': f"epsilon_{t}^{m}({F['label']})", @@ -1221,8 +1214,7 @@ class Result: @staticmethod - def _add_stretch_tensor(F: Dict[str, np.ndarray], - t: str) -> Dict[str, Any]: + def _add_stretch_tensor(F: Dict[str, Any], t: str) -> Dict[str, Any]: return { 'data': (mechanics.stretch_left if t.upper() == 'V' else mechanics.stretch_right)(F['data']), 'label': f"{t}({F['label']})", @@ -1252,8 +1244,7 @@ class Result: @staticmethod - def _add_curl(f: Dict[str, np.ndarray], - size: np.ndarray) -> Dict[str, Any]: + def _add_curl(f: Dict[str, Any], size: np.ndarray) -> Dict[str, Any]: return { 'data': grid_filters.curl(size,f['data']), 'label': f"curl({f['label']})", @@ -1282,8 +1273,7 @@ class Result: @staticmethod - def _add_divergence(f: Dict[str, np.ndarray], - size: np.ndarray) -> Dict[str, Any]: + def _add_divergence(f: Dict[str, Any], size: np.ndarray) -> Dict[str, Any]: return { 'data': grid_filters.divergence(size,f['data']), 'label': f"divergence({f['label']})", @@ -1312,8 +1302,7 @@ class Result: @staticmethod - def _add_gradient(f: Dict[str, np.ndarray], - size: np.ndarray) -> Dict[str, Any]: + def _add_gradient(f: Dict[str, Any], size: np.ndarray) -> Dict[str, Any]: return { 'data': grid_filters.gradient(size,f['data'] if len(f['data'].shape) == 4 else \ f['data'].reshape(f['data'].shape+(1,))), @@ -1406,7 +1395,7 @@ class Result: callback: Callable, datasets: Dict[str, str], args: Dict[str, str], - lock: LockBase) -> List[Union[None, Any]]: + lock: Lock) -> List[Union[None, Any]]: """Execute job for _add_generic_pointwise.""" try: datasets_in = {} @@ -1619,7 +1608,7 @@ class Result: f.write(xml.dom.minidom.parseString(ET.tostring(xdmf).decode()).toprettyxml()) - def _mappings(self) -> Tuple[List[Dict[Any, np.ndarray]], List[Dict[Any, Any]], Dict[Any, np.ndarray], Dict[Any, Any]]: + def _mappings(self): """Mappings to place data spatially.""" with h5py.File(self.fname,'r') as f: @@ -1642,7 +1631,7 @@ class Result: def export_VTK(self, output: Union[str,list] = '*', mode: str = 'cell', - constituents: Optional[Union[int, list]] = None, + constituents: IntSequence = None, fill_float: float = np.nan, fill_int: int = 0, parallel: bool = True): @@ -1688,7 +1677,7 @@ class Result: N_digits = int(np.floor(np.log10(max(1,self.incs[-1]))))+1 constituents_ = constituents if isinstance(constituents,Iterable) else \ - (range(self.N_constituents) if constituents is None else [constituents]) + (range(self.N_constituents) if constituents is None else [constituents]) # type: ignore suffixes = [''] if self.N_constituents == 1 or isinstance(constituents,int) else \ [f'#{c}' for c in constituents_] @@ -1739,7 +1728,7 @@ class Result: def get(self, output: Union[str, List[str]] = '*', flatten: bool = True, - prune: bool = True) -> Optional[Dict[str, Dict[str, Any]]]: + prune: bool = True): """ Collect data per phase/homogenization reflecting the group/folder structure in the DADF5 file. @@ -1788,7 +1777,7 @@ class Result: output: Union[str, list] = '*', flatten: bool = True, prune: bool = True, - constituents: Union[None, int, List[int]] = None, + constituents: IntSequence = None, fill_float: float = np.nan, fill_int: int = 0) -> Optional[Dict[str, Dict[str, Dict[str, Dict[str, Union[np.ma.MaskedArray]]]]]]: """ @@ -1830,7 +1819,7 @@ class Result: """ r: Dict[str, Dict[str, Dict[str, Dict[str, Union[np.ma.MaskedArray]]]]] = {} - constituents_: Sequence[int] = constituents if isinstance(constituents,Iterable) else \ + constituents_ = list(map(int,constituents)) if isinstance(constituents,Iterable) else \ (range(self.N_constituents) if constituents is None else [constituents]) suffixes = [''] if self.N_constituents == 1 or isinstance(constituents,int) else \ @@ -1893,7 +1882,7 @@ class Result: Defaults to False. """ - def export(name: str,obj: Union[h5py.Dataset,h5py.Group],output: Union[str,list],overwrite: bool): + def export(name: str, obj: Union[h5py.Dataset,h5py.Group], output: Union[str,list], overwrite: bool): if type(obj) == h5py.Dataset and _match(output,[name]): d = obj.attrs['description'] if h5py3 else obj.attrs['description'].decode() if not Path(name).exists() or overwrite: