From f604314207d5a698cc386cc8f383a17b16c12029 Mon Sep 17 00:00:00 2001 From: Martin Diehl Date: Fri, 21 Feb 2020 23:16:25 +0100 Subject: [PATCH] polishing --- python/damask/dadf5.py | 82 +++++++++++++++++++++++------------------- 1 file changed, 46 insertions(+), 36 deletions(-) diff --git a/python/damask/dadf5.py b/python/damask/dadf5.py index c13cf527f..f1476af61 100644 --- a/python/damask/dadf5.py +++ b/python/damask/dadf5.py @@ -949,21 +949,20 @@ class DADF5(): def _job(self,group,func,datasets,args,lock): - def _read(group,datasets,lock): - datasets_in = {} - lock.acquire() - with h5py.File(self.fname,'r') as f: - for k,v in datasets.items(): - loc = f[group+'/'+v] - datasets_in[k]={'data':loc[()], - 'label':v, - 'meta':{k2:v2.decode() for k2,v2 in loc.attrs.items()}} - lock.release() - return datasets_in - + """ + Execute job for _add_generic_pointwise + """ try: - d = _read(group,datasets,lock) - r = func(**d,**args) + datasets_in = {} + lock.acquire() + with h5py.File(self.fname,'r') as f: + for arg,label in datasets.items(): + loc = f[group+'/'+label] + datasets_in[arg]={'data' :loc[()], + 'label':label, + 'meta': {k:v.decode() for k,v in loc.attrs.items()}} + lock.release() + r = func(**datasets_in,**args) return [group,r] except Exception as err: print('Error during calculation: {}.'.format(err)) @@ -971,30 +970,41 @@ class DADF5(): def _add_generic_pointwise(self,func,datasets,args={}): + """ + General function to add pointwise data. - env = Environment() - N_threads = int(env.options['DAMASK_NUM_THREADS']) - pool = multiprocessing.Pool(N_threads) - m = multiprocessing.Manager() - lock = m.Lock() + Parameters + ---------- + func : function + Callback function that calculates a new dataset from one or more datasets per HDF5 group. + datasets : dictionary + Details of the datasets to be used: label (in HDF5 file) and arg (argument to which the data is parsed in func). + args : dictionary, optional + Arguments parsed to func. + """ + N_threads = int(Environment().options['DAMASK_NUM_THREADS']) + pool = multiprocessing.Pool(N_threads) + lock = multiprocessing.Manager().Lock() - groups = self.groups_with_datasets(datasets.values()) - default_arg = partial(self._job,func=func,datasets=datasets,args=args,lock=lock) - util.progressBar(iteration=0,total=len(groups)-1) - for i,result in enumerate(pool.imap_unordered(default_arg,groups)): - util.progressBar(iteration=i,total=len(groups)-1) - if not result: continue - lock.acquire() - with h5py.File(self.fname, 'a') as f: - try: - dataset = f[result[0]].create_dataset(result[1]['label'],data=result[1]['data']) - for l,v in result[1]['meta'].items(): - dataset.attrs[l]=v.encode() - except OSError as err: - print('Could not add dataset: {}.'.format(err)) - lock.release() - pool.close() - pool.join() + groups = self.groups_with_datasets(datasets.values()) + default_arg = partial(self._job,func=func,datasets=datasets,args=args,lock=lock) + + util.progressBar(iteration=0,total=len(groups)) + for i,result in enumerate(pool.imap_unordered(default_arg,groups)): + util.progressBar(iteration=i+1,total=len(groups)) + if not result: continue + lock.acquire() + with h5py.File(self.fname, 'a') as f: + try: + dataset = f[result[0]].create_dataset(result[1]['label'],data=result[1]['data']) + for l,v in result[1]['meta'].items(): + dataset.attrs[l]=v.encode() + except OSError as err: + print('Could not add dataset: {}.'.format(err)) + lock.release() + + pool.close() + pool.join() def to_vtk(self,labels,mode='cell'):