from queue import Queue import re import h5py import numpy as np from . import util # ------------------------------------------------------------------ class DADF5(): """ Read and write to DADF5 files. DADF5 files contain DAMASK results. """ # ------------------------------------------------------------------ def __init__(self, filename, mode = 'r', ): """ Opens an existing DADF5 file. Parameters ---------- filename : str name of the DADF5 file to be openend. mode : str, optional filemode for opening, either 'r' or 'a'. """ if mode not in ['a','r']: print('Invalid file access mode') else: with h5py.File(filename,mode): pass with h5py.File(filename,'r') as f: if f.attrs['DADF5-major'] != 0 or f.attrs['DADF5-minor'] != 2: raise TypeError('Unsupported DADF5 version {} '.format(f.attrs['DADF5-version'])) self.structured = 'grid' in f['geometry'].attrs.keys() if self.structured: self.grid = f['geometry'].attrs['grid'] self.size = f['geometry'].attrs['size'] r=re.compile('inc[0-9]+') self.increments = [{'inc': int(u[3:]), 'time': round(f[u].attrs['time/s'],12), } for u in f.keys() if r.match(u)] self.constituents = np.unique(f['mapping/cellResults/constituent']['Name']).tolist() # ToDo: I am not to happy with the name self.constituents = [c.decode() for c in self.constituents] self.materialpoints = np.unique(f['mapping/cellResults/materialpoint']['Name']).tolist() # ToDo: I am not to happy with the name self.materialpoints = [m.decode() for m in self.materialpoints] self.Nconstituents = [i for i in range(np.shape(f['mapping/cellResults/constituent'])[1])] self.Nmaterialpoints = np.shape(f['mapping/cellResults/constituent'])[0] self.c_output_types = [] for c in self.constituents: for o in f['inc{:05}/constituent/{}'.format(self.increments[0]['inc'],c)].keys(): self.c_output_types.append(o) self.c_output_types = list(set(self.c_output_types)) # make unique self.m_output_types = [] for m in self.materialpoints: for o in f['inc{:05}/materialpoint/{}'.format(self.increments[0]['inc'],m)].keys(): self.m_output_types.append(o) self.m_output_types = list(set(self.m_output_types)) # make unique self.active= {'increments': self.increments, 'constituents': self.constituents, 'materialpoints': self.materialpoints, 'constituent': self.Nconstituents, 'c_output_types': self.c_output_types, 'm_output_types': self.m_output_types} self.filename = filename self.mode = mode def get_candidates(self,l): groups = [] if type(l) is not list: print('mist') with h5py.File(self.filename,'r') as f: for g in self.get_active_groups(): if set(l).issubset(f[g].keys()): groups.append(g) return groups def get_active_groups(self): groups = [] for i,x in enumerate(self.active['increments']): group_inc = 'inc{:05}'.format(self.active['increments'][i]['inc']) for c in self.active['constituents']: group_constituent = group_inc+'/constituent/'+c for t in self.active['c_output_types']: group_output_types = group_constituent+'/'+t groups.append(group_output_types) for m in self.active['materialpoints']: group_materialpoint = group_inc+'/materialpoint/'+m for t in self.active['m_output_types']: group_output_types = group_materialpoint+'/'+t groups.append(group_output_types) return groups def list_data(self): """Shows information on all datasets in the file.""" with h5py.File(self.filename,'r') as f: group_inc = 'inc{:05}'.format(self.active['increments'][0]['inc']) for c in self.active['constituents']: print('\n'+c) group_constituent = group_inc+'/constituent/'+c for t in self.active['c_output_types']: print(' {}'.format(t)) group_output_types = group_constituent+'/'+t try: for x in f[group_output_types].keys(): print(' {} ({})'.format(x,f[group_output_types+'/'+x].attrs['Description'].decode())) except KeyError: pass for m in self.active['materialpoints']: group_materialpoint = group_inc+'/materialpoint/'+m for t in self.active['m_output_types']: print(' {}'.format(t)) group_output_types = group_materialpoint+'/'+t try: for x in f[group_output_types].keys(): print(' {} ({})'.format(x,f[group_output_types+'/'+x].attrs['Description'].decode())) except KeyError: pass def get_dataset_location(self,label): """Returns the location of all active datasets with given label.""" path = [] with h5py.File(self.filename,'r') as f: for i in self.active['increments']: group_inc = 'inc{:05}'.format(i['inc']) for c in self.active['constituents']: group_constituent = group_inc+'/constituent/'+c for t in self.active['c_output_types']: try: f[group_constituent+'/'+t+'/'+label] path.append(group_constituent+'/'+t+'/'+label) except KeyError as e: print('unable to locate constituents dataset: '+ str(e)) for m in self.active['materialpoints']: group_materialpoint = group_inc+'/materialpoint/'+m for t in self.active['m_output_types']: try: f[group_materialpoint+'/'+t+'/'+label] path.append(group_materialpoint+'/'+t+'/'+label) except KeyError as e: print('unable to locate materialpoints dataset: '+ str(e)) return path def read_dataset(self,path,c): """ Dataset for all points/cells. If more than one path is given, the dataset is composed of the individual contributions """ with h5py.File(self.filename,'r') as f: shape = (self.Nmaterialpoints,) + np.shape(f[path[0]])[1:] if len(shape) == 1: shape = shape +(1,) dataset = np.full(shape,np.nan) for pa in path: label = pa.split('/')[2] try: p = np.where(f['mapping/cellResults/constituent'][:,c]['Name'] == str.encode(label))[0] u = (f['mapping/cellResults/constituent'][p,c]['Position']) a = np.array(f[pa]) if len(a.shape) == 1: a=a.reshape([a.shape[0],1]) dataset[p,:] = a[u,:] except KeyError as e: print('unable to read constituent: '+ str(e)) try: p = np.where(f['mapping/cellResults/materialpoint']['Name'] == str.encode(label))[0] u = (f['mapping/cellResults/materialpoint'][p.tolist()]['Position']) a = np.array(f[pa]) if len(a.shape) == 1: a=a.reshape([a.shape[0],1]) dataset[p,:] = a[u,:] except KeyError as e: print('unable to read materialpoint: '+ str(e)) return dataset def add_Cauchy(self,P='P',F='F'): """ Adds Cauchy stress calculated from 1st Piola-Kirchhoff stress and deformation gradient. Todo ---- The einsum formula is completely untested! """ def Cauchy(F,P): return np.einsum('i,ijk,ilk->ijl',1.0/np.linalg.det(F),F,P) args = [{'label':F,'shape':[3,3],'unit':'-'}, {'label':P,'shape':[3,3],'unit':'Pa'} ] result = {'label':'sigma', 'unit':'Pa', 'Description': 'Cauchy stress calculated from 1st Piola-Kirchhoff stress and deformation gradient'} self.add_generic_pointwise_vectorized(Cauchy,args,result) def add_Mises_stress(self,stress='sigma'): """Adds equivalent von Mises stress.""" def Mises_stress(stress): dev = stress - np.trace(stress)/3.0*np.eye(3) symdev = 0.5*(dev+dev.T) return np.sqrt(np.sum(symdev*symdev.T)*3.0/2.0) args = [{'label':stress,'shape':[3,3],'unit':'Pa'}] result = {'label':'Mises({})'.format(stress), 'unit':'Pa', 'Description': 'Equivalent Mises stress'} self.add_generic_pointwise(Mises_stress,args,result) def add_norm(self,x,ord=None): """Adds norm of vector or tensor or magnitude of a scalar.""" # ToDo: The output unit should be the input unit args = [{'label':x,'shape':None,'unit':None}] result = {'label':'norm_{}({})'.format(str(ord),x), 'unit':'n/a', 'Description': 'Norm of vector or tensor or magnitude of a scalar. See numpy.linalg.norm manual for details'} self.add_generic_pointwise_vectorized(np.linalg.norm,args,result) def add_determinant(self,a): """Adds the determinant of a tensor.""" # ToDo: The output unit should be the input unit args = [{'label':a,'shape':[3,3],'unit':None}] result = {'label':'det({})'.format(a), 'unit':'n/a', 'Description': 'Determinant of a tensor'} self.add_generic_pointwise_vectorized(np.linalg.det,args,result) def add_spherical(self,a): """Adds the spherical component of a tensor.""" def spherical(m): return (m[0,0]+m[1,1]+m[2,2])/3.0 # ToDo: The output unit should be the input unit args = [{'label':a,'shape':[3,3],'unit':None}] result = {'label':'sph({})'.format(a), 'unit':'n/a', 'Description': 'Spherical component of a tensor'} self.add_generic_pointwise(spherical,args,result) def add_deviator(self,a): """Adds the deviator of a tensor.""" def deviator(m): return m - np.eye(3)*(m[0,0]+m[1,1]+m[2,2])/3.0 # ToDo: The output unit should be the input unit args = [{'label':a,'shape':[3,3],'unit':'Pa'}] result = {'label':'dev({})'.format(a), 'unit':'n/a', 'Description': 'Deviatoric component of a tensor'} self.add_generic_pointwise(deviator,args,result) def add_strain_tensors(self,defgrad='F'): """Adds a strain definition.""" def strain(defgrad): (U,S,Vh) = np.linalg.svd(defgrad) # singular value decomposition R_inv = np.dot(U,Vh).T # inverse rotation of polar decomposition U = np.dot(R_inv,defgrad) # F = RU U = np.where(abs(U) < 1e-12, 0, U) # kill nasty noisy data (D,V) = np.linalg.eig(U) # eigen decomposition (of symmetric matrix) neg = np.where(D < 0.0) # find negative eigenvalues ... D[neg] *= -1. # ... flip value ... V[:,neg] *= -1. # ... and vector for i,eigval in enumerate(D): if np.dot(V[:,i],V[:,(i+1)%3]) != 0.0: # check each vector for orthogonality V[:,(i+1)%3] = np.cross(V[:,(i+2)%3],V[:,i]) # correct next vector V[:,(i+1)%3] /= np.sqrt(np.dot(V[:,(i+1)%3],V[:,(i+1)%3].conj())) # and renormalize (hyperphobic?) d = np.log(D) # operate on eigenvalues of U o r V return np.dot(V,np.dot(np.diag(d),V.T)).real # build tensor back from eigenvalue/vector basis # ToDo: The output unit should be the input unit args = [{'label':defgrad,'shape':[3,3],'unit':None}] result = {'label':'strain({})'.format(defgrad), 'unit':'-', 'Description': 'strain (ln(V)) of a deformation gradient'} self.add_generic_pointwise(strain,args,result) def get_fitting(self,data): groups = [] if type(data) is not list: print('mist') with h5py.File(self.filename,'r') as f: for g in self.get_candidates([l['label'] for l in data]): print(g) fits = True for d in data: # ToDo: check for unit if d['shape'] is not None: fits = fits and np.all(np.array(f[g+'/'+d['label']].shape[1:]) == np.array(d['shape'])) if fits: groups.append(g) return groups def add_generic_pointwise(self,func,args,result): """ General function to add pointwise data. function 'func' first needs to have data arguments before other arguments Works for functions that are pointwise defined. """ groups = self.get_fitting(args) def job(args): out = args['out'] datasets_in = args['dat'] func = args['fun'] for i in range(out.shape[0]): arg = tuple([d[i,] for d in datasets_in]) out[i,] = func(*arg) args['results'].put({'out':out,'group':args['group']}) Nthreads = 4 # ToDo: should be a parameter results = Queue(Nthreads+1) todo = [] for g in groups: with h5py.File(self.filename,'r') as f: datasets_in = [f[g+'/'+u['label']][()] for u in args] # figure out dimension of results testArg = tuple([d[0,] for d in datasets_in]) # to call function with first point out = np.empty([datasets_in[0].shape[0]] + list(func(*testArg).shape)) # shape is Npoints x shape of the results for one point todo.append({'dat':datasets_in,'fun':func,'out':out,'group':g,'results':results}) # Instantiate a thread pool with worker threads pool = util.ThreadPool(Nthreads) missingResults = len(todo) # Add the jobs in bulk to the thread pool. Alternatively you could use # `pool.add_task` to add single jobs. The code will block here, which # makes it possible to cancel the thread pool with an exception when # the currently running batch of workers is finished pool.map(job, todo[:Nthreads+1]) i = 0 while missingResults > 0: r=results.get() # noqa print(r['group']) with h5py.File(self.filename,'r+') as f: dataset_out = f[r['group']].create_dataset(result['label'],data=r['out']) dataset_out.attrs['Unit'] = result['unit'] dataset_out.attrs['Description'] = result['Description'] dataset_out.attrs['Creator'] = 'dadf5.py v{}'.format('n/a') missingResults-=1 try: pool.add_task(job,todo[Nthreads+1+i]) except IndexError: pass i+=1 pool.wait_completion() def add_generic_pointwise_vectorized(self,func,args,result): """ General function to add pointwise data. function 'func' first needs to have data arguments before other arguments Works for vectorized functions. """ groups = self.get_fitting(args) def job(args): out = args['out'] datasets_in = args['dat'] func = args['fun'] out = func(*datasets_in) args['results'].put({'out':out,'group':args['group']}) Nthreads = 4 # ToDo: should be a parameter results = Queue(Nthreads+1) todo = [] for g in groups: with h5py.File(self.filename,'r') as f: datasets_in = [f[g+'/'+u['label']][()] for u in args] # figure out dimension of results testArg = tuple([d[0:1,] for d in datasets_in]) # to call function with first point out = np.empty([datasets_in[0].shape[0]] + list(func(*testArg).shape[1:])) # shape is Npoints x shape of the results for one point todo.append({'dat':datasets_in,'fun':func,'out':out,'group':g,'results':results}) # Instantiate a thread pool with worker threads pool = util.ThreadPool(Nthreads) missingResults = len(todo) # Add the jobs in bulk to the thread pool. Alternatively you could use # `pool.add_task` to add single jobs. The code will block here, which # makes it possible to cancel the thread pool with an exception when # the currently running batch of workers is finished pool.map(job, todo[:Nthreads+1]) i = 0 while missingResults > 0: r=results.get() # noqa print(r['group']) with h5py.File(self.filename,'r+') as f: dataset_out = f[r['group']].create_dataset(result['label'],data=r['out']) dataset_out.attrs['Unit'] = result['unit'] dataset_out.attrs['Description'] = result['Description'] dataset_out.attrs['Creator'] = 'dadf5.py v{}'.format('n/a') missingResults-=1 try: pool.add_task(job,todo[Nthreads+1+i]) except IndexError: pass i+=1 pool.wait_completion()