polished and added tests

This commit is contained in:
Martin Diehl 2021-04-01 15:52:43 +02:00
parent 71d1a341e4
commit 84e117c6b3
10 changed files with 77 additions and 57 deletions

View File

@ -1326,7 +1326,7 @@ class Result:
v.save(f'{self.fname.stem}_inc{inc[ln:].zfill(N_digits)}')
def read(self,labels,compress=True,strip=True):
def read(self,output,compress=True,strip=True):
"""
Export data from file per phase/homogenization.
@ -1335,8 +1335,8 @@ class Result:
Parameters
----------
labels : str or list of, optional
Labels of the datasets to be read.
output : str or list of, optional
Name of the datasets to include.
compress : bool
Squeeze out dictionaries that are not needed for a unique
structure. This might be beneficial in the case of single
@ -1348,22 +1348,22 @@ class Result:
"""
r = {}
labels_ = set([labels] if isinstance(labels,str) else labels)
output_ = set([output] if isinstance(output,str) else output)
with h5py.File(self.fname,'r') as f:
for inc in util.show_progress(self.visible['increments']):
r[inc] = {'phase':{},'homogenization':{},'geometry':{}}
for la in labels_.intersection(f['/'.join((inc,'geometry'))].keys()):
r[inc]['geometry'][la] = _read(f,'/'.join((inc,'geometry',la)))
for out in output_.intersection(f['/'.join((inc,'geometry'))].keys()):
r[inc]['geometry'][out] = _read(f,'/'.join((inc,'geometry',out)))
for ty in ['phase','homogenization']:
for na in self.visible[ty+'s']:
r[inc][ty][na] = {}
for field in f['/'.join((inc,ty,na))].keys():
r[inc][ty][na][field] = {}
for la in labels_.intersection(f['/'.join((inc,ty,na,field))].keys()):
r[inc][ty][na][field][la] = _read(f,'/'.join((inc,ty,na,field,la)))
for label in self.visible[ty+'s']:
r[inc][ty][label] = {}
for field in f['/'.join((inc,ty,label))].keys():
r[inc][ty][label][field] = {}
for out in output_.intersection(f['/'.join((inc,ty,label,field))].keys()):
r[inc][ty][label][field][out] = _read(f,'/'.join((inc,ty,label,field,out)))
if strip: r = util.dict_strip(r)
if compress: r = util.dict_compress(r)
@ -1371,7 +1371,7 @@ class Result:
return r
def place(self,labels,compress=True,strip=True,constituents=None,fill_float=0.0,fill_int=0):
def place(self,output,compress=True,strip=True,constituents=None,fill_float=0.0,fill_int=0):
"""
Export data from file suitable sorted for spatial operations.
@ -1384,7 +1384,7 @@ class Result:
Parameters
----------
labels : str or list of, optional
output : str or list of, optional
Labels of the datasets to be read.
compress : bool
Squeeze out dictionaries that are not needed for a unique
@ -1403,16 +1403,17 @@ class Result:
fill_int : int
Fill value for non existent entries of integer type.
Defaults to 0.
"""
r = {}
labels_ = set([labels] if isinstance(labels,str) else labels)
output_ = set([output] if isinstance(output,str) else output)
if constituents is None:
constituents_ = range(self.N_constituents)
else:
constituents_ = constituents if isinstance(constituents,Iterable) else [constituents]
suffixes = [''] if self.N_constituents == 1 or len(constituents_) == 1 else \
suffixes = [''] if self.N_constituents == 1 or isinstance(constituents,int) else \
[f'#{c}' for c in constituents_]
grp = 'mapping' if self.version_minor < 12 else 'cell_to'
@ -1423,10 +1424,10 @@ class Result:
at_cell_ph = []
in_data_ph = []
for c in constituents_:
for c in range(self.N_constituents):
at_cell_ph.append({label: np.where(f[os.path.join(grp,'phase')][:,c][name] == label.encode())[0] \
for label in self.visible['phases']})
in_data_ph.append({label: f[os.path.join(grp,'phase')][member][at_cell_ph[c][label]][...,0] \
in_data_ph.append({label: f[os.path.join(grp,'phase')][member][at_cell_ph[c][label]][...,c] \
for label in self.visible['phases']})
at_cell_ho = {label: np.where(f[os.path.join(grp,'homogenization')][:][name] == label.encode())[0] \
@ -1437,45 +1438,41 @@ class Result:
for inc in util.show_progress(self.visible['increments']):
r[inc] = {'phase':{},'homogenization':{},'geometry':{}}
for la in labels_.intersection(f[os.path.join(inc,'geometry')].keys()):
r[inc]['geometry'][la] = _read(f,os.path.join(inc,'geometry',la))
for out in output_.intersection(f[os.path.join(inc,'geometry')].keys()):
r[inc]['geometry'][out] = _read(f,os.path.join(inc,'geometry',out))
for ph in self.visible['phases']:
for field in f[os.path.join(inc,'phase',ph)].keys():
if field not in r[inc]['phase'].keys():
r[inc]['phase'][field] = {}
for ty in ['phase','homogenization']:
for label in self.visible[ty+'s']:
for field in f[os.path.join(inc,ty,label)].keys():
if field not in r[inc][ty].keys():
r[inc][ty][field] = {}
for la in labels_.intersection(f[os.path.join(inc,'phase',ph,field)].keys()):
data = ma.array(_read(f,os.path.join(inc,'phase',ph,field,la)))
for out in output_.intersection(f[os.path.join(inc,ty,label,field)].keys()):
data = ma.array(_read(f,os.path.join(inc,ty,label,field,out)))
if la+suffixes[0] not in r[inc]['phase'][field].keys():
container = np.empty((self.N_materialpoints,)+data.shape[1:],dtype=data.dtype)
fill_value = fill_float if data.dtype in np.sctypes['float'] else \
fill_int
for c,suffix in zip(constituents_, suffixes):
r[inc]['phase'][field][la+suffix] = \
ma.array(container,fill_value=fill_value,mask=True)
if ty == 'phase':
if out+suffixes[0] not in r[inc][ty][field].keys():
container = np.empty((self.N_materialpoints,)+data.shape[1:],dtype=data.dtype)
fill_value = fill_float if data.dtype in np.sctypes['float'] else \
fill_int
for c,suffix in zip(constituents_, suffixes):
r[inc][ty][field][out+suffix] = \
ma.array(container,fill_value=fill_value,mask=True)
for c,suffix in zip(constituents_, suffixes):
r[inc]['phase'][field][la+suffix][at_cell_ph[c][ph]] = data[in_data_ph[c][ph]]
for c,suffix in zip(constituents_, suffixes):
r[inc][ty][field][out+suffix][at_cell_ph[c][label]] = data[in_data_ph[c][label]]
for ho in self.visible['homogenizations']:
for field in f[os.path.join(inc,'homogenization',ho)].keys():
if field not in r[inc]['homogenization'].keys():
r[inc]['homogenization'][field] = {}
if ty == 'homogenization':
if out not in r[inc][ty][field].keys():
container = np.empty((self.N_materialpoints,)+data.shape[1:],dtype=data.dtype)
fill_value = fill_float if data.dtype in np.sctypes['float'] else \
fill_int
r[inc][ty][field][out] = \
ma.array(container,fill_value=fill_value,mask=True)
for la in labels_.intersection(f[os.path.join(inc,'homogenization',ho,field)].keys()):
data = ma.array(_read(f,os.path.join(inc,'homogenization',ho,field,la)))
if la not in r[inc]['homogenization'][field].keys():
container = np.empty((self.N_materialpoints,)+data.shape[1:],dtype=data.dtype)
fill_value = fill_float if data.dtype in np.sctypes['float'] else \
fill_int
r[inc]['homogenization'][field][la] = \
ma.array(container,fill_value=fill_value,mask=True)
r[inc]['homogenization'][field][la][at_cell_ho[ho]] = data[in_data_ho[ho]]
r[inc][ty][field][out][at_cell_ho[label]] = data[in_data_ho[label]]
if strip: r = util.dict_strip(r)
if compress: r = util.dict_compress(r)
return r

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -401,7 +401,7 @@ class TestResult:
with pytest.raises(TypeError):
default.save_XDMF()
@pytest.mark.parametrize('view,labels,compress,strip',
@pytest.mark.parametrize('view,output,compress,strip',
[({},['F','P','F','L_p','F_e','F_p'],True,True),
({'increments':3},'F',True,True),
({'increments':[1,8,3,4,5,6,7]},['F','P'],True,True),
@ -411,18 +411,41 @@ class TestResult:
({'phases':False},['Delta_V'],True,True),
({},['u_p','u_n'],False,False)],
ids=list(range(8)))
def test_read(self,update,request,ref_path,view,labels,compress,strip):
def test_read(self,update,request,ref_path,view,output,compress,strip):
result = Result(ref_path/'4grains2x4x3_compressionY.hdf5')
for key,value in view.items():
result.view(key,value)
N = request.node.name[8:].split('[')[1].split(']')[0]
cur = result.read(labels,compress,strip)
cur = result.read(output,compress,strip)
if update:
with bz2.BZ2File(ref_path/f'read_{N}.pbz2','w') as f:
pickle.dump(cur,f)
with bz2.BZ2File(ref_path/f'read_{N}.pbz2') as f:
ref = pickle.load(f)
assert dict_equal(cur,pickle.load(f))
assert dict_equal(cur,ref)
@pytest.mark.parametrize('view,output,compress,constituents,strip',
[({},['F','P','F','L_p','F_e','F_p'],True,True,None),
({'increments':3},'F',True,True,[0,1,2,3,4,5,6,7]),
({'increments':[1,8,3,4,5,6,7]},['F','P'],True,True,1),
({'phases':['A','B']},['F','P'],True,True,[1,2]),
({'phases':['A','C'],'homogenizations':False},['F','P','O'],True,True,[0,7]),
({'phases':False,'homogenizations':False},['F','P','O'],True,True,[1,2,3,4]),
({'phases':False},['Delta_V'],True,True,[1,2,4]),
({},['u_p','u_n'],False,False,None)],
ids=list(range(8)))
def test_place(self,update,request,ref_path,view,output,compress,strip,constituents):
result = Result(ref_path/'4grains2x4x3_compressionY.hdf5')
for key,value in view.items():
result.view(key,value)
N = request.node.name[8:].split('[')[1].split(']')[0]
cur = result.place(output,compress,strip,constituents)
if update:
with bz2.BZ2File(ref_path/f'place_{N}.pbz2','w') as f:
pickle.dump(cur,f)
with bz2.BZ2File(ref_path/f'place_{N}.pbz2') as f:
assert dict_equal(cur,pickle.load(f))