diff --git a/python/damask/_result.py b/python/damask/_result.py index b5f08a87d..5d8f2f668 100644 --- a/python/damask/_result.py +++ b/python/damask/_result.py @@ -23,18 +23,22 @@ from . import mechanics from . import tensor from . import util +from typing import Union, Optional, Callable, Any, Sequence, Literal, Dict, List, Tuple +from multiprocessing.synchronize import Lock as LockBase + h5py3 = h5py.__version__[0] == '3' chunk_size = 1024**2//8 # for compression in HDF5 prefix_inc = 'increment_' -def _read(dataset): +def _read(dataset: h5py._hl.dataset.Dataset): """Read a dataset and its metadata into a numpy.ndarray.""" metadata = {k:(v.decode() if not h5py3 and type(v) is bytes else v) for k,v in dataset.attrs.items()} - dtype = np.dtype(dataset.dtype,metadata=metadata) + dtype = np.dtype(dataset.dtype,metadata=metadata) # type: ignore return np.array(dataset,dtype=dtype) -def _match(requested,existing): +def _match(requested: Union[None, str, Sequence[Any], np.ndarray], + existing: h5py._hl.base.KeysViewHDF5) -> List[Any]: """Find matches among two sets of labels.""" def flatten_list(list_of_lists): return [e for e_ in list_of_lists for e in e_] @@ -50,7 +54,10 @@ def _match(requested,existing): return sorted(set(flatten_list([fnmatch.filter(existing,r) for r in requested_])), key=util.natural_sort) -def _empty_like(dataset,N_materialpoints,fill_float,fill_int): +def _empty_like(dataset: np.ma.core.MaskedArray, + N_materialpoints: int, + fill_float: float, + fill_int: int) -> np.ma.core.MaskedArray: """Create empty numpy.ma.MaskedArray.""" return ma.array(np.empty((N_materialpoints,)+dataset.shape[1:],dataset.dtype), fill_value = fill_float if dataset.dtype in np.sctypes['float'] else fill_int, @@ -84,7 +91,7 @@ class Result: """ - def __init__(self,fname): + def __init__(self, fname: Union[str, Path]): """ New result view bound to a HDF5 file. @@ -102,7 +109,7 @@ class Result: if self.version_major != 0 or not 12 <= self.version_minor <= 14: raise TypeError(f'unsupported DADF5 version "{self.version_major}.{self.version_minor}"') if self.version_major == 0 and self.version_minor < 14: - self.export_setup = None + self.export_setup = None # type: ignore self.structured = 'cells' in f['geometry'].attrs.keys() @@ -111,7 +118,7 @@ class Result: self.size = f['geometry'].attrs['size'] self.origin = f['geometry'].attrs['origin'] else: - self.add_curl = self.add_divergence = self.add_gradient = None + self.add_curl = self.add_divergence = self.add_gradient = None # type: ignore r = re.compile(rf'{prefix_inc}([0-9]+)') self.increments = sorted([i for i in f.keys() if r.match(i)],key=util.natural_sort) @@ -126,7 +133,7 @@ class Result: self.phase = f['cell_to/phase']['label'].astype('str') self.phases = sorted(np.unique(self.phase),key=util.natural_sort) - self.fields = [] + self.fields: List[str] = [] for c in self.phases: self.fields += f['/'.join([self.increments[0],'phase',c])].keys() for m in self.homogenizations: @@ -144,14 +151,14 @@ class Result: self._protected = True - def __copy__(self): + def __copy__(self) -> "Result": """Create deep copy.""" return copy.deepcopy(self) copy = __copy__ - def __repr__(self): + def __repr__(self) -> str: """Give short human-readable summary.""" with h5py.File(self.fname,'r') as f: header = [f'Created by {f.attrs["creator"]}', @@ -171,12 +178,12 @@ class Result: def _manage_view(self, - action, + action: Literal['set', 'add', 'del'], increments=None, times=None, phases=None, homogenizations=None, - fields=None): + fields=None) -> "Result": """ Manages the visibility of the groups. @@ -223,8 +230,8 @@ class Result: if idx >= len(self.times): continue if np.isclose(c,self.times[idx]): choice.append(self.increments[idx]) - elif np.isclose(c,self.times[idx+1]): - choice.append(self.increments[idx+1]) + elif np.isclose(c,self.times[idx+1]): #type: ignore + choice.append(self.increments[idx+1]) #type: ignore valid = _match(choice,getattr(self,what)) existing = set(self.visible[what]) @@ -241,7 +248,9 @@ class Result: return dup - def increments_in_range(self,start=None,end=None): + def increments_in_range(self, + start: Union[str, int] = None, + end: Union[str, int] = None) -> Sequence[int]: """ Get all increments within a given range. @@ -263,7 +272,9 @@ class Result: self.incs[-1] if end is None else end)) return [i for i in self.incs if s <= i <= e] - def times_in_range(self,start=None,end=None): + def times_in_range(self, + start: float = None, + end: float = None) -> Sequence[int]: """ Get all increments within a given time range. @@ -286,12 +297,12 @@ class Result: def view(self,*, - increments=None, - times=None, - phases=None, - homogenizations=None, - fields=None, - protected=None): + increments: Union[int, Sequence[int], str, Sequence[str], bool] = None, + times: Union[float, Sequence[float], str, Sequence[str], bool] = None, + phases: Union[str, Sequence[str], bool] = None, + homogenizations: Union[str, Sequence[str], bool] = None, + fields: Union[str, Sequence[str], bool] = None, + protected: bool = None) -> "Result": """ Set view. @@ -343,11 +354,11 @@ class Result: def view_more(self,*, - increments=None, - times=None, - phases=None, - homogenizations=None, - fields=None): + increments: Union[int, Sequence[int], str, Sequence[str], bool] = None, + times: Union[float, Sequence[float], str, Sequence[str], bool] = None, + phases: Union[str, Sequence[str], bool] = None, + homogenizations: Union[str, Sequence[str], bool] = None, + fields: Union[str, Sequence[str], bool] = None) -> "Result": """ Add to view. @@ -386,11 +397,11 @@ class Result: def view_less(self,*, - increments=None, - times=None, - phases=None, - homogenizations=None, - fields=None): + increments: Union[int, Sequence[int], str, Sequence[str], bool] = None, + times: Union[float, Sequence[float], str, Sequence[str], bool] = None, + phases: Union[str, Sequence[str], bool] = None, + homogenizations: Union[str, Sequence[str], bool] = None, + fields: Union[str, Sequence[str], bool] = None) -> "Result": """ Remove from view. @@ -427,7 +438,9 @@ class Result: return self._manage_view('del',increments,times,phases,homogenizations,fields) - def rename(self,name_src,name_dst): + def rename(self, + name_src: str, + name_dst: str): """ Rename/move datasets (within the same group/folder). @@ -468,7 +481,7 @@ class Result: del f[path_src] - def remove(self,name): + def remove(self, name: str): """ Remove/delete datasets. @@ -502,7 +515,7 @@ class Result: if path in f.keys(): del f[path] - def list_data(self): + def list_data(self) -> List[str]: """ Collect information on all active datasets in the file. @@ -533,7 +546,8 @@ class Result: return msg - def enable_user_function(self,func): + def enable_user_function(self, + func: Callable): globals()[func.__name__]=func print(f'Function {func.__name__} enabled in add_calculation.') @@ -544,7 +558,7 @@ class Result: @property - def coordinates0_point(self): + def coordinates0_point(self) -> np.ndarray: """Initial/undeformed cell center coordinates.""" if self.structured: return grid_filters.coordinates0_point(self.cells,self.size,self.origin).reshape(-1,3,order='F') @@ -553,7 +567,7 @@ class Result: return f['geometry/x_p'][()] @property - def coordinates0_node(self): + def coordinates0_node(self) -> np.ndarray: """Initial/undeformed nodal coordinates.""" if self.structured: return grid_filters.coordinates0_node(self.cells,self.size,self.origin).reshape(-1,3,order='F') @@ -562,7 +576,7 @@ class Result: return f['geometry/x_n'][()] @property - def geometry0(self): + def geometry0(self) -> VTK: """Initial/undeformed geometry.""" if self.structured: return VTK.from_image_data(self.cells,self.size,self.origin) @@ -575,7 +589,7 @@ class Result: @staticmethod - def _add_absolute(x): + def _add_absolute(x: Dict[str, Any]) -> Dict[str, Union[Dict[str, Union[str, int, slice]], str, np.ndarray]]: return { 'data': np.abs(x['data']), 'label': f'|{x["label"]}|', @@ -585,7 +599,7 @@ class Result: 'creator': 'add_absolute' } } - def add_absolute(self,x): + def add_absolute(self, x: str): """ Add absolute value. @@ -599,7 +613,7 @@ class Result: @staticmethod - def _add_calculation(**kwargs): + def _add_calculation(**kwargs) -> Dict[str, Union[Dict[str, str], str]]: formula = kwargs['formula'] for d in re.findall(r'#(.*?)#',formula): formula = formula.replace(f'#{d}#',f"kwargs['{d}']['data']") @@ -617,7 +631,11 @@ class Result: 'creator': 'add_calculation' } } - def add_calculation(self,formula,name,unit='n/a',description=None): + def add_calculation(self, + formula: str, + name: str, + unit: str = 'n/a', + description: str = None): """ Add result of a general formula. @@ -661,13 +679,14 @@ class Result: ... 'Mises equivalent of the Cauchy stress') """ - dataset_mapping = {d:d for d in set(re.findall(r'#(.*?)#',formula))} # datasets used in the formula + dataset_mapping: Dict[str, str] = {d:d for d in set(re.findall(r'#(.*?)#',formula))} # datasets used in the formula args = {'formula':formula,'label':name,'unit':unit,'description':description} self._add_generic_pointwise(self._add_calculation,dataset_mapping,args) @staticmethod - def _add_stress_Cauchy(P,F): + def _add_stress_Cauchy(P: Dict[str, np.ndarray], + F: Dict[str, np.ndarray]) -> Dict[str, Any]: return { 'data': mechanics.stress_Cauchy(P['data'],F['data']), 'label': 'sigma', @@ -679,7 +698,9 @@ class Result: 'creator': 'add_stress_Cauchy' } } - def add_stress_Cauchy(self,P='P',F='F'): + def add_stress_Cauchy(self, + P: str = 'P', + F: str = 'F'): """ Add Cauchy stress calculated from first Piola-Kirchhoff stress and deformation gradient. @@ -695,7 +716,7 @@ class Result: @staticmethod - def _add_determinant(T): + def _add_determinant(T: Dict[str, np.ndarray]) -> Dict[str, Any]: return { 'data': np.linalg.det(T['data']), 'label': f"det({T['label']})", @@ -705,7 +726,7 @@ class Result: 'creator': 'add_determinant' } } - def add_determinant(self,T): + def add_determinant(self, T: str): """ Add the determinant of a tensor. @@ -727,7 +748,7 @@ class Result: @staticmethod - def _add_deviator(T): + def _add_deviator(T: Dict[str, np.ndarray]) -> Dict[str, Any]: return { 'data': tensor.deviatoric(T['data']), 'label': f"s_{T['label']}", @@ -737,7 +758,7 @@ class Result: 'creator': 'add_deviator' } } - def add_deviator(self,T): + def add_deviator(self, T: str): """ Add the deviatoric part of a tensor. @@ -759,13 +780,16 @@ class Result: @staticmethod - def _add_eigenvalue(T_sym,eigenvalue): + def _add_eigenvalue(T_sym: Dict[str, np.ndarray], + eigenvalue: Literal['max, mid, minimum']) -> Dict[str, Any]: if eigenvalue == 'max': label,p = 'maximum',2 elif eigenvalue == 'mid': label,p = 'intermediate',1 elif eigenvalue == 'min': label,p = 'minimum',0 + else: + raise TypeError return { 'data': tensor.eigenvalues(T_sym['data'])[:,p], @@ -776,7 +800,9 @@ class Result: 'creator': 'add_eigenvalue' } } - def add_eigenvalue(self,T_sym,eigenvalue='max'): + def add_eigenvalue(self, + T_sym: str, + eigenvalue: Literal['max', 'mid', 'minimum'] = 'max'): """ Add eigenvalues of symmetric tensor. @@ -800,7 +826,8 @@ class Result: @staticmethod - def _add_eigenvector(T_sym,eigenvalue): + def _add_eigenvector(T_sym: Dict[str, np.ndarray], + eigenvalue: Literal['max', 'mid', 'minimum']) -> Dict[str, Any]: if eigenvalue == 'max': label,p = 'maximum',2 elif eigenvalue == 'mid': @@ -817,7 +844,9 @@ class Result: 'creator': 'add_eigenvector' } } - def add_eigenvector(self,T_sym,eigenvalue='max'): + def add_eigenvector(self, + T_sym: str, + eigenvalue: Literal['max', 'mid', 'minimum'] = 'max'): """ Add eigenvector of symmetric tensor. @@ -834,7 +863,8 @@ class Result: @staticmethod - def _add_IPF_color(l,q): + def _add_IPF_color(l: Sequence[float], + q: Dict[str, Any]) -> Dict[str, Any]: m = util.scale_to_coprime(np.array(l)) lattice = q['meta']['lattice'] o = Orientation(rotation = q['data'],lattice=lattice) @@ -849,7 +879,9 @@ class Result: 'creator': 'add_IPF_color' } } - def add_IPF_color(self,l,q='O'): + def add_IPF_color(self, + l: np.ndarray, + q: str = 'O'): """ Add RGB color tuple of inverse pole figure (IPF) color. @@ -874,7 +906,7 @@ class Result: @staticmethod - def _add_maximum_shear(T_sym): + def _add_maximum_shear(T_sym: dict) -> dict: return { 'data': mechanics.maximum_shear(T_sym['data']), 'label': f"max_shear({T_sym['label']})", @@ -884,7 +916,7 @@ class Result: 'creator': 'add_maximum_shear' } } - def add_maximum_shear(self,T_sym): + def add_maximum_shear(self, T_sym: str): """ Add maximum shear components of symmetric tensor. @@ -898,7 +930,8 @@ class Result: @staticmethod - def _add_equivalent_Mises(T_sym,kind): + def _add_equivalent_Mises(T_sym: Dict[str, np.ndarray], + kind: str) -> Dict[str, Any]: k = kind if k is None: if T_sym['meta']['unit'] == '1': @@ -918,7 +951,9 @@ class Result: 'creator': 'add_Mises' } } - def add_equivalent_Mises(self,T_sym,kind=None): + def add_equivalent_Mises(self, + T_sym: str, + kind: str = None): """ Add the equivalent Mises stress or strain of a symmetric tensor. @@ -949,10 +984,11 @@ class Result: @staticmethod - def _add_norm(x,ord): + def _add_norm(x: Dict[str, np.ndarray], + ord: int) -> Dict[str, Any]: o = ord if len(x['data'].shape) == 2: - axis = 1 + axis: Union[int, Tuple[int, int]] = 1 t = 'vector' if o is None: o = 2 elif len(x['data'].shape) == 3: @@ -971,7 +1007,9 @@ class Result: 'creator': 'add_norm' } } - def add_norm(self,x,ord=None): + def add_norm(self, + x: str, + ord: Union[int, Literal['fro', 'nuc']] = None): """ Add the norm of vector or tensor. @@ -987,7 +1025,8 @@ class Result: @staticmethod - def _add_stress_second_Piola_Kirchhoff(P,F): + def _add_stress_second_Piola_Kirchhoff(P: Dict[str, np.ndarray], + F: Dict[str, np.ndarray]) -> Dict[str, Any]: return { 'data': mechanics.stress_second_Piola_Kirchhoff(P['data'],F['data']), 'label': 'S', @@ -999,7 +1038,9 @@ class Result: 'creator': 'add_stress_second_Piola_Kirchhoff' } } - def add_stress_second_Piola_Kirchhoff(self,P='P',F='F'): + def add_stress_second_Piola_Kirchhoff(self, + P: str = 'P', + F: str = 'F'): """ Add second Piola-Kirchhoff stress calculated from first Piola-Kirchhoff stress and deformation gradient. @@ -1023,7 +1064,10 @@ class Result: @staticmethod - def _add_pole(q,uvw,hkl,with_symmetry): + def _add_pole(q: Dict[str, Any], + uvw: np.ndarray, + hkl: np.ndarray, + with_symmetry: bool) -> Dict[str, Any]: c = q['meta']['c/a'] if 'c/a' in q['meta'] else 1 pole = Orientation(q['data'],lattice=q['meta']['lattice'],a=1,c=c).to_pole(uvw=uvw,hkl=hkl,with_symmetry=with_symmetry) @@ -1038,7 +1082,12 @@ class Result: 'creator': 'add_pole' } } - def add_pole(self,q='O',*,uvw=None,hkl=None,with_symmetry=False): + def add_pole(self, + q: str = 'O', + *, + uvw: np.ndarray = None, + hkl: np.ndarray = None, + with_symmetry: bool = False): """ Add lab frame vector along lattice direction [uvw] or plane normal (hkl). @@ -1057,7 +1106,7 @@ class Result: @staticmethod - def _add_rotation(F): + def _add_rotation(F: Dict[str, np.ndarray]) -> Dict[str, Any]: return { 'data': mechanics.rotation(F['data']).as_matrix(), 'label': f"R({F['label']})", @@ -1067,7 +1116,7 @@ class Result: 'creator': 'add_rotation' } } - def add_rotation(self,F): + def add_rotation(self, F: str): """ Add rotational part of a deformation gradient. @@ -1089,7 +1138,7 @@ class Result: @staticmethod - def _add_spherical(T): + def _add_spherical(T: Dict[str, np.ndarray]) -> Dict[str, Any]: return { 'data': tensor.spherical(T['data'],False), 'label': f"p_{T['label']}", @@ -1099,7 +1148,7 @@ class Result: 'creator': 'add_spherical' } } - def add_spherical(self,T): + def add_spherical(self, T: str): """ Add the spherical (hydrostatic) part of a tensor. @@ -1121,7 +1170,8 @@ class Result: @staticmethod - def _add_strain(F,t,m): + def _add_strain(F: Dict[str, np.ndarray], + t: str, m: float) -> Dict[str, Any]: return { 'data': mechanics.strain(F['data'],t,m), 'label': f"epsilon_{t}^{m}({F['label']})", @@ -1131,7 +1181,10 @@ class Result: 'creator': 'add_strain' } } - def add_strain(self,F='F',t='V',m=0.0): + def add_strain(self, + F: str = 'F', + t: Literal['V', 'U'] = 'V', + m: float = 0.0): """ Add strain tensor of a deformation gradient. @@ -1167,7 +1220,8 @@ class Result: @staticmethod - def _add_stretch_tensor(F,t): + def _add_stretch_tensor(F: Dict[str, np.ndarray], + t: str) -> Dict[str, Any]: return { 'data': (mechanics.stretch_left if t.upper() == 'V' else mechanics.stretch_right)(F['data']), 'label': f"{t}({F['label']})", @@ -1178,7 +1232,9 @@ class Result: 'creator': 'add_stretch_tensor' } } - def add_stretch_tensor(self,F='F',t='V'): + def add_stretch_tensor(self, + F: str = 'F', + t: Literal['V', 'U'] = 'V'): """ Add stretch tensor of a deformation gradient. @@ -1195,7 +1251,8 @@ class Result: @staticmethod - def _add_curl(f,size): + def _add_curl(f: Dict[str, np.ndarray], + size: np.ndarray) -> Dict[str, Any]: return { 'data': grid_filters.curl(size,f['data']), 'label': f"curl({f['label']})", @@ -1205,7 +1262,7 @@ class Result: 'creator': 'add_curl' } } - def add_curl(self,f): + def add_curl(self, f: str): """ Add curl of a field. @@ -1224,7 +1281,8 @@ class Result: @staticmethod - def _add_divergence(f,size): + def _add_divergence(f: Dict[str, np.ndarray], + size: np.ndarray) -> Dict[str, Any]: return { 'data': grid_filters.divergence(size,f['data']), 'label': f"divergence({f['label']})", @@ -1234,7 +1292,7 @@ class Result: 'creator': 'add_divergence' } } - def add_divergence(self,f): + def add_divergence(self, f: str): """ Add divergence of a field. @@ -1253,7 +1311,8 @@ class Result: @staticmethod - def _add_gradient(f,size): + def _add_gradient(f: Dict[str, np.ndarray], + size: np.ndarray) -> Dict[str, Any]: return { 'data': grid_filters.gradient(size,f['data'] if len(f['data'].shape) == 4 else \ f['data'].reshape(f['data'].shape+(1,))), @@ -1264,7 +1323,7 @@ class Result: 'creator': 'add_gradient' } } - def add_gradient(self,f): + def add_gradient(self, f: str): """ Add gradient of a field. @@ -1282,7 +1341,11 @@ class Result: self._add_generic_grid(self._add_gradient,{'f':f},{'size':self.size}) - def _add_generic_grid(self,func,datasets,args={},constituents=None): + def _add_generic_grid(self, + func: Callable, + datasets: Dict[str, str], + args: Dict[str, str] = {}, + constituents=None): """ General function to add data on a regular grid. @@ -1303,11 +1366,13 @@ class Result: at_cell_ph,in_data_ph,at_cell_ho,in_data_ho = self._mappings() + increments = self.place(list(datasets.values()),False) + if not increments: raise RuntimeError("received invalid dataset") with h5py.File(self.fname, 'a') as f: - for increment in self.place(datasets.values(),False).items(): + for increment in increments.items(): for ty in increment[1].items(): for field in ty[1].items(): - d = list(field[1].values())[0] + d: np.ma.MaskedArray = list(field[1].values())[0] if np.any(d.mask): continue dataset = {'f':{'data':np.reshape(d.data,tuple(self.cells)+d.data.shape[1:]), 'label':list(datasets.values())[0], @@ -1321,21 +1386,26 @@ class Result: result1 = result[at_cell_ho[x]] path = '/'.join(['/',increment[0],ty[0],x,field[0]]) - dataset = f[path].create_dataset(r['label'],data=result1) + h5_dataset : h5py._hl.dataset.Dataset = f[path].create_dataset(r['label'],data=result1) now = datetime.datetime.now().astimezone() - dataset.attrs['created'] = now.strftime('%Y-%m-%d %H:%M:%S%z') if h5py3 else \ + h5_dataset.attrs['created'] = now.strftime('%Y-%m-%d %H:%M:%S%z') if h5py3 else \ now.strftime('%Y-%m-%d %H:%M:%S%z').encode() for l,v in r['meta'].items(): - dataset.attrs[l.lower()]=v if h5py3 else v.encode() - creator = dataset.attrs['creator'] if h5py3 else \ - dataset.attrs['creator'].decode() - dataset.attrs['creator'] = f'damask.Result.{creator} v{damask.version}' if h5py3 else \ + h5_dataset.attrs[l.lower()]=v if h5py3 else v.encode() + creator = h5_dataset.attrs['creator'] if h5py3 else \ + h5_dataset.attrs['creator'].decode() + h5_dataset.attrs['creator'] = f'damask.Result.{creator} v{damask.version}' if h5py3 else \ f'damask.Result.{creator} v{damask.version}'.encode() - def _job_pointwise(self,group,func,datasets,args,lock): + def _job_pointwise(self, + group: str, + func: Callable, + datasets: Dict[str, str], + args: Dict[str, str], + lock: LockBase) -> List[Union[None, Any]]: """Execute job for _add_generic_pointwise.""" try: datasets_in = {} @@ -1354,7 +1424,10 @@ class Result: print(f'Error during calculation: {err}.') return [None,None] - def _add_generic_pointwise(self,func,datasets,args={}): + def _add_generic_pointwise(self, + func: Callable, + datasets: Dict[str, Any], + args: Dict[str, Any] = {}): """ General function to add pointwise data. @@ -1388,7 +1461,7 @@ class Result: default_arg = partial(self._job_pointwise,func=func,datasets=datasets,args=args,lock=lock) - for group,result in util.show_progress(pool.imap_unordered(default_arg,groups),len(groups)): + for group,result in util.show_progress(pool.imap_unordered(default_arg,groups),len(groups)): #type: ignore if not result: continue lock.acquire() @@ -1402,7 +1475,7 @@ class Result: shape = result['data'].shape if result['data'].size >= chunk_size*2: chunks = (chunk_size//np.prod(shape[1:]),)+shape[1:] - compression = ('gzip',6) + compression: Tuple[Optional[str], Optional[int]] = ('gzip',6) else: chunks = shape compression = (None,None) @@ -1430,7 +1503,8 @@ class Result: pool.join() - def export_XDMF(self,output='*'): + def export_XDMF(self, + output: Union[str, List[str]] = '*'): """ Write XDMF file to directly visualize data from DADF5 file. @@ -1544,7 +1618,7 @@ class Result: f.write(xml.dom.minidom.parseString(ET.tostring(xdmf).decode()).toprettyxml()) - def _mappings(self): + def _mappings(self) -> Tuple[List[Dict[Any, np.ndarray]], List[Dict[Any, Any]], Dict[Any, np.ndarray], Dict[Any, Any]]: """Mappings to place data spatially.""" with h5py.File(self.fname,'r') as f: @@ -1564,7 +1638,13 @@ class Result: return at_cell_ph,in_data_ph,at_cell_ho,in_data_ho - def export_VTK(self,output='*',mode='cell',constituents=None,fill_float=np.nan,fill_int=0,parallel=True): + def export_VTK(self, + output: Union[str,list] = '*', + mode: str = 'cell', + constituents: Optional[Union[int, list]] = None, + fill_float: float = np.nan, + fill_int: int = 0, + parallel: bool = True): """ Export to VTK cell/point data. @@ -1627,7 +1707,7 @@ class Result: for ty in ['phase','homogenization']: for field in self.visible['fields']: - outs = {} + outs: Dict[str, np.ma.core.MaskedArray] = {} for label in self.visible[ty+'s']: if field not in f['/'.join([inc,ty,label])].keys(): continue @@ -1655,7 +1735,10 @@ class Result: v.save(f'{self.fname.stem}_inc{inc[10:].zfill(N_digits)}',parallel=parallel) - def get(self,output='*',flatten=True,prune=True): + def get(self, + output: Union[str, List[str]] = '*', + flatten: bool = True, + prune: bool = True) -> Optional[Dict[str, Dict[str, Any]]]: """ Collect data per phase/homogenization reflecting the group/folder structure in the DADF5 file. @@ -1677,7 +1760,7 @@ class Result: Datasets structured by phase/homogenization and according to selected view. """ - r = {} + r: Dict[str, Dict[str, Any]] = {} with h5py.File(self.fname,'r') as f: for inc in util.show_progress(self.visible['increments']): @@ -1700,7 +1783,13 @@ class Result: return None if (type(r) == dict and r == {}) else r - def place(self,output='*',flatten=True,prune=True,constituents=None,fill_float=np.nan,fill_int=0): + def place(self, + output: Union[str, list] = '*', + flatten: bool = True, + prune: bool = True, + constituents: Union[None, int, List[int]] = None, + fill_float: float = np.nan, + fill_int: int = 0) -> Optional[Dict[str, Dict[str, Dict[str, Dict[str, Union[np.ma.MaskedArray]]]]]]: """ Merge data into spatial order that is compatible with the damask.VTK geometry representation. @@ -1738,9 +1827,9 @@ class Result: Datasets structured by spatial position and according to selected view. """ - r = {} + r: Dict[str, Dict[str, Dict[str, Dict[str, Union[np.ma.MaskedArray]]]]] = {} - constituents_ = constituents if isinstance(constituents,Iterable) else \ + constituents_: Sequence[int] = constituents if isinstance(constituents,Iterable) else \ (range(self.N_constituents) if constituents is None else [constituents]) suffixes = [''] if self.N_constituents == 1 or isinstance(constituents,int) else \ @@ -1787,7 +1876,9 @@ class Result: return None if (type(r) == dict and r == {}) else r - def export_setup(self,output='*',overwrite=False): + def export_setup(self, + output: Union[str, List[str]] = '*', + overwrite: bool = False): """ Export configuration files. @@ -1801,7 +1892,7 @@ class Result: Defaults to False. """ - def export(name,obj,output,overwrite): + def export(name: str,obj: Union[h5py.Dataset,h5py.Group],output: Union[str,list],overwrite: bool): if type(obj) == h5py.Dataset and _match(output,[name]): d = obj.attrs['description'] if h5py3 else obj.attrs['description'].decode() if not Path(name).exists() or overwrite: diff --git a/python/damask/_vtk.py b/python/damask/_vtk.py index 4f2658cd7..92ec08aee 100644 --- a/python/damask/_vtk.py +++ b/python/damask/_vtk.py @@ -1,7 +1,7 @@ import os import multiprocessing as mp from pathlib import Path -from typing import Union, Literal, List, Sequence +from typing import Union, Literal import numpy as np import vtk @@ -86,7 +86,7 @@ class VTK: @property - def comments(self) -> List[str]: + def comments(self): """Return the comments.""" field_data = self.vtk_data.GetFieldData() for a in range(field_data.GetNumberOfArrays()): @@ -97,7 +97,7 @@ class VTK: @comments.setter def comments(self, - comments: Union[str, Sequence[str]]): + comments): """ Set comments.