93 integer,
save:: n_state(0:4)=0
94 integer,
save:: d_state(0:4)=0
95 integer,
save:: verbose=0
96 integer,
save:: stalled=0, max_stalled=10000, retry_stalled=100
97 integer,
save:: min_nq=2**30
98 logical,
save:: do_delay=.false.
99 logical,
save:: track_active=.false.
109 SUBROUTINE init (self, name)
111 character(len=*),
optional:: name
113 class(
link_t),
pointer:: link
114 logical:: queue_unpack, send_priv, recv_active, recv_priv
116 namelist /dispatcher6_params/ verbose, max_stalled, retry_stalled, do_delay
120 call trace%begin(
'dispatcher6_t%init')
123 read(io%input, dispatcher6_params, iostat=iostat)
124 write (io%output, dispatcher6_params)
131 SUBROUTINE execute (self, task_list, test)
138 integer,
save:: itimer=0
140 call trace%begin (
'dispatcher6_t%execute', itimer=itimer)
141 call startup (task_list)
148 do while (task_list%na > 0 .and. wallclock() < io%job_seconds)
149 call update_list (task_list, test)
150 if (io%do_stop)
call mpi%abort (
'stop flag')
151 if (task_list%na == task_list%n_tasks)
then 153 min_nq = min(min_nq,task_list%nq)
156 write (io_unit%log,*)
'thread',omp%thread,
' arrived' 161 call mpi_mesg%diagnostics(1)
162 call toc (
'wall time', timer%n_update, time=sec)
163 call mpi%barrier (
'end')
164 write (io%output,*)
"task list finished, min_nq =", min_nq
165 call trace%end (itimer)
166 END SUBROUTINE execute
188 SUBROUTINE update_list (task_list, test)
192 class(
link_t),
pointer:: head, prev
193 class(
task_t),
pointer:: task, otask
194 class(
mesg_t),
pointer:: mesg
195 logical:: already_busy, was_refined
197 real(8),
save:: time, otime=0d0
198 integer,
save:: oid=0
200 integer,
save:: itimer=0
202 call trace%begin(
'dispatcher6::update_list', itimer=itimer)
206 if (mpi%size > 1)
then 209 call mpi_mesg%sent_list%check_sent (nq)
211 call mpi_io%iwrite_list%check
215 call task_list%lock%set()
216 head => task_list%queue
217 if (
associated(head))
then 218 task_list%queue => head%next_time
220 call task_list%lock%unset()
221 if (
associated(head))
then 223 task%atime = task%time
226 if (task%state==1)
then 227 call set_state (task, 2)
229 task_list%nq = task_list%nq-1
230 task%nq = task_list%nq
231 call task%lock%unset()
232 if (task%is_set(bits%virtual))
then 233 call unpack (task_list, task%mesg, head)
235 call update_task (task_list, head, test, was_refined)
237 if (.not. was_refined)
then 238 call check_nbors (task_list, head)
241 call set_state (task, 0)
243 write (io%output,*)
'WARNING: queue head task was in state', task%state
250 call check_all (task_list)
252 call trace_end (itimer)
253 END SUBROUTINE update_list
258 SUBROUTINE update_task (task_list, head, test, was_refined)
260 class(
link_t),
pointer:: head
262 logical,
optional:: was_refined
264 class(
task_t),
pointer:: task
266 logical:: refined, derefined
267 integer,
save:: itimer=0
269 call trace%begin(
'dispatcher6::update_task', itimer=itimer)
276 call refine%check_current(task_list, head, refined, derefined)
278 call trace%end (itimer)
281 if (
present(was_refined))
then 282 was_refined = refined
293 task%rotated = .false.
295 if (.not.task%rotated)
then 298 call task%info (task_list%nq, task_list%na)
306 if (task%is_set(bits%boundary))
then 307 head%task%nq = task_list%nq
308 if (load_balance%check_load (head))
then 310 task_list%na = task_list%na-1
312 task_list%nb = task_list%nb-1
314 task_list%nv = task_list%nv-1
315 call task_list%count_status
317 if (task%id == io%id_debug) &
318 write(io_unit%mpi,*) &
319 'DBG task_list_t%update: calling send_to_vnbors', task%id
320 call task_list%send_to_vnbors (head)
328 if (task%has_finished())
then 329 call set_state (task, 4)
330 call task%lock%unset()
331 call load_balance%active (.false.)
333 task_list%na = task_list%na-1
335 call task%lock%unset()
337 call trace%end (itimer)
338 END SUBROUTINE update_task
347 SUBROUTINE check_nbors (task_list, link)
349 class(
link_t),
pointer:: link
350 class(
link_t),
pointer:: nbor
351 class(
task_t),
pointer:: task
352 integer,
save:: itimer=0
354 call trace_begin(
'dispatcher6::check_nbors', itimer=itimer)
357 do while (
associated (nbor))
359 call check_ready (task_list, nbor%link)
362 if (link%task%state==0) &
363 call check_ready (task_list, link)
364 call trace_end (itimer)
365 END SUBROUTINE check_nbors
370 SUBROUTINE check_ready (task_list, link)
372 class(
link_t),
pointer:: link
373 class(
task_t),
pointer:: task
374 class(
link_t),
pointer:: nbor
377 integer,
save:: itimer=0
379 call trace%begin (
'dispatcher6::check_ready', itimer=itimer)
383 call task%lock%unset()
387 if (task%state==0)
then 388 if (task%is_set(bits%virtual))
then 390 ok = task%mesg%is_complete()
392 if (task%mesg%req==0)
then 393 call task%mesg%irecv (task%rank, task%id)
400 do while (
associated (nbor))
401 if (nbor%needed)
then 402 if (.not. is_ahead_of(nbor%task, task))
then 412 call set_state (task, 1)
413 call queue_by_time (task_list, link)
414 call task%lock%unset()
417 call trace%end (itimer)
418 END SUBROUTINE check_ready
422 SUBROUTINE queue_by_time (task_list, this)
424 class(
link_t),
pointer:: this
425 class(
link_t),
pointer:: next, prev
426 class(
task_t),
pointer:: task
427 integer,
save:: itimer=0
430 call trace_begin (
'dispatcher6::queue_by_time ', itimer=itimer)
431 call task_list%lock%set
433 mpi_mesg%n_ready = mpi_mesg%n_ready+1
435 next => task_list%queue
436 do while (
associated(next))
437 if (
associated(next%task, task))
then 438 write (io_unit%log,*) omp_mythread,
' WARNING: task', task%id,
' is already in ready queue' 440 else if (next%task%time > task%time)
then 441 this%next_time => next
442 if (
associated(prev))
then 443 prev%next_time => this
445 task_list%queue => this
448 task_list%nq = task_list%nq+1
452 next => next%next_time
454 task_list%nq = task_list%nq+1
455 if (
associated(prev))
then 456 prev%next_time => this
458 task_list%queue => this
460 nullify (this%next_time)
462 call task_list%lock%unset
463 call trace_end (itimer)
464 END SUBROUTINE queue_by_time
472 LOGICAL FUNCTION is_ahead_of (source, target)
473 class(
task_t):: source,
target 475 real(8):: nbtime, nbdtime, tgtime
476 integer:: state, istep
478 call source%lock%set()
480 nbdtime = source%dtime
483 call source%lock%unset()
485 call target%lock%set()
487 call target%lock%unset()
493 else if (source%level /=
target%level .or. istep < 3)
then 494 is_ahead_of = nbtime >= tgtime
497 is_ahead_of = nbtime + nbdtime*
target%grace > tgtime
499 if (io_unit%verbose>1)
then 500 if (
target%id==io%id_debug.or.io_unit%verbose>4) &
501 print
'(i6,i4,2x,a,i6,3f9.5,l3)', source%id, omp_mythread,
'mk is_ahead_of: ', &
502 target%id, nbtime, nbdtime*
target%grace, tgtime, is_ahead_of
504 END FUNCTION is_ahead_of
512 SUBROUTINE unpack (self, mesg, link)
514 class(
mesg_t),
pointer:: mesg
515 class(
link_t),
pointer:: link
516 class(
link_t),
pointer:: link2
517 class(
task_t),
pointer:: task
520 integer,
save:: itimer=0
522 call trace%begin (
'dispatcher6::unpack', itimer=itimer)
523 if (mesg%nbuf < 40)
then 524 call load_balance%unpack (mesg%buffer)
533 if (task%is_set (bits%boundary))
then 534 write (io_unit%log,
'(f12.6,2x,a,i9,3x,5l1)') wallclock(), &
535 'task_mesg_mod::unpack ERROR, received mpi_mesg for boundary task:', task%id, &
536 task%is_set(bits%internal), &
537 task%is_set(bits%boundary), &
538 task%is_set(bits%virtual), &
539 task%is_set(bits%external), &
540 task%is_set(bits%swap_request)
543 if (task%is_set (bits%ready))
then 544 write (io_unit%log,
'(f12.6,2x,a,i9,3x,5l1)') wallclock(), &
545 'task_mesg_mod::unpack ERROR, received mpi_mesg for task with ready bit:', &
547 task%is_set(bits%internal), &
548 task%is_set(bits%boundary), &
549 task%is_set(bits%virtual), &
550 task%is_set(bits%external), &
551 task%is_set(bits%swap_request)
561 call task%unpack (mesg)
562 if (mpi_mesg%debug) &
563 write (io_unit%log,
'(f12.6,2x,a,i9,1p,e18.6)') wallclock(), &
564 'unpk: id, time =', task%id, task%time
565 if (id /= mesg%id)
then 566 write(io%output,
'(i6,i4,2x,a,3i6)') mpi%rank, omp%thread, &
567 'unpack ERROR: wrong mesg%id', id, task%id, mesg%id
568 write(io_unit%log,
'(f12.6,i6,i4,2x,a,4i6)') wallclock(), mpi%rank, &
569 omp%thread,
'unpack ERROR: wrong mesg%id', mesg%sender, id, task%id, mesg%id
571 if (.not. failed)
then 573 mpi_mesg%n_unpk = mpi_mesg%n_unpk+1
581 if (task%is_set(bits%swap_request) .and. task%is_set(bits%boundary))
then 582 self%na = self%na+1; self%nb = self%nb+1; self%nv = self%nv-1
583 call self%init_nbors (link)
584 call task%clear (bits%swap_request+bits%ready)
585 call self%update_nbor_status (link)
586 call self%count_status
588 write(io%output,
'(f12.6,2x,a,i6,a,i9,a,i6)') &
589 wallclock(),
'LB: rank',mpi%rank,
' given patch',task%id,
' by',mesg%sender
591 write (io_unit%log,*)
'task_mesg_t%unpack: swapped virtual to boundary:', task%id
600 else if (task%is_set(bits%swap_request) .and. task%is_set(bits%virtual))
then 602 write (io_unit%log,*)
'task_mesg_t%unpack: new virtual patch:', task%id
604 call self%init_nbors (link)
605 call task%clear (bits%swap_request+bits%ready)
606 call self%update_nbor_status (link)
607 call self%count_status
611 call trace%end (itimer)
612 END SUBROUTINE unpack
617 SUBROUTINE startup (task_list)
619 class(
link_t),
pointer:: link
620 class(
task_t),
pointer:: task
622 call trace%begin (
'dispatcher6::startup')
627 link => task_list%head
628 do while (
associated(link))
630 if (task%is_set (bits%virtual))
then 631 call task%allocate_mesg
632 task%wc_last = wallclock()
633 call task%mesg%irecv (task%rank, task%id)
640 link => task_list%head
641 do while (
associated(link))
642 call set_state (task, 0)
643 call check_ready (task_list, link)
647 task_list%n_tasks = task_list%na
651 call mpi%barrier (
'task_list%execute')
653 END SUBROUTINE startup
659 SUBROUTINE check_all (list)
661 class(
link_t),
pointer:: link
662 integer,
save:: itimer=0
664 call trace_begin(
'dispatcher6::check_all', itimer=itimer)
667 do while (
associated (link))
668 call check_ready (list, link)
672 call trace_end (itimer)
673 END SUBROUTINE check_all
681 SUBROUTINE rotate (self)
684 integer,
save:: itimer=0
686 if (self%rotated)
return 687 call trace_begin (
'dispatcher6::rotate',itimer=itimer)
690 self%dt(self%it) = self%dtime
693 if (trim(self%kind) /=
'zeus_mhd_patch')
then 694 self%dt(self%new)= self%dtime
696 self%time = self%time + self%dtime
697 self%t(self%new) = self%time
699 self%new = mod(self%new,self%nt)+1
701 self%iit(i) = self%iit(i+1)
703 self%iit(self%nt) = self%new
704 self%istep = self%istep + 1
705 self%rotated = .true.
707 call self%lock%unset()
708 call trace_end (itimer)
709 END SUBROUTINE rotate
714 SUBROUTINE set_state (task, in)
715 class(
task_t),
pointer:: task
720 n_state(ip) = n_state(ip) - 1
721 n_state(in) = n_state(in) + 1
722 d_state(in) = d_state(in) + 1
724 print
'(a,i4,f12.6,2x,5i4,2x,2i3)',
' dispatcher6:', omp%thread, wallclock(), n_state, ip, in
725 END SUBROUTINE set_state
Each thread uses a private timer data type, with arrays for start time and total time for each regist...
Module for handling blocking and non-blocking MPI parallel I/O to a single file. The module is initia...
Support tic/toc timing, as in MATLAB, and accurate wallclock() function. The timing is generally much...
Module with list handling for generic class task_t objects.
Template module for patches, which adds pointers to memory and mesh, and number of dimensions and var...
This module handles checking max change between neighboring points. Each instance of it needs an inde...
Message handling for task lists. Some of the methods are only used by dispatcher0_t, so should perhaps be moved over to that file.
Task list data type, with methods for startup and updates. Message handling is inherited from the tas...
Module with list handling for generic class task_t objects.
Dispatcher method that relies on all threads maintaining a "ready queue", with tasks ready for updati...
Keep track of neighbor ranks and their loads, by sending and receiving short messages, storing the info in a linked list.
Template module for tasks.