import bz2 import pickle import time import shutil import os import sys import hashlib from datetime import datetime import pytest import numpy as np import h5py from damask import Result from damask import Rotation from damask import Orientation from damask import tensor from damask import mechanics from damask import grid_filters @pytest.fixture def default(tmp_path,ref_path): """Small Result file in temp location for modification.""" fname = '12grains6x7x8_tensionY.hdf5' shutil.copy(ref_path/fname,tmp_path) f = Result(tmp_path/fname) f.view('times',20.0) return f @pytest.fixture def single_phase(tmp_path,ref_path): """Single phase Result file in temp location for modification.""" fname = '6grains6x7x8_single_phase_tensionY.hdf5' shutil.copy(ref_path/fname,tmp_path) return Result(tmp_path/fname) @pytest.fixture def ref_path(ref_path_base): """Directory containing reference results.""" return ref_path_base/'Result' def dict_equal(d1, d2): for k in d1: if (k not in d2): return False else: if type(d1[k]) is dict: return dict_equal(d1[k],d2[k]) else: if not np.allclose(d1[k],d2[k]): return False return True class TestResult: def test_self_report(self,default): print(default) def test_view_all(self,default): default.view('increments',True) a = default.get_dataset_location('F') default.view('increments','*') b = default.get_dataset_location('F') default.view('increments',default.increments_in_range(0,np.iinfo(int).max)) c = default.get_dataset_location('F') default.view('times',True) d = default.get_dataset_location('F') default.view('times','*') e = default.get_dataset_location('F') default.view('times',default.times_in_range(0.0,np.inf)) f = default.get_dataset_location('F') assert a == b == c == d == e ==f @pytest.mark.parametrize('what',['increments','times','phases']) # ToDo: discuss homogenizations def test_view_none(self,default,what): default.view(what,False) a = default.get_dataset_location('F') default.view(what,[]) b = default.get_dataset_location('F') assert a == b == [] @pytest.mark.parametrize('what',['increments','times','phases']) # ToDo: discuss homogenizations def test_view_more(self,default,what): default.view(what,False) default.view_more(what,'*') a = default.get_dataset_location('F') default.view(what,True) b = default.get_dataset_location('F') assert a == b @pytest.mark.parametrize('what',['increments','times','phases']) # ToDo: discuss homogenizations def test_view_less(self,default,what): default.view(what,True) default.view_less(what,'*') a = default.get_dataset_location('F') default.view(what,False) b = default.get_dataset_location('F') assert a == b == [] def test_view_invalid(self,default): with pytest.raises(AttributeError): default.view('invalid',True) def test_add_absolute(self,default): default.add_absolute('F_e') loc = {'F_e': default.get_dataset_location('F_e'), '|F_e|': default.get_dataset_location('|F_e|')} in_memory = np.abs(default.read_dataset(loc['F_e'],0)) in_file = default.read_dataset(loc['|F_e|'],0) assert np.allclose(in_memory,in_file) @pytest.mark.parametrize('mode',['direct','function']) def test_add_calculation(self,default,tmp_path,mode): if mode == 'direct': default.add_calculation('x','2.0*np.abs(#F#)-1.0','-','my notes') else: with open(tmp_path/'f.py','w') as f: f.write("import numpy as np\ndef my_func(field):\n return 2.0*np.abs(field)-1.0\n") sys.path.insert(0,str(tmp_path)) import f default.enable_user_function(f.my_func) default.add_calculation('x','my_func(#F#)','-','my notes') loc = {'F': default.get_dataset_location('F'), 'x': default.get_dataset_location('x')} in_memory = 2.0*np.abs(default.read_dataset(loc['F'],0))-1.0 in_file = default.read_dataset(loc['x'],0) assert np.allclose(in_memory,in_file) def test_add_stress_Cauchy(self,default): default.add_stress_Cauchy('P','F') loc = {'F': default.get_dataset_location('F'), 'P': default.get_dataset_location('P'), 'sigma':default.get_dataset_location('sigma')} in_memory = mechanics.stress_Cauchy(default.read_dataset(loc['P'],0), default.read_dataset(loc['F'],0)) in_file = default.read_dataset(loc['sigma'],0) assert np.allclose(in_memory,in_file) def test_add_determinant(self,default): default.add_determinant('P') loc = {'P': default.get_dataset_location('P'), 'det(P)':default.get_dataset_location('det(P)')} in_memory = np.linalg.det(default.read_dataset(loc['P'],0)).reshape(-1,1) in_file = default.read_dataset(loc['det(P)'],0) assert np.allclose(in_memory,in_file) def test_add_deviator(self,default): default.add_deviator('P') loc = {'P' :default.get_dataset_location('P'), 's_P':default.get_dataset_location('s_P')} in_memory = tensor.deviatoric(default.read_dataset(loc['P'],0)) in_file = default.read_dataset(loc['s_P'],0) assert np.allclose(in_memory,in_file) @pytest.mark.parametrize('eigenvalue,function',[('max',np.amax),('min',np.amin)]) def test_add_eigenvalue(self,default,eigenvalue,function): default.add_stress_Cauchy('P','F') default.add_eigenvalue('sigma',eigenvalue) loc = {'sigma' :default.get_dataset_location('sigma'), 'lambda':default.get_dataset_location(f'lambda_{eigenvalue}(sigma)')} in_memory = function(tensor.eigenvalues(default.read_dataset(loc['sigma'],0)),axis=1,keepdims=True) in_file = default.read_dataset(loc['lambda'],0) assert np.allclose(in_memory,in_file) @pytest.mark.parametrize('eigenvalue,idx',[('max',2),('mid',1),('min',0)]) def test_add_eigenvector(self,default,eigenvalue,idx): default.add_stress_Cauchy('P','F') default.add_eigenvector('sigma',eigenvalue) loc = {'sigma' :default.get_dataset_location('sigma'), 'v(sigma)':default.get_dataset_location(f'v_{eigenvalue}(sigma)')} in_memory = tensor.eigenvectors(default.read_dataset(loc['sigma'],0))[:,idx] in_file = default.read_dataset(loc['v(sigma)'],0) assert np.allclose(in_memory,in_file) @pytest.mark.parametrize('d',[[1,0,0],[0,1,0],[0,0,1]]) def test_add_IPF_color(self,default,d): default.add_IPF_color(d,'O') loc = {'O': default.get_dataset_location('O'), 'color': default.get_dataset_location('IPFcolor_[{} {} {}]'.format(*d))} qu = default.read_dataset(loc['O']).view(np.double).squeeze() crystal_structure = default._get_attribute(default.get_dataset_location('O')[0],'lattice') c = Orientation(rotation=qu,lattice=crystal_structure) in_memory = np.uint8(c.IPF_color(np.array(d))*255) in_file = default.read_dataset(loc['color']) assert np.allclose(in_memory,in_file) def test_add_maximum_shear(self,default): default.add_stress_Cauchy('P','F') default.add_maximum_shear('sigma') loc = {'sigma' :default.get_dataset_location('sigma'), 'max_shear(sigma)':default.get_dataset_location('max_shear(sigma)')} in_memory = mechanics.maximum_shear(default.read_dataset(loc['sigma'],0)).reshape(-1,1) in_file = default.read_dataset(loc['max_shear(sigma)'],0) assert np.allclose(in_memory,in_file) def test_add_Mises_strain(self,default): t = ['V','U'][np.random.randint(0,2)] m = np.random.random()*2.0 - 1.0 default.add_strain('F',t,m) label = f'epsilon_{t}^{m}(F)' default.add_equivalent_Mises(label) loc = {label :default.get_dataset_location(label), label+'_vM':default.get_dataset_location(label+'_vM')} in_memory = mechanics.equivalent_strain_Mises(default.read_dataset(loc[label],0)).reshape(-1,1) in_file = default.read_dataset(loc[label+'_vM'],0) assert np.allclose(in_memory,in_file) def test_add_Mises_stress(self,default): default.add_stress_Cauchy('P','F') default.add_equivalent_Mises('sigma') loc = {'sigma' :default.get_dataset_location('sigma'), 'sigma_vM':default.get_dataset_location('sigma_vM')} in_memory = mechanics.equivalent_stress_Mises(default.read_dataset(loc['sigma'],0)).reshape(-1,1) in_file = default.read_dataset(loc['sigma_vM'],0) assert np.allclose(in_memory,in_file) def test_add_Mises_invalid(self,default): default.add_stress_Cauchy('P','F') default.add_calculation('sigma_y','#sigma#',unit='y') default.add_equivalent_Mises('sigma_y') assert default.get_dataset_location('sigma_y_vM') == [] def test_add_Mises_stress_strain(self,default): default.add_stress_Cauchy('P','F') default.add_calculation('sigma_y','#sigma#',unit='y') default.add_calculation('sigma_x','#sigma#',unit='x') default.add_equivalent_Mises('sigma_y',kind='strain') default.add_equivalent_Mises('sigma_x',kind='stress') loc = {'y' :default.get_dataset_location('sigma_y_vM'), 'x' :default.get_dataset_location('sigma_x_vM')} assert not np.allclose(default.read_dataset(loc['y'],0),default.read_dataset(loc['x'],0)) def test_add_norm(self,default): default.add_norm('F',1) loc = {'F': default.get_dataset_location('F'), '|F|_1':default.get_dataset_location('|F|_1')} in_memory = np.linalg.norm(default.read_dataset(loc['F'],0),ord=1,axis=(1,2),keepdims=True) in_file = default.read_dataset(loc['|F|_1'],0) assert np.allclose(in_memory,in_file) def test_add_stress_second_Piola_Kirchhoff(self,default): default.add_stress_second_Piola_Kirchhoff('P','F') loc = {'F':default.get_dataset_location('F'), 'P':default.get_dataset_location('P'), 'S':default.get_dataset_location('S')} in_memory = mechanics.stress_second_Piola_Kirchhoff(default.read_dataset(loc['P'],0), default.read_dataset(loc['F'],0)) in_file = default.read_dataset(loc['S'],0) assert np.allclose(in_memory,in_file) @pytest.mark.skip(reason='requires rework of lattice.f90') @pytest.mark.parametrize('polar',[True,False]) def test_add_pole(self,default,polar): pole = np.array([1.,0.,0.]) default.add_pole('O',pole,polar) loc = {'O': default.get_dataset_location('O'), 'pole': default.get_dataset_location('p^{}_[1 0 0)'.format(u'rφ' if polar else 'xy'))} rot = Rotation(default.read_dataset(loc['O']).view(np.double)) rotated_pole = rot * np.broadcast_to(pole,rot.shape+(3,)) xy = rotated_pole[:,0:2]/(1.+abs(pole[2])) in_memory = xy if not polar else \ np.block([np.sqrt(xy[:,0:1]*xy[:,0:1]+xy[:,1:2]*xy[:,1:2]),np.arctan2(xy[:,1:2],xy[:,0:1])]) in_file = default.read_dataset(loc['pole']) assert np.allclose(in_memory,in_file) def test_add_rotation(self,default): default.add_rotation('F') loc = {'F': default.get_dataset_location('F'), 'R(F)': default.get_dataset_location('R(F)')} in_memory = mechanics.rotation(default.read_dataset(loc['F'],0)).as_matrix() in_file = default.read_dataset(loc['R(F)'],0) assert np.allclose(in_memory,in_file) def test_add_spherical(self,default): default.add_spherical('P') loc = {'P': default.get_dataset_location('P'), 'p_P': default.get_dataset_location('p_P')} in_memory = tensor.spherical(default.read_dataset(loc['P'],0),False).reshape(-1,1) in_file = default.read_dataset(loc['p_P'],0) assert np.allclose(in_memory,in_file) def test_add_strain(self,default): t = ['V','U'][np.random.randint(0,2)] m = np.random.random()*2.0 - 1.0 default.add_strain('F',t,m) label = f'epsilon_{t}^{m}(F)' loc = {'F': default.get_dataset_location('F'), label: default.get_dataset_location(label)} in_memory = mechanics.strain(default.read_dataset(loc['F'],0),t,m) in_file = default.read_dataset(loc[label],0) assert np.allclose(in_memory,in_file) def test_add_stretch_right(self,default): default.add_stretch_tensor('F','U') loc = {'F': default.get_dataset_location('F'), 'U(F)': default.get_dataset_location('U(F)')} in_memory = mechanics.stretch_right(default.read_dataset(loc['F'],0)) in_file = default.read_dataset(loc['U(F)'],0) assert np.allclose(in_memory,in_file) def test_add_stretch_left(self,default): default.add_stretch_tensor('F','V') loc = {'F': default.get_dataset_location('F'), 'V(F)': default.get_dataset_location('V(F)')} in_memory = mechanics.stretch_left(default.read_dataset(loc['F'],0)) in_file = default.read_dataset(loc['V(F)'],0) assert np.allclose(in_memory,in_file) def test_add_invalid(self,default): with pytest.raises(TypeError): default.add_calculation('#invalid#*2') @pytest.mark.parametrize('overwrite',['off','on']) def test_add_overwrite(self,default,overwrite): default.view('times',default.times_in_range(0,np.inf)[-1]) default.add_stress_Cauchy() loc = default.get_dataset_location('sigma') with h5py.File(default.fname,'r') as f: # h5py3 compatibility try: created_first = f[loc[0]].attrs['created'].decode() except AttributeError: created_first = f[loc[0]].attrs['created'] created_first = datetime.strptime(created_first,'%Y-%m-%d %H:%M:%S%z') if overwrite == 'on': default.allow_modification() else: default.disallow_modification() time.sleep(2.) try: default.add_calculation('sigma','#sigma#*0.0+311.','not the Cauchy stress') except ValueError: pass with h5py.File(default.fname,'r') as f: # h5py3 compatibility try: created_second = f[loc[0]].attrs['created'].decode() except AttributeError: created_second = f[loc[0]].attrs['created'] created_second = datetime.strptime(created_second,'%Y-%m-%d %H:%M:%S%z') if overwrite == 'on': assert created_first < created_second and np.allclose(default.read_dataset(loc),311.) else: assert created_first == created_second and not np.allclose(default.read_dataset(loc),311.) @pytest.mark.parametrize('allowed',['off','on']) def test_rename(self,default,allowed): if allowed == 'on': F = default.read_dataset(default.get_dataset_location('F')) default.allow_modification() default.rename('F','new_name') assert np.all(F == default.read_dataset(default.get_dataset_location('new_name'))) default.disallow_modification() with pytest.raises(PermissionError): default.rename('P','another_new_name') @pytest.mark.parametrize('mode',['cell','node']) def test_coordinates(self,default,mode): if mode == 'cell': a = grid_filters.coordinates0_point(default.cells,default.size,default.origin) b = default.coordinates0_point.reshape(tuple(default.cells)+(3,),order='F') elif mode == 'node': a = grid_filters.coordinates0_node(default.cells,default.size,default.origin) b = default.coordinates0_node.reshape(tuple(default.cells+1)+(3,),order='F') assert np.allclose(a,b) # need to wait for writing in parallel, output order might change if select more then one @pytest.mark.parametrize('output',['F','*',['P']],ids=range(3)) @pytest.mark.parametrize('fname',['12grains6x7x8_tensionY.hdf5'],ids=range(1)) @pytest.mark.parametrize('inc',[4,0],ids=range(2)) def test_vtk(self,request,tmp_path,ref_path,update,output,fname,inc): result = Result(ref_path/fname) result.view('increments',inc) os.chdir(tmp_path) result.save_VTK(output) fname = fname.split('.')[0]+f'_inc{(inc if type(inc) == int else inc[0]):0>2}.vtr' last = '' for i in range(10): if os.path.isfile(tmp_path/fname): with open(fname) as f: cur = hashlib.md5(f.read().encode()).hexdigest() if cur == last: break else: last = cur time.sleep(.5) if update: with open((ref_path/'save_VTK'/request.node.name).with_suffix('.md5'),'w') as f: f.write(cur) with open((ref_path/'save_VTK'/request.node.name).with_suffix('.md5')) as f: assert cur == f.read() @pytest.mark.parametrize('mode',['point','cell']) def test_vtk_mode(self,tmp_path,single_phase,mode): os.chdir(tmp_path) single_phase.save_VTK(mode=mode) def test_XDMF(self,tmp_path,single_phase,update,ref_path): for shape in [('scalar',()),('vector',(3,)),('tensor',(3,3)),('matrix',(12,))]: for dtype in ['f4','f8','i1','i2','i4','i8','u1','u2','u4','u8']: single_phase.add_calculation(f'{shape[0]}_{dtype}',f"np.ones(np.shape(#F#)[0:1]+{shape[1]},'{dtype}')") fname = os.path.splitext(os.path.basename(single_phase.fname))[0]+'.xdmf' os.chdir(tmp_path) single_phase.save_XDMF() if update: shutil.copy(tmp_path/fname,ref_path/fname) assert sorted(open(tmp_path/fname).read()) == sorted(open(ref_path/fname).read()) # XML is not ordered def test_XDMF_invalid(self,default): with pytest.raises(TypeError): default.save_XDMF() @pytest.mark.parametrize('view,output,compress,strip', [({},['F','P','F','L_p','F_e','F_p'],True,True), ({'increments':3},'F',True,True), ({'increments':[1,8,3,4,5,6,7]},['F','P'],True,True), ({'phases':['A','B']},['F','P'],True,True), ({'phases':['A','C'],'homogenizations':False},['F','P','O'],True,True), ({'phases':False,'homogenizations':False},['F','P','O'],True,True), ({'phases':False},['Delta_V'],True,True), ({},['u_p','u_n'],False,False)], ids=list(range(8))) def test_read(self,update,request,ref_path,view,output,compress,strip): result = Result(ref_path/'4grains2x4x3_compressionY.hdf5') for key,value in view.items(): result.view(key,value) fname = request.node.name cur = result.read(output,compress,strip) if update: with bz2.BZ2File((ref_path/'read'/fname).with_suffix('.pbz2'),'w') as f: pickle.dump(cur,f) with bz2.BZ2File((ref_path/'read'/fname).with_suffix('.pbz2')) as f: assert dict_equal(cur,pickle.load(f)) @pytest.mark.parametrize('view,output,compress,constituents,strip', [({},['F','P','F','L_p','F_e','F_p'],True,True,None), ({'increments':3},'F',True,True,[0,1,2,3,4,5,6,7]), ({'increments':[1,8,3,4,5,6,7]},['F','P'],True,True,1), ({'phases':['A','B']},['F','P'],True,True,[1,2]), ({'phases':['A','C'],'homogenizations':False},['F','P','O'],True,True,[0,7]), ({'phases':False,'homogenizations':False},['F','P','O'],True,True,[1,2,3,4]), ({'phases':False},['Delta_V'],True,True,[1,2,4]), ({},['u_p','u_n'],False,False,None)], ids=list(range(8))) def test_place(self,update,request,ref_path,view,output,compress,strip,constituents): result = Result(ref_path/'4grains2x4x3_compressionY.hdf5') for key,value in view.items(): result.view(key,value) fname = request.node.name cur = result.place(output,compress,strip,constituents) if update: with bz2.BZ2File((ref_path/'place'/fname).with_suffix('.pbz2'),'w') as f: pickle.dump(cur,f) with bz2.BZ2File((ref_path/'place'/fname).with_suffix('.pbz2')) as f: assert dict_equal(cur,pickle.load(f))