polishing

This commit is contained in:
Martin Diehl 2020-02-21 23:16:25 +01:00
parent e9bf02a52c
commit f604314207
1 changed files with 46 additions and 36 deletions

View File

@ -949,21 +949,20 @@ class DADF5():
def _job(self,group,func,datasets,args,lock): def _job(self,group,func,datasets,args,lock):
def _read(group,datasets,lock): """
Execute job for _add_generic_pointwise
"""
try:
datasets_in = {} datasets_in = {}
lock.acquire() lock.acquire()
with h5py.File(self.fname,'r') as f: with h5py.File(self.fname,'r') as f:
for k,v in datasets.items(): for arg,label in datasets.items():
loc = f[group+'/'+v] loc = f[group+'/'+label]
datasets_in[k]={'data':loc[()], datasets_in[arg]={'data' :loc[()],
'label':v, 'label':label,
'meta':{k2:v2.decode() for k2,v2 in loc.attrs.items()}} 'meta': {k:v.decode() for k,v in loc.attrs.items()}}
lock.release() lock.release()
return datasets_in r = func(**datasets_in,**args)
try:
d = _read(group,datasets,lock)
r = func(**d,**args)
return [group,r] return [group,r]
except Exception as err: except Exception as err:
print('Error during calculation: {}.'.format(err)) print('Error during calculation: {}.'.format(err))
@ -971,18 +970,28 @@ class DADF5():
def _add_generic_pointwise(self,func,datasets,args={}): def _add_generic_pointwise(self,func,datasets,args={}):
"""
General function to add pointwise data.
env = Environment() Parameters
N_threads = int(env.options['DAMASK_NUM_THREADS']) ----------
func : function
Callback function that calculates a new dataset from one or more datasets per HDF5 group.
datasets : dictionary
Details of the datasets to be used: label (in HDF5 file) and arg (argument to which the data is parsed in func).
args : dictionary, optional
Arguments parsed to func.
"""
N_threads = int(Environment().options['DAMASK_NUM_THREADS'])
pool = multiprocessing.Pool(N_threads) pool = multiprocessing.Pool(N_threads)
m = multiprocessing.Manager() lock = multiprocessing.Manager().Lock()
lock = m.Lock()
groups = self.groups_with_datasets(datasets.values()) groups = self.groups_with_datasets(datasets.values())
default_arg = partial(self._job,func=func,datasets=datasets,args=args,lock=lock) default_arg = partial(self._job,func=func,datasets=datasets,args=args,lock=lock)
util.progressBar(iteration=0,total=len(groups)-1)
util.progressBar(iteration=0,total=len(groups))
for i,result in enumerate(pool.imap_unordered(default_arg,groups)): for i,result in enumerate(pool.imap_unordered(default_arg,groups)):
util.progressBar(iteration=i,total=len(groups)-1) util.progressBar(iteration=i+1,total=len(groups))
if not result: continue if not result: continue
lock.acquire() lock.acquire()
with h5py.File(self.fname, 'a') as f: with h5py.File(self.fname, 'a') as f:
@ -993,6 +1002,7 @@ class DADF5():
except OSError as err: except OSError as err:
print('Could not add dataset: {}.'.format(err)) print('Could not add dataset: {}.'.format(err))
lock.release() lock.release()
pool.close() pool.close()
pool.join() pool.join()