MPI has a 2GB limit for writing at once, now chunking

This commit is contained in:
Martin Diehl 2015-08-21 17:51:05 +00:00
parent 00b8660203
commit 1171dc4344
1 changed files with 61 additions and 49 deletions

View File

@ -13,6 +13,7 @@ program DAMASK_spectral_Driver
iso_fortran_env ! to get compiler_version and compiler_options (at least for gfortran >4.6 at the moment) iso_fortran_env ! to get compiler_version and compiler_options (at least for gfortran >4.6 at the moment)
use prec, only: & use prec, only: &
pInt, & pInt, &
pLongInt, &
pReal, & pReal, &
tol_math_check tol_math_check
use DAMASK_interface, only: & use DAMASK_interface, only: &
@ -117,7 +118,8 @@ program DAMASK_spectral_Driver
timeIncOld = 0.0_pReal, & !< previous time interval timeIncOld = 0.0_pReal, & !< previous time interval
remainingLoadCaseTime = 0.0_pReal !< remaining time of current load case remainingLoadCaseTime = 0.0_pReal !< remaining time of current load case
logical :: & logical :: &
guess !< guess along former trajectory guess, & !< guess along former trajectory
stagIterate
integer(pInt) :: & integer(pInt) :: &
i, j, k, l, field, & i, j, k, l, field, &
errorID, & errorID, &
@ -131,17 +133,26 @@ program DAMASK_spectral_Driver
notConvergedCounter = 0_pInt, & !< # of non-converged increments notConvergedCounter = 0_pInt, & !< # of non-converged increments
resUnit = 0_pInt, & !< file unit for results writing resUnit = 0_pInt, & !< file unit for results writing
statUnit = 0_pInt, & !< file unit for statistics output statUnit = 0_pInt, & !< file unit for statistics output
lastRestartWritten = 0_pInt !< total increment # at which last restart information was written lastRestartWritten = 0_pInt, & !< total increment # at which last restart information was written
stagIter
character(len=6) :: loadcase_string character(len=6) :: loadcase_string
character(len=1024) :: incInfo !< string parsed to solution with information about current load case character(len=1024) :: incInfo !< string parsed to solution with information about current load case
type(tLoadCase), allocatable, dimension(:) :: loadCases !< array of all load cases type(tLoadCase), allocatable, dimension(:) :: loadCases !< array of all load cases
type(tSolutionState), allocatable, dimension(:) :: solres type(tSolutionState), allocatable, dimension(:) :: solres
integer(kind=MPI_OFFSET_KIND) :: my_offset integer(MPI_OFFSET_KIND) :: fileOffset
integer, dimension(:), allocatable :: outputSize integer(MPI_OFFSET_KIND), dimension(:), allocatable :: outputSize
integer(pInt) :: stagIter integer(pInt), parameter :: maxByteOut = 2147483647-4096 !< limit of one file output write https://trac.mpich.org/projects/mpich/ticket/1742
logical :: stagIterate integer(pLongInt), dimension(2) :: outputIndex
PetscErrorCode :: ierr PetscErrorCode :: ierr
external :: quit external :: &
quit, &
MPI_file_open, &
MPI_file_close, &
MPI_file_seek, &
MPI_file_get_position, &
MPI_file_write, &
MPI_allreduce
!-------------------------------------------------------------------------------------------------- !--------------------------------------------------------------------------------------------------
! init DAMASK (all modules) ! init DAMASK (all modules)
call CPFEM_initAll(el = 1_pInt, ip = 1_pInt) call CPFEM_initAll(el = 1_pInt, ip = 1_pInt)
@ -375,10 +386,8 @@ program DAMASK_spectral_Driver
!-------------------------------------------------------------------------------------------------- !--------------------------------------------------------------------------------------------------
! write header of output file ! write header of output file
allocate(outputSize(worldsize), source = 0_pInt); outputSize(worldrank+1) = size(materialpoint_results)*8 if (worldrank == 0) then
call MPI_Allreduce(MPI_IN_PLACE,outputSize,worldsize,MPI_INT,MPI_SUM,PETSC_COMM_WORLD,ierr) if (.not. appendToOutFile) then ! after restart, append to existing results file
if (.not. appendToOutFile) then ! after restart, append to existing results file
if (worldrank == 0) then
open(newunit=resUnit,file=trim(getSolverWorkingDirectoryName())//trim(getSolverJobName())//& open(newunit=resUnit,file=trim(getSolverWorkingDirectoryName())//trim(getSolverJobName())//&
'.spectralOut',form='UNFORMATTED',status='REPLACE') '.spectralOut',form='UNFORMATTED',status='REPLACE')
write(resUnit) 'load:', trim(loadCaseFile) ! ... and write header write(resUnit) 'load:', trim(loadCaseFile) ! ... and write header
@ -395,47 +404,44 @@ program DAMASK_spectral_Driver
write(resUnit) 'startingIncrement:', restartInc - 1_pInt ! start with writing out the previous inc write(resUnit) 'startingIncrement:', restartInc - 1_pInt ! start with writing out the previous inc
write(resUnit) 'eoh' write(resUnit) 'eoh'
close(resUnit) ! end of header close(resUnit) ! end of header
endif
call MPI_File_open(PETSC_COMM_WORLD, &
trim(getSolverWorkingDirectoryName())//trim(getSolverJobName())//'.spectralOut', &
MPI_MODE_WRONLY + MPI_MODE_APPEND, &
MPI_INFO_NULL, &
resUnit, &
ierr)
call MPI_File_get_position(resUnit,my_offset,ierr)
my_offset = my_offset + sum(outputSize(1:worldrank))
call MPI_File_seek (resUnit,my_offset,MPI_SEEK_SET,ierr)
call MPI_File_write(resUnit, materialpoint_results, size(materialpoint_results), &
MPI_DOUBLE, MPI_STATUS_IGNORE, ierr)
my_offset = my_offset + sum(outputSize)
call MPI_File_seek (resUnit,my_offset,MPI_SEEK_SET,ierr)
else
call MPI_File_open(PETSC_COMM_WORLD, &
trim(getSolverWorkingDirectoryName())//trim(getSolverJobName())//'.spectralOut', &
MPI_MODE_WRONLY + MPI_MODE_APPEND, &
MPI_INFO_NULL, &
resUnit, &
ierr)
call MPI_File_get_position(resUnit,my_offset,ierr)
my_offset = my_offset + sum(outputSize(1:worldrank))
call MPI_File_seek (resUnit,my_offset,MPI_SEEK_SET,ierr)
endif
if (iand(debug_level(debug_spectral),debug_levelBasic) /= 0 .and. worldrank == 0_pInt) &
write(6,'(/,a)') ' header of result file written out'
flush(6)
if (worldrank == 0) then
if (appendToOutFile) then ! after restart, append to existing results file
open(newunit=statUnit,file=trim(getSolverWorkingDirectoryName())//trim(getSolverJobName())//&
'.sta',form='FORMATTED', position='APPEND', status='OLD')
else ! open new files ...
open(newunit=statUnit,file=trim(getSolverWorkingDirectoryName())//trim(getSolverJobName())//& open(newunit=statUnit,file=trim(getSolverWorkingDirectoryName())//trim(getSolverJobName())//&
'.sta',form='FORMATTED',status='REPLACE') '.sta',form='FORMATTED',status='REPLACE')
write(statUnit,'(a)') 'Increment Time CutbackLevel Converged IterationsNeeded' ! statistics file write(statUnit,'(a)') 'Increment Time CutbackLevel Converged IterationsNeeded' ! statistics file
if (iand(debug_level(debug_spectral),debug_levelBasic) /= 0) &
write(6,'(/,a)') ' header of result and statistics file written out'
flush(6)
else ! open new files ...
open(newunit=statUnit,file=trim(getSolverWorkingDirectoryName())//trim(getSolverJobName())//&
'.sta',form='FORMATTED', position='APPEND', status='OLD')
endif endif
endif endif
!-------------------------------------------------------------------------------------------------- !--------------------------------------------------------------------------------------------------
! prepare MPI parallel out (including opening of file)
allocate(outputSize(worldsize), source = 0_MPI_OFFSET_KIND)
outputSize(worldrank+1) = int(size(materialpoint_results)*pReal,MPI_OFFSET_KIND)
call MPI_allreduce(MPI_IN_PLACE,outputSize,worldsize,MPI_INT,MPI_SUM,PETSC_COMM_WORLD,ierr) ! get total output size over each process
call MPI_file_open(PETSC_COMM_WORLD, &
trim(getSolverWorkingDirectoryName())//trim(getSolverJobName())//'.spectralOut', &
MPI_MODE_WRONLY + MPI_MODE_APPEND, &
MPI_INFO_NULL, &
resUnit, &
ierr)
call MPI_file_get_position(resUnit,fileOffset,ierr) ! get offset from header
fileOffset = fileOffset + sum(outputSize(1:worldrank)) ! offset of my process in file (header + processes before me)
call MPI_file_seek (resUnit,fileOffset,MPI_SEEK_SET,ierr)
if (.not. appendToOutFile) then ! if not restarting, write 0th increment
do i=1, size(materialpoint_results,3)/(maxByteOut/(materialpoint_sizeResults*pReal))+1 ! slice the output of my process in chunks not exceeding the limit for one output
outputIndex=[(i-1)*((maxByteOut/pReal)/materialpoint_sizeResults)+1, &
min(i*((maxByteOut/pReal)/materialpoint_sizeResults),size(materialpoint_results,3))]
call MPI_file_write(resUnit,reshape(materialpoint_results(:,:,outputIndex(1):outputIndex(2)),&
[(outputIndex(2)-outputIndex(1)+1)*materialpoint_sizeResults]), &
(outputIndex(2)-outputIndex(1)+1)*materialpoint_sizeResults,&
MPI_DOUBLE, MPI_STATUS_IGNORE, ierr)
enddo
endif
!--------------------------------------------------------------------------------------------------
! loopping over loadcases ! loopping over loadcases
loadCaseLooping: do currentLoadCase = 1_pInt, size(loadCases) loadCaseLooping: do currentLoadCase = 1_pInt, size(loadCases)
time0 = time ! currentLoadCase start time time0 = time ! currentLoadCase start time
@ -628,10 +634,16 @@ program DAMASK_spectral_Driver
if (mod(inc,loadCases(currentLoadCase)%outputFrequency) == 0_pInt) then ! at output frequency if (mod(inc,loadCases(currentLoadCase)%outputFrequency) == 0_pInt) then ! at output frequency
if (worldrank == 0) & if (worldrank == 0) &
write(6,'(1/,a)') ' ... writing results to file ......................................' write(6,'(1/,a)') ' ... writing results to file ......................................'
call MPI_File_write(resUnit, materialpoint_results, size(materialpoint_results), & fileOffset = fileOffset + sum(outputSize) ! forward to current file position
MPI_DOUBLE, MPI_STATUS_IGNORE, ierr) call MPI_file_seek (resUnit,fileOffset,MPI_SEEK_SET,ierr)
my_offset = my_offset + sum(outputSize) do i=1, size(materialpoint_results,3)/(maxByteOut/(materialpoint_sizeResults*pReal))+1 ! slice the output of my process in chunks not exceeding the limit for one output
call MPI_File_seek (resUnit,my_offset,MPI_SEEK_SET,ierr) outputIndex=[(i-1)*maxByteOut/pReal/materialpoint_sizeResults+1, &
min(i*maxByteOut/pReal/materialpoint_sizeResults,size(materialpoint_results,3))]
call MPI_file_write(resUnit,reshape(materialpoint_results(:,:,outputIndex(1):outputIndex(2)),&
[(outputIndex(2)-outputIndex(1)+1)*materialpoint_sizeResults]), &
(outputIndex(2)-outputIndex(1)+1)*materialpoint_sizeResults,&
MPI_DOUBLE, MPI_STATUS_IGNORE, ierr)
enddo
endif endif
if( loadCases(currentLoadCase)%restartFrequency > 0_pInt .and. & ! at frequency of writing restart information set restart parameter for FEsolving if( loadCases(currentLoadCase)%restartFrequency > 0_pInt .and. & ! at frequency of writing restart information set restart parameter for FEsolving
mod(inc,loadCases(currentLoadCase)%restartFrequency) == 0_pInt) then ! first call to CPFEM_general will write? mod(inc,loadCases(currentLoadCase)%restartFrequency) == 0_pInt) then ! first call to CPFEM_general will write?