50 procedure,
private:: recv_improbe
51 procedure,
private:: recv_irecv
52 procedure,
private:: recv_mirecv
53 procedure,
private:: move_mesgs
54 procedure,
private:: unpk_mesgs
55 procedure:: init_virtual
56 procedure,
private:: recv_virtual
57 procedure,
private:: find_task
58 procedure,
private:: unpack
60 integer,
save:: verbose=0
61 logical,
save:: use_hashtable=.true.
63 type(
task_mesg_t),
save:: private_list, master_list, remove_list
65 character(len=8),
save:: method=
'irecv' 73 SUBROUTINE init (self, name)
75 character(len=*),
optional:: name
77 integer,
save:: hash_table_size=10000
79 namelist /task_mesg_params/ verbose, method, hash_table_size, use_hashtable
80 character(len=120):: id = &
81 '$Id: 7b482c1f0d310cc777fb73836ce2956a1dcb3595 $ lists/task_mesg_mod.f90' 83 call trace%begin (
'task_mesg_t%init')
84 call trace%print_id (id)
86 read (io%input, task_mesg_params, iostat=iostat)
87 call link%init_verbose (verbose)
88 write (io%output, task_mesg_params)
90 call self%hash_table%init (hash_table_size)
95 trim(method) /=
'irecv' .and. &
96 trim(method) /=
'improbe')
then 99 mpi_mesg%uniq_mesg = .true.
100 select case (trim(method))
102 mpi_mesg%tag_type = 2
104 mpi_mesg%max_recv = 5
105 mpi_mesg%tag_type = 2
107 mpi_mesg%tag_type = 2
109 mpi_mesg%tag_type = 1
111 mpi_mesg%tag_type = 1
113 call mpi%abort (
'unknown method in MPI_MESG_PARAMS')
121 SUBROUTINE set_test_time (self, task)
128 if (mpi_mesg%test_time > 0.0)
then 129 task%mesg%test_time = mpi_mesg%test_time
138 task%update_cadence = 1e-6*product(task%n)*self%na/omp%nthreads
139 task%mesg%test_time = task%update_cadence*0.5**(task%mesg%n_failed+1)
141 END SUBROUTINE set_test_time
146 SUBROUTINE check_mpi (self, n_unpk)
148 integer,
optional:: n_unpk
153 if (mpi%size <= 1)
return 154 call trace%begin (
'task_mesg_t%check_mpi')
160 call mpi_mesg%sent_list%check_sent (nq)
164 select case (trim(method))
167 call self%recv_improbe (n_unpk_l)
170 call self%recv_irecv (n_unpk_l)
172 call self%recv_mirecv (n_unpk_l)
175 call self%recv_virtual (master_list, n_unpk_l)
177 call self%recv_virtual (private_list, n_unpk_l)
179 if (
present(n_unpk))
then 183 END SUBROUTINE check_mpi
191 SUBROUTINE recv_improbe (self, n_unpk)
194 class(
mesg_t),
pointer:: mesg, next
195 integer,
save:: itimer=0
199 if (omp%thread > 0)
return 200 call trace%begin (
'task_mesg_t%recv_improbe', 1, itimer=itimer)
201 mpi_mesg%uniq_mesg = .true.
206 write (io_unit%log,*) wallclock(), &
207 'recv_improbe: n =', mpi_mesg%recv_list%n, mpi_mesg%max_recv
209 call self%move_mesgs (mpi_mesg%recv_list)
210 call self%unpk_mesgs (n_unpk)
212 write (io_unit%log,*) wallclock(), &
213 'recv_improbe: n =', mpi_mesg%recv_list%n, mpi_mesg%min_nq
219 if (self%nq < mpi_mesg%min_nq)
then 221 call self%move_mesgs (mpi_mesg%recv_list)
223 mesg => mpi_mesg%recv_list%head
224 do while (
associated(mesg) .and. self%nq < mpi_mesg%min_nq)
226 call mesg%wait_for_completion()
228 timer%n_master(4) = timer%n_master(4) + 1
230 write (io_unit%log,*)
'recv_improbe: waited on mesg', &
232 call mpi_mesg%recv_list%remove (mesg)
233 call mpi_mesg%unpk_list%add (mesg)
234 call self%unpk_mesgs (n_unpk)
238 write (io_unit%log,*) wallclock(),
'recv_improbe: n =', mpi_mesg%recv_list%n
240 call trace%end (itimer)
247 subroutine recv_mesgs
249 call mpi_mesg%get (mesg)
250 do while (
associated(mesg))
252 timer%n_master(1) = timer%n_master(1) + 1
253 call mpi_mesg%recv_list%add (mesg)
256 write (io_unit%log,*)
'recv_improbe: addded mesg to recv_list', &
258 call mpi_mesg%get (mesg)
260 end subroutine recv_mesgs
261 END SUBROUTINE recv_improbe
269 SUBROUTINE recv_irecv (self, n_unpk)
272 class(
mesg_t),
pointer:: mesg, next
273 integer,
save:: itimer=0
277 if (omp%thread > 0)
return 278 if (mpi_mesg%nbuf == 0)
return 279 call trace%begin (
'task_mesg_t%recv_irecv', 1, itimer=itimer)
280 mpi_mesg%uniq_mesg = .true.
284 call self%move_mesgs (mpi_mesg%recv_list)
288 do while (mpi_mesg%recv_list%n < mpi_mesg%max_recv)
289 call mpi_mesg%iget (mesg)
290 if (mesg%is_complete(
'recv_irecv'))
then 291 call mpi_mesg%unpk_list%add (mesg)
293 call mpi_mesg%recv_list%add (mesg)
299 call self%unpk_mesgs (n_unpk)
300 call trace%end (itimer)
301 END SUBROUTINE recv_irecv
310 SUBROUTINE recv_mirecv (self, n_unpk)
313 class(
mesg_t),
pointer:: mesg, next
315 integer,
save:: itimer=0
319 if (mpi_mesg%nbuf == 0)
return 320 call trace%begin (
'task_mesg_t%recv_mirecv', 1, itimer=itimer)
321 mpi_mesg%uniq_mesg = .true.
325 mesg => priv_list%head
326 do while (
associated(mesg))
328 if (mesg%is_complete(
'recv_mirecv'))
then 329 call priv_list%remove (mesg)
330 call unpk_tmp%add (mesg)
337 do while (priv_list%n < mpi_mesg%max_recv)
338 call mpi_mesg%iget (mesg)
339 if (mesg%is_complete(
'recv_irecv'))
then 340 call unpk_tmp%add (mesg)
342 call priv_list%add (mesg)
350 mesg => unpk_tmp%head
351 do while (
associated(mesg))
353 call unpk_tmp%remove (mesg)
354 call mpi_mesg%unpk_list%add (mesg)
357 call self%unpk_mesgs (n_unpk)
359 call trace%end (itimer)
360 END SUBROUTINE recv_mirecv
365 SUBROUTINE move_mesgs (self, msg_list)
368 class(
mesg_t),
pointer:: mesg, next
370 mesg => msg_list%head
371 do while (
associated(mesg))
373 if (mesg%is_complete(
'recv_improbe'))
then 375 timer%n_master(2) = timer%n_master(2) + 1
377 write (io_unit%log,*)
'recv_improbe: moved to unpk_list:', &
378 mesg%id, mesg%seq,
associated(next)
379 call msg_list%remove (mesg)
380 call mpi_mesg%unpk_list%add (mesg)
383 write (io_unit%log,*)
'recv_improbe: not yet complete:', &
388 END SUBROUTINE move_mesgs
393 SUBROUTINE unpk_mesgs (self, n)
396 class(
mesg_t),
pointer:: mesg, next
398 mesg => mpi_mesg%unpk_list%head
399 do while (
associated(mesg))
401 if (mesg%is_in_order())
then 403 timer%n_master(3) = timer%n_master(3) + 1
404 call self%unpack (mesg)
406 write (io_unit%log,*)
'recv_improbe: unpacked from list:', &
408 call mpi_mesg%unpk_list%remove (mesg)
409 call mpi_mesg%unpk_list%delete (mesg, .false.)
413 write (io_unit%log,*)
'recv_improbe: kept in unpk_list:', &
418 END SUBROUTINE unpk_mesgs
424 SUBROUTINE init_virtual (self)
426 class(
link_t),
pointer:: link
427 class(
task_t),
pointer:: task
428 integer:: nvirt, per, i, i1, i2
430 if (mpi%size==1)
return 431 if (trim(method) /=
'virtual' .and. trim(method) /=
'private')
return 432 call trace%begin (
'task_mesg_t%init_virtual')
438 do while (
associated(link))
440 if (task%is_set (bits%virtual))
then 445 per = nvirt/omp%nthreads + 1
446 if (trim(method) ==
'private')
then 452 i1 = 1 + per*omp%thread
454 call init_virtual_list (self, private_list, i1, i2)
456 else if (trim(method) ==
'virtual')
then 457 call init_virtual_list (self, master_list, 1, 1+nvirt)
460 END SUBROUTINE init_virtual
465 SUBROUTINE init_virtual_list (self, list, i1, i2)
467 integer,
optional:: i1, i2
468 class(
link_t),
pointer:: link
469 class(
task_t),
pointer:: task
472 call trace%begin (
'task_mesg_t%init_virtual_list')
475 do while (
associated(link))
477 if (task%is_set (bits%virtual))
then 478 if (i >= i1 .and. i < i2)
then 480 call list%append (task, &
481 nbor=link%nbor, needed=link%needed, needs_me=link%needs_me)
483 call task%allocate_mesg
484 task%wc_last = wallclock()
486 call task%mesg%irecv (task%rank, task%id)
488 write (io_unit%log,*) &
489 'task_mesg_t%init_virtual: id, mesg%id, mesg%tag =', &
490 task%id, task%mesg%id, task%mesg%tag
497 write (io_unit%log,
'(a,2i5,2x,2i4)') &
498 'task_mesg_t%init_virtual_list: n, thread, i1, i2 =', &
499 list%n, omp%thread, i1, i2
501 END SUBROUTINE init_virtual_list
508 SUBROUTINE recv_virtual (self, list, n_unpk)
510 class(
link_t),
pointer:: link
511 class(
task_t),
pointer:: task
514 integer,
save:: itimer=0
516 call trace%begin (
'task_mesg_t%recv_virtual', itimer=itimer)
522 do while (
associated(link))
526 call set_test_time (self, task)
527 if (task%mesg%is_complete(
'virtual'))
then 528 if (verbose > 1 .or. task%id == io%id_debug) &
529 write (io_unit%log,
'(f12.6,2x,a,i6,i5,i9,i6,z12)') wallclock(), &
530 'task_mesg_t%recv_virtual: recv id, seq, mesg%id, sender =', &
531 task%id, mod(task%istep,100), task%mesg%id, task%mesg%sender
532 call self%unpack (task%mesg, link=link)
534 mpi_mesg%n_recv = mpi_mesg%n_recv+1
536 timer%bytes_recv = timer%bytes_recv + 4.0_8*task%mesg%nbuf
538 timer%n_recv = timer%n_recv + 1_8
540 if (wc-task%wc_last > 10.)
write(io%output, &
541 '("WARNING: virtual patch not updated in",f5.1," sec")') wc
543 call task%mesg%irecv (task%rank, task%id)
546 else if (verbose > 2 .or. task%id == io%id_debug)
then 547 write (io_unit%log,
'(f12.6,2x,a,i6,i5,i9,i6,z12)') wallclock(), &
548 'task_mesg_t%recv_virtual: fail id, seq, mesg%id, sender, req =', &
549 task%id, mod(task%istep,100), task%mesg%id, task%rank, task%mesg%req
555 write (io_unit%log,*)
' task_mesg_t%recv_virtual, n_unpk =', n_unpk
556 call trace%end (itimer)
557 END SUBROUTINE recv_virtual
564 FUNCTION find_task (self, id, new)
RESULT (link)
569 class(
link_t),
pointer:: link
570 class(
task_t),
pointer:: task
571 class(
patch_t),
pointer:: patch
572 class(experiment_t),
pointer:: exper
573 class(*),
pointer:: ptr
574 integer,
save:: itimer=0
576 call trace%begin (
'task_mesg_t%find_task', itimer=itimer)
578 write (io_unit%log,*)
'task_mesg_t%find_task: id =', id
581 call self%hash_table%get ([id, 1], ptr)
582 if (
associated(ptr))
then 589 call io%abort (
'hash table link not useful')
593 write (io_unit%log,*)
'task_mesg_t%find_task: hash =', task%id
595 call self%lock%set (
'find_task')
598 do while (
associated(link))
601 write (io_unit%log,*)
'task_mesg_t%find_task: task%id =', task%id
602 if (task%id == id)
then 603 if (verbose > 1)
then 604 write (io_unit%log,*)
'task_mesg_t%find_task: match =', id
610 if (use_hashtable)
then 612 call self%hash_table%set ([id, 1], ptr)
619 call self%lock%unset (
'find_task')
629 exper%box = self%size
640 call exper%set (bits%virtual)
642 write (io_unit%log,*) &
643 'task_mesg_t%find: created new task, id =', id, exper%id
645 call trace%end (itimer)
646 END FUNCTION find_task
652 SUBROUTINE unpack (self, mesg, link)
654 class(
mesg_t),
pointer:: mesg
655 class(
link_t),
pointer,
optional:: link
656 class(
link_t),
pointer:: link1, link2
657 class(experiment_t),
pointer:: task
658 logical:: found, failed, new
659 integer:: id, n_added
660 integer,
save:: itimer=0
661 character(len=24):: label
664 call trace%begin (
'task_mesg_t%unpack', itimer=itimer)
665 if (task%logging > 1)
then 666 write (label,
'(a,i4,i8)')
'unpack ', mesg%sender, mesg%id
667 call task%log (label)
669 if (mesg%nbuf < 40)
then 670 call load_balance%unpack (mesg%buffer)
681 if (
present(link))
then 683 if (self%verbose > 1) &
684 write (io_unit%log,
'(f12.6,2x,a,2i5,z12)') &
685 wallclock(),
'task_mesg_t%unpack: id =', mesg%id, link1%task%id
687 link1 => find_task(self, mesg%id, new)
688 if (self%verbose > 1) &
689 write (io_unit%log,
'(f12.6,2x,a,2i5,z12)') &
690 wallclock(),
'task_mesg_t%unpack: found id =', mesg%id, link1%task%id
695 associate(ltask=>link1%task)
697 class is (experiment_t)
703 if (
associated(link1%task))
then 705 write (io_unit%log,
'(a,i6,2x,5l1)')
'unpack: id, bits BVRES =', mesg%id, &
706 task%is_set (bits%boundary), &
707 task%is_set (bits%virtual), &
708 task%is_set (bits%ready), &
709 task%is_set (bits%remove), &
710 task%is_set (bits%swap_request)
715 if (task%is_set (bits%boundary))
then 716 write (io_unit%log,
'(f12.6,2x,a,i9,3x,5l1)') wallclock(), &
717 'task_mesg_mod::unpack ERROR, received mpi_mesg for boundary task:', task%id, &
718 task%is_set(bits%internal), &
719 task%is_set(bits%boundary), &
720 task%is_set(bits%virtual), &
721 task%is_set(bits%external), &
722 task%is_set(bits%swap_request)
725 if (task%is_set (bits%ready))
then 728 do while (
associated(link2))
729 if (link2%task%id == task%id)
then 733 link2 => link2%next_time
735 write (io_unit%log,
'(f12.6,2x,a,i9,l3,3x,5l1)') wallclock(), &
736 'task_mesg_mod::unpack ERROR, received mpi_mesg for task with ready bit:', task%id, found, &
737 task%is_set(bits%internal), &
738 task%is_set(bits%boundary), &
739 task%is_set(bits%virtual), &
740 task%is_set(bits%external), &
741 task%is_set(bits%swap_request)
748 task%update_cadence = wc - task%update_last
749 task%update_last = wc
757 call task%unpack (mesg)
758 if (verbose > 1)
then 759 write (io_unit%mpi,
'(f12.6,3x,a,3i6,f12.6,2x,3f10.4,2x,5l1)') &
760 wallclock(),
'unpack: after task%unpack BVRES =', &
761 id, mesg%id, task%id, task%time, task%position, &
762 task%is_set (bits%boundary), &
763 task%is_set (bits%virtual), &
764 task%is_set (bits%ready), &
765 task%is_set (bits%remove), &
766 task%is_set (bits%swap_request)
772 if (task%is_set (bits%remove))
then 773 if (verbose > 0)
then 774 write (io_unit%log,
'(f12.6,2x,a,i6)') &
775 wallclock(),
'unpack: suicide note received for id =', task%id
777 call self%remove_and_reset (link1)
778 call trace%end (itimer)
781 if (io%log_sent > 0)
then 783 call mpi_mesg%log_files()
784 write (io_unit%sent,
'(f16.6,i4,2x,a,i6,f16.6,8i5)') wallclock(), omp%thread,
'unp', task%id, task%time, task%rank
788 if (mpi_mesg%debug .or. id==io%id_debug)
then 790 write (io_unit%log,
'(f12.6,2x,a,2i9,z12,2x,5l1)') wallclock(), &
791 'DBG unpk: id, sender, req =', mesg%id, task%rank, mesg%req, &
792 task%is_set (bits%internal), &
793 task%is_set (bits%boundary), &
794 task%is_set (bits%virtual), &
795 task%is_set (bits%external), &
796 task%is_set (bits%swap_request)
799 if (mpi_mesg%debug) &
800 write (io_unit%log,
'(f12.6,2x,a,i9,1p,e18.6)') wallclock(),
'unpk: id, time =', task%id, task%time
802 write(io%output,*)
'unpack ERROR: wrong mesg%id', id, task%id, mesg%id
803 if (.not. failed)
then 805 mpi_mesg%n_unpk = mpi_mesg%n_unpk+1
812 if (task%is_set(bits%swap_request) .and. task%is_set(bits%boundary))
then 819 call self%init_nbors (link1)
820 call self%check_ready (link1)
821 call task%clear (bits%swap_request+bits%ready)
822 call self%update_nbor_status (link1)
823 call self%count_status
825 write(io%output,
'(f12.6,2x,a,i6,a,i9,a,i6)') &
826 wallclock(),
'LB: rank',mpi%rank,
' given patch',task%id,
' by',mesg%sender
828 write (io_unit%log,*)
'task_mesg_t%unpack: swapped virtual to boundary:', task%id
837 else if (task%is_set(bits%swap_request) .and. task%is_set(bits%virtual))
then 839 write (io_unit%log,*)
'task_mesg_t%unpack: new virtual patch:', task%id
841 call self%init_nbors (link1)
842 call self%check_ready (link1)
843 call task%clear (bits%swap_request+bits%ready)
844 call self%update_nbor_status (link1)
845 call self%count_status
860 if (verbose > 0)
then 861 write (io_unit%mpi,
'(f12.6,2x,a,3i6,2i4,f12.6,2x,3f10.5,2x,"new")') &
862 wallclock(),
'task_mesg_t%unpack: ', id, task%id, link1%task%id, &
863 mesg%seq, omp%thread, task%time, task%position
866 call self%add_new_link (link1)
870 if (task%is_set (bits%support))
then 871 call refine%check_support (self, link1, n_added)
878 if (task%is_set (bits%init_nbors))
then 879 call self%init_nbors (link1)
880 call self%check_ready (link1)
882 write (io_unit%log,*)
'unpack: bits%init_nbors id =', task%id
884 if (verbose > 0)
then 885 write (io_unit%mpi,
'(f12.6,2x,a,i6,2i4,f12.6,2x,3f10.5,2x)') &
886 wallclock(),
'task_mesg_t%unpack: ', id, mesg%seq, omp%thread, &
887 task%time, task%position
898 if (self%method==0)
then 900 write (io_unit%log,*)
'unpack: check_nbors, id =', task%id
901 call self%check_nbors (link1)
903 call trace%end (itimer)
904 END SUBROUTINE unpack
Each thread uses a private timer data type, with arrays for start time and total time for each regist...
Support tic/toc timing, as in MATLAB, and accurate wallclock() function. The timing is generally much...
Module with list handling for generic class task_t objects.
Hash table module for the use inside DISPATCH.
Template module for patches, which adds pointers to memory and mesh, and number of dimensions and var...
This module handles checking max change between neighboring points. Each instance of it needs an inde...
Message handling for task lists. Some of the methods are only used by dispatcher0_t, so should perhaps be moved over to that file.
Module with list handling for generic class task_t objects.
Keep track of neighbor ranks and their loads, by sending and receiving short messages, storing the info in a linked list.
Template module for tasks.