diff --git a/python/damask/_result.py b/python/damask/_result.py index bf58ec160..1ad25bdbf 100644 --- a/python/damask/_result.py +++ b/python/damask/_result.py @@ -1,4 +1,5 @@ import multiprocessing as mp +from multiprocessing.synchronize import Lock import re import fnmatch import os @@ -10,6 +11,7 @@ from pathlib import Path from functools import partial from collections import defaultdict from collections.abc import Iterable +from typing import Union, Callable, Any, Sequence, Literal, Dict, List, Tuple import h5py import numpy as np @@ -22,19 +24,21 @@ from . import grid_filters from . import mechanics from . import tensor from . import util +from ._typehints import FloatSequence, IntSequence h5py3 = h5py.__version__[0] == '3' chunk_size = 1024**2//8 # for compression in HDF5 prefix_inc = 'increment_' -def _read(dataset): +def _read(dataset: h5py._hl.dataset.Dataset) -> np.ndarray: """Read a dataset and its metadata into a numpy.ndarray.""" metadata = {k:(v.decode() if not h5py3 and type(v) is bytes else v) for k,v in dataset.attrs.items()} - dtype = np.dtype(dataset.dtype,metadata=metadata) + dtype = np.dtype(dataset.dtype,metadata=metadata) # type: ignore return np.array(dataset,dtype=dtype) -def _match(requested,existing): +def _match(requested, + existing: h5py._hl.base.KeysViewHDF5) -> List[Any]: """Find matches among two sets of labels.""" def flatten_list(list_of_lists): return [e for e_ in list_of_lists for e in e_] @@ -50,7 +54,10 @@ def _match(requested,existing): return sorted(set(flatten_list([fnmatch.filter(existing,r) for r in requested_])), key=util.natural_sort) -def _empty_like(dataset,N_materialpoints,fill_float,fill_int): +def _empty_like(dataset: np.ma.core.MaskedArray, + N_materialpoints: int, + fill_float: float, + fill_int: int) -> np.ma.core.MaskedArray: """Create empty numpy.ma.MaskedArray.""" return ma.array(np.empty((N_materialpoints,)+dataset.shape[1:],dataset.dtype), fill_value = fill_float if dataset.dtype in np.sctypes['float'] else fill_int, @@ -84,7 +91,7 @@ class Result: """ - def __init__(self,fname): + def __init__(self, fname: Union[str, Path]): """ New result view bound to a HDF5 file. @@ -102,7 +109,7 @@ class Result: if self.version_major != 0 or not 12 <= self.version_minor <= 14: raise TypeError(f'unsupported DADF5 version "{self.version_major}.{self.version_minor}"') if self.version_major == 0 and self.version_minor < 14: - self.export_setup = None + self.export_setup = None # type: ignore self.structured = 'cells' in f['geometry'].attrs.keys() @@ -111,7 +118,7 @@ class Result: self.size = f['geometry'].attrs['size'] self.origin = f['geometry'].attrs['origin'] else: - self.add_curl = self.add_divergence = self.add_gradient = None + self.add_curl = self.add_divergence = self.add_gradient = None # type: ignore r = re.compile(rf'{prefix_inc}([0-9]+)') self.increments = sorted([i for i in f.keys() if r.match(i)],key=util.natural_sort) @@ -126,7 +133,7 @@ class Result: self.phase = f['cell_to/phase']['label'].astype('str') self.phases = sorted(np.unique(self.phase),key=util.natural_sort) - self.fields = [] + self.fields: List[str] = [] for c in self.phases: self.fields += f['/'.join([self.increments[0],'phase',c])].keys() for m in self.homogenizations: @@ -144,14 +151,14 @@ class Result: self._protected = True - def __copy__(self): + def __copy__(self) -> "Result": """Create deep copy.""" return copy.deepcopy(self) copy = __copy__ - def __repr__(self): + def __repr__(self) -> str: """Give short human-readable summary.""" with h5py.File(self.fname,'r') as f: header = [f'Created by {f.attrs["creator"]}', @@ -171,12 +178,12 @@ class Result: def _manage_view(self, - action, - increments=None, - times=None, - phases=None, - homogenizations=None, - fields=None): + action: Literal['set', 'add', 'del'], + increments: Union[int, Sequence[int], str, Sequence[str], bool] = None, + times: Union[float, Sequence[float], str, Sequence[str], bool] = None, + phases: Union[str, Sequence[str], bool] = None, + homogenizations: Union[str, Sequence[str], bool] = None, + fields: Union[str, Sequence[str], bool] = None) -> "Result": """ Manages the visibility of the groups. @@ -204,8 +211,7 @@ class Result: datasets = '*' elif datasets is False: datasets = [] - choice = list(datasets).copy() if hasattr(datasets,'__iter__') and not isinstance(datasets,str) else \ - [datasets] + choice = [datasets] if not hasattr(datasets,'__iter__') or isinstance(datasets,str) else list(datasets) # type: ignore if what == 'increments': choice = [c if isinstance(c,str) and c.startswith(prefix_inc) else @@ -216,7 +222,7 @@ class Result: if choice == ['*']: choice = self.increments else: - iterator = map(float,choice) + iterator = map(float,choice) # type: ignore choice = [] for c in iterator: idx = np.searchsorted(self.times,c) @@ -224,7 +230,7 @@ class Result: if np.isclose(c,self.times[idx]): choice.append(self.increments[idx]) elif np.isclose(c,self.times[idx+1]): - choice.append(self.increments[idx+1]) + choice.append(self.increments[idx+1]) # type: ignore valid = _match(choice,getattr(self,what)) existing = set(self.visible[what]) @@ -241,7 +247,9 @@ class Result: return dup - def increments_in_range(self,start=None,end=None): + def increments_in_range(self, + start: Union[str, int] = None, + end: Union[str, int] = None) -> Sequence[int]: """ Get all increments within a given range. @@ -263,7 +271,9 @@ class Result: self.incs[-1] if end is None else end)) return [i for i in self.incs if s <= i <= e] - def times_in_range(self,start=None,end=None): + def times_in_range(self, + start: float = None, + end: float = None) -> Sequence[int]: """ Get all increments within a given time range. @@ -286,12 +296,12 @@ class Result: def view(self,*, - increments=None, - times=None, - phases=None, - homogenizations=None, - fields=None, - protected=None): + increments: Union[int, Sequence[int], str, Sequence[str], bool] = None, + times: Union[float, Sequence[float], str, Sequence[str], bool] = None, + phases: Union[str, Sequence[str], bool] = None, + homogenizations: Union[str, Sequence[str], bool] = None, + fields: Union[str, Sequence[str], bool] = None, + protected: bool = None) -> "Result": """ Set view. @@ -343,11 +353,11 @@ class Result: def view_more(self,*, - increments=None, - times=None, - phases=None, - homogenizations=None, - fields=None): + increments: Union[int, Sequence[int], str, Sequence[str], bool] = None, + times: Union[float, Sequence[float], str, Sequence[str], bool] = None, + phases: Union[str, Sequence[str], bool] = None, + homogenizations: Union[str, Sequence[str], bool] = None, + fields: Union[str, Sequence[str], bool] = None) -> "Result": """ Add to view. @@ -386,11 +396,11 @@ class Result: def view_less(self,*, - increments=None, - times=None, - phases=None, - homogenizations=None, - fields=None): + increments: Union[int, Sequence[int], str, Sequence[str], bool] = None, + times: Union[float, Sequence[float], str, Sequence[str], bool] = None, + phases: Union[str, Sequence[str], bool] = None, + homogenizations: Union[str, Sequence[str], bool] = None, + fields: Union[str, Sequence[str], bool] = None) -> "Result": """ Remove from view. @@ -427,7 +437,9 @@ class Result: return self._manage_view('del',increments,times,phases,homogenizations,fields) - def rename(self,name_src,name_dst): + def rename(self, + name_src: str, + name_dst: str): """ Rename/move datasets (within the same group/folder). @@ -468,7 +480,7 @@ class Result: del f[path_src] - def remove(self,name): + def remove(self, name: str): """ Remove/delete datasets. @@ -502,7 +514,7 @@ class Result: if path in f.keys(): del f[path] - def list_data(self): + def list_data(self) -> List[str]: """ Collect information on all active datasets in the file. @@ -533,7 +545,8 @@ class Result: return msg - def enable_user_function(self,func): + def enable_user_function(self, + func: Callable): globals()[func.__name__]=func print(f'Function {func.__name__} enabled in add_calculation.') @@ -544,7 +557,7 @@ class Result: @property - def coordinates0_point(self): + def coordinates0_point(self) -> np.ndarray: """Initial/undeformed cell center coordinates.""" if self.structured: return grid_filters.coordinates0_point(self.cells,self.size,self.origin).reshape(-1,3,order='F') @@ -553,7 +566,7 @@ class Result: return f['geometry/x_p'][()] @property - def coordinates0_node(self): + def coordinates0_node(self) -> np.ndarray: """Initial/undeformed nodal coordinates.""" if self.structured: return grid_filters.coordinates0_node(self.cells,self.size,self.origin).reshape(-1,3,order='F') @@ -562,7 +575,7 @@ class Result: return f['geometry/x_n'][()] @property - def geometry0(self): + def geometry0(self) -> VTK: """Initial/undeformed geometry.""" if self.structured: return VTK.from_image_data(self.cells,self.size,self.origin) @@ -575,7 +588,7 @@ class Result: @staticmethod - def _add_absolute(x): + def _add_absolute(x: Dict[str, Any]) -> Dict[str, Any]: return { 'data': np.abs(x['data']), 'label': f'|{x["label"]}|', @@ -585,7 +598,7 @@ class Result: 'creator': 'add_absolute' } } - def add_absolute(self,x): + def add_absolute(self, x: str): """ Add absolute value. @@ -599,7 +612,7 @@ class Result: @staticmethod - def _add_calculation(**kwargs): + def _add_calculation(**kwargs) -> Dict[str, Any]: formula = kwargs['formula'] for d in re.findall(r'#(.*?)#',formula): formula = formula.replace(f'#{d}#',f"kwargs['{d}']['data']") @@ -617,7 +630,11 @@ class Result: 'creator': 'add_calculation' } } - def add_calculation(self,formula,name,unit='n/a',description=None): + def add_calculation(self, + formula: str, + name: str, + unit: str = 'n/a', + description: str = None): """ Add result of a general formula. @@ -661,13 +678,13 @@ class Result: ... 'Mises equivalent of the Cauchy stress') """ - dataset_mapping = {d:d for d in set(re.findall(r'#(.*?)#',formula))} # datasets used in the formula + dataset_mapping = {d:d for d in set(re.findall(r'#(.*?)#',formula))} # datasets used in the formula args = {'formula':formula,'label':name,'unit':unit,'description':description} self._add_generic_pointwise(self._add_calculation,dataset_mapping,args) @staticmethod - def _add_stress_Cauchy(P,F): + def _add_stress_Cauchy(P: Dict[str, Any], F: Dict[str, Any]) -> Dict[str, Any]: return { 'data': mechanics.stress_Cauchy(P['data'],F['data']), 'label': 'sigma', @@ -679,7 +696,9 @@ class Result: 'creator': 'add_stress_Cauchy' } } - def add_stress_Cauchy(self,P='P',F='F'): + def add_stress_Cauchy(self, + P: str = 'P', + F: str = 'F'): """ Add Cauchy stress calculated from first Piola-Kirchhoff stress and deformation gradient. @@ -695,7 +714,7 @@ class Result: @staticmethod - def _add_determinant(T): + def _add_determinant(T: Dict[str, Any]) -> Dict[str, Any]: return { 'data': np.linalg.det(T['data']), 'label': f"det({T['label']})", @@ -705,7 +724,7 @@ class Result: 'creator': 'add_determinant' } } - def add_determinant(self,T): + def add_determinant(self, T: str): """ Add the determinant of a tensor. @@ -727,7 +746,7 @@ class Result: @staticmethod - def _add_deviator(T): + def _add_deviator(T: Dict[str, Any]) -> Dict[str, Any]: return { 'data': tensor.deviatoric(T['data']), 'label': f"s_{T['label']}", @@ -737,7 +756,7 @@ class Result: 'creator': 'add_deviator' } } - def add_deviator(self,T): + def add_deviator(self, T: str): """ Add the deviatoric part of a tensor. @@ -759,13 +778,15 @@ class Result: @staticmethod - def _add_eigenvalue(T_sym,eigenvalue): + def _add_eigenvalue(T_sym: Dict[str, Any], eigenvalue: Literal['max, mid, min']) -> Dict[str, Any]: if eigenvalue == 'max': label,p = 'maximum',2 elif eigenvalue == 'mid': label,p = 'intermediate',1 elif eigenvalue == 'min': label,p = 'minimum',0 + else: + raise ValueError(f'invalid eigenvalue: {eigenvalue}') return { 'data': tensor.eigenvalues(T_sym['data'])[:,p], @@ -776,7 +797,9 @@ class Result: 'creator': 'add_eigenvalue' } } - def add_eigenvalue(self,T_sym,eigenvalue='max'): + def add_eigenvalue(self, + T_sym: str, + eigenvalue: Literal['max', 'mid', 'min'] = 'max'): """ Add eigenvalues of symmetric tensor. @@ -800,13 +823,16 @@ class Result: @staticmethod - def _add_eigenvector(T_sym,eigenvalue): + def _add_eigenvector(T_sym: Dict[str, Any], eigenvalue: Literal['max', 'mid', 'min']) -> Dict[str, Any]: if eigenvalue == 'max': label,p = 'maximum',2 elif eigenvalue == 'mid': label,p = 'intermediate',1 elif eigenvalue == 'min': label,p = 'minimum',0 + else: + raise ValueError(f'invalid eigenvalue: {eigenvalue}') + return { 'data': tensor.eigenvectors(T_sym['data'])[:,p], 'label': f"v_{eigenvalue}({T_sym['label']})", @@ -817,7 +843,9 @@ class Result: 'creator': 'add_eigenvector' } } - def add_eigenvector(self,T_sym,eigenvalue='max'): + def add_eigenvector(self, + T_sym: str, + eigenvalue: Literal['max', 'mid', 'min'] = 'max'): """ Add eigenvector of symmetric tensor. @@ -834,7 +862,7 @@ class Result: @staticmethod - def _add_IPF_color(l,q): + def _add_IPF_color(l: FloatSequence, q: Dict[str, Any]) -> Dict[str, Any]: m = util.scale_to_coprime(np.array(l)) lattice = q['meta']['lattice'] o = Orientation(rotation = q['data'],lattice=lattice) @@ -849,7 +877,9 @@ class Result: 'creator': 'add_IPF_color' } } - def add_IPF_color(self,l,q='O'): + def add_IPF_color(self, + l: FloatSequence, + q: str = 'O'): """ Add RGB color tuple of inverse pole figure (IPF) color. @@ -874,7 +904,7 @@ class Result: @staticmethod - def _add_maximum_shear(T_sym): + def _add_maximum_shear(T_sym: Dict[str, Any]) -> Dict[str, Any]: return { 'data': mechanics.maximum_shear(T_sym['data']), 'label': f"max_shear({T_sym['label']})", @@ -884,7 +914,7 @@ class Result: 'creator': 'add_maximum_shear' } } - def add_maximum_shear(self,T_sym): + def add_maximum_shear(self, T_sym: str): """ Add maximum shear components of symmetric tensor. @@ -898,7 +928,7 @@ class Result: @staticmethod - def _add_equivalent_Mises(T_sym,kind): + def _add_equivalent_Mises(T_sym: Dict[str, Any], kind: str) -> Dict[str, Any]: k = kind if k is None: if T_sym['meta']['unit'] == '1': @@ -918,7 +948,9 @@ class Result: 'creator': 'add_Mises' } } - def add_equivalent_Mises(self,T_sym,kind=None): + def add_equivalent_Mises(self, + T_sym: str, + kind: str = None): """ Add the equivalent Mises stress or strain of a symmetric tensor. @@ -949,10 +981,10 @@ class Result: @staticmethod - def _add_norm(x,ord): + def _add_norm(x: Dict[str, Any], ord: Union[int, float, Literal['fro', 'nuc']]) -> Dict[str, Any]: o = ord if len(x['data'].shape) == 2: - axis = 1 + axis: Union[int, Tuple[int, int]] = 1 t = 'vector' if o is None: o = 2 elif len(x['data'].shape) == 3: @@ -971,7 +1003,9 @@ class Result: 'creator': 'add_norm' } } - def add_norm(self,x,ord=None): + def add_norm(self, + x: str, + ord: Union[int, float, Literal['fro', 'nuc']] = None): """ Add the norm of vector or tensor. @@ -987,7 +1021,7 @@ class Result: @staticmethod - def _add_stress_second_Piola_Kirchhoff(P,F): + def _add_stress_second_Piola_Kirchhoff(P: Dict[str, Any], F: Dict[str, Any]) -> Dict[str, Any]: return { 'data': mechanics.stress_second_Piola_Kirchhoff(P['data'],F['data']), 'label': 'S', @@ -999,7 +1033,9 @@ class Result: 'creator': 'add_stress_second_Piola_Kirchhoff' } } - def add_stress_second_Piola_Kirchhoff(self,P='P',F='F'): + def add_stress_second_Piola_Kirchhoff(self, + P: str = 'P', + F: str = 'F'): """ Add second Piola-Kirchhoff stress calculated from first Piola-Kirchhoff stress and deformation gradient. @@ -1023,7 +1059,11 @@ class Result: @staticmethod - def _add_pole(q,uvw,hkl,with_symmetry,normalize): + def _add_pole(q: Dict[str, Any], + uvw: FloatSequence, + hkl: FloatSequence, + with_symmetry: bool, + normalize: bool) -> Dict[str, Any]: c = q['meta']['c/a'] if 'c/a' in q['meta'] else 1 brackets = ['[]','()','⟨⟩','{}'][(uvw is None)*1+with_symmetry*2] label = 'p^' + '{}{} {} {}{}'.format(brackets[0], @@ -1042,7 +1082,13 @@ class Result: 'creator': 'add_pole' } } - def add_pole(self,q='O',*,uvw=None,hkl=None,with_symmetry=False,normalize=True): + def add_pole(self, + q: str = 'O', + *, + uvw: FloatSequence = None, + hkl: FloatSequence = None, + with_symmetry: bool = False, + normalize: bool = True): """ Add lab frame vector along lattice direction [uvw] or plane normal (hkl). @@ -1067,7 +1113,7 @@ class Result: @staticmethod - def _add_rotation(F): + def _add_rotation(F: Dict[str, Any]) -> Dict[str, Any]: return { 'data': mechanics.rotation(F['data']).as_matrix(), 'label': f"R({F['label']})", @@ -1077,7 +1123,7 @@ class Result: 'creator': 'add_rotation' } } - def add_rotation(self,F): + def add_rotation(self, F: str): """ Add rotational part of a deformation gradient. @@ -1099,7 +1145,7 @@ class Result: @staticmethod - def _add_spherical(T): + def _add_spherical(T: Dict[str, Any]) -> Dict[str, Any]: return { 'data': tensor.spherical(T['data'],False), 'label': f"p_{T['label']}", @@ -1109,7 +1155,7 @@ class Result: 'creator': 'add_spherical' } } - def add_spherical(self,T): + def add_spherical(self, T: str): """ Add the spherical (hydrostatic) part of a tensor. @@ -1131,7 +1177,7 @@ class Result: @staticmethod - def _add_strain(F,t,m): + def _add_strain(F: Dict[str, Any], t: Literal['V', 'U'], m: float) -> Dict[str, Any]: return { 'data': mechanics.strain(F['data'],t,m), 'label': f"epsilon_{t}^{m}({F['label']})", @@ -1141,7 +1187,10 @@ class Result: 'creator': 'add_strain' } } - def add_strain(self,F='F',t='V',m=0.0): + def add_strain(self, + F: str = 'F', + t: Literal['V', 'U'] = 'V', + m: float = 0.0): """ Add strain tensor of a deformation gradient. @@ -1177,7 +1226,7 @@ class Result: @staticmethod - def _add_stretch_tensor(F,t): + def _add_stretch_tensor(F: Dict[str, Any], t: str) -> Dict[str, Any]: return { 'data': (mechanics.stretch_left if t.upper() == 'V' else mechanics.stretch_right)(F['data']), 'label': f"{t}({F['label']})", @@ -1188,7 +1237,9 @@ class Result: 'creator': 'add_stretch_tensor' } } - def add_stretch_tensor(self,F='F',t='V'): + def add_stretch_tensor(self, + F: str = 'F', + t: Literal['V', 'U'] = 'V'): """ Add stretch tensor of a deformation gradient. @@ -1205,7 +1256,7 @@ class Result: @staticmethod - def _add_curl(f,size): + def _add_curl(f: Dict[str, Any], size: np.ndarray) -> Dict[str, Any]: return { 'data': grid_filters.curl(size,f['data']), 'label': f"curl({f['label']})", @@ -1215,7 +1266,7 @@ class Result: 'creator': 'add_curl' } } - def add_curl(self,f): + def add_curl(self, f: str): """ Add curl of a field. @@ -1234,7 +1285,7 @@ class Result: @staticmethod - def _add_divergence(f,size): + def _add_divergence(f: Dict[str, Any], size: np.ndarray) -> Dict[str, Any]: return { 'data': grid_filters.divergence(size,f['data']), 'label': f"divergence({f['label']})", @@ -1244,7 +1295,7 @@ class Result: 'creator': 'add_divergence' } } - def add_divergence(self,f): + def add_divergence(self, f: str): """ Add divergence of a field. @@ -1263,7 +1314,7 @@ class Result: @staticmethod - def _add_gradient(f,size): + def _add_gradient(f: Dict[str, Any], size: np.ndarray) -> Dict[str, Any]: return { 'data': grid_filters.gradient(size,f['data'] if len(f['data'].shape) == 4 else \ f['data'].reshape(f['data'].shape+(1,))), @@ -1274,7 +1325,7 @@ class Result: 'creator': 'add_gradient' } } - def add_gradient(self,f): + def add_gradient(self, f: str): """ Add gradient of a field. @@ -1292,7 +1343,11 @@ class Result: self._add_generic_grid(self._add_gradient,{'f':f},{'size':self.size}) - def _add_generic_grid(self,func,datasets,args={},constituents=None): + def _add_generic_grid(self, + func: Callable, + datasets: Dict[str, str], + args: Dict[str, str] = {}, + constituents = None): """ General function to add data on a regular grid. @@ -1313,11 +1368,13 @@ class Result: at_cell_ph,in_data_ph,at_cell_ho,in_data_ho = self._mappings() + increments = self.place(list(datasets.values()),False) + if not increments: raise RuntimeError("received invalid dataset") with h5py.File(self.fname, 'a') as f: - for increment in self.place(datasets.values(),False).items(): + for increment in increments.items(): for ty in increment[1].items(): for field in ty[1].items(): - d = list(field[1].values())[0] + d: np.ma.MaskedArray = list(field[1].values())[0] if np.any(d.mask): continue dataset = {'f':{'data':np.reshape(d.data,tuple(self.cells)+d.data.shape[1:]), 'label':list(datasets.values())[0], @@ -1331,21 +1388,26 @@ class Result: result1 = result[at_cell_ho[x]] path = '/'.join(['/',increment[0],ty[0],x,field[0]]) - dataset = f[path].create_dataset(r['label'],data=result1) + h5_dataset = f[path].create_dataset(r['label'],data=result1) now = datetime.datetime.now().astimezone() - dataset.attrs['created'] = now.strftime('%Y-%m-%d %H:%M:%S%z') if h5py3 else \ - now.strftime('%Y-%m-%d %H:%M:%S%z').encode() + h5_dataset.attrs['created'] = now.strftime('%Y-%m-%d %H:%M:%S%z') if h5py3 else \ + now.strftime('%Y-%m-%d %H:%M:%S%z').encode() for l,v in r['meta'].items(): - dataset.attrs[l.lower()]=v if h5py3 else v.encode() - creator = dataset.attrs['creator'] if h5py3 else \ - dataset.attrs['creator'].decode() - dataset.attrs['creator'] = f'damask.Result.{creator} v{damask.version}' if h5py3 else \ - f'damask.Result.{creator} v{damask.version}'.encode() + h5_dataset.attrs[l.lower()]=v if h5py3 else v.encode() + creator = h5_dataset.attrs['creator'] if h5py3 else \ + h5_dataset.attrs['creator'].decode() + h5_dataset.attrs['creator'] = f'damask.Result.{creator} v{damask.version}' if h5py3 else \ + f'damask.Result.{creator} v{damask.version}'.encode() - def _job_pointwise(self,group,func,datasets,args,lock): + def _job_pointwise(self, + group: str, + callback: Callable, + datasets: Dict[str, str], + args: Dict[str, str], + lock: Lock) -> List[Union[None, Any]]: """Execute job for _add_generic_pointwise.""" try: datasets_in = {} @@ -1358,19 +1420,23 @@ class Result: 'meta': {k:(v.decode() if not h5py3 and type(v) is bytes else v) \ for k,v in loc.attrs.items()}} lock.release() - r = func(**datasets_in,**args) + r = callback(**datasets_in,**args) return [group,r] except Exception as err: print(f'Error during calculation: {err}.') return [None,None] - def _add_generic_pointwise(self,func,datasets,args={}): + + def _add_generic_pointwise(self, + func: Callable, + datasets: Dict[str, Any], + args: Dict[str, Any] = {}): """ General function to add pointwise data. Parameters ---------- - func : function + callback : function Callback function that calculates a new dataset from one or more datasets per HDF5 group. datasets : dictionary @@ -1396,9 +1462,9 @@ class Result: print('No matching dataset found, no data was added.') return - default_arg = partial(self._job_pointwise,func=func,datasets=datasets,args=args,lock=lock) + default_arg = partial(self._job_pointwise,callback=func,datasets=datasets,args=args,lock=lock) - for group,result in util.show_progress(pool.imap_unordered(default_arg,groups),len(groups)): + for group,result in util.show_progress(pool.imap_unordered(default_arg,groups),len(groups)):# type: ignore if not result: continue lock.acquire() @@ -1410,15 +1476,14 @@ class Result: dataset.attrs['overwritten'] = True else: shape = result['data'].shape - if result['data'].size >= chunk_size*2: + if compress := (result['data'].size >= chunk_size*2): chunks = (chunk_size//np.prod(shape[1:]),)+shape[1:] - compression = ('gzip',6) else: chunks = shape - compression = (None,None) dataset = f[group].create_dataset(result['label'],data=result['data'], maxshape=shape, chunks=chunks, - compression=compression[0], compression_opts=compression[1], + compression = 'gzip' if compress else None, + compression_opts = 6 if compress else None, shuffle=True,fletcher32=True) now = datetime.datetime.now().astimezone() @@ -1440,7 +1505,8 @@ class Result: pool.join() - def export_XDMF(self,output='*'): + def export_XDMF(self, + output: Union[str, List[str]] = '*'): """ Write XDMF file to directly visualize data from DADF5 file. @@ -1574,7 +1640,13 @@ class Result: return at_cell_ph,in_data_ph,at_cell_ho,in_data_ho - def export_VTK(self,output='*',mode='cell',constituents=None,fill_float=np.nan,fill_int=0,parallel=True): + def export_VTK(self, + output: Union[str,list] = '*', + mode: str = 'cell', + constituents: IntSequence = None, + fill_float: float = np.nan, + fill_int: int = 0, + parallel: bool = True): """ Export to VTK cell/point data. @@ -1612,12 +1684,12 @@ class Result: else: raise ValueError(f'invalid mode "{mode}"') - v.comments = util.execution_stamp('Result','export_VTK') + v.comments = [util.execution_stamp('Result','export_VTK')] N_digits = int(np.floor(np.log10(max(1,self.incs[-1]))))+1 constituents_ = constituents if isinstance(constituents,Iterable) else \ - (range(self.N_constituents) if constituents is None else [constituents]) + (range(self.N_constituents) if constituents is None else [constituents]) # type: ignore suffixes = [''] if self.N_constituents == 1 or isinstance(constituents,int) else \ [f'#{c}' for c in constituents_] @@ -1637,7 +1709,7 @@ class Result: for ty in ['phase','homogenization']: for field in self.visible['fields']: - outs = {} + outs: Dict[str, np.ma.core.MaskedArray] = {} for label in self.visible[ty+'s']: if field not in f['/'.join([inc,ty,label])].keys(): continue @@ -1665,7 +1737,10 @@ class Result: v.save(f'{self.fname.stem}_inc{inc[10:].zfill(N_digits)}',parallel=parallel) - def get(self,output='*',flatten=True,prune=True): + def get(self, + output: Union[str, List[str]] = '*', + flatten: bool = True, + prune: bool = True): """ Collect data per phase/homogenization reflecting the group/folder structure in the DADF5 file. @@ -1687,7 +1762,7 @@ class Result: Datasets structured by phase/homogenization and according to selected view. """ - r = {} + r = {} # type: ignore with h5py.File(self.fname,'r') as f: for inc in util.show_progress(self.visible['increments']): @@ -1710,7 +1785,13 @@ class Result: return None if (type(r) == dict and r == {}) else r - def place(self,output='*',flatten=True,prune=True,constituents=None,fill_float=np.nan,fill_int=0): + def place(self, + output: Union[str, List[str]] = '*', + flatten: bool = True, + prune: bool = True, + constituents: IntSequence = None, + fill_float: float = np.nan, + fill_int: int = 0): """ Merge data into spatial order that is compatible with the damask.VTK geometry representation. @@ -1748,10 +1829,10 @@ class Result: Datasets structured by spatial position and according to selected view. """ - r = {} + r = {} # type: ignore - constituents_ = constituents if isinstance(constituents,Iterable) else \ - (range(self.N_constituents) if constituents is None else [constituents]) + constituents_ = list(map(int,constituents)) if isinstance(constituents,Iterable) else \ + (range(self.N_constituents) if constituents is None else [constituents]) # type: ignore suffixes = [''] if self.N_constituents == 1 or isinstance(constituents,int) else \ [f'#{c}' for c in constituents_] @@ -1797,7 +1878,9 @@ class Result: return None if (type(r) == dict and r == {}) else r - def export_setup(self,output='*',overwrite=False): + def export_setup(self, + output: Union[str, List[str]] = '*', + overwrite: bool = False): """ Export configuration files. @@ -1811,7 +1894,7 @@ class Result: Defaults to False. """ - def export(name,obj,output,overwrite): + def export(name: str, obj: Union[h5py.Dataset,h5py.Group], output: Union[str,List[str]], overwrite: bool): if type(obj) == h5py.Dataset and _match(output,[name]): d = obj.attrs['description'] if h5py3 else obj.attrs['description'].decode() if not Path(name).exists() or overwrite: diff --git a/python/damask/_rotation.py b/python/damask/_rotation.py index 0e443ee62..61529eb46 100644 --- a/python/damask/_rotation.py +++ b/python/damask/_rotation.py @@ -994,7 +994,7 @@ class Rotation: """ rng = np.random.default_rng(rng_seed) - r = rng.random(3 if shape is None else tuple(shape)+(3,) if hasattr(shape, '__iter__') else (shape,3)) #type: ignore + r = rng.random(3 if shape is None else tuple(shape)+(3,) if hasattr(shape, '__iter__') else (shape,3)) # type: ignore A = np.sqrt(r[...,2]) B = np.sqrt(1.0-r[...,2]) diff --git a/python/damask/_table.py b/python/damask/_table.py index 2d2c30c00..03e62b2b4 100644 --- a/python/damask/_table.py +++ b/python/damask/_table.py @@ -160,7 +160,7 @@ class Table: 'linear' ==> 1_v 2_v 3_v """ - self.data.columns = self._label(self.shapes,how) #type: ignore + self.data.columns = self._label(self.shapes,how) # type: ignore def _add_comment(self,