no multiprocessing for adding datasets
multiprocessing was not reliable on different platform and caused all kinds of weird behavior
This commit is contained in:
parent
0604e510ec
commit
b0bb904c89
|
@ -1,5 +1,3 @@
|
||||||
import multiprocessing as mp
|
|
||||||
from multiprocessing.synchronize import Lock
|
|
||||||
import re
|
import re
|
||||||
import fnmatch
|
import fnmatch
|
||||||
import os
|
import os
|
||||||
|
@ -1450,12 +1448,10 @@ class Result:
|
||||||
group: str,
|
group: str,
|
||||||
callback: Callable,
|
callback: Callable,
|
||||||
datasets: Dict[str, str],
|
datasets: Dict[str, str],
|
||||||
args: Dict[str, str],
|
args: Dict[str, str]) -> List[Union[None, Any]]:
|
||||||
lock: Lock) -> List[Union[None, Any]]:
|
|
||||||
"""Execute job for _add_generic_pointwise."""
|
"""Execute job for _add_generic_pointwise."""
|
||||||
try:
|
try:
|
||||||
datasets_in = {}
|
datasets_in = {}
|
||||||
lock.acquire()
|
|
||||||
with h5py.File(self.fname,'r') as f:
|
with h5py.File(self.fname,'r') as f:
|
||||||
for arg,label in datasets.items():
|
for arg,label in datasets.items():
|
||||||
loc = f[group+'/'+label]
|
loc = f[group+'/'+label]
|
||||||
|
@ -1463,7 +1459,6 @@ class Result:
|
||||||
'label':label,
|
'label':label,
|
||||||
'meta': {k:(v.decode() if not h5py3 and type(v) is bytes else v) \
|
'meta': {k:(v.decode() if not h5py3 and type(v) is bytes else v) \
|
||||||
for k,v in loc.attrs.items()}}
|
for k,v in loc.attrs.items()}}
|
||||||
lock.release()
|
|
||||||
r = callback(**datasets_in,**args)
|
r = callback(**datasets_in,**args)
|
||||||
return [group,r]
|
return [group,r]
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
|
@ -1490,9 +1485,6 @@ class Result:
|
||||||
Arguments parsed to func.
|
Arguments parsed to func.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
pool = mp.Pool(int(os.environ.get('OMP_NUM_THREADS',4)))
|
|
||||||
lock = mp.Manager().Lock()
|
|
||||||
|
|
||||||
groups = []
|
groups = []
|
||||||
with h5py.File(self.fname,'r') as f:
|
with h5py.File(self.fname,'r') as f:
|
||||||
for inc in self.visible['increments']:
|
for inc in self.visible['increments']:
|
||||||
|
@ -1506,12 +1498,12 @@ class Result:
|
||||||
print('No matching dataset found, no data was added.')
|
print('No matching dataset found, no data was added.')
|
||||||
return
|
return
|
||||||
|
|
||||||
default_arg = partial(self._job_pointwise,callback=func,datasets=datasets,args=args,lock=lock)
|
default_arg = partial(self._job_pointwise,callback=func,datasets=datasets,args=args)
|
||||||
|
|
||||||
for group,result in util.show_progress(pool.imap_unordered(default_arg,groups),len(groups)):# type: ignore
|
for grp in util.show_progress(groups):
|
||||||
|
group, result = default_arg(grp) # type: ignore
|
||||||
if not result:
|
if not result:
|
||||||
continue
|
continue
|
||||||
lock.acquire()
|
|
||||||
with h5py.File(self.fname, 'a') as f:
|
with h5py.File(self.fname, 'a') as f:
|
||||||
try:
|
try:
|
||||||
if not self._protected and '/'.join([group,result['label']]) in f:
|
if not self._protected and '/'.join([group,result['label']]) in f:
|
||||||
|
@ -1543,10 +1535,6 @@ class Result:
|
||||||
|
|
||||||
except (OSError,RuntimeError) as err:
|
except (OSError,RuntimeError) as err:
|
||||||
print(f'Could not add dataset: {err}.')
|
print(f'Could not add dataset: {err}.')
|
||||||
lock.release()
|
|
||||||
|
|
||||||
pool.close()
|
|
||||||
pool.join()
|
|
||||||
|
|
||||||
|
|
||||||
def _mappings(self):
|
def _mappings(self):
|
||||||
|
|
Loading…
Reference in New Issue