DISPATCH
mpi_io_mod.f90
1 !*******************************************************************************
2 !> Module for handling blocking and non-blocking MPI parallel I/O to a single file.
3 !> The module is initialized by specifying the chunk size (which can be 1- or
4 !> 3-dimensional). A chunk could be just a patch, or a single chunk of data per
5 !> rank.
6 !>
7 !> The module can be used to handle any collection of patches, by using the
8 !> patch%id number to map each patch into the file. If the patch sizes are fixed,
9 !> this is a unique mapping, and if each snapshot is written into a separate file
10 !> this is also sufficiently general. The header part of the file may be written
11 !> to a separate file.
12 !>
13 !> Typical multi-threaded use:
14 !>
15 !> type(mpi_file_t):: file ! holds file info
16 !> type(mpi_io_t):: thread_io ! holds thread info
17 !> !$omp threadprivate (thread_io) ! threadprivate data type
18 !> ...
19 !> call file%openw ('some_file.dat') ! implicit MPI barrier
20 !> call thread_io%use (file) ! all threads use the same file
21 !> ...
22 !> call thread_io%init (snapshot_size, patch_size) ! define offset factors
23 !> call thread_io%write (patch_data, iout1, id1) ! write snapshot iout1, patch id1
24 !> ...
25 !> call thread_io%read (patch_data, iout2, id2) ! read snapshot iout2, patch id2
26 !> ...
27 !> !$omp barrier ! all threads must agree
28 !> call file%close ! implicit MPI barrier
29 !>
30 !*******************************************************************************
31 MODULE mpi_io_mod
32  USE io_mod
33  USE io_unit_mod
34  USE omp_mod
35  USE omp_timer_mod
36  USE trace_mod
37  USE mpi_mod, only: mp=>mpi
38  USE mpi_file_mod
39  USE dll_mod
40 #ifdef MPI
41  USE mpi, only: mpi_offset_kind, mpi_real, mpi_status_size, mpi_thread_multiple
42 #endif MPI
43  implicit none
44  private
45  !-----------------------------------------------------------------------------
46  ! dll_node extension with a request and buffer pointer
47  !-----------------------------------------------------------------------------
48  type, public, extends(dll_node_t):: iwrite_t
49  integer:: req, handle, words
50 #ifdef MPI
51  integer(kind=MPI_OFFSET_KIND):: pos
52 #else
53  integer(8):: pos
54 #endif
55  real, dimension(:,:,:), pointer:: buffer3 => null()
56  real, dimension(:,:,:,:), pointer:: buffer => null()
57  end type
58  !-----------------------------------------------------------------------------
59  ! dll_t extension with a procedure for testing request completion
60  !-----------------------------------------------------------------------------
61  type, public, extends(dll_t):: iwrite_list_t
62  logical:: first_time=.true.
63  logical:: active=.false.
64  integer:: thread=-1
65  contains
66  procedure:: check
67  end type
68  !-----------------------------------------------------------------------------
69  ! MPI I/O data type
70  !-----------------------------------------------------------------------------
71  type, public:: mpi_io_t
72  type(mpi_file_t):: file
73  integer:: req, nwrite=-1
74  integer(8):: rec_words
75  integer:: chunk_words
76  integer:: err
77  real, dimension(:,:,:), allocatable:: buf
78  type(iwrite_list_t), public:: iwrite_list
79 #ifdef MPI
80  integer:: status(mpi_status_size)
81 #else
82  integer:: status
83 #endif
84  contains
85  procedure:: use
86  procedure:: init
87  procedure:: set
88  procedure:: check_init
89  procedure, private:: write3
90  procedure, private:: write4
91  generic, public :: write => write3, write4
92  procedure, private:: iwrite3
93  procedure, private:: iwrite4
94  generic, public :: iwrite => iwrite3, iwrite4
95  procedure, private:: read3
96  procedure, private:: read4
97  generic, public :: read => read3, read4
98  procedure:: assert
99  end type
100  character(len=32), save:: fmt='(1x,a,4i8,1p,5e12.3)'
101  integer, save:: verbose=0
102  logical, save:: direct=.false.
103  type(mpi_io_t), public:: mpi_io
104 CONTAINS
105 
106 !===============================================================================
107 !> Initialize the local size self%lsize, global size self%gsize
108 !===============================================================================
109 SUBROUTINE use (self, file)
110  class(mpi_io_t):: self
111  type(mpi_file_t):: file
112  !-----------------------------------------------------------------------------
113  call trace%begin ('mpi_io_t%use')
114  self%file = file
115  call trace%end()
116 END SUBROUTINE use
117 
118 !===============================================================================
119 !> Initialize the local size self%lsize, global size self%gsize
120 !===============================================================================
121 SUBROUTINE init (self)
122  class(mpi_io_t):: self
123  integer:: iostat
124  logical, save:: first_time=.true.
125  namelist /mpi_io_params/ verbose, direct
126  !-----------------------------------------------------------------------------
127  call trace%begin ('mpi_io_t%init')
128  !-----------------------------------------------------------------------------
129  ! Note that when this is called from mpi_io_t%init(), it is already in a
130  ! critical (input_cr) region, so must either use a different name, or no
131  ! critical region
132  !-----------------------------------------------------------------------------
133  !$omp critical (mpi_io_cr)
134  if (first_time) then
135  first_time = .false.
136  rewind(io%input)
137  read (io%input, mpi_io_params, iostat=iostat)
138  if (io%master) write (io%output, mpi_io_params)
139  end if
140  !$omp end critical (mpi_io_cr)
141  !$omp critical (iwrite_cr)
142  if (self%iwrite_list%first_time) then
143  self%iwrite_list%first_time = .false.
144  call self%iwrite_list%init ('iwrite_list')
145  if (io%master) &
146  print *, 'mpi_io_t%init done'
147  end if
148  !$omp end critical (iwrite_cr)
149  call trace%end()
150 END SUBROUTINE init
151 
152 !===============================================================================
153 !> Initialize the local size self%lsize, global size self%gsize
154 !===============================================================================
155 SUBROUTINE set (self, rec_words, chunk_words, nwrite)
156  class(mpi_io_t):: self
157  integer(8), optional:: rec_words, chunk_words
158  integer, optional:: nwrite
159  !-----------------------------------------------------------------------------
160  call trace%begin ('mpi_io_t%set')
161  if (present(rec_words)) then
162  self%rec_words = rec_words
163  if (verbose > 2) print *, 'mpi_io_t%init: setting rec_words =', rec_words
164  end if
165  if (present(chunk_words)) then
166  self%chunk_words = chunk_words
167  if (verbose > 2) print *, 'mpi_io_t%init: setting chunk_words =', chunk_words
168  end if
169  if (present(nwrite)) then
170  mpi_io%nwrite = nwrite
171  if (verbose > 2) print *, 'mpi_io_t%init: setting nwrite =', nwrite
172  end if
173  call trace%end()
174 END SUBROUTINE set
175 
176 !===============================================================================
177 !> Initialize the local size self%lsize, global size self%gsize
178 !===============================================================================
179 SUBROUTINE check_init (self)
180  class(mpi_io_t):: self
181  !-----------------------------------------------------------------------------
182  call self%assert ('mpi_io_t:rec_words is not positive', err=self%rec_words <= 0)
183  call self%assert ('mpi_io_t:chunk_words is not positive', err=self%chunk_words <= 0)
184 END SUBROUTINE check_init
185 
186 !===============================================================================
187 !> Write a chunk (self%lsize) out to the open file
188 !===============================================================================
189 SUBROUTINE write3 (self, f, rec, id)
190  class(mpi_io_t):: self
191  real(kind=4), dimension(:,:,:):: f
192  integer:: rec, id
193 #ifdef MPI
194  integer(kind=MPI_OFFSET_KIND):: pos
195  !-----------------------------------------------------------------------------
196  call trace%begin ('mpi_io_t%write')
197  call self%check_init
198  pos = 4_8*((rec-1_8)*self%rec_words + (id-1_8)*self%chunk_words)
199  if (verbose>1) &
200  write (io_unit%output, '(a,3i8,i15,1p,2e14.6)') ' mpi_io_t%write3: rec, id, words, pos =', &
201  rec, id, self%chunk_words, pos, minval(f), maxval(f)
202  if (mp%mode == mpi_thread_multiple) then
203  call mpi_file_write_at (self%file%handle, pos, f, self%chunk_words, mpi_real, &
204  self%status, self%err)
205  else
206  !$omp critical (mpi_cr)
207  call mpi_file_write_at (self%file%handle, pos, f, self%chunk_words, mpi_real, &
208  self%status, self%err)
209  !$omp end critical (mpi_cr)
210  end if
211  call self%assert ('mpi_io_mod::write MPI_File_write_at '//trim(self%file%filename))
212  call trace%end()
213 #endif
214 END SUBROUTINE write3
215 
216 !===============================================================================
217 !> Write a chunk (self%lsize) out to the open file
218 !===============================================================================
219 SUBROUTINE write4 (self, f, rec, id)
220  class(mpi_io_t):: self
221  real(kind=4), dimension(:,:,:,:):: f
222  integer:: rec, id
223 #ifdef MPI
224  integer(kind=MPI_OFFSET_KIND):: pos
225  !-----------------------------------------------------------------------------
226  call trace%begin ('mpi_io_t%write')
227  call self%check_init
228  pos = 4_8*((rec-1_8)*self%rec_words + (id-1_8)*self%chunk_words)
229  if (verbose > 1) &
230  write (io_unit%output, '(a,2i6,i12,i15,1p,2e14.6)') &
231  ' mpi_io_t%write4: rec, id, words, pos, min/max =', &
232  rec, id, self%chunk_words, pos, minval(f), maxval(f)
233  if (verbose > 0) then
234  write(io_unit%mpi,'(g12.5,i4,2x,a)') wallclock(), omp%thread, &
235  'mpi_io_t%write4: writing to '//trim(self%file%filename)
236  flush (io_unit%mpi)
237  end if
238  if (mp%mode == mpi_thread_multiple) then
239  call mpi_file_write_at (self%file%handle, pos, f, self%chunk_words, mpi_real, &
240  self%status, self%err)
241  else
242  !$omp critical (mpi_cr)
243  call mpi_file_write_at (self%file%handle, pos, f, self%chunk_words, mpi_real, &
244  self%status, self%err)
245  !$omp end critical (mpi_cr)
246  end if
247  call self%assert ('mpi_io_mod::write MPI_File_write_at '//trim(self%file%filename))
248  call trace%end()
249 #endif
250 END SUBROUTINE write4
251 
252 !===============================================================================
253 !> Write a chunk (self%lsize) out to the open file
254 !===============================================================================
255 SUBROUTINE iwrite3 (self, f, rec, id)
256  class(mpi_io_t):: self
257  real(kind=4), dimension(:,:,:), pointer:: f
258  integer:: rec, id, unit
259 #ifdef MPI
260  integer(kind=MPI_OFFSET_KIND):: pos
261  class(iwrite_t), pointer:: item
262  class(dll_node_t), pointer:: node
263  integer:: n
264  integer, save:: itimer=0
265  !-----------------------------------------------------------------------------
266  call trace%begin ('mpi_io_t%iwrite', itimer=itimer)
267  call self%check_init
268  pos = 4_8*((rec-1_8)*self%rec_words + (id-1_8)*self%chunk_words)
269  if (verbose > 1) then
270  if (verbose > 2) then
271  unit = io_unit%output
272  else
273  unit = io_unit%mpi
274  end if
275  write (unit,'(a,3i8,i15,1p,2e14.6)') &
276  ' mpi_io_t%read3: rec, id, words, pos =', &
277  rec, id, self%chunk_words, pos, minval(f), maxval(f)
278  end if
279  !-----------------------------------------------------------------------------
280  ! Add details we need in order to check for buffers to deallocate
281  !-----------------------------------------------------------------------------
282  allocate (item)
283  item%handle = self%file%handle
284  item%words = self%chunk_words
285  item%pos = pos
286  item%req = id
287  item%buffer3 => f
288  !$omp critical (iwrite_cr)
289  node => item
290  call self%iwrite_list%append (node)
291  self%iwrite_list%active = .true.
292  n = self%iwrite_list%n
293  !$omp end critical (iwrite_cr)
294  if (verbose>1) &
295  write(io%output,'(f12.6,i5,2x,a,i7,i4)') &
296  wallclock(), omp%thread, 'appended req, n =', item%req, n
297  call trace%end (itimer)
298 #endif
299 END SUBROUTINE iwrite3
300 
301 !===============================================================================
302 !> Write a chunk (self%lsize) out to the open file
303 !===============================================================================
304 SUBROUTINE iwrite4 (self, f, rec, id)
305  class(mpi_io_t):: self
306  real(kind=4), dimension(:,:,:,:), pointer:: f
307  integer:: rec, id, unit
308 #ifdef MPI
309  integer(kind=MPI_OFFSET_KIND):: pos
310  class(iwrite_t), pointer:: item
311  class(dll_node_t), pointer:: node
312  integer:: n
313  integer, save:: itimer=0
314  !-----------------------------------------------------------------------------
315  call trace%begin ('mpi_io_t%iwrite', itimer=itimer)
316  call self%check_init
317  pos = 4_8*((rec-1_8)*self%rec_words + (id-1_8)*self%chunk_words)
318  if (verbose > 1) then
319  if (verbose > 2) then
320  unit = io_unit%output
321  else
322  unit = io_unit%mpi
323  end if
324  write (unit,'(a,3i8,i15,1p,2e14.6)') &
325  ' mpi_io_t%read3: rec, id, words, pos =', &
326  rec, id, self%chunk_words, pos, minval(f), maxval(f)
327  end if
328  !-----------------------------------------------------------------------------
329  ! Add details we need in order to check for buffers to deallocate
330  !-----------------------------------------------------------------------------
331  allocate (item)
332  item%handle = self%file%handle
333  item%words = self%chunk_words
334  item%pos = pos
335  item%req = id
336  item%buffer => f
337  !$omp critical (iwrite_cr)
338  node => item
339  call self%iwrite_list%append (node)
340  self%iwrite_list%active = .true.
341  n = self%iwrite_list%n
342  !$omp end critical (iwrite_cr)
343  if (verbose>1) &
344  write(io%output,'(f12.6,i5,2x,a,i7,i4)') &
345  wallclock(), omp%thread, 'appended req, n =', item%req, n
346  call trace%end (itimer)
347 #endif
348 END SUBROUTINE iwrite4
349 
350 !===============================================================================
351 !> Read a chunk (self%lsize) from the open file
352 !===============================================================================
353 SUBROUTINE read3 (self, f, rec, id)
354  class(mpi_io_t):: self
355  real(kind=4), dimension(:,:,:):: f
356  integer rec, id, unit
357 #ifdef MPI
358  integer(kind=MPI_OFFSET_KIND):: pos
359  !-----------------------------------------------------------------------------
360  if (rec < 1) &
361  call io%abort ('mpi_io_t%read4: illegal record number')
362  call trace%begin ('mpi_io_t%read3')
363  call self%check_init
364  pos = 4_8*((rec-1_8)*self%rec_words + (id-1_8)*self%chunk_words)
365  if (mp%mode == mpi_thread_multiple) then
366  call mpi_file_read_at (self%file%handle, pos, f, self%chunk_words, mpi_real, &
367  self%status, self%err)
368  else
369  !$omp critical (mpi_cr)
370  call mpi_file_read_at (self%file%handle, pos, f, self%chunk_words, mpi_real, &
371  self%status, self%err)
372  !$omp end critical (mpi_cr)
373  end if
374  if (verbose > 1) then
375  if (verbose > 2) then
376  unit = io_unit%output
377  else
378  unit = io_unit%mpi
379  end if
380  write (unit,'(a,3i8,i15,1p,2e14.6)') &
381  ' mpi_io_t%read3: rec, id, words, pos =', &
382  rec, id, self%chunk_words, pos, minval(f), maxval(f)
383  end if
384  call trace%end()
385 #endif
386 END SUBROUTINE read3
387 
388 !===============================================================================
389 !> Read a chunk (self%lsize) from the open file
390 !===============================================================================
391 SUBROUTINE read4 (self, f, rec, id)
392  class(mpi_io_t):: self
393  real(kind=4), dimension(:,:,:,:):: f
394  integer rec, id, unit
395 #ifdef MPI
396  integer(kind=MPI_OFFSET_KIND):: pos
397  !-----------------------------------------------------------------------------
398  if (rec < 1) &
399  call io%abort ('mpi_io_t%read4: illegal record number')
400  call trace%begin ('mpi_io_t%read4')
401  call self%check_init
402  pos = 4_8*((rec-1_8)*self%rec_words + (id-1_8)*self%chunk_words)
403  if (mp%mode == mpi_thread_multiple) then
404  call mpi_file_read_at (self%file%handle, pos, f, self%chunk_words, mpi_real, &
405  self%status, self%err)
406  else
407  !$omp critical (mpi_cr)
408  call mpi_file_read_at (self%file%handle, pos, f, self%chunk_words, mpi_real, &
409  self%status, self%err)
410  !$omp end critical (mpi_cr)
411  end if
412  if (verbose > 1) then
413  if (verbose > 2) then
414  unit = io_unit%output
415  else
416  unit = io_unit%mpi
417  end if
418  write (unit,'(a,3i8,i15,1p,2e14.6)') &
419  ' mpi_io_t%read4: rec, id, words, pos =', &
420  rec, id, self%chunk_words, pos, minval(f), maxval(f)
421  end if
422  call trace%end()
423 #endif
424 END SUBROUTINE read4
425 
426 !===============================================================================
427 !> Assert no error
428 !===============================================================================
429 SUBROUTINE assert (self, label, err)
430  class(mpi_io_t):: self
431  character(len=*):: label
432  logical, optional:: err
433  !-----------------------------------------------------------------------------
434  if (present(err)) then
435  if (err) then
436  call mp%abort (label)
437  end if
438  else if (self%err /= 0) then
439  print*,mp%rank,omp%thread,' error =',self%err
440  call mp%abort (label)
441  end if
442 END SUBROUTINE assert
443 
444 !===============================================================================
445 !> Check for queued I/O, and do it all with one single thread, to avoid
446 !> slowing down the other threads. Other threads are free to append more
447 !> items at the end of the iwrite_list in the mean time -- this does not
448 !> disturb the thread doing I/O.
449 !===============================================================================
450 SUBROUTINE check (self)
451  class(iwrite_list_t):: self
452  class(dll_node_t), pointer:: node, next
453  class(iwrite_t), pointer:: item
454 #ifdef MPI
455  integer:: status(mpi_status_size), err
456 #else
457  integer:: status(8), err
458  integer:: mpi_real=1
459 #endif
460  integer, save:: itimer=0, nprint=3
461  integer:: done, omp_get_thread_num, rec
462  logical:: ok
463  real(8):: start, wc, dwc
464  !-----------------------------------------------------------------------------
465  ! Most threads will exit immediately here
466  !-----------------------------------------------------------------------------
467  if (.not.io%do_output .or. .not.self%active) &
468  return
469  if (self%n < mpi_io%nwrite .or. self%thread >=0) then
470  if (verbose > 1 .and. self%thread >= 0) &
471  write(io%output,'(f12.6,i5,2x,a,i7,i4)') &
472  wallclock(), omp%thread, ' returning immediately', self%n, self%thread
473  return
474  end if
475  call trace%begin ('iwrite_list_t%check', itimer=itimer)
476  !-----------------------------------------------------------------------------
477  ! More than one thread may succeed getting to here, so we pick the first of
478  ! them in a critical region -- the rest will find ok to be false and exit
479  !-----------------------------------------------------------------------------
480  !$omp critical (check_cr)
481  ok = (self%thread == -1)
482  if (ok) &
483  self%thread = omp_get_thread_num()
484  !$omp end critical (check_cr)
485  !-----------------------------------------------------------------------------
486  ! The thread that wins the race starts writing out data, while also allowing
487  ! more buffers to be appended simultaneously
488  !-----------------------------------------------------------------------------
489  if (ok) then
490  if (verbose > 0 .and. .not.io_unit%do_validate) &
491  write(io%output,'(1p,g14.6,i6,3x,a,i6)') &
492  wallclock(), self%thread, 'I/O start, n =', self%n
493  start = wallclock()
494  done = 0
495  !!omp critical (iwrite_cr)
496  node => self%head
497  !!omp end critical (iwrite_cr)
498  do while (self%n > 0 .and. associated(node))
499  err = 0
500  select type (node)
501  class is (iwrite_t)
502  item => node
503  if (associated(item%buffer) .or. associated(item%buffer3)) then
504  wc = wallclock()
505  if (direct) then
506  rec = 1 + item%pos/item%words/4
507  write (io_unit%direct, rec=rec) item%buffer
508  deallocate (item%buffer)
509  else if (associated(item%buffer3)) then
510 #ifdef MPI
511  if (mp%mode == mpi_thread_multiple) then
512  call mpi_file_write_at (item%handle, item%pos, item%buffer3, &
513  item%words, mpi_real, status, err)
514  else
515  !$omp critical (mpi_cr)
516  call mpi_file_write_at (item%handle, item%pos, item%buffer3, &
517  item%words, mpi_real, status, err)
518  !$omp end critical (mpi_cr)
519  end if
520 #endif
521  call io%assert (err==0, 'MPI_File_write_at: error code non-zero')
522  deallocate (item%buffer3)
523  nullify (item%buffer3)
524  else
525 #ifdef MPI
526  if (mp%mode == mpi_thread_multiple) then
527  call mpi_file_write_at (item%handle, item%pos, item%buffer, &
528  item%words, mpi_real, status, err)
529  else
530  !$omp critical (mpi_cr)
531  call mpi_file_write_at (item%handle, item%pos, item%buffer, &
532  item%words, mpi_real, status, err)
533  !$omp end critical (mpi_cr)
534  end if
535 #endif
536  call io%assert (err==0, 'MPI_File_write_at: error code non-zero')
537  deallocate (item%buffer)
538  end if
539  nullify (item%buffer)
540  !---------------------------------------------------------------------
541  if (verbose > 0 .and. nprint > 0 .and. .not.io_unit%do_validate) then
542  nprint = nprint-1
543  dwc = wallclock()-wc
544  if (direct) then
545  write(io%output,'(10x,a,f12.6)') 'direct access write time:', dwc
546  else
547  write(io%output,'(10x,a,f12.6)') 'MPI_File_write_at time:', dwc
548  end if
549  end if
550  !---------------------------------------------------------------------
551  ! Spin for similar time, to yield to other MPI processes
552  !---------------------------------------------------------------------
553  !wc = wallclock()
554  !do while (wallclock()-wc < dwc)
555  !end do
556  else
557  write(io%output,*) &
558  self%thread, 'buffer not associated on req', item%req
559  end if
560  end select
561  done = done+1
562  if (.not.io_unit%do_validate) then
563  if (verbose > 1) &
564  write(io%output,'(1p,g14.6,i6,3x,a,2i7)') &
565  wallclock(), self%thread, 'I/O req =', item%req, done
566  if (verbose > 0 .and. mod(done,1000) == 0) &
567  write(io%output,'(1p,g14.6,i6,3x,a,i7)') &
568  wallclock(), self%thread, 'I/O done =', done
569  end if
570  !!omp critical (iwrite_cr)
571  next => node%next
572  call self%remove (node)
573  deallocate (node)
574  node => next
575  !!omp end critical (iwrite_cr)
576  end do
577  if (verbose > 0 .and. .not.io_unit%do_validate) &
578  write(io%output,'(1p,g14.6,i6,3x,a,i6,0p,f12.3," s")') &
579  wallclock(), self%thread, 'I/O final, n =', done, wallclock()-start
580  !!omp critical (iwrite_cr)
581  self%thread = -1
582  !!omp end critical (iwrite_cr)
583  else
584  if (verbose > 0) &
585  write(io%output,'(1p,g14.6,i6,3x,a,i7)') wallclock(), omp_get_thread_num(), 'I/O skip'
586  end if
587  call trace%end (itimer)
588 END SUBROUTINE check
589 
590 END MODULE mpi_io_mod
Module for handling blocking and non-blocking MPI parallel I/O to a single file. The module is initia...
Definition: mpi_io_mod.f90:31
Support tic/toc timing, as in MATLAB, and accurate wallclock() function. The timing is generally much...
Module for handling blocking and non-blocking MPI parallel I/O to a single file.
Definition: mpi_file_mod.f90:4
Doubly linked list (DLL), carrying anything, as simply as possible.
Definition: dll_mod.f90:4
Definition: io_mod.f90:4