19 class(
mesg_t),
pointer:: next => null()
20 class(
mesg_t),
pointer:: prev => null()
21 integer,
dimension(:),
pointer:: buffer
22 integer,
pointer:: reqs(:) => null()
23 integer:: nbuf, nreq, req=0, sender, id, ntry=0
27 real(8):: test_next=0d0
34 procedure:: wait_for_completion
35 procedure:: is_complete
36 procedure,
private:: get_id
38 procedure:: all_completed
40 procedure:: is_in_order
42 integer,
dimension(:),
allocatable,
save:: expected
43 integer(8) :: n_wait_all=0 , n_wait_for=0
44 real(8) :: t_wait_all=0d0, t_wait_for=0d0
47 class(
mesg_t),
pointer:: head => null()
48 class(
mesg_t),
pointer:: tail => null()
49 character(len=16):: name=
'mesg_list' 54 procedure:: remove_completed
58 procedure:: check_sent
59 procedure:: print => print_list
66 logical:: initialized=.false.
82 logical:: recv_wait=.false.
83 logical:: send_priv=.true.
84 logical:: debug=.false.
85 logical:: uniq_mesg=.true.
94 procedure:: diagnostics
99 type(
mesg_list_t),
save,
public:: unpk_list, sent_list, recv_list
101 real,
save:: delay_ms=0.0
102 integer,
save:: min_nq=10
103 integer,
save:: max_sent=10
104 integer,
save:: max_recv=100
105 integer,
save:: max_probe=10
106 integer,
save:: every_send=2
107 integer,
save:: every_recv=1
108 logical,
save:: recv_wait=.false.
109 logical,
save:: send_wait=.false.
111 integer,
save:: verbose=0
112 logical,
save:: detailed_timer=.false.
118 SUBROUTINE send (self, rank, tag)
120 integer,
optional:: tag
121 integer:: rank, req, ierr,ltag
123 call trace%begin (
'mesg_t%send')
124 if (
present(tag))
then 130 if (verbose>0 .or. self%id==io%id_debug)
then 131 write (io_unit%log,*) &
132 'mpi_mesg_t%send id, tag, nbuf, to =', self%id, ltag, self%nbuf, rank
136 if (mpi%mode == mpi_thread_multiple)
then 137 call mpi_isend (self%buffer, self%nbuf, mpi_integer, rank, ltag, &
139 self%nreq = self%nreq+1
140 self%reqs(self%nreq) = req
143 call mpi_isend (self%buffer, self%nbuf, mpi_integer, rank, ltag, &
145 self%nreq = self%nreq+1
146 self%reqs(self%nreq) = req
151 self%nreq = self%nreq+1
152 self%reqs(self%nreq) = req
156 mpi_mesg%n_send = mpi_mesg%n_send+1
163 SUBROUTINE recv (self, rank, tag)
165 integer,
optional:: tag
166 integer:: rank, req, ierr,ltag
168 call trace%begin (
'mesg_t%recv')
169 if (
present(tag))
then 174 if (verbose > 2)
then 175 write (io_unit%log,*) &
176 'mpi_mesg%recv id, nbuf, to =', self%id, self%nbuf, rank
183 if (mpi%mode == mpi_thread_multiple)
then 184 call mpi_irecv (self%buffer, self%nbuf, mpi_integer, rank, ltag, &
185 mpi%comm, self%req, ierr)
188 call mpi_irecv (self%buffer, self%nbuf, mpi_integer, rank, ltag, &
189 mpi%comm, self%req, ierr)
199 SUBROUTINE remove_completed (self)
201 class(
mesg_t),
pointer:: mesg
203 call trace%begin (
'mesg_list%remove_completed')
205 do while (
associated(mesg))
206 if (mesg%all_completed())
then 207 call self%remove (mesg)
208 call self%delete (mesg)
212 END SUBROUTINE remove_completed
217 SUBROUTINE check_sent (self, nq)
220 class(
mesg_t),
pointer:: mesg, next
223 integer,
save:: every=0
224 integer,
save:: itimer=0
229 if (mpi_mesg%send_priv)
then 234 write (io_unit%log,*) wallclock(), omp%thread, &
235 'check_sent: n =', self%n, max_sent,
associated(self%head)
236 if (mpi%size <= 1 .or. .not.
associated(self%head))
return 237 call trace%begin (
'mpi_mesg_t%check_sent', itimer=itimer)
249 sent_tmp%name =
'sent_tmp' 251 do while (self%n > m)
253 call self%remove (mesg)
254 call sent_tmp%add (mesg)
261 write (io_unit%log,*)
'WAITALL: n,m,sent_list%n,sent_tmp%n =', n, m, self%n, sent_tmp%n
262 mesg => sent_tmp%head
263 do while (
associated(mesg))
266 call sent_tmp%remove (mesg)
267 call sent_tmp%delete (mesg, send=.true.)
278 call trace%end (itimer)
288 if (
associated(self%head))
then 291 do while (
associated(mesg))
293 call mesg%test_all (flag)
295 call self%remove (mesg)
296 call self%delete (mesg, send=.true.)
303 write (io_unit%log,*) wallclock(), omp%thread,
'check_sent: n =', self%n
304 call trace%end (itimer)
305 END SUBROUTINE check_sent
310 SUBROUTINE check_priv
311 class(
mesg_t),
pointer:: mesg, next
314 integer,
save:: itimer=0
317 debug = (verbose > 0) .and. (sent_list%n > 0)
319 write (io_unit%log,*) wallclock(), omp%thread, &
320 'check_priv: n =', sent_list%n,
associated(sent_list%head)
321 if (mpi%size <= 1 .or. .not.
associated(sent_list%head))
return 322 call trace%begin (
'mpi_mesg_t%check_priv', itimer=itimer)
324 mesg => sent_list%head
325 do while (
associated(mesg))
327 if (sent_list%n > max_sent)
then 331 call mesg%test_all (flag)
334 call sent_list%remove (mesg)
335 call sent_list%delete (mesg, send=.true.)
340 write (io_unit%log,*) wallclock(), omp%thread, &
341 'check_priv: n =', sent_list%n
342 call trace%end (itimer)
343 END SUBROUTINE check_priv
348 SUBROUTINE test_all (self, flag)
351 integer:: rank, req, ierr
352 integer,
save:: itimer=0
354 if ((self%nreq <= 0) .or. (.not.
associated(self%reqs)))
then 356 write (stdout,*)
'mesg_t%test_all: WARNING', &
357 self%nreq,
associated(self%reqs)
361 if (mpi%mode == mpi_thread_multiple)
then 362 call mpi_testall (self%nreq, self%reqs, flag, mpi_statuses_ignore, ierr)
365 call mpi_testall (self%nreq, self%reqs, flag, mpi_statuses_ignore, ierr)
372 write (io_unit%log,*) wallclock(), self%id,
' test_all flag =', flag
375 END SUBROUTINE test_all
380 SUBROUTINE wait_all (self)
382 integer:: rank, req, ierr
384 integer,
save:: itimer=0
386 call trace%begin (
'mesg_t%wait_all', itimer=itimer)
387 if (verbose >= 0) wc = wallclock()
389 if (mpi%mode == mpi_thread_multiple)
then 391 call mpi_waitall (self%nreq, self%reqs, mpi_statuses_ignore, ierr)
394 call mpi_waitall (self%nreq, self%reqs, mpi_statuses_ignore, ierr)
397 if (verbose >= 0)
then 400 n_wait_all = n_wait_all + 1
402 t_wait_all = t_wait_all + wc
403 if (verbose > 0)
then 404 write (io_unit%log,*) wallclock(),
' wait_all:', self%id, wc
408 call trace_end (itimer)
409 END SUBROUTINE wait_all
416 SUBROUTINE add (self, mesg)
418 class(
mesg_t),
pointer:: mesg
421 if (
associated(self%head))
then 422 self%tail%next => mesg
423 mesg%prev => self%tail
431 if (verbose > 1)
then 432 write (io_unit%log,*) trim(self%name)//
' added mesg', mesg%id, self%n
441 SUBROUTINE remove (self, mesg)
443 class(
mesg_t),
pointer:: mesg
445 if (
associated(mesg%prev))
then 446 mesg%prev%next => mesg%next
448 self%head => mesg%next
450 if (
associated(mesg%next))
then 451 mesg%next%prev => mesg%prev
453 self%tail => mesg%prev
455 if (verbose > 1)
then 456 write (io_unit%log,*) trim(self%name)//
' remove mesg OK', mesg%id, self%n
461 END SUBROUTINE remove
466 SUBROUTINE delete (self, mesg, send)
468 class(
mesg_t),
pointer:: mesg
469 logical,
optional:: send
471 if (
associated(mesg%buffer))
then 472 call io%bits_mem (-storage_size(mesg%buffer), product(shape(mesg%buffer)),
'mem')
473 deallocate (mesg%buffer)
475 if (
associated(mesg%reqs))
deallocate (mesg%reqs)
477 if (
present(send))
then 479 timer%nq_send_max = max(timer%nq_send_max,mpi_mesg%nq_send)
481 mpi_mesg%nq_send = mpi_mesg%nq_send-1
483 timer%nq_recv_max = max(timer%nq_recv_max,mpi_mesg%nq_recv)
485 mpi_mesg%nq_recv = mpi_mesg%nq_recv-1
488 END SUBROUTINE delete
493 SUBROUTINE reset (self)
504 SUBROUTINE print_list (self, label)
506 character(len=*),
optional:: label
507 class(
mesg_t),
pointer:: mesg
509 if (
present(label)) &
510 write (io_unit%log,*)
'------------------ '//label//
' ------------------' 512 do while (
associated(mesg))
513 write (io_unit%log,
'(a,i5,i9,2i5)')
' mesg_list: '//self%name, &
514 self%n, mesg%id, mesg%sender, mesg%ntry
517 END SUBROUTINE print_list
522 SUBROUTINE count (self, label)
524 character(len=*):: label
525 class(
mesg_t),
pointer:: mesg
528 if (verbose < 1)
return 531 do while (
associated(mesg))
535 if (n /= self%n)
then 536 write (io_unit%log,*)
'WARNING: inconsistent '//self%name, n, self%n
543 SUBROUTINE init (self)
546 logical,
save:: debug
547 logical,
save:: recv_priv
548 logical,
save:: recv_active
549 logical,
save:: send_priv
550 logical,
save:: queue_unpack
551 logical,
save:: uniq_mesg
552 real,
save:: test_time=20e-3
553 namelist /mpi_mesg_params/ min_nq, max_sent, max_probe, max_recv, every_recv, &
554 every_send, delay_ms, recv_wait, send_wait, send_priv, &
555 test_time, uniq_mesg, debug, verbose, detailed_timer
557 character(len=120):: id = &
558 '$Id: eb4a832e49f174279be5680c7a92f2966eb4efa7 $ mpi_mesg_mod.f90' 562 call trace%print_id (id)
563 if (self%initialized)
return 564 self%initialized = .true.
565 call trace%begin (
'mpi_mesg_t%init')
569 uniq_mesg = self%uniq_mesg
570 send_priv = self%send_priv
575 if (mpi%mode /= mpi_thread_multiple)
then 584 read (io%input, mpi_mesg_params,iostat=iostat)
585 write (io%output, mpi_mesg_params)
588 self%max_recv = max_recv
589 self%max_probe = max_probe
590 self%recv_wait = recv_wait
591 self%uniq_mesg = uniq_mesg
592 self%delay_ms = delay_ms
593 mpi_mesg%test_time = test_time
597 self%send_priv = send_priv
601 self%sent_list%name =
'sent_list' 602 self%recv_list%name =
'recv_list' 603 self%unpk_list%name =
'unpk_list' 605 recv_list%name =
'recv_list' 606 sent_list%name =
'sent_list' 607 unpk_list%name =
'unpk_list' 608 sent_list%id = omp_get_thread_num()
612 allocate (expected(0:mpi%size-1))
621 SUBROUTINE log_files (self)
623 character(len=120):: filename
625 integer,
save:: previous=-1
627 if (io%log_sent > 0)
then 628 one_two = wallclock()/60.
629 one_two = mod(one_two,2) + 1
630 if (one_two /= previous)
then 633 write (filename,
'(a,"/sent_",i5.5,"_",i1,".log")') &
634 trim(io%outputname), mpi%rank, one_two
635 open (io_unit%sent,file=filename, form=
'formatted', status=
'unknown')
639 END SUBROUTINE log_files
644 SUBROUTINE test (self)
647 class(
mesg_t),
pointer:: mesg, next
652 allocate (mesg%buffer(10))
654 call test_list%add (mesg)
656 call test_list%print (
'test1')
657 mesg => test_list%head
660 call test_list%remove (mesg)
661 call test_list%delete (mesg, .true.)
664 call test_list%print (
'test2')
665 mesg => test_list%head
666 call test_list%remove (mesg)
667 call test_list%delete (mesg, .true.)
668 if (io%master)
write (io_unit%log,*) &
669 associated(test_list%head),
associated(test_list%tail)
670 call test_list%print (
'test3')
677 SUBROUTINE sent (self, mesg)
679 class(
mesg_t),
pointer:: mesg
682 mpi_mesg%nq_send = mpi_mesg%nq_send+1
683 if (mpi_mesg%send_priv)
then 684 call sent_list%add (mesg)
685 else if (send_wait)
then 688 call io%bits_mem (-storage_size(mesg%buffer), product(shape(mesg%buffer)),
'mem')
689 deallocate (mesg%buffer)
692 mpi_mesg%nq_send = mpi_mesg%nq_send-1
696 call self%sent_list%add (mesg)
699 if (verbose > 0)
then 700 write (io_unit%log,
'(f12.6,2x,a,i9,2i6)') &
701 wallclock(),
'mpi_mesg_t%sent: id, thread, n =', &
702 mesg%id, omp_get_thread_num(), sent_list%n
715 SUBROUTINE get (self, mesg)
717 class(
mesg_t),
pointer:: mesg
721 integer:: stat(mpi_status_size)
723 integer:: msg ,ierr, nbuf, req
724 integer,
save:: itimer=0
727 if (mpi%size <= 1)
return 728 call trace%begin (
'mpi_mesg_t%get', itimer=itimer)
733 if (mpi%mode == mpi_thread_multiple)
then 741 call trace%end (itimer)
750 call mpi_improbe (mpi_any_source, mpi_any_tag, mpi%comm, flag, msg, stat, ierr)
753 call mpi_get_count (stat, mpi_int, nbuf, ierr)
754 allocate (mesg%buffer(nbuf))
755 call io%bits_mem (storage_size(mesg%buffer),product(shape(mesg%buffer)),
'buf')
756 call mesg%get_id (stat)
758 mpi_mesg%nq_recv = mpi_mesg%nq_recv+1
759 call mpi_imrecv (mesg%buffer, nbuf, mpi_int, msg, req, ierr)
761 if (verbose > 0)
then 762 write (io_unit%mpi,
'(f12.6,2x,"get: id, seq, sender =",i9,2i6)') &
763 wallclock(), mesg%id, mesg%seq, mesg%sender
768 mpi_mesg%n_recv = mpi_mesg%n_recv+1
770 timer%bytes_recv = timer%bytes_recv + 4.0_8*nbuf
772 timer%n_recv = timer%n_recv + 1_8
775 end subroutine probe_for
786 SUBROUTINE iget (self, mesg)
788 class(
mesg_t),
pointer:: mesg
792 integer:: msg ,ierr, req
793 integer,
save:: itimer=0
796 if (mpi%size <= 1)
return 797 call trace%begin (
'mpi_mesg_t%get', itimer=itimer)
802 if (mpi%mode == mpi_thread_multiple)
then 810 call trace%end (itimer)
820 mesg%nbuf = mpi_mesg%nbuf
821 allocate (mesg%buffer(mesg%nbuf))
822 call io%bits_mem (storage_size(mesg%buffer),product(shape(mesg%buffer)),
'buf')
824 call mpi_irecv (mesg%buffer, mesg%nbuf, mpi_int, mpi_any_source, &
825 mpi_any_tag, mpi%comm, mesg%req, ierr)
827 mpi_mesg%nq_recv = mpi_mesg%nq_recv+1
829 mpi_mesg%n_recv = mpi_mesg%n_recv+1
831 timer%bytes_recv = timer%bytes_recv + 4.0_8*mesg%nbuf
833 timer%n_recv = timer%n_recv + 1_8
835 end subroutine probe_for
841 SUBROUTINE get_id (self, stat)
845 self%id = stat(mpi_tag)
846 self%sender = stat(mpi_source)
847 if (mpi_mesg%uniq_mesg)
then 848 self%seq = mod(self%id,100)
849 self%id = self%id/100
851 write (io_unit%log,*)
'mpi_mesg_t%get_id: id, sender, seq =', &
852 self%id, self%sender, self%seq
853 else if (verbose > 1)
then 854 write (io_unit%log,*)
'mpi_mesg_t%get_id: id, sender =', &
858 END SUBROUTINE get_id
864 FUNCTION completed (self)
RESULT(flag)
867 integer,
save:: itimer=0
869 integer:: stat(mpi_status_size)
873 if (mpi%size <= 1)
return 874 call trace%begin (
'mesg_t%completed', itimer=itimer)
876 if (mpi%mode == mpi_thread_multiple)
then 877 call mpi_test (self%req, flag, stat, ierr)
880 call mpi_test (self%req, flag, stat, ierr)
884 timer%mpi_test = timer%mpi_test + 1
886 if (flag .and. verbose > 0)
then 887 write (io_unit%log,
'(f12.6,2x,a,i6,i4,l2)') wallclock(), &
888 'mesg_t%completed:', self%id, mpi_mesg%recv_list%n, mpi_mesg%recv_wait
890 call trace%end (itimer)
891 END FUNCTION completed
896 FUNCTION all_completed (self)
RESULT(flag)
899 integer,
save:: itimer=0
901 integer:: stat(mpi_status_size)
905 if (mpi%size <= 1)
return 906 call trace%begin (
'mesg_t%all_completed', itimer=itimer)
908 if (mpi%mode == mpi_thread_multiple)
then 909 call mpi_testall (self%nreq, self%reqs, flag, mpi_statuses_ignore, ierr)
912 call mpi_testall (self%nreq, self%reqs, flag, mpi_statuses_ignore, ierr)
916 call trace%end (itimer)
917 END FUNCTION all_completed
923 FUNCTION is_complete (self, parentname)
925 logical:: is_complete
926 character(len=*),
optional:: parentname
929 real(8):: now, test_next
930 integer,
save:: itimer=0
932 integer:: stat(mpi_status_size)
935 is_complete = .false.
936 if (mpi%size <= 1)
then 941 if (now < self%test_next) &
943 if (
present(parentname) .and. detailed_timer)
then 944 call trace%begin (
'mesg_t%is_complete('//trim(parentname)//
')', 2, itimer=itimer)
946 call trace%begin (
'mesg_t%is_complete', 2, itimer=itimer)
952 if (mpi%mode == mpi_thread_multiple)
then 959 if (is_complete)
then 963 call self%get_id (stat)
966 self%n_failed = self%n_failed+1
969 test_next = now + self%test_time
971 self%test_next = test_next
972 call trace%end (itimer)
978 subroutine check_recv
984 if (mpi_mesg%recv_list%n > max_recv .or. mpi_mesg%recv_wait)
then 985 if (verbose > 1)
then 987 write (io_unit%log,*) wc, &
988 'is_complete waiting for id', self%id, mpi_mesg%recv_list%n, mpi_mesg%recv_wait
991 call mpi_wait (self%req, stat, ierr)
994 n_wait_for = n_wait_for + 1
996 t_wait_for = t_wait_for + wc
997 if (verbose > 0)
then 998 write (io_unit%log,
'(f12.6,2x,a,i6,i4,l2)') wallclock(), &
999 'is_complete:', self%id, mpi_mesg%recv_list%n, mpi_mesg%recv_wait
1002 call mpi_test (self%req, is_complete, stat, ierr)
1006 timer%mpi_test = timer%mpi_test + 1
1007 if (is_complete)
then 1009 timer%mpi_hit = timer%mpi_hit + 1
1011 end subroutine check_recv
1012 END FUNCTION is_complete
1017 SUBROUTINE irecv (self, rank, id)
1020 integer:: rank, tag, ierr, seq
1025 write (io_unit%log,*) self%seq, mpi_mesg%tag_type
1026 if (mpi_mesg%tag_type == 1)
then 1027 self%seq = self%seq + 1
1028 tag = mod(self%seq,100) + id*100
1029 else if (mpi_mesg%tag_type == 2)
then 1030 expected(rank) = expected(rank) + 1
1031 tag = mod(expected(rank),100) + id*100
1035 write (io_unit%log,*) &
1036 'irecv: seq, tag_type, tag', self%seq, mpi_mesg%tag_type, tag
1040 if (mpi%mode == mpi_thread_multiple)
then 1041 call mpi_irecv (self%buffer, self%nbuf, mpi_int, rank, tag, mpi%comm, self%req, ierr)
1044 call mpi_irecv (self%buffer, self%nbuf, mpi_int, rank, tag, mpi%comm, self%req, ierr)
1048 if (verbose > 1)
then 1049 write (io_unit%log,
'(f12.6,2x,a,i6,i9,i5,z12)') &
1050 wallclock(),
'mesg_t%irecv: id, tag, rank, req =', &
1051 self%id, tag, rank, self%req
1055 END SUBROUTINE irecv
1060 SUBROUTINE wait_for_completion (self)
1064 integer,
save:: itimer=0
1067 integer:: stat(mpi_status_size)
1070 if (mpi%size <= 1)
return 1071 call trace%begin (
'mesg_t%wait_for_completion', itimer=itimer)
1072 if (verbose>=0) wc=wallclock()
1074 if (mpi%mode == mpi_thread_multiple)
then 1075 call mpi_wait (self%req, stat, ierr)
1078 call mpi_wait (self%req, stat, ierr)
1082 if (verbose >= 0)
then 1085 n_wait_for = n_wait_for + 1
1087 t_wait_for = t_wait_for + wc
1089 write (io_unit%log,*)
'wait_for_completion', self%id, wc
1091 call trace%end (itimer)
1092 END SUBROUTINE wait_for_completion
1097 SUBROUTINE delay (self, n, ms)
1099 integer,
optional:: n
1102 integer,
save:: itimer=0
1104 if (
present(ms))
then 1107 ms_l = mpi_mesg%delay_ms
1109 if (ms_l==0.0)
return 1110 call trace%begin (
'mpi_mesg_t%no_queue', itimer=itimer)
1111 call mpi%delay (ms_l)
1113 self%n_delay = self%n_delay + 1
1114 call trace%end (itimer)
1115 END SUBROUTINE delay
1120 SUBROUTINE diagnostics (self, flag)
1123 integer,
save:: itimer=0
1127 call trace%begin (
'mpi_mesg%diagnostics', itimer=itimer)
1128 call self%sent_list%print (
'ABORT')
1129 call self%recv_list%print (
'ABORT')
1131 call trace%end (itimer)
1136 write (io_unit%mpi,1)
'average wait_for time:', t_wait_for/max(n_wait_for,1_8), &
1137 ', with',
real(n_wait_for),
' waits' 1138 write (io_unit%mpi,1)
'average wait_all time:', t_wait_all/max(n_wait_all,1_8), &
1139 ', with',
real(n_wait_all),
' waits' 1140 1
format(a,1p,2(e10.2,a))
1141 if (io%master.and..not.io_unit%do_validate)
then 1142 write (io_unit%output,1)
'average wait_for time:', t_wait_for/max(n_wait_for,1_8), &
1143 ', with',
real(n_wait_for),
' waits' 1144 write (io_unit%output,1)
'average wait_all time:', t_wait_all/max(n_wait_all,1_8), &
1145 ', with',
real(n_wait_all),
' waits' 1149 END SUBROUTINE diagnostics
1155 LOGICAL FUNCTION is_in_order (self)
1158 is_in_order = self%seq == expected(self%sender)
1159 if (is_in_order)
then 1160 expected(self%sender) = mod(expected(self%sender)+1,100)
1162 END FUNCTION is_in_order
Each thread uses a private timer data type, with arrays for start time and total time for each regist...
Support tic/toc timing, as in MATLAB, and accurate wallclock() function. The timing is generally much...