import multiprocessing import re import inspect import glob import os import datetime import xml.etree.ElementTree as ET import xml.dom.minidom from functools import partial import h5py import numpy as np from . import VTK from . import Table from . import Rotation from . import Orientation from . import Environment from . import grid_filters from . import mechanics from . import util from . import version class Result: """ Read and write to DADF5 files. DADF5 (DAMASK HDF5) files contain DAMASK results. """ def __init__(self,fname): """ Open an existing DADF5 file. Parameters ---------- fname : str name of the DADF5 file to be openend. """ with h5py.File(fname,'r') as f: try: self.version_major = f.attrs['DADF5_version_major'] self.version_minor = f.attrs['DADF5_version_minor'] except KeyError: self.version_major = f.attrs['DADF5-major'] self.version_minor = f.attrs['DADF5-minor'] if self.version_major != 0 or not 2 <= self.version_minor <= 6: raise TypeError('Unsupported DADF5 version {}.{} '.format(self.version_major, self.version_minor)) self.structured = 'grid' in f['geometry'].attrs.keys() if self.structured: self.grid = f['geometry'].attrs['grid'] self.size = f['geometry'].attrs['size'] self.origin = f['geometry'].attrs['origin'] if self.version_major == 0 and self.version_minor >= 5 else \ np.zeros(3) r=re.compile('inc[0-9]+') increments_unsorted = {int(i[3:]):i for i in f.keys() if r.match(i)} self.increments = [increments_unsorted[i] for i in sorted(increments_unsorted)] self.times = [round(f[i].attrs['time/s'],12) for i in self.increments] self.Nmaterialpoints, self.Nconstituents = np.shape(f['mapping/cellResults/constituent']) self.materialpoints = [m.decode() for m in np.unique(f['mapping/cellResults/materialpoint']['Name'])] self.constituents = [c.decode() for c in np.unique(f['mapping/cellResults/constituent'] ['Name'])] # faster, but does not work with (deprecated) DADF5_postResults #self.materialpoints = [m for m in f['inc0/materialpoint']] #self.constituents = [c for c in f['inc0/constituent']] self.con_physics = [] for c in self.constituents: self.con_physics += f['/'.join([self.increments[0],'constituent',c])].keys() self.con_physics = list(set(self.con_physics)) # make unique self.mat_physics = [] for m in self.materialpoints: self.mat_physics += f['/'.join([self.increments[0],'materialpoint',m])].keys() self.mat_physics = list(set(self.mat_physics)) # make unique self.selection = {'increments': self.increments, 'constituents': self.constituents,'materialpoints': self.materialpoints, 'con_physics': self.con_physics, 'mat_physics': self.mat_physics } self.fname = os.path.abspath(fname) self._allow_overwrite = False def __repr__(self): """Show selected data.""" all_selected_increments = self.selection['increments'] self.pick('increments',all_selected_increments[0:1]) first = self.list_data() self.pick('increments',all_selected_increments[-1:]) last = '' if len(all_selected_increments) < 2 else self.list_data() self.pick('increments',all_selected_increments) in_between = '' if len(all_selected_increments) < 3 else \ ''.join(['\n{}\n ...\n'.format(inc) for inc in all_selected_increments[1:-2]]) return util.srepr(first + in_between + last) def _manage_selection(self,action,what,datasets): """ Manages the visibility of the groups. Parameters ---------- action : str select from 'set', 'add', and 'del' what : str attribute to change (must be from self.selection) datasets : list of str or bool name of datasets as list, supports ? and * wildcards. True is equivalent to [*], False is equivalent to [] """ # allow True/False and string arguments if datasets is True: datasets = ['*'] elif datasets is False: datasets = [] choice = datasets if hasattr(datasets,'__iter__') and not isinstance(datasets,str) else \ [datasets] if what == 'increments': choice = [c if isinstance(c,str) and c.startswith('inc') else 'inc{}'.format(c) for c in choice] elif what == 'times': what = 'increments' if choice == ['*']: choice = self.increments else: iterator = map(float,choice) choice = [] for c in iterator: idx = np.searchsorted(self.times,c) if idx >= len(self.times): continue if np.isclose(c,self.times[idx]): choice.append(self.increments[idx]) elif np.isclose(c,self.times[idx+1]): choice.append(self.increments[idx+1]) valid = [e for e_ in [glob.fnmatch.filter(getattr(self,what),s) for s in choice] for e in e_] existing = set(self.selection[what]) if action == 'set': self.selection[what] = valid elif action == 'add': add = existing.union(valid) add_sorted = sorted(add, key=lambda x: int("".join([i for i in x if i.isdigit()]))) self.selection[what] = add_sorted elif action == 'del': diff = existing.difference(valid) diff_sorted = sorted(diff, key=lambda x: int("".join([i for i in x if i.isdigit()]))) self.selection[what] = diff_sorted def enable_overwrite(self): print(util.bcolors().WARNING,util.bcolors().BOLD, 'Warning: Enabled overwrite of existing datasets!', util.bcolors().ENDC) self._allow_overwrite = True def disable_overwrite(self): self._allow_overwrite = False def incs_in_range(self,start,end): selected = [] for i,inc in enumerate([int(i[3:]) for i in self.increments]): s,e = map(lambda x: int(x[3:] if isinstance(x,str) and x.startswith('inc') else x), (start,end)) if s <= inc <= e: selected.append(self.increments[i]) return selected def times_in_range(self,start,end): selected = [] for i,time in enumerate(self.times): if start <= time <= end: selected.append(self.times[i]) return selected def iterate(self,what): """ Iterate over selection items by setting each one selected. Parameters ---------- what : str attribute to change (must be from self.selection) """ datasets = self.selection[what] last_selection = datasets.copy() for dataset in datasets: if last_selection != self.selection[what]: self._manage_selection('set',what,datasets) raise Exception self._manage_selection('set',what,dataset) last_selection = self.selection[what] yield dataset self._manage_selection('set',what,datasets) def pick(self,what,datasets): """ Set selection. Parameters ---------- what : str attribute to change (must be from self.selection) datasets : list of str or bool name of datasets as list, supports ? and * wildcards. True is equivalent to [*], False is equivalent to [] """ self._manage_selection('set',what,datasets) def pick_more(self,what,datasets): """ Add to selection. Parameters ---------- what : str attribute to change (must be from self.selection) datasets : list of str or bool name of datasets as list, supports ? and * wildcards. True is equivalent to [*], False is equivalent to [] """ self._manage_selection('add',what,datasets) def pick_less(self,what,datasets): """ Delete from selection. Parameters ---------- what : str attribute to change (must be from self.selection) datasets : list of str or bool name of datasets as list, supports ? and * wildcards. True is equivalent to [*], False is equivalent to [] """ self._manage_selection('del',what,datasets) # def datamerger(regular expression to filter groups into one copy) def place(self,datasets,component=0,tagged=False,split=True): """ Distribute datasets onto geometry and return Table or (split) dictionary of Tables. Must not mix nodal end cell data. Only data within - inc?????/constituent/*_*/* - inc?????/materialpoint/*_*/* - inc?????/geometry/* are considered. Parameters ---------- datasets : iterable or str component : int homogenization component to consider for constituent data tagged : bool tag Table.column name with '#component' defaults to False split : bool split Table by increment and return dictionary of Tables defaults to True """ sets = datasets if hasattr(datasets,'__iter__') and not isinstance(datasets,str) \ else [datasets] tag = f'#{component}' if tagged else '' tbl = {} if split else None inGeom = {} inData = {} with h5py.File(self.fname,'r') as f: for dataset in sets: for group in self.groups_with_datasets(dataset): path = os.path.join(group,dataset) inc,prop,name,cat,item = (path.split('/') + ['']*5)[:5] key = '/'.join([prop,name+tag]) if key not in inGeom: if prop == 'geometry': inGeom[key] = inData[key] = np.arange(self.Nmaterialpoints) elif prop == 'constituent': inGeom[key] = np.where(f['mapping/cellResults/constituent'][:,component]['Name'] == str.encode(name))[0] inData[key] = f['mapping/cellResults/constituent'][inGeom[key],component]['Position'] else: inGeom[key] = np.where(f['mapping/cellResults/materialpoint']['Name'] == str.encode(name))[0] inData[key] = f['mapping/cellResults/materialpoint'][inGeom[key].tolist()]['Position'] shape = np.shape(f[path]) data = np.full((self.Nmaterialpoints,) + (shape[1:] if len(shape)>1 else (1,)), np.nan, dtype=np.dtype(f[path])) data[inGeom[key]] = (f[path] if len(shape)>1 else np.expand_dims(f[path],1))[inData[key]] path = (os.path.join(*([prop,name]+([cat] if cat else [])+([item] if item else []))) if split else path)+tag if split: try: tbl[inc].add(path,data) except KeyError: tbl[inc] = Table(data.reshape(self.Nmaterialpoints,-1),{path:data.shape[1:]}) else: try: tbl.add(path,data) except AttributeError: tbl = Table(data.reshape(self.Nmaterialpoints,-1),{path:data.shape[1:]}) return tbl def groups_with_datasets(self,datasets): """ Return groups that contain all requested datasets. Only groups within - inc*/constituent/*/* - inc*/materialpoint/*/* - inc*/geometry/* are considered as they contain user-relevant data. Single strings will be treated as list with one entry. Wild card matching is allowed, but the number of arguments need to fit. Parameters ---------- datasets : iterable or str or bool Examples -------- datasets = False matches no group datasets = True matches all groups datasets = ['F','P'] matches a group with ['F','P','sigma'] datasets = ['*','P'] matches a group with ['F','P'] datasets = ['*'] does not match a group with ['F','P','sigma'] datasets = ['*','*'] does not match a group with ['F','P','sigma'] datasets = ['*','*','*'] matches a group with ['F','P','sigma'] """ if datasets is False: return [] sets = datasets if isinstance(datasets,bool) or (hasattr(datasets,'__iter__') and not isinstance(datasets,str)) else \ [datasets] groups = [] with h5py.File(self.fname,'r') as f: for i in self.iterate('increments'): for o,p in zip(['constituents','materialpoints'],['con_physics','mat_physics']): for oo in self.iterate(o): for pp in self.iterate(p): group = '/'.join([i,o[:-1],oo,pp]) # o[:-1]: plural/singular issue if sets is True: groups.append(group) else: match = [e for e_ in [glob.fnmatch.filter(f[group].keys(),s) for s in sets] for e in e_] if len(set(match)) == len(sets): groups.append(group) return groups def list_data(self): """Return information on all active datasets in the file.""" message = '' with h5py.File(self.fname,'r') as f: for i in self.iterate('increments'): message += '\n{} ({}s)\n'.format(i,self.times[self.increments.index(i)]) for o,p in zip(['constituents','materialpoints'],['con_physics','mat_physics']): for oo in self.iterate(o): message += ' {}\n'.format(oo) for pp in self.iterate(p): message += ' {}\n'.format(pp) group = '/'.join([i,o[:-1],oo,pp]) # o[:-1]: plural/singular issue for d in f[group].keys(): try: dataset = f['/'.join([group,d])] message += ' {} / ({}): {}\n'.\ format(d,dataset.attrs['Unit'].decode(),dataset.attrs['Description'].decode()) except KeyError: pass return message def get_dataset_location(self,label): """Return the location of all active datasets with given label.""" path = [] with h5py.File(self.fname,'r') as f: for i in self.iterate('increments'): k = '/'.join([i,'geometry',label]) try: f[k] path.append(k) except KeyError: pass for o,p in zip(['constituents','materialpoints'],['con_physics','mat_physics']): for oo in self.iterate(o): for pp in self.iterate(p): k = '/'.join([i,o[:-1],oo,pp,label]) try: f[k] path.append(k) except KeyError: pass return path def get_constituent_ID(self,c=0): """Pointwise constituent ID.""" with h5py.File(self.fname,'r') as f: names = f['/mapping/cellResults/constituent']['Name'][:,c].astype('str') return np.array([int(n.split('_')[0]) for n in names.tolist()],dtype=np.int32) def get_crystal_structure(self): # ToDo: extension to multi constituents/phase """Info about the crystal structure.""" with h5py.File(self.fname,'r') as f: return f[self.get_dataset_location('orientation')[0]].attrs['Lattice'].astype('str') # np.bytes_ to string def read_dataset(self,path,c=0,plain=False): """ Dataset for all points/cells. If more than one path is given, the dataset is composed of the individual contributions. """ with h5py.File(self.fname,'r') as f: shape = (self.Nmaterialpoints,) + np.shape(f[path[0]])[1:] if len(shape) == 1: shape = shape +(1,) dataset = np.full(shape,np.nan,dtype=np.dtype(f[path[0]])) for pa in path: label = pa.split('/')[2] if pa.split('/')[1] == 'geometry': dataset = np.array(f[pa]) continue p = np.where(f['mapping/cellResults/constituent'][:,c]['Name'] == str.encode(label))[0] if len(p)>0: u = (f['mapping/cellResults/constituent']['Position'][p,c]) a = np.array(f[pa]) if len(a.shape) == 1: a=a.reshape([a.shape[0],1]) dataset[p,:] = a[u,:] p = np.where(f['mapping/cellResults/materialpoint']['Name'] == str.encode(label))[0] if len(p)>0: u = (f['mapping/cellResults/materialpoint']['Position'][p.tolist()]) a = np.array(f[pa]) if len(a.shape) == 1: a=a.reshape([a.shape[0],1]) dataset[p,:] = a[u,:] if plain and dataset.dtype.names is not None: return dataset.view(('float64',len(dataset.dtype.names))) else: return dataset def cell_coordinates(self): """Return initial coordinates of the cell centers.""" if self.structured: return grid_filters.cell_coord0(self.grid,self.size,self.origin).reshape(-1,3,order='F') else: with h5py.File(self.fname,'r') as f: return f['geometry/x_c'][()] def node_coordinates(self): """Return initial coordinates of the cell centers.""" if self.structured: return grid_filters.node_coord0(self.grid,self.size,self.origin).reshape(-1,3,order='F') else: with h5py.File(self.fname,'r') as f: return f['geometry/x_n'][()] @staticmethod def _add_absolute(x): return { 'data': np.abs(x['data']), 'label': '|{}|'.format(x['label']), 'meta': { 'Unit': x['meta']['Unit'], 'Description': 'Absolute value of {} ({})'.format(x['label'],x['meta']['Description']), 'Creator': inspect.stack()[0][3][1:] } } def add_absolute(self,x): """ Add absolute value. Parameters ---------- x : str Label of scalar, vector, or tensor dataset to take absolute value of. """ self._add_generic_pointwise(self._add_absolute,{'x':x}) @staticmethod def _add_calculation(**kwargs): formula = kwargs['formula'] for d in re.findall(r'#(.*?)#',formula): formula = formula.replace('#{}#'.format(d),"kwargs['{}']['data']".format(d)) return { 'data': eval(formula), 'label': kwargs['label'], 'meta': { 'Unit': kwargs['unit'], 'Description': '{} (formula: {})'.format(kwargs['description'],kwargs['formula']), 'Creator': inspect.stack()[0][3][1:] } } def add_calculation(self,label,formula,unit='n/a',description=None,vectorized=True): """ Add result of a general formula. Parameters ---------- label : str Label of resulting dataset. formula : str Formula to calculate resulting dataset. Existing datasets are referenced by ‘#TheirLabel#‘. unit : str, optional Physical unit of the result. description : str, optional Human-readable description of the result. vectorized : bool, optional Indicate whether the formula can be used in vectorized form. Defaults to ‘True’. """ if not vectorized: raise NotImplementedError dataset_mapping = {d:d for d in set(re.findall(r'#(.*?)#',formula))} # datasets used in the formula args = {'formula':formula,'label':label,'unit':unit,'description':description} self._add_generic_pointwise(self._add_calculation,dataset_mapping,args) @staticmethod def _add_Cauchy(P,F): return { 'data': mechanics.Cauchy(P['data'],F['data']), 'label': 'sigma', 'meta': { 'Unit': P['meta']['Unit'], 'Description': 'Cauchy stress calculated from {} ({}) '.format(P['label'], P['meta']['Description'])+\ 'and {} ({})'.format(F['label'],F['meta']['Description']), 'Creator': inspect.stack()[0][3][1:] } } def add_Cauchy(self,P='P',F='F'): """ Add Cauchy stress calculated from first Piola-Kirchhoff stress and deformation gradient. Parameters ---------- P : str, optional Label of the dataset containing the first Piola-Kirchhoff stress. Defaults to ‘P’. F : str, optional Label of the dataset containing the deformation gradient. Defaults to ‘F’. """ self._add_generic_pointwise(self._add_Cauchy,{'P':P,'F':F}) @staticmethod def _add_determinant(T): return { 'data': np.linalg.det(T['data']), 'label': 'det({})'.format(T['label']), 'meta': { 'Unit': T['meta']['Unit'], 'Description': 'Determinant of tensor {} ({})'.format(T['label'],T['meta']['Description']), 'Creator': inspect.stack()[0][3][1:] } } def add_determinant(self,T): """ Add the determinant of a tensor. Parameters ---------- T : str Label of tensor dataset. """ self._add_generic_pointwise(self._add_determinant,{'T':T}) @staticmethod def _add_deviator(T): return { 'data': mechanics.deviatoric_part(T['data']), 'label': 's_{}'.format(T['label']), 'meta': { 'Unit': T['meta']['Unit'], 'Description': 'Deviator of tensor {} ({})'.format(T['label'],T['meta']['Description']), 'Creator': inspect.stack()[0][3][1:] } } def add_deviator(self,T): """ Add the deviatoric part of a tensor. Parameters ---------- T : str Label of tensor dataset. """ self._add_generic_pointwise(self._add_deviator,{'T':T}) @staticmethod def _add_eigenvalue(T_sym): return { 'data': mechanics.eigenvalues(T_sym['data']), 'label': 'lambda({})'.format(T_sym['label']), 'meta' : { 'Unit': T_sym['meta']['Unit'], 'Description': 'Eigenvalues of {} ({})'.format(T_sym['label'],T_sym['meta']['Description']), 'Creator': inspect.stack()[0][3][1:] } } def add_eigenvalues(self,T_sym): """ Add eigenvalues of symmetric tensor. Parameters ---------- T_sym : str Label of symmetric tensor dataset. """ self._add_generic_pointwise(self._add_eigenvalue,{'T_sym':T_sym}) @staticmethod def _add_eigenvector(T_sym): return { 'data': mechanics.eigenvectors(T_sym['data']), 'label': 'v({})'.format(T_sym['label']), 'meta' : { 'Unit': '1', 'Description': 'Eigenvectors of {} ({})'.format(T_sym['label'],T_sym['meta']['Description']), 'Creator': inspect.stack()[0][3][1:] } } def add_eigenvectors(self,T_sym): """ Add eigenvectors of symmetric tensor. Parameters ---------- T_sym : str Label of symmetric tensor dataset. """ self._add_generic_pointwise(self._add_eigenvector,{'T_sym':T_sym}) @staticmethod def _add_IPFcolor(q,l): d = np.array(l) d_unit = d/np.linalg.norm(d) m = util.scale_to_coprime(d) colors = np.empty((len(q['data']),3),np.uint8) lattice = q['meta']['Lattice'] for i,qu in enumerate(q['data']): o = Orientation(np.array([qu['w'],qu['x'],qu['y'],qu['z']]),lattice).reduced() colors[i] = np.uint8(o.IPFcolor(d_unit)*255) return { 'data': colors, 'label': 'IPFcolor_[{} {} {}]'.format(*m), 'meta' : { 'Unit': 'RGB (8bit)', 'Lattice': lattice, 'Description': 'Inverse Pole Figure (IPF) colors along sample direction [{} {} {}]'.format(*m), 'Creator': inspect.stack()[0][3][1:] } } def add_IPFcolor(self,q,l): """ Add RGB color tuple of inverse pole figure (IPF) color. Parameters ---------- q : str Label of the dataset containing the crystallographic orientation as quaternions. l : numpy.array of shape (3) Lab frame direction for inverse pole figure. """ self._add_generic_pointwise(self._add_IPFcolor,{'q':q},{'l':l}) @staticmethod def _add_maximum_shear(T_sym): return { 'data': mechanics.maximum_shear(T_sym['data']), 'label': 'max_shear({})'.format(T_sym['label']), 'meta': { 'Unit': T_sym['meta']['Unit'], 'Description': 'Maximum shear component of {} ({})'.format(T_sym['label'],T_sym['meta']['Description']), 'Creator': inspect.stack()[0][3][1:] } } def add_maximum_shear(self,T_sym): """ Add maximum shear components of symmetric tensor. Parameters ---------- T_sym : str Label of symmetric tensor dataset. """ self._add_generic_pointwise(self._add_maximum_shear,{'T_sym':T_sym}) @staticmethod def _add_Mises(T_sym): t = 'strain' if T_sym['meta']['Unit'] == '1' else \ 'stress' return { 'data': mechanics.Mises_strain(T_sym['data']) if t=='strain' else mechanics.Mises_stress(T_sym['data']), 'label': '{}_vM'.format(T_sym['label']), 'meta': { 'Unit': T_sym['meta']['Unit'], 'Description': 'Mises equivalent {} of {} ({})'.format(t,T_sym['label'],T_sym['meta']['Description']), 'Creator': inspect.stack()[0][3][1:] } } def add_Mises(self,T_sym): """ Add the equivalent Mises stress or strain of a symmetric tensor. Parameters ---------- T_sym : str Label of symmetric tensorial stress or strain dataset. """ self._add_generic_pointwise(self._add_Mises,{'T_sym':T_sym}) @staticmethod def _add_norm(x,ord): o = ord if len(x['data'].shape) == 2: axis = 1 t = 'vector' if o is None: o = 2 elif len(x['data'].shape) == 3: axis = (1,2) t = 'tensor' if o is None: o = 'fro' else: raise ValueError return { 'data': np.linalg.norm(x['data'],ord=o,axis=axis,keepdims=True), 'label': '|{}|_{}'.format(x['label'],o), 'meta': { 'Unit': x['meta']['Unit'], 'Description': '{}-norm of {} {} ({})'.format(o,t,x['label'],x['meta']['Description']), 'Creator': inspect.stack()[0][3][1:] } } def add_norm(self,x,ord=None): """ Add the norm of vector or tensor. Parameters ---------- x : str Label of vector or tensor dataset. ord : {non-zero int, inf, -inf, ‘fro’, ‘nuc’}, optional Order of the norm. inf means NumPy’s inf object. For details refer to numpy.linalg.norm. """ self._add_generic_pointwise(self._add_norm,{'x':x},{'ord':ord}) @staticmethod def _add_PK2(P,F): return { 'data': mechanics.PK2(P['data'],F['data']), 'label': 'S', 'meta': { 'Unit': P['meta']['Unit'], 'Description': '2. Kirchhoff stress calculated from {} ({}) '.format(P['label'], P['meta']['Description'])+\ 'and {} ({})'.format(F['label'],F['meta']['Description']), 'Creator': inspect.stack()[0][3][1:] } } def add_PK2(self,P='P',F='F'): """ Add second Piola-Kirchhoff calculated from first Piola-Kirchhoff stress and deformation gradient. Parameters ---------- P : str, optional Label first Piola-Kirchhoff stress dataset. Defaults to ‘P’. F : str, optional Label of deformation gradient dataset. Defaults to ‘F’. """ self._add_generic_pointwise(self._add_PK2,{'P':P,'F':F}) @staticmethod def _add_pole(q,p,polar): pole = np.array(p) unit_pole = pole/np.linalg.norm(pole) m = util.scale_to_coprime(pole) rot = Rotation(q['data'].view(np.double).reshape(-1,4)) rotatedPole = rot @ np.broadcast_to(unit_pole,rot.shape+(3,)) # rotate pole according to crystal orientation xy = rotatedPole[:,0:2]/(1.+abs(unit_pole[2])) # stereographic projection coords = xy if not polar else \ np.block([np.sqrt(xy[:,0:1]*xy[:,0:1]+xy[:,1:2]*xy[:,1:2]),np.arctan2(xy[:,1:2],xy[:,0:1])]) return { 'data': coords, 'label': 'p^{}_[{} {} {})'.format(u'rφ' if polar else 'xy',*m), 'meta' : { 'Unit': '1', 'Description': '{} coordinates of stereographic projection of pole (direction/plane) in crystal frame'\ .format('Polar' if polar else 'Cartesian'), 'Creator': inspect.stack()[0][3][1:] } } def add_pole(self,q,p,polar=False): """ Add coordinates of stereographic projection of given pole in crystal frame. Parameters ---------- q : str Label of the dataset containing the crystallographic orientation as quaternions. p : numpy.array of shape (3) Crystallographic direction or plane. polar : bool, optional Give pole in polar coordinates. Defaults to False. """ self._add_generic_pointwise(self._add_pole,{'q':q},{'p':p,'polar':polar}) @staticmethod def _add_rotational_part(F): return { 'data': mechanics.rotational_part(F['data']), 'label': 'R({})'.format(F['label']), 'meta': { 'Unit': F['meta']['Unit'], 'Description': 'Rotational part of {} ({})'.format(F['label'],F['meta']['Description']), 'Creator': inspect.stack()[0][3][1:] } } def add_rotational_part(self,F): """ Add rotational part of a deformation gradient. Parameters ---------- F : str, optional Label of deformation gradient dataset. """ self._add_generic_pointwise(self._add_rotational_part,{'F':F}) @staticmethod def _add_spherical(T): return { 'data': mechanics.spherical_part(T['data']), 'label': 'p_{}'.format(T['label']), 'meta': { 'Unit': T['meta']['Unit'], 'Description': 'Spherical component of tensor {} ({})'.format(T['label'],T['meta']['Description']), 'Creator': inspect.stack()[0][3][1:] } } def add_spherical(self,T): """ Add the spherical (hydrostatic) part of a tensor. Parameters ---------- T : str Label of tensor dataset. """ self._add_generic_pointwise(self._add_spherical,{'T':T}) @staticmethod def _add_strain_tensor(F,t,m): return { 'data': mechanics.strain_tensor(F['data'],t,m), 'label': 'epsilon_{}^{}({})'.format(t,m,F['label']), 'meta': { 'Unit': F['meta']['Unit'], 'Description': 'Strain tensor of {} ({})'.format(F['label'],F['meta']['Description']), 'Creator': inspect.stack()[0][3][1:] } } def add_strain_tensor(self,F='F',t='V',m=0.0): """ Add strain tensor of a deformation gradient. For details refer to damask.mechanics.strain_tensor Parameters ---------- F : str, optional Label of deformation gradient dataset. Defaults to ‘F’. t : {‘V’, ‘U’}, optional Type of the polar decomposition, ‘V’ for left stretch tensor and ‘U’ for right stretch tensor. Defaults to ‘V’. m : float, optional Order of the strain calculation. Defaults to ‘0.0’. """ self._add_generic_pointwise(self._add_strain_tensor,{'F':F},{'t':t,'m':m}) @staticmethod def _add_stretch_tensor(F,t): return { 'data': mechanics.left_stretch(F['data']) if t == 'V' else mechanics.right_stretch(F['data']), 'label': '{}({})'.format(t,F['label']), 'meta': { 'Unit': F['meta']['Unit'], 'Description': '{} stretch tensor of {} ({})'.format('Left' if t == 'V' else 'Right', F['label'],F['meta']['Description']), 'Creator': inspect.stack()[0][3][1:] } } def add_stretch_tensor(self,F='F',t='V'): """ Add stretch tensor of a deformation gradient. Parameters ---------- F : str, optional Label of deformation gradient dataset. Defaults to ‘F’. t : {‘V’, ‘U’}, optional Type of the polar decomposition, ‘V’ for left stretch tensor and ‘U’ for right stretch tensor. Defaults to ‘V’. """ self._add_generic_pointwise(self._add_stretch_tensor,{'F':F},{'t':t}) def _job(self,group,func,datasets,args,lock): """Execute job for _add_generic_pointwise.""" try: datasets_in = {} lock.acquire() with h5py.File(self.fname,'r') as f: for arg,label in datasets.items(): loc = f[group+'/'+label] datasets_in[arg]={'data' :loc[()], 'label':label, 'meta': {k:v.decode() for k,v in loc.attrs.items()}} lock.release() r = func(**datasets_in,**args) return [group,r] except Exception as err: print('Error during calculation: {}.'.format(err)) return None def _add_generic_pointwise(self,func,datasets,args={}): """ General function to add pointwise data. Parameters ---------- func : function Callback function that calculates a new dataset from one or more datasets per HDF5 group. datasets : dictionary Details of the datasets to be used: label (in HDF5 file) and arg (argument to which the data is parsed in func). args : dictionary, optional Arguments parsed to func. """ pool = multiprocessing.Pool(int(Environment().options['DAMASK_NUM_THREADS'])) lock = multiprocessing.Manager().Lock() groups = self.groups_with_datasets(datasets.values()) default_arg = partial(self._job,func=func,datasets=datasets,args=args,lock=lock) for result in util.show_progress(pool.imap_unordered(default_arg,groups),len(groups)): if not result: continue lock.acquire() with h5py.File(self.fname, 'a') as f: try: if self._allow_overwrite and result[0]+'/'+result[1]['label'] in f: dataset = f[result[0]+'/'+result[1]['label']] dataset[...] = result[1]['data'] dataset.attrs['Overwritten'] = 'Yes'.encode() else: dataset = f[result[0]].create_dataset(result[1]['label'],data=result[1]['data']) now = datetime.datetime.now().astimezone() dataset.attrs['Created'] = now.strftime('%Y-%m-%d %H:%M:%S%z').encode() for l,v in result[1]['meta'].items(): dataset.attrs[l]=v.encode() creator = 'damask.Result.{} v{}'.format(dataset.attrs['Creator'].decode(),version) dataset.attrs['Creator'] = creator.encode() except (OSError,RuntimeError) as err: print('Could not add dataset: {}.'.format(err)) lock.release() pool.close() pool.join() def write_XDMF(self): """ Write XDMF file to directly visualize data in DADF5 file. This works only for scalar, 3-vector and 3x3-tensor data. Selection is not taken into account. """ if len(self.constituents) != 1 or not self.structured: raise NotImplementedError('XDMF only available for grid results with 1 constituent.') xdmf=ET.Element('Xdmf') xdmf.attrib={'Version': '2.0', 'xmlns:xi': 'http://www.w3.org/2001/XInclude'} domain=ET.SubElement(xdmf, 'Domain') collection = ET.SubElement(domain, 'Grid') collection.attrib={'GridType': 'Collection', 'CollectionType': 'Temporal'} time = ET.SubElement(collection, 'Time') time.attrib={'TimeType': 'List'} time_data = ET.SubElement(time, 'DataItem') time_data.attrib={'Format': 'XML', 'NumberType': 'Float', 'Dimensions': '{}'.format(len(self.times))} time_data.text = ' '.join(map(str,self.times)) attributes = [] data_items = [] for inc in self.increments: grid=ET.SubElement(collection,'Grid') grid.attrib = {'GridType': 'Uniform', 'Name': inc} topology=ET.SubElement(grid, 'Topology') topology.attrib={'TopologyType': '3DCoRectMesh', 'Dimensions': '{} {} {}'.format(*self.grid+1)} geometry=ET.SubElement(grid, 'Geometry') geometry.attrib={'GeometryType':'Origin_DxDyDz'} origin=ET.SubElement(geometry, 'DataItem') origin.attrib={'Format': 'XML', 'NumberType': 'Float', 'Dimensions': '3'} origin.text="{} {} {}".format(*self.origin) delta=ET.SubElement(geometry, 'DataItem') delta.attrib={'Format': 'XML', 'NumberType': 'Float', 'Dimensions': '3'} delta.text="{} {} {}".format(*(self.size/self.grid)) with h5py.File(self.fname,'r') as f: attributes.append(ET.SubElement(grid, 'Attribute')) attributes[-1].attrib={'Name': 'u', 'Center': 'Node', 'AttributeType': 'Vector'} data_items.append(ET.SubElement(attributes[-1], 'DataItem')) data_items[-1].attrib={'Format': 'HDF', 'Precision': '8', 'Dimensions': '{} {} {} 3'.format(*(self.grid+1))} data_items[-1].text='{}:/{}/geometry/u_n'.format(os.path.split(self.fname)[1],inc) for o,p in zip(['constituents','materialpoints'],['con_physics','mat_physics']): for oo in getattr(self,o): for pp in getattr(self,p): g = '/'.join([inc,o[:-1],oo,pp]) for l in f[g]: name = '/'.join([g,l]) shape = f[name].shape[1:] dtype = f[name].dtype prec = f[name].dtype.itemsize if (shape not in [(1,), (3,), (3,3)]) or dtype != np.float64: continue attributes.append(ET.SubElement(grid, 'Attribute')) attributes[-1].attrib={'Name': '{}'.format(name.split('/',2)[2]), 'Center': 'Cell', 'AttributeType': 'Tensor'} data_items.append(ET.SubElement(attributes[-1], 'DataItem')) data_items[-1].attrib={'Format': 'HDF', 'NumberType': 'Float', 'Precision': '{}'.format(prec), 'Dimensions': '{} {} {} {}'.format(*self.grid,np.prod(shape))} data_items[-1].text='{}:{}'.format(os.path.split(self.fname)[1],name) with open(os.path.splitext(self.fname)[0]+'.xdmf','w') as f: f.write(xml.dom.minidom.parseString(ET.tostring(xdmf).decode()).toprettyxml()) def to_vtk(self,labels=[],mode='cell'): """ Export to vtk cell/point data. Parameters ---------- labels : str or list of, optional Labels of the datasets to be exported. mode : str, either 'cell' or 'point' Export in cell format or point format. Defaults to 'cell'. """ if mode.lower()=='cell': if self.structured: v = VTK.from_rectilinearGrid(self.grid,self.size,self.origin) else: with h5py.File(self.fname,'r') as f: v = VTK.from_unstructuredGrid(f['/geometry/x_n'][()], f['/geometry/T_c'][()]-1, f['/geometry/T_c'].attrs['VTK_TYPE'].decode()) elif mode.lower()=='point': v = VTK.from_polyData(self.cell_coordinates()) N_digits = int(np.floor(np.log10(int(self.increments[-1][3:]))))+1 for inc in util.show_progress(self.iterate('increments'),len(self.selection['increments'])): materialpoints_backup = self.selection['materialpoints'].copy() self.pick('materialpoints',False) for label in (labels if isinstance(labels,list) else [labels]): for p in self.iterate('con_physics'): if p != 'generic': for c in self.iterate('constituents'): x = self.get_dataset_location(label) if len(x) == 0: continue array = self.read_dataset(x,0) v.add(array,'1_'+x[0].split('/',1)[1]) #ToDo: hard coded 1! else: x = self.get_dataset_location(label) if len(x) == 0: continue array = self.read_dataset(x,0) ph_name = re.compile(r'(?<=(constituent\/))(.*?)(?=(generic))') # identify phase name dset_name = '1_' + re.sub(ph_name,r'',x[0].split('/',1)[1]) # removing phase name v.add(array,dset_name) self.pick('materialpoints',materialpoints_backup) constituents_backup = self.selection['constituents'].copy() self.pick('constituents',False) for label in (labels if isinstance(labels,list) else [labels]): for p in self.iterate('mat_physics'): if p != 'generic': for m in self.iterate('materialpoints'): x = self.get_dataset_location(label) if len(x) == 0: continue array = self.read_dataset(x,0) v.add(array,'1_'+x[0].split('/',1)[1]) #ToDo: why 1_? else: x = self.get_dataset_location(label) if len(x) == 0: continue array = self.read_dataset(x,0) v.add(array,'1_'+x[0].split('/',1)[1]) self.pick('constituents',constituents_backup) u = self.read_dataset(self.get_dataset_location('u_n' if mode.lower() == 'cell' else 'u_p')) v.add(u,'u') file_out = '{}_inc{}'.format(os.path.splitext(os.path.basename(self.fname))[0], inc[3:].zfill(N_digits)) v.write(file_out) ################################################################################################### # BEGIN DEPRECATED def set_by_time(self,start,end): """ Set active increments based on start and end time. Parameters ---------- start : float start time (included) end : float end time (included) """ self._manage_selection('set','times',self.times_in_range(start,end))