shorter function name; setup_files is property

This commit is contained in:
Philip Eisenlohr 2022-11-09 09:39:47 -05:00
parent 836feaa5f4
commit 1a748ec5aa
2 changed files with 190 additions and 193 deletions

View File

@ -109,7 +109,7 @@ class Result:
if self.version_major != 0 or not 12 <= self.version_minor <= 14: if self.version_major != 0 or not 12 <= self.version_minor <= 14:
raise TypeError(f'unsupported DADF5 version "{self.version_major}.{self.version_minor}"') raise TypeError(f'unsupported DADF5 version "{self.version_major}.{self.version_minor}"')
if self.version_major == 0 and self.version_minor < 14: if self.version_major == 0 and self.version_minor < 14:
self.export_simulation_setup_files = None # type: ignore self.export_simulation_setup = None # type: ignore
self.structured = 'cells' in f['geometry'].attrs.keys() self.structured = 'cells' in f['geometry'].attrs.keys()
@ -561,6 +561,14 @@ class Result:
print(f'Function {func.__name__} enabled in add_calculation.') print(f'Function {func.__name__} enabled in add_calculation.')
@property
def simulation_setup_files(self):
"""Simulation setup files used to generate the Result object."""
files = []
with h5py.File(self.fname,'r') as f_in:
f_in['setup'].visit(lambda name: files.append(name))
return files
@property @property
def incs(self): def incs(self):
return [int(i.split(prefix_inc)[-1]) for i in self.increments] return [int(i.split(prefix_inc)[-1]) for i in self.increments]
@ -1515,6 +1523,166 @@ class Result:
pool.join() pool.join()
def _mappings(self):
"""Mappings to place data spatially."""
with h5py.File(self.fname,'r') as f:
at_cell_ph = []
in_data_ph = []
for c in range(self.N_constituents):
at_cell_ph.append({label: np.where(self.phase[:,c] == label)[0] \
for label in self.visible['phases']})
in_data_ph.append({label: f['/'.join(['cell_to','phase'])]['entry'][at_cell_ph[c][label]][:,c] \
for label in self.visible['phases']})
at_cell_ho = {label: np.where(self.homogenization[:] == label)[0] \
for label in self.visible['homogenizations']}
in_data_ho = {label: f['/'.join(['cell_to','homogenization'])]['entry'][at_cell_ho[label]] \
for label in self.visible['homogenizations']}
return at_cell_ph,in_data_ph,at_cell_ho,in_data_ho
def get(self,
output: Union[str, List[str]] = '*',
flatten: bool = True,
prune: bool = True):
"""
Collect data per phase/homogenization reflecting the group/folder structure in the DADF5 file.
Parameters
----------
output : (list of) str, optional
Names of the datasets to read.
Defaults to '*', in which case all datasets are read.
flatten : bool, optional
Remove singular levels of the folder hierarchy.
This might be beneficial in case of single increment,
phase/homogenization, or field. Defaults to True.
prune : bool, optional
Remove branches with no data. Defaults to True.
Returns
-------
data : dict of numpy.ndarray
Datasets structured by phase/homogenization and according to selected view.
"""
r = {} # type: ignore
with h5py.File(self.fname,'r') as f:
for inc in util.show_progress(self.visible['increments']):
r[inc] = {'phase':{},'homogenization':{},'geometry':{}}
for out in _match(output,f['/'.join([inc,'geometry'])].keys()):
r[inc]['geometry'][out] = _read(f['/'.join([inc,'geometry',out])])
for ty in ['phase','homogenization']:
for label in self.visible[ty+'s']:
r[inc][ty][label] = {}
for field in _match(self.visible['fields'],f['/'.join([inc,ty,label])].keys()):
r[inc][ty][label][field] = {}
for out in _match(output,f['/'.join([inc,ty,label,field])].keys()):
r[inc][ty][label][field][out] = _read(f['/'.join([inc,ty,label,field,out])])
if prune: r = util.dict_prune(r)
if flatten: r = util.dict_flatten(r)
return None if (type(r) == dict and r == {}) else r
def place(self,
output: Union[str, List[str]] = '*',
flatten: bool = True,
prune: bool = True,
constituents: IntSequence = None,
fill_float: float = np.nan,
fill_int: int = 0):
"""
Merge data into spatial order that is compatible with the damask.VTK geometry representation.
The returned data structure reflects the group/folder structure in the DADF5 file.
Multi-phase data is fused into a single output.
`place` is equivalent to `get` if only one phase/homogenization
and one constituent is present.
Parameters
----------
output : (list of) str, optional
Names of the datasets to read.
Defaults to '*', in which case all visible datasets are placed.
flatten : bool, optional
Remove singular levels of the folder hierarchy.
This might be beneficial in case of single increment or field.
Defaults to True.
prune : bool, optional
Remove branches with no data. Defaults to True.
constituents : (list of) int, optional
Constituents to consider.
Defaults to None, in which case all constituents are considered.
fill_float : float, optional
Fill value for non-existent entries of floating point type.
Defaults to NaN.
fill_int : int, optional
Fill value for non-existent entries of integer type.
Defaults to 0.
Returns
-------
data : dict of numpy.ma.MaskedArray
Datasets structured by spatial position and according to selected view.
"""
r = {} # type: ignore
constituents_ = list(map(int,constituents)) if isinstance(constituents,Iterable) else \
(range(self.N_constituents) if constituents is None else [constituents]) # type: ignore
suffixes = [''] if self.N_constituents == 1 or isinstance(constituents,int) else \
[f'#{c}' for c in constituents_]
at_cell_ph,in_data_ph,at_cell_ho,in_data_ho = self._mappings()
with h5py.File(self.fname,'r') as f:
for inc in util.show_progress(self.visible['increments']):
r[inc] = {'phase':{},'homogenization':{},'geometry':{}}
for out in _match(output,f['/'.join([inc,'geometry'])].keys()):
r[inc]['geometry'][out] = ma.array(_read(f['/'.join([inc,'geometry',out])]),fill_value = fill_float)
for ty in ['phase','homogenization']:
for label in self.visible[ty+'s']:
for field in _match(self.visible['fields'],f['/'.join([inc,ty,label])].keys()):
if field not in r[inc][ty].keys():
r[inc][ty][field] = {}
for out in _match(output,f['/'.join([inc,ty,label,field])].keys()):
data = ma.array(_read(f['/'.join([inc,ty,label,field,out])]))
if ty == 'phase':
if out+suffixes[0] not in r[inc][ty][field].keys():
for c,suffix in zip(constituents_,suffixes):
r[inc][ty][field][out+suffix] = \
_empty_like(data,self.N_materialpoints,fill_float,fill_int)
for c,suffix in zip(constituents_,suffixes):
r[inc][ty][field][out+suffix][at_cell_ph[c][label]] = data[in_data_ph[c][label]]
if ty == 'homogenization':
if out not in r[inc][ty][field].keys():
r[inc][ty][field][out] = \
_empty_like(data,self.N_materialpoints,fill_float,fill_int)
r[inc][ty][field][out][at_cell_ho[label]] = data[in_data_ho[label]]
if prune: r = util.dict_prune(r)
if flatten: r = util.dict_flatten(r)
return None if (type(r) == dict and r == {}) else r
def export_XDMF(self, def export_XDMF(self,
output: Union[str, List[str]] = '*', output: Union[str, List[str]] = '*',
target_dir: Union[str, Path] = None, target_dir: Union[str, Path] = None,
@ -1642,26 +1810,6 @@ class Result:
f.write(xml.dom.minidom.parseString(ET.tostring(xdmf).decode()).toprettyxml()) f.write(xml.dom.minidom.parseString(ET.tostring(xdmf).decode()).toprettyxml())
def _mappings(self):
"""Mappings to place data spatially."""
with h5py.File(self.fname,'r') as f:
at_cell_ph = []
in_data_ph = []
for c in range(self.N_constituents):
at_cell_ph.append({label: np.where(self.phase[:,c] == label)[0] \
for label in self.visible['phases']})
in_data_ph.append({label: f['/'.join(['cell_to','phase'])]['entry'][at_cell_ph[c][label]][:,c] \
for label in self.visible['phases']})
at_cell_ho = {label: np.where(self.homogenization[:] == label)[0] \
for label in self.visible['homogenizations']}
in_data_ho = {label: f['/'.join(['cell_to','homogenization'])]['entry'][at_cell_ho[label]] \
for label in self.visible['homogenizations']}
return at_cell_ph,in_data_ph,at_cell_ho,in_data_ho
def export_VTK(self, def export_VTK(self,
output: Union[str,List[str]] = '*', output: Union[str,List[str]] = '*',
mode: str = 'cell', mode: str = 'cell',
@ -1682,7 +1830,7 @@ class Result:
---------- ----------
output : (list of) str, optional output : (list of) str, optional
Names of the datasets to export to the VTK file. Names of the datasets to export to the VTK file.
Defaults to '*', in which case all datasets are exported. Defaults to '*', in which case all visible datasets are exported.
mode : {'cell', 'point'}, optional mode : {'cell', 'point'}, optional
Export in cell format or point format. Export in cell format or point format.
Defaults to 'cell'. Defaults to 'cell'.
@ -1766,54 +1914,6 @@ class Result:
v.save(vtk_dir/f'{self.fname.stem}_inc{inc.split(prefix_inc)[-1].zfill(N_digits)}', v.save(vtk_dir/f'{self.fname.stem}_inc{inc.split(prefix_inc)[-1].zfill(N_digits)}',
parallel=parallel) parallel=parallel)
def get(self,
output: Union[str, List[str]] = '*',
flatten: bool = True,
prune: bool = True):
"""
Collect data per phase/homogenization reflecting the group/folder structure in the DADF5 file.
Parameters
----------
output : (list of) str, optional
Names of the datasets to read.
Defaults to '*', in which case all datasets are read.
flatten : bool, optional
Remove singular levels of the folder hierarchy.
This might be beneficial in case of single increment,
phase/homogenization, or field. Defaults to True.
prune : bool, optional
Remove branches with no data. Defaults to True.
Returns
-------
data : dict of numpy.ndarray
Datasets structured by phase/homogenization and according to selected view.
"""
r = {} # type: ignore
with h5py.File(self.fname,'r') as f:
for inc in util.show_progress(self.visible['increments']):
r[inc] = {'phase':{},'homogenization':{},'geometry':{}}
for out in _match(output,f['/'.join([inc,'geometry'])].keys()):
r[inc]['geometry'][out] = _read(f['/'.join([inc,'geometry',out])])
for ty in ['phase','homogenization']:
for label in self.visible[ty+'s']:
r[inc][ty][label] = {}
for field in _match(self.visible['fields'],f['/'.join([inc,ty,label])].keys()):
r[inc][ty][label][field] = {}
for out in _match(output,f['/'.join([inc,ty,label,field])].keys()):
r[inc][ty][label][field][out] = _read(f['/'.join([inc,ty,label,field,out])])
if prune: r = util.dict_prune(r)
if flatten: r = util.dict_flatten(r)
return None if (type(r) == dict and r == {}) else r
def export_DADF5(self, def export_DADF5(self,
fname, fname,
output: Union[str, List[str]] = '*'): output: Union[str, List[str]] = '*'):
@ -1858,99 +1958,7 @@ class Result:
f_in[p].copy(out,f_out[p]) f_in[p].copy(out,f_out[p])
def place(self, def export_simulation_setup(self,
output: Union[str, List[str]] = '*',
flatten: bool = True,
prune: bool = True,
constituents: IntSequence = None,
fill_float: float = np.nan,
fill_int: int = 0):
"""
Merge data into spatial order that is compatible with the damask.VTK geometry representation.
The returned data structure reflects the group/folder structure in the DADF5 file.
Multi-phase data is fused into a single output.
`place` is equivalent to `get` if only one phase/homogenization
and one constituent is present.
Parameters
----------
output : (list of) str, optional
Names of the datasets to read.
Defaults to '*', in which case all datasets are placed.
flatten : bool, optional
Remove singular levels of the folder hierarchy.
This might be beneficial in case of single increment or field.
Defaults to True.
prune : bool, optional
Remove branches with no data. Defaults to True.
constituents : (list of) int, optional
Constituents to consider.
Defaults to None, in which case all constituents are considered.
fill_float : float, optional
Fill value for non-existent entries of floating point type.
Defaults to NaN.
fill_int : int, optional
Fill value for non-existent entries of integer type.
Defaults to 0.
Returns
-------
data : dict of numpy.ma.MaskedArray
Datasets structured by spatial position and according to selected view.
"""
r = {} # type: ignore
constituents_ = list(map(int,constituents)) if isinstance(constituents,Iterable) else \
(range(self.N_constituents) if constituents is None else [constituents]) # type: ignore
suffixes = [''] if self.N_constituents == 1 or isinstance(constituents,int) else \
[f'#{c}' for c in constituents_]
at_cell_ph,in_data_ph,at_cell_ho,in_data_ho = self._mappings()
with h5py.File(self.fname,'r') as f:
for inc in util.show_progress(self.visible['increments']):
r[inc] = {'phase':{},'homogenization':{},'geometry':{}}
for out in _match(output,f['/'.join([inc,'geometry'])].keys()):
r[inc]['geometry'][out] = ma.array(_read(f['/'.join([inc,'geometry',out])]),fill_value = fill_float)
for ty in ['phase','homogenization']:
for label in self.visible[ty+'s']:
for field in _match(self.visible['fields'],f['/'.join([inc,ty,label])].keys()):
if field not in r[inc][ty].keys():
r[inc][ty][field] = {}
for out in _match(output,f['/'.join([inc,ty,label,field])].keys()):
data = ma.array(_read(f['/'.join([inc,ty,label,field,out])]))
if ty == 'phase':
if out+suffixes[0] not in r[inc][ty][field].keys():
for c,suffix in zip(constituents_,suffixes):
r[inc][ty][field][out+suffix] = \
_empty_like(data,self.N_materialpoints,fill_float,fill_int)
for c,suffix in zip(constituents_,suffixes):
r[inc][ty][field][out+suffix][at_cell_ph[c][label]] = data[in_data_ph[c][label]]
if ty == 'homogenization':
if out not in r[inc][ty][field].keys():
r[inc][ty][field][out] = \
_empty_like(data,self.N_materialpoints,fill_float,fill_int)
r[inc][ty][field][out][at_cell_ho[label]] = data[in_data_ho[label]]
if prune: r = util.dict_prune(r)
if flatten: r = util.dict_flatten(r)
return None if (type(r) == dict and r == {}) else r
def export_simulation_setup_files(self,
output: Union[str, List[str]] = '*', output: Union[str, List[str]] = '*',
target_dir: Union[str, Path] = None, target_dir: Union[str, Path] = None,
overwrite: bool = False, overwrite: bool = False,
@ -1962,11 +1970,11 @@ class Result:
---------- ----------
output : (list of) str, optional output : (list of) str, optional
Names of the datasets to export to the file. Names of the datasets to export to the file.
Defaults to '*', in which case all datasets are exported. Defaults to '*', in which case all setup files are exported.
target_dir : str or pathlib.Path, optional target_dir : str or pathlib.Path, optional
Directory to save configuration files. Will be created if non-existent. Directory to save setup files. Will be created if non-existent.
overwrite : bool, optional overwrite : bool, optional
Overwrite existing configuration files. Overwrite any existing setup files.
Defaults to False. Defaults to False.
""" """
@ -1982,27 +1990,16 @@ class Result:
d = obj.attrs['description'] if h5py3 else obj.attrs['description'].decode() d = obj.attrs['description'] if h5py3 else obj.attrs['description'].decode()
if overwrite or not cfg.exists(): if overwrite or not cfg.exists():
with util.open_text(cfg,'w') as f_out: f_out.write(obj[0].decode()) with util.open_text(cfg,'w') as f_out: f_out.write(obj[0].decode())
print(f'Exported {d} to "{cfg}".') print(f'{d} --> "{cfg}"')
else: else:
print(f'"{cfg}" exists, {d} not exported.') print(f'{d} --x "{cfg}" exists!')
elif type(obj) == h5py.Group: elif type(obj) == h5py.Group:
cfg.mkdir(parents=True,exist_ok=True) cfg.mkdir(parents=True,exist_ok=True)
cfg_dir = (Path.cwd() if target_dir is None else Path(target_dir))
cfg_dir = (Path.cwd() if target_dir is None else Path(target_dir))
cfg_dir.mkdir(parents=True,exist_ok=True) cfg_dir.mkdir(parents=True,exist_ok=True)
with h5py.File(self.fname,'r') as f_in: with h5py.File(self.fname,'r') as f_in:
f_in['setup'].visititems(partial(export, f_in['setup'].visititems(partial(export,
output=output, output=output,
cfg_dir=cfg_dir, cfg_dir=cfg_dir,
overwrite=overwrite)) overwrite=overwrite))
def list_simulation_setup_files(self):
"""List available simulation setup files used to generate the Result object."""
simulation_datasets = []
def retrieve_dataset(name, node):
simulation_datasets.append(name)
with h5py.File(self.fname,'r') as f_in:
f_in['setup'].visititems(retrieve_dataset)
return simulation_datasets

View File

@ -294,7 +294,7 @@ class TestResult:
default.add_curl('x') default.add_curl('x')
in_file = default.place('curl(x)') in_file = default.place('curl(x)')
in_memory = grid_filters.curl(default.size,x.reshape(tuple(default.cells)+x.shape[1:])).reshape(in_file.shape) in_memory = grid_filters.curl(default.size,x.reshape(tuple(default.cells)+x.shape[1:])).reshape(in_file.shape)
assert (in_file==in_memory).all() assert (in_file == in_memory).all()
@pytest.mark.parametrize('shape',['vector','tensor']) @pytest.mark.parametrize('shape',['vector','tensor'])
def test_add_divergence(self,default,shape): def test_add_divergence(self,default,shape):
@ -304,7 +304,7 @@ class TestResult:
default.add_divergence('x') default.add_divergence('x')
in_file = default.place('divergence(x)') in_file = default.place('divergence(x)')
in_memory = grid_filters.divergence(default.size,x.reshape(tuple(default.cells)+x.shape[1:])).reshape(in_file.shape) in_memory = grid_filters.divergence(default.size,x.reshape(tuple(default.cells)+x.shape[1:])).reshape(in_file.shape)
assert (in_file==in_memory).all() assert (in_file == in_memory).all()
@pytest.mark.parametrize('shape',['scalar','pseudo_scalar','vector']) @pytest.mark.parametrize('shape',['scalar','pseudo_scalar','vector'])
def test_add_gradient(self,default,shape): def test_add_gradient(self,default,shape):
@ -315,7 +315,7 @@ class TestResult:
default.add_gradient('x') default.add_gradient('x')
in_file = default.place('gradient(x)') in_file = default.place('gradient(x)')
in_memory = grid_filters.gradient(default.size,x.reshape(tuple(default.cells)+x.shape[1:])).reshape(in_file.shape) in_memory = grid_filters.gradient(default.size,x.reshape(tuple(default.cells)+x.shape[1:])).reshape(in_file.shape)
assert (in_file==in_memory).all() assert (in_file == in_memory).all()
@pytest.mark.parametrize('overwrite',['off','on']) @pytest.mark.parametrize('overwrite',['off','on'])
def test_add_overwrite(self,default,overwrite): def test_add_overwrite(self,default,overwrite):
@ -338,7 +338,7 @@ class TestResult:
created_second = datetime.strptime(created_second,'%Y-%m-%d %H:%M:%S%z') created_second = datetime.strptime(created_second,'%Y-%m-%d %H:%M:%S%z')
if overwrite == 'on': if overwrite == 'on':
assert created_first < created_second and np.allclose(last.place('sigma'),311.) assert created_first < created_second and np.allclose(last.place('sigma'),311.)
else: else:
assert created_first == created_second and not np.allclose(last.place('sigma'),311.) assert created_first == created_second and not np.allclose(last.place('sigma'),311.)
@ -418,7 +418,7 @@ class TestResult:
def test_vtk_custom_path(self,tmp_path,single_phase): def test_vtk_custom_path(self,tmp_path,single_phase):
export_dir = tmp_path/'export_dir' export_dir = tmp_path/'export_dir'
single_phase.export_VTK(mode='point',target_dir=export_dir,parallel=False) single_phase.export_VTK(mode='point',target_dir=export_dir,parallel=False)
assert set(os.listdir(export_dir))==set([f'{single_phase.fname.stem}_inc{i:02}.vtp' for i in range(0,40+1,4)]) assert set(os.listdir(export_dir)) == set([f'{single_phase.fname.stem}_inc{i:02}.vtp' for i in range(0,40+1,4)])
def test_XDMF_datatypes(self,tmp_path,single_phase,update,ref_path): def test_XDMF_datatypes(self,tmp_path,single_phase,update,ref_path):
for what,shape in {'scalar':(),'vector':(3,),'tensor':(3,3),'matrix':(12,)}.items(): for what,shape in {'scalar':(),'vector':(3,),'tensor':(3,3),'matrix':(12,)}.items():
@ -543,30 +543,30 @@ class TestResult:
'6grains6x7x8_single_phase_tensionY.hdf5']) '6grains6x7x8_single_phase_tensionY.hdf5'])
@pytest.mark.parametrize('output',['material.yaml','*']) @pytest.mark.parametrize('output',['material.yaml','*'])
@pytest.mark.parametrize('overwrite',[True,False]) @pytest.mark.parametrize('overwrite',[True,False])
def test_export_simulation_setup_files(self,ref_path,tmp_path,fname,output,overwrite): def test_export_simulation_setup(self,ref_path,tmp_path,fname,output,overwrite):
r = Result(ref_path/fname) r = Result(ref_path/fname)
r.export_simulation_setup_files(output,target_dir=tmp_path) r.export_simulation_setup(output,target_dir=tmp_path)
with h5py.File(ref_path/fname,'r') as f_hdf5: with h5py.File(ref_path/fname,'r') as f_hdf5:
for file in fnmatch.filter(f_hdf5['setup'].keys(),output): for file in fnmatch.filter(f_hdf5['setup'].keys(),output):
with open(tmp_path/file) as f: with open(tmp_path/file) as f:
assert f_hdf5[f'setup/{file}'][()][0].decode() == f.read() assert f_hdf5[f'setup/{file}'][()][0].decode() == f.read()
r.export_simulation_setup_files(output,target_dir=tmp_path,overwrite=overwrite) r.export_simulation_setup(output,target_dir=tmp_path,overwrite=overwrite)
def test_export_simulation_setup_files_custom_path(self,ref_path,tmp_path): def test_export_simulation_setup_custom_path(self,ref_path,tmp_path):
src = ref_path/'4grains2x4x3_compressionY.hdf5' src = ref_path/'4grains2x4x3_compressionY.hdf5'
subdir = 'export_dir' subdir = 'export_dir'
absdir = tmp_path/subdir absdir = tmp_path/subdir
absdir.mkdir() absdir.mkdir(exist_ok=True)
r = Result(src) r = Result(src)
for t,cwd in zip([absdir,subdir,None],[tmp_path,tmp_path,absdir]): for t,cwd in zip([absdir,subdir,None],[tmp_path,tmp_path,absdir]):
os.chdir(cwd) os.chdir(cwd)
r.export_simulation_setup_files('material.yaml',target_dir=t) r.export_simulation_setup('material.yaml',target_dir=t)
assert 'material.yaml' in os.listdir(absdir); (absdir/'material.yaml').unlink() assert 'material.yaml' in os.listdir(absdir); (absdir/'material.yaml').unlink()
def test_list_simulation_setup_files(self,ref_path): def test_simulation_setup_files(self,ref_path):
r = Result(ref_path/'4grains2x4x3_compressionY.hdf5') r = Result(ref_path/'4grains2x4x3_compressionY.hdf5')
assert r.list_simulation_setup_files()==['4grains2x4x3.vti', 'compressionY.yaml', 'material.yaml'] assert set(r.simulation_setup_files) == set(['4grains2x4x3.vti', 'compressionY.yaml', 'material.yaml'])
@pytest.mark.parametrize('fname',['4grains2x4x3_compressionY.hdf5', @pytest.mark.parametrize('fname',['4grains2x4x3_compressionY.hdf5',
'6grains6x7x8_single_phase_tensionY.hdf5']) '6grains6x7x8_single_phase_tensionY.hdf5'])