DISPATCH
mpi_mesg_mod.f90
1 !===============================================================================
2 !> $Id: eb4a832e49f174279be5680c7a92f2966eb4efa7 $
3 !===============================================================================
5  USE trace_mod
6  USE mpi_mod
7  USE io_mod
8  USE io_unit_mod
9  USE omp_mod
10  USE omp_timer_mod
11  USE timer_mod
12  implicit none
13  private
14 #ifdef MPI
15  include "mpif.h"
16 #endif
17  !
18  type, public:: mesg_t
19  class(mesg_t), pointer:: next => null()
20  class(mesg_t), pointer:: prev => null()
21  integer, dimension(:), pointer:: buffer
22  integer, pointer:: reqs(:) => null()
23  integer:: nbuf, nreq, req=0, sender, id, ntry=0
24  integer:: n_failed=0
25  integer:: seq=0
26  integer:: tag=2**30
27  real(8):: test_next=0d0
28  real:: test_time=0.0
29  contains
30  procedure:: send
31  procedure:: recv
32  procedure:: test_all
33  procedure:: wait_all
34  procedure:: wait_for_completion
35  procedure:: is_complete
36  procedure, private:: get_id
37  procedure:: completed
38  procedure:: all_completed
39  procedure:: irecv
40  procedure:: is_in_order
41  end type
42  integer, dimension(:), allocatable, save:: expected
43  integer(8) :: n_wait_all=0 , n_wait_for=0
44  real(8) :: t_wait_all=0d0, t_wait_for=0d0
45  !
46  type, public:: mesg_list_t
47  class(mesg_t), pointer:: head => null()
48  class(mesg_t), pointer:: tail => null()
49  character(len=16):: name='mesg_list'
50  integer:: n=0
51  integer:: id=0
52  contains
53  procedure:: add
54  procedure:: remove_completed
55  procedure:: remove
56  procedure:: delete
57  procedure:: reset
58  procedure:: check_sent
59  procedure:: print => print_list
60  procedure:: count
61  end type
62  !-----------------------------------------------------------------------------
63  ! Data type collecting parameters and message lists into one object
64  !-----------------------------------------------------------------------------
65  type, public:: mpi_mesg_t
66  logical:: initialized=.false.
67  type(mesg_list_t):: sent_list, recv_list, unpk_list
68  integer:: n_check=0
69  integer:: n_ready=0
70  integer:: n_update=0
71  integer:: n_send=0
72  integer:: n_recv=0
73  integer:: n_delay=0
74  integer:: n_unpk=0
75  integer:: nq_send=0
76  integer:: nq_recv=0
77  integer:: max_recv
78  integer:: max_probe
79  integer:: min_nq
80  integer:: tag_type=0
81  integer:: nbuf=0
82  logical:: recv_wait=.false.
83  logical:: send_priv=.true.
84  logical:: debug=.false.
85  logical:: uniq_mesg=.true.
86  real:: delay_ms=0.0
87  real:: test_time=0.0
88  contains
89  procedure:: init
90  procedure:: get
91  procedure:: iget
92  procedure:: sent
93  procedure:: delay
94  procedure:: diagnostics
95  procedure:: test
96  procedure:: log_files
97  end type
98  type(mpi_mesg_t), public:: mpi_mesg
99  type(mesg_list_t), save, public:: unpk_list, sent_list, recv_list
100  !$omp threadprivate (unpk_list,sent_list,recv_list)
101  real, save:: delay_ms=0.0 ! allow automatic setting
102  integer, save:: min_nq=10 ! min ready q size for MPI_Test
103  integer, save:: max_sent=10 ! max outstanding patch sends
104  integer, save:: max_recv=100 ! max outstanding patch recvs
105  integer, save:: max_probe=10 ! max loop count in MPI_Probe
106  integer, save:: every_send=2 ! how often to check send_list
107  integer, save:: every_recv=1 ! how often to check recv_list
108  logical, save:: recv_wait=.false. ! if true, always use MPI_Wait
109  logical, save:: send_wait=.false. ! if true, wait in OMP task
110  integer, save:: id=0
111  integer, save:: verbose=0
112  logical, save:: detailed_timer=.false.
113 CONTAINS
114 
115 !===============================================================================
116 !> Send a buffer, already prepared with size and id tag, returning a request id
117 !===============================================================================
118 SUBROUTINE send (self, rank, tag)
119  class(mesg_t):: self
120  integer, optional:: tag
121  integer:: rank, req, ierr,ltag
122  !.............................................................................
123  call trace%begin ('mesg_t%send')
124  if (present(tag)) then
125  ltag = tag
126  else
127  ltag = self%id
128  end if
129  self%tag = ltag
130  if (verbose>0 .or. self%id==io%id_debug) then
131  write (io_unit%log,*) &
132  'mpi_mesg_t%send id, tag, nbuf, to =', self%id, ltag, self%nbuf, rank
133  flush (io_unit%log)
134  end if
135 #ifdef MPI
136  if (mpi%mode == mpi_thread_multiple) then
137  call mpi_isend (self%buffer, self%nbuf, mpi_integer, rank, ltag, &
138  mpi%comm, req, ierr)
139  self%nreq = self%nreq+1 ! count reqs
140  self%reqs(self%nreq) = req ! collect reqs
141  else
142  !$omp critical (mpi_cr)
143  call mpi_isend (self%buffer, self%nbuf, mpi_integer, rank, ltag, &
144  mpi%comm, req, ierr)
145  self%nreq = self%nreq+1 ! count reqs
146  self%reqs(self%nreq) = req ! collect reqs
147  !$omp end critical (mpi_cr)
148  end if
149 #else
150  !$omp critical (mpi_cr)
151  self%nreq = self%nreq+1 ! count reqs
152  self%reqs(self%nreq) = req ! collect reqs
153  !$omp end critical (mpi_cr)
154 #endif
155  !$omp atomic
156  mpi_mesg%n_send = mpi_mesg%n_send+1
157  call trace%end()
158 END SUBROUTINE send
159 
160 !===============================================================================
161 !> Recv a buffer, already prepared with size and id tag, returning a request id
162 !===============================================================================
163 SUBROUTINE recv (self, rank, tag)
164  class(mesg_t):: self
165  integer, optional:: tag
166  integer:: rank, req, ierr,ltag
167  !.............................................................................
168  call trace%begin ('mesg_t%recv')
169  if (present(tag)) then
170  ltag = tag
171  else
172  ltag = self%id
173  end if
174  if (verbose > 2) then
175  write (io_unit%log,*) &
176  'mpi_mesg%recv id, nbuf, to =', self%id, self%nbuf, rank
177  flush (io_unit%log)
178  end if
179  !-----------------------------------------------------------------------------
180  ! Start a new receive
181  !-----------------------------------------------------------------------------
182 #ifdef MPI
183  if (mpi%mode == mpi_thread_multiple) then
184  call mpi_irecv (self%buffer, self%nbuf, mpi_integer, rank, ltag, &
185  mpi%comm, self%req, ierr)
186  else
187  !$omp critical (mpi_cr)
188  call mpi_irecv (self%buffer, self%nbuf, mpi_integer, rank, ltag, &
189  mpi%comm, self%req, ierr)
190  !$omp end critical (mpi_cr)
191  end if
192 #endif
193  call trace%end()
194 END SUBROUTINE recv
195 
196 !===============================================================================
197 !> Remove and delete completed messages in a message list
198 !===============================================================================
199 SUBROUTINE remove_completed (self)
200  class(mesg_list_t):: self
201  class(mesg_t), pointer:: mesg
202  !-----------------------------------------------------------------------------
203  call trace%begin ('mesg_list%remove_completed')
204  mesg => self%head
205  do while (associated(mesg))
206  if (mesg%all_completed()) then
207  call self%remove (mesg)
208  call self%delete (mesg)
209  end if
210  end do
211  call trace%end()
212 END SUBROUTINE remove_completed
213 
214 !===============================================================================
215 !> Check messages on a sent_list for completeness
216 !===============================================================================
217 SUBROUTINE check_sent (self, nq)
218  class(mesg_list_t):: self
219  integer:: nq
220  class(mesg_t), pointer:: mesg, next
221  type(mesg_list_t):: sent_tmp
222  logical:: flag
223  integer, save:: every=0
224  integer, save:: itimer=0
225  real(8):: wc
226  integer:: n, m
227  !$omp threadprivate (every)
228  !-----------------------------------------------------------------------------
229  if (mpi_mesg%send_priv) then
230  call check_priv
231  return
232  end if
233 if (verbose > 0) &
234 write (io_unit%log,*) wallclock(), omp%thread, &
235 'check_sent: n =', self%n, max_sent, associated(self%head)
236  if (mpi%size <= 1 .or. .not.associated(self%head)) return
237  call trace%begin ('mpi_mesg_t%check_sent', itimer=itimer)
238  !-----------------------------------------------------------------------------
239  ! If there are more than max_sent messages pending, bring the number down with
240  ! wait_all on the oldest ones.
241  !-----------------------------------------------------------------------------
242  !m = merge(max_sent,0,nq >= mpi_mesg%min_nq)
243  m = max_sent
244  !$omp atomic read
245  n = self%n
246  if (n > m) then
247  !$omp critical (sent_cr)
248  call sent_tmp%reset
249  sent_tmp%name = 'sent_tmp'
250  mesg => self%head ! top of list
251  do while (self%n > m)
252  next => mesg%next
253  call self%remove (mesg)
254  call sent_tmp%add (mesg)
255  mesg => next
256  end do
257  !$omp end critical (sent_cr)
258  !---------------------------------------------------------------------------
259  ! Now we can wait on those messages outside of any critical region
260  !---------------------------------------------------------------------------
261  write (io_unit%log,*) 'WAITALL: n,m,sent_list%n,sent_tmp%n =', n, m, self%n, sent_tmp%n
262  mesg => sent_tmp%head ! top of list
263  do while (associated(mesg))
264  next => mesg%next
265  call mesg%wait_all
266  call sent_tmp%remove (mesg)
267  call sent_tmp%delete (mesg, send=.true.)
268  mesg => next
269  end do
270  end if
271  !-----------------------------------------------------------------------------
272  ! Only test for sent messages completion 'every_send' time; there is no need
273  ! to rush with the deallocation of mesg buffers, and the test cost can be
274  ! significant. Note that this counted 'per thread'; every is threadprivate!
275  !-----------------------------------------------------------------------------
276  if (every>0) then
277  every = every-1
278  call trace%end (itimer)
279  return
280  else
281  every = every_send
282  end if
283  !-----------------------------------------------------------------------------
284  ! This must be done in a critical region, since it traverses a list that may
285  ! be manipulated by other threads. Apart from the testing itself, this is a
286  ! fast traversal, since the only action is deallocation / message deletion.
287  !-----------------------------------------------------------------------------
288  if (associated(self%head)) then
289  !$omp critical (sent_cr)
290  mesg => self%head ! top of list
291  do while (associated(mesg))
292  next => mesg%next ! in case removed
293  call mesg%test_all (flag) ! done with?
294  if (flag) then
295  call self%remove (mesg) ! yes, remove
296  call self%delete (mesg, send=.true.) ! deallocate
297  end if
298  mesg => next ! true next
299  end do
300  !$omp end critical (sent_cr)
301  end if
302 if (verbose > 0) &
303 write (io_unit%log,*) wallclock(), omp%thread, 'check_sent: n =', self%n
304  call trace%end (itimer)
305 END SUBROUTINE check_sent
306 
307 !===============================================================================
308 !> Check messages on a sent_list for completeness
309 !===============================================================================
310 SUBROUTINE check_priv
311  class(mesg_t), pointer:: mesg, next
312  logical:: flag
313  integer:: ierr, n, m
314  integer, save:: itimer=0
315 logical:: debug
316  !-----------------------------------------------------------------------------
317 debug = (verbose > 0) .and. (sent_list%n > 0)
318 if (debug) &
319 write (io_unit%log,*) wallclock(), omp%thread, &
320 'check_priv: n =', sent_list%n, associated(sent_list%head)
321  if (mpi%size <= 1 .or. .not.associated(sent_list%head)) return
322  call trace%begin ('mpi_mesg_t%check_priv', itimer=itimer)
323  !-----------------------------------------------------------------------------
324  mesg => sent_list%head ! top of list
325  do while (associated(mesg))
326  next => mesg%next ! in case removed
327  if (sent_list%n > max_sent) then ! force wait?
328  call mesg%wait_all ! wait for mesg
329  flag = .true.
330  else
331  call mesg%test_all (flag) ! test mesgs
332  end if
333  if (flag) then ! remove?
334  call sent_list%remove (mesg) ! yes
335  call sent_list%delete (mesg, send=.true.) ! deallocate
336  end if
337  mesg => next ! true next
338  end do
339 if (debug) &
340 write (io_unit%log,*) wallclock(), omp%thread, &
341 'check_priv: n =', sent_list%n
342  call trace%end (itimer)
343 END SUBROUTINE check_priv
344 
345 !===============================================================================
346 !> Test if all requests (sending to several ranks) are complete for this mesg
347 !===============================================================================
348 SUBROUTINE test_all (self, flag)
349  class(mesg_t):: self
350  logical:: flag
351  integer:: rank, req, ierr
352  integer, save:: itimer=0
353  !.............................................................................
354  if ((self%nreq <= 0) .or. (.not.associated(self%reqs))) then
355  if (verbose > 2) &
356  write (stdout,*) 'mesg_t%test_all: WARNING', &
357  self%nreq, associated(self%reqs)
358  return
359  end if
360 #ifdef MPI
361  if (mpi%mode == mpi_thread_multiple) then
362  call mpi_testall (self%nreq, self%reqs, flag, mpi_statuses_ignore, ierr)
363  else
364  !$omp critical (mpi_cr)
365  call mpi_testall (self%nreq, self%reqs, flag, mpi_statuses_ignore, ierr)
366  !$omp end critical (mpi_cr)
367  end if
368 #else
369  flag = .true.
370 #endif MPI
371  if (verbose>2) then
372  write (io_unit%log,*) wallclock(), self%id, ' test_all flag =', flag
373  flush (io_unit%log)
374  end if
375 END SUBROUTINE test_all
376 
377 !===============================================================================
378 !> Wait for all requests related to this mesg to complete
379 !===============================================================================
380 SUBROUTINE wait_all (self)
381  class(mesg_t):: self
382  integer:: rank, req, ierr
383  real(8):: wc
384  integer, save:: itimer=0
385  !.............................................................................
386  call trace%begin ('mesg_t%wait_all', itimer=itimer)
387  if (verbose >= 0) wc = wallclock()
388 #ifdef MPI
389  if (mpi%mode == mpi_thread_multiple) then
390  !$omp critical (mpi_cr)
391  call mpi_waitall (self%nreq, self%reqs, mpi_statuses_ignore, ierr)
392  !$omp end critical (mpi_cr)
393  else
394  call mpi_waitall (self%nreq, self%reqs, mpi_statuses_ignore, ierr)
395  end if
396 #endif MPI
397  if (verbose >= 0) then
398  wc = wallclock()-wc
399  !$omp atomic
400  n_wait_all = n_wait_all + 1
401  !$omp atomic
402  t_wait_all = t_wait_all + wc
403  if (verbose > 0) then
404  write (io_unit%log,*) wallclock(), ' wait_all:', self%id, wc
405  flush (io_unit%log)
406  end if
407  end if
408  call trace_end (itimer)
409 END SUBROUTINE wait_all
410 
411 !===============================================================================
412 !> Append a mesg at the end of the message list. Since adding and removing use
413 !> the same critical region to protect the list handling, there is no need for
414 !> atomic, but let's keep it for safe measure for now.
415 !===============================================================================
416 SUBROUTINE add (self, mesg)
417  class(mesg_list_t):: self
418  class(mesg_t), pointer:: mesg
419  !.............................................................................
420  nullify (mesg%next) ! since mesg will be tail
421  if (associated(self%head)) then ! if the list is non-empty
422  self%tail%next => mesg ! add to the tail
423  mesg%prev => self%tail ! prev is the previous tail
424  else
425  self%head => mesg ! else add at the head
426  nullify (mesg%prev) ! mesg is head
427  end if
428  self%tail => mesg ! tail points to it
429  !$omp atomic
430  self%n = self%n+1 ! increment counter
431  if (verbose > 1) then
432  write (io_unit%log,*) trim(self%name)//' added mesg', mesg%id, self%n
433  flush (io_unit%log)
434  end if
435 END SUBROUTINE add
436 
437 !===============================================================================
438 !> Remove a message from a message list. This routine is assumed to be called
439 !> from inside a critical region.
440 !===============================================================================
441 SUBROUTINE remove (self, mesg)
442  class(mesg_list_t):: self
443  class(mesg_t), pointer:: mesg
444  !.............................................................................
445  if (associated(mesg%prev)) then ! associated(prev) => not head
446  mesg%prev%next => mesg%next ! cut out forwards
447  else ! mesg is head
448  self%head => mesg%next ! new head
449  end if
450  if (associated(mesg%next)) then ! associated(next) => not tail
451  mesg%next%prev => mesg%prev ! cut out backwards
452  else ! mesg is tail
453  self%tail => mesg%prev ! new tail
454  end if
455  if (verbose > 1) then
456  write (io_unit%log,*) trim(self%name)//' remove mesg OK', mesg%id, self%n
457  flush (io_unit%log)
458  end if
459  !$omp atomic
460  self%n = self%n-1 ! decrement count
461 END SUBROUTINE remove
462 
463 !===============================================================================
464 !> Deallocate the message buffer and request array, and then the message itself
465 !===============================================================================
466 SUBROUTINE delete (self, mesg, send)
467  class(mesg_list_t):: self
468  class(mesg_t), pointer:: mesg
469  logical, optional:: send
470  !.............................................................................
471  if (associated(mesg%buffer)) then
472  call io%bits_mem (-storage_size(mesg%buffer), product(shape(mesg%buffer)), 'mem')
473  deallocate (mesg%buffer)
474  end if
475  if (associated(mesg%reqs)) deallocate (mesg%reqs)
476  deallocate (mesg)
477  if (present(send)) then
478  if (send) then
479  timer%nq_send_max = max(timer%nq_send_max,mpi_mesg%nq_send)
480  !$omp atomic
481  mpi_mesg%nq_send = mpi_mesg%nq_send-1
482  else
483  timer%nq_recv_max = max(timer%nq_recv_max,mpi_mesg%nq_recv)
484  !$omp atomic
485  mpi_mesg%nq_recv = mpi_mesg%nq_recv-1
486  end if
487  end if
488 END SUBROUTINE delete
489 
490 !===============================================================================
491 !> Reset the mesg_list
492 !===============================================================================
493 SUBROUTINE reset (self)
494  class(mesg_list_t):: self
495  !.............................................................................
496  nullify (self%head)
497  nullify (self%tail)
498  self%n = 0
499 END SUBROUTINE reset
500 
501 !===============================================================================
502 !> Print a list of messages
503 !===============================================================================
504 SUBROUTINE print_list (self, label)
505  class(mesg_list_t):: self
506  character(len=*), optional:: label
507  class(mesg_t), pointer:: mesg
508  !-----------------------------------------------------------------------------
509  if (present(label)) &
510  write (io_unit%log,*) '------------------ '//label//' ------------------'
511  mesg => self%head
512  do while (associated(mesg))
513  write (io_unit%log,'(a,i5,i9,2i5)') ' mesg_list: '//self%name, &
514  self%n, mesg%id, mesg%sender, mesg%ntry
515  mesg => mesg%next
516  end do
517 END SUBROUTINE print_list
518 
519 !===============================================================================
520 !> Count messages, with warning if the %n does not agree with the actual number
521 !===============================================================================
522 SUBROUTINE count (self, label)
523  class(mesg_list_t):: self
524  character(len=*):: label
525  class(mesg_t), pointer:: mesg
526  integer:: n
527  !-----------------------------------------------------------------------------
528  if (verbose < 1) return
529  mesg => self%head
530  n = 0
531  do while (associated(mesg))
532  n = n+1
533  mesg => mesg%next
534  end do
535  if (n /= self%n) then
536  write (io_unit%log,*) 'WARNING: inconsistent '//self%name, n, self%n
537  end if
538 END SUBROUTINE count
539 
540 !===============================================================================
541 !> Initialize the three message lists
542 !===============================================================================
543 SUBROUTINE init (self)
544  class(mpi_mesg_t):: self
545  !.............................................................................
546  logical, save:: debug
547  logical, save:: recv_priv
548  logical, save:: recv_active
549  logical, save:: send_priv
550  logical, save:: queue_unpack
551  logical, save:: uniq_mesg
552  real, save:: test_time=20e-3
553  namelist /mpi_mesg_params/ min_nq, max_sent, max_probe, max_recv, every_recv, &
554  every_send, delay_ms, recv_wait, send_wait, send_priv, &
555  test_time, uniq_mesg, debug, verbose, detailed_timer
556  integer:: iostat
557  character(len=120):: id = &
558  '$Id: eb4a832e49f174279be5680c7a92f2966eb4efa7 $ mpi_mesg_mod.f90'
559  !-----------------------------------------------------------------------------
560  ! Prevent initializing more than once
561  !-----------------------------------------------------------------------------
562  call trace%print_id (id)
563  if (self%initialized) return
564  self%initialized = .true.
565  call trace%begin ('mpi_mesg_t%init')
566  !-----------------------------------------------------------------------------
567  ! Default values from dispatcher
568  !-----------------------------------------------------------------------------
569  uniq_mesg = self%uniq_mesg
570  send_priv = self%send_priv
571  !-----------------------------------------------------------------------------
572  ! Namelist input
573  !-----------------------------------------------------------------------------
574 #ifdef MPI
575  if (mpi%mode /= mpi_thread_multiple) then
576  min_nq = 0
577  max_sent = 1000
578  max_recv = 1000
579  recv_wait = .false.
580  send_wait = .false.
581  end if
582 #endif
583  rewind(io%input)
584  read (io%input, mpi_mesg_params,iostat=iostat)
585  write (io%output, mpi_mesg_params)
586  self%debug = debug
587  self%min_nq = min_nq
588  self%max_recv = max_recv
589  self%max_probe = max_probe
590  self%recv_wait = recv_wait
591  self%uniq_mesg = uniq_mesg
592  self%delay_ms = delay_ms
593  mpi_mesg%test_time = test_time
594  !-----------------------------------------------------------------------------
595  ! Default values from dispatcher possible changed here
596  !-----------------------------------------------------------------------------
597  self%send_priv = send_priv
598  !-----------------------------------------------------------------------------
599  ! Initialize thread-private lists
600  !-----------------------------------------------------------------------------
601  self%sent_list%name = 'sent_list'
602  self%recv_list%name = 'recv_list'
603  self%unpk_list%name = 'unpk_list'
604  !$omp parallel
605  recv_list%name = 'recv_list'
606  sent_list%name = 'sent_list'
607  unpk_list%name = 'unpk_list'
608  sent_list%id = omp_get_thread_num()
609  !$omp end parallel
610  !call self%test
611  !-----------------------------------------------------------------------------
612  allocate (expected(0:mpi%size-1))
613  expected(:) = 1
614  call trace%end()
615 END SUBROUTINE init
616 
617 !===============================================================================
618 !> Close and reopen the sent_rrrrr_tttt_x.log files once per minute, so the last
619 !> 1-2 minutes of logging is always available
620 !===============================================================================
621 SUBROUTINE log_files (self)
622  class(mpi_mesg_t):: self
623  character(len=120):: filename
624  integer:: one_two
625  integer, save:: previous=-1
626  !-----------------------------------------------------------------------------
627  if (io%log_sent > 0) then
628  one_two = wallclock()/60.
629  one_two = mod(one_two,2) + 1
630  if (one_two /= previous) then
631  if (previous > 0) &
632  close (previous)
633  write (filename,'(a,"/sent_",i5.5,"_",i1,".log")') &
634  trim(io%outputname), mpi%rank, one_two
635  open (io_unit%sent,file=filename, form='formatted', status='unknown')
636  previous = one_two
637  end if
638  end if
639 END SUBROUTINE log_files
640 
641 !===============================================================================
642 !> Test the components
643 !===============================================================================
644 SUBROUTINE test (self)
645  class(mpi_mesg_t):: self
646  type(mesg_list_t):: test_list
647  class(mesg_t), pointer:: mesg, next
648  integer:: i, n=3
649  !.............................................................................
650  do i=1,n
651  allocate (mesg)
652  allocate (mesg%buffer(10))
653  mesg%id = i
654  call test_list%add (mesg)
655  end do
656  call test_list%print ('test1')
657  mesg => test_list%head
658  do i=1,n-1
659  next => mesg%next
660  call test_list%remove (mesg)
661  call test_list%delete (mesg, .true.)
662  mesg => next
663  end do
664  call test_list%print ('test2')
665  mesg => test_list%head
666  call test_list%remove (mesg)
667  call test_list%delete (mesg, .true.)
668  if (io%master) write (io_unit%log,*) &
669  associated(test_list%head), associated(test_list%tail)
670  call test_list%print ('test3')
671 END SUBROUTINE test
672 
673 !===============================================================================
674 !> Add a message to the sent_list, in a critical region, or wait for all sends
675 !> in a background task, or use a threadprivate list
676 !===============================================================================
677 SUBROUTINE sent (self, mesg)
678  class(mpi_mesg_t):: self
679  class(mesg_t), pointer:: mesg
680  !-----------------------------------------------------------------------------
681  !$omp atomic
682  mpi_mesg%nq_send = mpi_mesg%nq_send+1
683  if (mpi_mesg%send_priv) then
684  call sent_list%add (mesg)
685  else if (send_wait) then
686  !$omp task firstprivate(mesg)
687  call mesg%wait_all
688  call io%bits_mem (-storage_size(mesg%buffer), product(shape(mesg%buffer)), 'mem')
689  deallocate (mesg%buffer)
690  deallocate (mesg)
691  !$omp atomic
692  mpi_mesg%nq_send = mpi_mesg%nq_send-1
693  !$omp end task
694  else
695  !$omp critical (sent_cr)
696  call self%sent_list%add (mesg)
697  !$omp end critical (sent_cr)
698  end if
699  if (verbose > 0) then
700  write (io_unit%log,'(f12.6,2x,a,i9,2i6)') &
701  wallclock(), 'mpi_mesg_t%sent: id, thread, n =', &
702  mesg%id, omp_get_thread_num(), sent_list%n
703  flush (io_unit%log)
704  end if
705 END SUBROUTINE sent
706 
707 !===============================================================================
708 !> Check for new incoming messages. Note that, ideally, we want to probe for
709 !> messages just before doing a task update, and then check afterwards, so that
710 !> if the delay doing the work is enough, some probed messages will have arrived.
711 !> This procedure returns flag=.true. as long as there are incoming messages,
712 !> and returns the message if comlete. If not, the message is added to the
713 !> recv_list, to be check later.
714 !===============================================================================
715 SUBROUTINE get (self, mesg)
716  class(mpi_mesg_t):: self
717  class(mesg_t), pointer:: mesg
718  !.............................................................................
719  logical:: flag
720 #ifdef MPI
721  integer:: stat(mpi_status_size)
722 #endif
723  integer:: msg ,ierr, nbuf, req
724  integer, save:: itimer=0
725  !-----------------------------------------------------------------------------
726  nullify(mesg)
727  if (mpi%size <= 1) return
728  call trace%begin ('mpi_mesg_t%get', itimer=itimer)
729  !-----------------------------------------------------------------------------
730  ! Probe for incoming messages
731  !-----------------------------------------------------------------------------
732 #ifdef MPI
733  if (mpi%mode == mpi_thread_multiple) then
734  call probe_for
735  else
736  !$omp critical (mpi_cr)
737  call probe_for
738  !$omp end critical (mpi_cr)
739  end if
740 #endif
741  call trace%end (itimer)
742  return
743 contains
744  !-----------------------------------------------------------------------------
745  ! Probe for incoming MPI messages
746  !-----------------------------------------------------------------------------
747  subroutine probe_for
748  logical:: complete
749 #ifdef MPI
750  call mpi_improbe (mpi_any_source, mpi_any_tag, mpi%comm, flag, msg, stat, ierr)
751  if (flag) then
752  allocate (mesg)
753  call mpi_get_count (stat, mpi_int, nbuf, ierr)
754  allocate (mesg%buffer(nbuf))
755  call io%bits_mem (storage_size(mesg%buffer),product(shape(mesg%buffer)), 'buf')
756  call mesg%get_id (stat)
757  !$omp atomic
758  mpi_mesg%nq_recv = mpi_mesg%nq_recv+1
759  call mpi_imrecv (mesg%buffer, nbuf, mpi_int, msg, req, ierr)
760  mesg%req = req
761  if (verbose > 0) then
762  write (io_unit%mpi,'(f12.6,2x,"get: id, seq, sender =",i9,2i6)') &
763  wallclock(), mesg%id, mesg%seq, mesg%sender
764  flush(io_unit%mpi)
765  end if
766  mesg%nbuf = nbuf
767  !$omp atomic
768  mpi_mesg%n_recv = mpi_mesg%n_recv+1
769  !$omp atomic
770  timer%bytes_recv = timer%bytes_recv + 4.0_8*nbuf
771  !$omp atomic
772  timer%n_recv = timer%n_recv + 1_8
773  end if
774 #endif
775  end subroutine probe_for
776 END SUBROUTINE get
777 
778 !===============================================================================
779 !> Check for new incoming messages. Note that, ideally, we want to probe for
780 !> messages just before doing a task update, and then check afterwards, so that
781 !> if the delay doing the work is enough, some probed messages will have arrived.
782 !> This procedure returns flag=.true. as long as there are incoming messages,
783 !> and returns the message if comlete. If not, the message is added to the
784 !> recv_list, to be check later.
785 !===============================================================================
786 SUBROUTINE iget (self, mesg)
787  class(mpi_mesg_t):: self
788  class(mesg_t), pointer:: mesg
789  integer:: nbuf
790  !.............................................................................
791  logical:: flag
792  integer:: msg ,ierr, req
793  integer, save:: itimer=0
794  !-----------------------------------------------------------------------------
795  nullify(mesg)
796  if (mpi%size <= 1) return
797  call trace%begin ('mpi_mesg_t%get', itimer=itimer)
798  !-----------------------------------------------------------------------------
799  ! Probe for incoming messages
800  !-----------------------------------------------------------------------------
801 #ifdef MPI
802  if (mpi%mode == mpi_thread_multiple) then
803  call probe_for
804  else
805  !$omp critical (mpi_cr)
806  call probe_for
807  !$omp end critical (mpi_cr)
808  end if
809 #endif
810  call trace%end (itimer)
811  return
812 contains
813  !-----------------------------------------------------------------------------
814  ! Probe for incoming MPI messages
815  !-----------------------------------------------------------------------------
816  subroutine probe_for
817  logical:: complete
818 #ifdef MPI
819  allocate (mesg)
820  mesg%nbuf = mpi_mesg%nbuf
821  allocate (mesg%buffer(mesg%nbuf))
822  call io%bits_mem (storage_size(mesg%buffer),product(shape(mesg%buffer)), 'buf')
823  !call mesg%get_id (stat)
824  call mpi_irecv (mesg%buffer, mesg%nbuf, mpi_int, mpi_any_source, &
825  mpi_any_tag, mpi%comm, mesg%req, ierr)
826  !$omp atomic
827  mpi_mesg%nq_recv = mpi_mesg%nq_recv+1
828  !$omp atomic
829  mpi_mesg%n_recv = mpi_mesg%n_recv+1
830  !$omp atomic
831  timer%bytes_recv = timer%bytes_recv + 4.0_8*mesg%nbuf
832  !$omp atomic
833  timer%n_recv = timer%n_recv + 1_8
834 #endif
835  end subroutine probe_for
836 END SUBROUTINE iget
837 
838 !===============================================================================
839 !> Get mesg%id, mesg%sender, mesg%seq
840 !===============================================================================
841 SUBROUTINE get_id (self, stat)
842  class(mesg_t):: self
843  integer:: stat(:)
844 #ifdef MPI
845  self%id = stat(mpi_tag)
846  self%sender = stat(mpi_source)
847  if (mpi_mesg%uniq_mesg) then
848  self%seq = mod(self%id,100)
849  self%id = self%id/100
850  if (verbose > 1) &
851  write (io_unit%log,*) 'mpi_mesg_t%get_id: id, sender, seq =', &
852  self%id, self%sender, self%seq
853  else if (verbose > 1) then
854  write (io_unit%log,*) 'mpi_mesg_t%get_id: id, sender =', &
855  self%id, self%sender
856  end if
857 #endif
858 END SUBROUTINE get_id
859 
860 !===============================================================================
861 !> Return true or false, depending on if the (send!) mesg request is complete or
862 !> not -- this is used by load_balance_mod
863 !===============================================================================
864 FUNCTION completed (self) RESULT(flag)
865  class(mesg_t):: self
866  logical:: flag, ierr
867  integer, save:: itimer=0
868 #ifdef MPI
869  integer:: stat(mpi_status_size)
870 #endif
871  !-----------------------------------------------------------------------------
872  flag = .false.
873  if (mpi%size <= 1) return
874  call trace%begin ('mesg_t%completed', itimer=itimer)
875 #ifdef MPI
876  if (mpi%mode == mpi_thread_multiple) then
877  call mpi_test (self%req, flag, stat, ierr)
878  else
879  !$omp critical (mpi_cr)
880  call mpi_test (self%req, flag, stat, ierr)
881  !$omp end critical (mpi_cr)
882  end if
883  !$omp atomic
884  timer%mpi_test = timer%mpi_test + 1
885 #endif
886  if (flag .and. verbose > 0) then
887  write (io_unit%log,'(f12.6,2x,a,i6,i4,l2)') wallclock(), &
888  'mesg_t%completed:', self%id, mpi_mesg%recv_list%n, mpi_mesg%recv_wait
889  end if
890  call trace%end (itimer)
891 END FUNCTION completed
892 
893 !===============================================================================
894 !> Return true or false, depending on if the mesg request is complete or not
895 !===============================================================================
896 FUNCTION all_completed (self) RESULT(flag)
897  class(mesg_t):: self
898  logical:: flag, ierr
899  integer, save:: itimer=0
900 #ifdef MPI
901  integer:: stat(mpi_status_size)
902 #endif
903  !-----------------------------------------------------------------------------
904  flag = .false.
905  if (mpi%size <= 1) return
906  call trace%begin ('mesg_t%all_completed', itimer=itimer)
907 #ifdef MPI
908  if (mpi%mode == mpi_thread_multiple) then
909  call mpi_testall (self%nreq, self%reqs, flag, mpi_statuses_ignore, ierr)
910  else
911  !$omp critical (mpi_cr)
912  call mpi_testall (self%nreq, self%reqs, flag, mpi_statuses_ignore, ierr)
913  !$omp end critical (mpi_cr)
914  end if
915 #endif
916  call trace%end (itimer)
917 END FUNCTION all_completed
918 
919 !===============================================================================
920 !> Return true or false, depending on if the mesg is complete or not. This
921 !> procedure respects the mpi_mesg attributes TEST_TIME, MAX_RECV, and RECV_WAIT
922 !===============================================================================
923 FUNCTION is_complete (self, parentname)
924  class(mesg_t):: self
925  logical:: is_complete
926  character(len=*), optional:: parentname
927  !.............................................................................
928  integer:: tag
929  real(8):: now, test_next
930  integer, save:: itimer=0
931 #ifdef MPI
932  integer:: stat(mpi_status_size)
933 #endif
934  !-----------------------------------------------------------------------------
935  is_complete = .false.
936  if (mpi%size <= 1) then
937  is_complete = .true.
938  return
939  end if
940  now = wallclock()
941  if (now < self%test_next) &
942  return
943  if (present(parentname) .and. detailed_timer) then
944  call trace%begin ('mesg_t%is_complete('//trim(parentname)//')', 2, itimer=itimer)
945  else
946  call trace%begin ('mesg_t%is_complete', 2, itimer=itimer)
947  end if
948  !-----------------------------------------------------------------------------
949  ! Check for completion
950  !-----------------------------------------------------------------------------
951 #ifdef MPI
952  if (mpi%mode == mpi_thread_multiple) then
953  call check_recv
954  else
955  !$omp critical (mpi_cr)
956  call check_recv
957  !$omp end critical (mpi_cr)
958  end if
959  if (is_complete) then
960  !---------------------------------------------------------------------------
961  ! In cases when MPI_IMPROBE has not already done so, get ID and sequence
962  !---------------------------------------------------------------------------
963  call self%get_id (stat)
964  self%n_failed = 0
965  else
966  self%n_failed = self%n_failed+1
967  end if
968 #endif
969  test_next = now + self%test_time
970  !$omp atomic write
971  self%test_next = test_next
972  call trace%end (itimer)
973  return
974 contains
975  !-----------------------------------------------------------------------------
976  ! Check for completed messages, independent of MPI_THREAD_MULTIPLE
977  !-----------------------------------------------------------------------------
978  subroutine check_recv
979  integer:: ierr
980  real(8):: wc
981  !---------------------------------------------------------------------------
982  is_complete = .true.
983 #ifdef MPI
984  if (mpi_mesg%recv_list%n > max_recv .or. mpi_mesg%recv_wait) then
985  if (verbose > 1) then
986  wc = wallclock()
987  write (io_unit%log,*) wc, &
988  'is_complete waiting for id', self%id, mpi_mesg%recv_list%n, mpi_mesg%recv_wait
989  flush (io_unit%log)
990  end if
991  call mpi_wait (self%req, stat, ierr)
992  wc = wallclock()-wc
993  !$omp atomic
994  n_wait_for = n_wait_for + 1
995  !$omp atomic
996  t_wait_for = t_wait_for + wc
997  if (verbose > 0) then
998  write (io_unit%log,'(f12.6,2x,a,i6,i4,l2)') wallclock(), &
999  'is_complete:', self%id, mpi_mesg%recv_list%n, mpi_mesg%recv_wait
1000  end if
1001  else
1002  call mpi_test (self%req, is_complete, stat, ierr)
1003  end if
1004 #endif
1005  !$omp atomic
1006  timer%mpi_test = timer%mpi_test + 1
1007  if (is_complete) then
1008  !$omp atomic
1009  timer%mpi_hit = timer%mpi_hit + 1
1010  end if
1011  end subroutine check_recv
1012 END FUNCTION is_complete
1013 
1014 !===============================================================================
1015 !> Issue a message recieve request from a specific rank
1016 !===============================================================================
1017 SUBROUTINE irecv (self, rank, id)
1018  class(mesg_t):: self
1019  integer:: id
1020  integer:: rank, tag, ierr, seq
1021  !-----------------------------------------------------------------------------
1022  ! Construct a unique tag from the incremented sequence number next seq number
1023  !-----------------------------------------------------------------------------
1024  self%id = id
1025  write (io_unit%log,*) self%seq, mpi_mesg%tag_type
1026  if (mpi_mesg%tag_type == 1) then
1027  self%seq = self%seq + 1
1028  tag = mod(self%seq,100) + id*100
1029  else if (mpi_mesg%tag_type == 2) then
1030  expected(rank) = expected(rank) + 1
1031  tag = mod(expected(rank),100) + id*100
1032  else
1033  tag = id
1034  end if
1035  write (io_unit%log,*) &
1036  'irecv: seq, tag_type, tag', self%seq, mpi_mesg%tag_type, tag
1037  self%tag = tag
1038  self%sender = rank
1039 #ifdef MPI
1040  if (mpi%mode == mpi_thread_multiple) then
1041  call mpi_irecv (self%buffer, self%nbuf, mpi_int, rank, tag, mpi%comm, self%req, ierr)
1042  else
1043  !$omp critical (mpi_cr)
1044  call mpi_irecv (self%buffer, self%nbuf, mpi_int, rank, tag, mpi%comm, self%req, ierr)
1045  !$omp end critical (mpi_cr)
1046  end if
1047  !-----------------------------------------------------------------------------
1048  if (verbose > 1) then
1049  write (io_unit%log,'(f12.6,2x,a,i6,i9,i5,z12)') &
1050  wallclock(), 'mesg_t%irecv: id, tag, rank, req =', &
1051  self%id, tag, rank, self%req
1052  flush (io_unit%log)
1053  end if
1054 #endif
1055 END SUBROUTINE irecv
1056 
1057 !===============================================================================
1058 !> Wait for completion of a message
1059 !===============================================================================
1060 SUBROUTINE wait_for_completion (self)
1061  class(mesg_t):: self
1062  !.............................................................................
1063  integer:: ierr
1064  integer, save:: itimer=0
1065  real(8):: wc
1066 #ifdef MPI
1067  integer:: stat(mpi_status_size)
1068 #endif
1069  !-----------------------------------------------------------------------------
1070  if (mpi%size <= 1) return
1071  call trace%begin ('mesg_t%wait_for_completion', itimer=itimer)
1072  if (verbose>=0) wc=wallclock()
1073 #ifdef MPI
1074  if (mpi%mode == mpi_thread_multiple) then
1075  call mpi_wait (self%req, stat, ierr)
1076  else
1077  !$omp critical (mpi_cr)
1078  call mpi_wait (self%req, stat, ierr)
1079  !$omp end critical (mpi_cr)
1080  end if
1081 #endif
1082  if (verbose >= 0) then
1083  wc = wallclock()-wc
1084  !$omp atomic
1085  n_wait_for = n_wait_for + 1
1086  !$omp atomic
1087  t_wait_for = t_wait_for + wc
1088  if (verbose > 1) &
1089  write (io_unit%log,*) 'wait_for_completion', self%id, wc
1090  end if
1091  call trace%end (itimer)
1092 END SUBROUTINE wait_for_completion
1093 
1094 !===============================================================================
1095 !> Delay for about 1 ms, to encourage incoming messages to complete
1096 !===============================================================================
1097 SUBROUTINE delay (self, n, ms)
1098  class(mpi_mesg_t):: self
1099  integer, optional:: n
1100  real, optional:: ms
1101  real:: ms_l
1102  integer, save:: itimer=0
1103  !-----------------------------------------------------------------------------
1104  if (present(ms)) then
1105  ms_l = ms
1106  else
1107  ms_l = mpi_mesg%delay_ms
1108  end if
1109  if (ms_l==0.0) return
1110  call trace%begin ('mpi_mesg_t%no_queue', itimer=itimer)
1111  call mpi%delay (ms_l)
1112  !$omp atomic
1113  self%n_delay = self%n_delay + 1
1114  call trace%end (itimer)
1115 END SUBROUTINE delay
1116 
1117 !===============================================================================
1118 !> Diagnostic printout, called when task_list_mod::update stalls
1119 !===============================================================================
1120 SUBROUTINE diagnostics (self, flag)
1121  class(mpi_mesg_t):: self
1122  integer:: flag
1123  integer, save:: itimer=0
1124  !-----------------------------------------------------------------------------
1125  if (flag==9) then
1126  !$omp critical (abort_cr)
1127  call trace%begin ('mpi_mesg%diagnostics', itimer=itimer)
1128  call self%sent_list%print ('ABORT')
1129  call self%recv_list%print ('ABORT')
1130  flush (io_unit%log)
1131  call trace%end (itimer)
1132  !$omp end critical (abort_cr)
1133  end if
1134  if (flag==1) then
1135  !$omp single
1136  write (io_unit%mpi,1) 'average wait_for time:', t_wait_for/max(n_wait_for,1_8), &
1137  ', with', real(n_wait_for),' waits'
1138  write (io_unit%mpi,1) 'average wait_all time:', t_wait_all/max(n_wait_all,1_8), &
1139  ', with', real(n_wait_all), ' waits'
1140  1 format(a,1p,2(e10.2,a))
1141  if (io%master.and..not.io_unit%do_validate) then
1142  write (io_unit%output,1) 'average wait_for time:', t_wait_for/max(n_wait_for,1_8), &
1143  ', with', real(n_wait_for), ' waits'
1144  write (io_unit%output,1) 'average wait_all time:', t_wait_all/max(n_wait_all,1_8), &
1145  ', with', real(n_wait_all), ' waits'
1146  end if
1147  !$omp end single
1148  end if
1149 END SUBROUTINE diagnostics
1150 
1151 !===============================================================================
1152 !> Check that a package is in the expected order. Note that only the master
1153 !> thread exectures this function, so no atomic constructs are needed
1154 !===============================================================================
1155 LOGICAL FUNCTION is_in_order (self)
1156  class(mesg_t):: self
1157  !.............................................................................
1158  is_in_order = self%seq == expected(self%sender)
1159  if (is_in_order) then
1160  expected(self%sender) = mod(expected(self%sender)+1,100)
1161  end if
1162 END FUNCTION is_in_order
1163 
1164 END MODULE mpi_mesg_mod
Each thread uses a private timer data type, with arrays for start time and total time for each regist...
Definition: timer_mod.f90:11
Support tic/toc timing, as in MATLAB, and accurate wallclock() function. The timing is generally much...
Definition: io_mod.f90:4