34 integer,
save:: verbose=0
35 integer,
save:: stalled=0, max_stalled=10000, retry_stalled=100
36 integer,
save:: min_nq=2**30
37 logical,
save:: do_delay=.false.
38 logical,
save:: track_active=.false.
48 SUBROUTINE init (self, name)
50 character(len=*),
optional:: name
52 class(
link_t),
pointer:: link
54 namelist /dispatcher5_params/ verbose, max_stalled, retry_stalled, do_delay
58 call trace%begin(
'dispatcher5_t%init')
61 read(io%input, dispatcher5_params, iostat=iostat)
62 write (io%output, dispatcher5_params)
70 SUBROUTINE execute (self, task_list, test)
77 integer,
save:: itimer=0
79 call trace%begin (
'dispatcher5_t%execute', itimer=itimer)
80 call startup (task_list)
87 do while (task_list%na > 0 .and. wallclock() < io%job_seconds)
88 call update_list (task_list, test)
89 if (io%do_stop)
call mpi%abort (
'stop flag')
90 if (task_list%na == task_list%n_tasks)
then 92 min_nq = min(min_nq,task_list%nq)
96 write (io_unit%log,*)
'thread',omp%thread,
' arrived' 101 call mpi_mesg%diagnostics(1)
102 call toc (
'wall time', timer%n_update, time=sec)
103 call mpi%barrier (
'end')
104 write (io%output,*)
"task list finished, min_nq =", min_nq
105 call trace%end (itimer)
106 END SUBROUTINE execute
128 SUBROUTINE update_list (task_list, test)
132 class(
link_t),
pointer:: head, prev
133 class(
task_t),
pointer:: task, otask
134 class(
mesg_t),
pointer:: mesg
135 logical:: already_busy, was_refined
137 real(8),
save:: time, otime=0d0
138 integer,
save:: oid=0
140 integer,
save:: itimer=0
142 call trace%begin(
'dispatcher5::update_list', itimer=itimer)
147 if (mpi%size > 1)
then 151 call mpi_mesg%sent_list%check_sent (nq)
153 call mpi_io%iwrite_list%check
157 call task_list%lock%set
159 head => task_list%queue
160 if (
associated(head) .and. .not.task_list%syncing)
then 162 write(io_unit%log,
'(f12.6,i6,i7)') wallclock(), task_list%na, head%task%id
166 do while (
associated(head))
168 if (task%mem_thread==-1 .or. omp%thread == task%mem_thread)
exit 170 head => head%next_time
175 if (.not.
associated(head))
then 177 head => task_list%queue
180 task%atime = task%time
181 if (io%verbose > 1)
then 182 write (io_unit%log,
'(f12.6,2x,a,i4,2x,a,i6,2x,a,1p,g16.6,2x,a,i5)') &
183 wallclock(),
'thread', omp%thread,
'takes', task%id, &
184 'time', task%time,
'nq', task%nq
215 already_busy = task%is_set (bits%busy)
217 if (.not.already_busy)
then 218 call task%set (bits%busy)
219 if (
associated(prev))
then 220 prev%next_time => head%next_time
222 task_list%queue => head%next_time
224 call task%clear (bits%ready)
226 call task_list%queue_active (head)
228 task_list%nq = task_list%nq-1
230 task%nq = task_list%nq
231 if (io%verbose >= 0)
then 232 if (track_active)
then 233 otask => task_list%active%task
236 write (*,
'(a,2(f12.6,i6))')
'TIME ERROR: otime, oid, time, id =', otime, oid, time, otask%id
244 write (io_unit%queue,
'(f12.6,i6,2f12.6,2i5)') &
245 wallclock(), task%istep, task%time, &
246 otask%atime, task_list%nq, task_list%nac
247 flush (io_unit%queue)
251 write (io_unit%queue,
'(f12.6,i6,f12.6,2i5)') &
252 wallclock(), task%istep, task%time, &
253 task_list%nq, task_list%nac
254 flush (io_unit%queue)
266 if (task%time == task_list%sync_next)
then 267 task_list%syncing = .true.
268 write (io_unit%mpi,*) task%id,omp%thread, &
269 ' is triggering a sync at t =', task_list%sync_next
273 write(io_unit%log,
'(f12.6,i6,2x,a)') wallclock(), task_list%na,
'no queue' 275 call task_list%lock%unset
281 if (
associated(head))
then 282 if (already_busy)
then 284 write (io_unit%output,*) mpi%rank,
' WARNING: thread',omp%thread, &
285 ' tried to update busy task', task%id
287 call trace_end (itimer)
290 if (task%is_set(bits%virtual))
then 291 call unpack (task_list, task%mesg, head)
293 call update_task (task_list, head, test, was_refined)
301 call task%clear (bits%busy)
302 if (.not. was_refined)
then 303 call check_nbors (task_list, head)
312 call check_all (task_list)
314 call trace_end (itimer)
315 END SUBROUTINE update_list
320 SUBROUTINE update_task (self, head, test, was_refined)
322 class(
link_t),
pointer:: head
324 logical,
optional:: was_refined
326 class(
task_t),
pointer:: task
328 logical:: refined, derefined
329 integer,
save:: itimer=0
335 call trace%begin(
'dispatcher5::update2', itimer=itimer)
338 if (task%id==io%id_track)
then 339 task%track = .not.task%track
347 call refine%check_current(self, head, refined, derefined)
349 call trace%end (itimer)
352 if (
present(was_refined))
then 353 was_refined = refined
358 if (task%is_set (bits%frozen))
then 361 if (task%is_set(bits%ready))
then 363 self%na = self%na - 1
366 call trace%end (itimer)
379 task%rotated = .false.
382 mpi_mesg%n_update = mpi_mesg%n_update+1
385 write (io_unit%log,
'(a,i4,2x,a,i7,2x,a,2x,a,i7,2x,a,1p,2g14.6,2x,a,2i5,l3)') &
386 'thread', omp_mythread,
'task', task%id, trim(task%type), &
387 'step', task%istep,
'dt, time:', task%dtime, task%time, &
388 'n, nq', self%n, self%nq,
associated(head%nbor)
396 if (.not.task%rotated)
then 401 timer%n_update = timer%n_update + product(task%n)
403 call task%info (self%nq, self%na)
411 if (task%is_set(bits%boundary))
then 412 head%task%nq = self%nq
413 if (load_balance%check_load (head))
then 420 call self%count_status
422 if (task%id == io%id_debug) &
423 write(io_unit%mpi,*) &
424 'DBG task_list_t%update: calling send_to_vnbors', task%id
425 call self%send_to_vnbors (head)
432 if (task%has_finished())
then 433 call task%set (bits%frozen)
434 call load_balance%active (.false.)
444 if (self%syncing)
then 445 write (io_unit%mpi,*) task%id,omp%thread, &
446 ' is waiting on a sync at t =', self%sync_next
448 call mpi%barrier (
'sync')
450 write (io_unit%mpi,*) task%id,omp%thread, &
451 ' finished wating on a sync at t =', self%sync_next
452 self%sync_next = self%sync_next + self%sync_time
453 self%syncing = .false.
455 task%sync_time = self%sync_next
456 call trace%end (itimer)
457 END SUBROUTINE update_task
462 SUBROUTINE check_ready (self, link)
464 class(
link_t),
pointer:: link
465 class(
task_t),
pointer:: task
466 class(
link_t),
pointer:: nbor
467 logical:: ok, debug, debug1
468 integer,
save:: itimer=0
470 call trace%begin (
'dispatcher5::check_ready', itimer=itimer)
472 debug = io%verbose > 1 .or. task%id==io%id_debug
476 if (task%is_clear(bits%ready+bits%busy+bits%frozen+bits%external))
then 477 if (task%is_set(bits%virtual))
then 479 ok = task%mesg%is_complete()
481 if (task%mesg%req==0)
then 482 call task%mesg%irecv (task%rank, task%id)
489 do while (
associated (nbor))
490 debug1 = nbor%task%id==io%id_debug
491 if (debug1)
write(io_unit%log,*)
'DBG check_ready: id, nbor, needed, ahead', &
492 task%id, nbor%task%id, nbor%needed, nbor%task%is_ahead_of(task)
493 if (nbor%needed)
then 494 if (nbor%task%is_ahead_of(task))
then 496 write (io_unit%log,
'("DBG list_t%check_ready:",i5,f10.6,a,i6,2f10.6,f6.2,l4)') &
497 task%id, task%time,
' is OK on', nbor%task%id, nbor%task%time, &
498 nbor%task%dtime, nbor%task%grace, nbor%task%is_set(bits%virtual)
501 write (io_unit%log,
'("DBG list_t%check_ready:",i5,f10.6,a,i6,2f10.6,f6.2,l4)') &
502 task%id, task%time,
' failed on', nbor%task%id, nbor%task%time, &
503 nbor%task%dtime, nbor%task%grace, nbor%task%is_set(bits%virtual)
513 write (io_unit%output,
'("list_t%check_ready:",i5,f10.6,a)') &
514 task%id, task%time,
' succeded' 515 call self%queue_by_time (link)
518 call trace%end (itimer)
519 END SUBROUTINE check_ready
529 SUBROUTINE check_nbors (self, link)
531 class(
link_t),
pointer:: link
532 class(
link_t),
pointer:: nbor
533 class(
task_t),
pointer:: task
534 integer,
save:: itimer=0
536 call trace_begin(
'dispatcher5::check_nbors', itimer=itimer)
539 do while (
associated (nbor))
540 if (verbose > 1 .or. nbor%task%id==io%id_debug) &
542 'task', task%id,
' needs task ', nbor%task%id, nbor%needs_me
543 call check_ready (self, nbor%link)
546 call check_ready (self, link)
547 call trace_end (itimer)
548 END SUBROUTINE check_nbors
556 SUBROUTINE unpack (self, mesg, link)
558 class(
mesg_t),
pointer:: mesg
559 class(
link_t),
pointer:: link
560 class(
link_t),
pointer:: link2
561 class(
task_t),
pointer:: task
564 integer,
save:: itimer=0
566 call trace%begin (
'dispatcher5::unpack', itimer=itimer)
567 if (mesg%nbuf < 40)
then 568 call load_balance%unpack (mesg%buffer)
577 if (task%is_set (bits%boundary))
then 578 write (io_unit%log,
'(f12.6,2x,a,i9,3x,5l1)') wallclock(), &
579 'task_mesg_mod::unpack ERROR, received mpi_mesg for boundary task:', task%id, &
580 task%is_set(bits%internal), &
581 task%is_set(bits%boundary), &
582 task%is_set(bits%virtual), &
583 task%is_set(bits%external), &
584 task%is_set(bits%swap_request)
587 if (task%is_set (bits%ready))
then 588 write (io_unit%log,
'(f12.6,2x,a,i9,3x,5l1)') wallclock(), &
589 'task_mesg_mod::unpack ERROR, received mpi_mesg for task with ready bit:', &
591 task%is_set(bits%internal), &
592 task%is_set(bits%boundary), &
593 task%is_set(bits%virtual), &
594 task%is_set(bits%external), &
595 task%is_set(bits%swap_request)
605 call task%unpack (mesg)
606 if (mpi_mesg%debug) &
607 write (io_unit%log,
'(f12.6,2x,a,i9,1p,e18.6)') wallclock(), &
608 'unpk: id, time =', task%id, task%time
609 if (id /= mesg%id)
then 610 write(io%output,
'(i6,i4,2x,a,3i6)') mpi%rank, omp%thread, &
611 'unpack ERROR: wrong mesg%id', id, task%id, mesg%id
612 write(io_unit%log,
'(f12.6,i6,i4,2x,a,4i6)') wallclock(), mpi%rank, &
613 omp%thread,
'unpack ERROR: wrong mesg%id', mesg%sender, id, task%id, mesg%id
615 if (.not. failed)
then 617 mpi_mesg%n_unpk = mpi_mesg%n_unpk+1
625 if (task%is_set(bits%swap_request) .and. task%is_set(bits%boundary))
then 626 self%na = self%na+1; self%nb = self%nb+1; self%nv = self%nv-1
627 call self%init_nbors (link)
628 call task%clear (bits%swap_request+bits%ready)
629 call self%update_nbor_status (link)
630 call self%count_status
632 write(io%output,
'(f12.6,2x,a,i6,a,i9,a,i6)') &
633 wallclock(),
'LB: rank',mpi%rank,
' given patch',task%id,
' by',mesg%sender
635 write (io_unit%log,*)
'task_mesg_t%unpack: swapped virtual to boundary:', task%id
644 else if (task%is_set(bits%swap_request) .and. task%is_set(bits%virtual))
then 646 write (io_unit%log,*)
'task_mesg_t%unpack: new virtual patch:', task%id
648 call self%init_nbors (link)
649 call task%clear (bits%swap_request+bits%ready)
650 call self%update_nbor_status (link)
651 call self%count_status
655 call trace%end (itimer)
656 END SUBROUTINE unpack
661 SUBROUTINE startup (task_list)
663 class(
link_t),
pointer:: link
664 class(
task_t),
pointer:: task
666 call trace%begin (
'dispatcher5::startup')
670 link => task_list%head
671 do while (
associated(link))
673 if (task%is_set (bits%virtual))
then 674 call task%allocate_mesg
675 task%wc_last = wallclock()
676 call task%mesg%irecv (task%rank, task%id)
683 link => task_list%head
684 do while (
associated(link))
685 call link%task%clear(bits%ready)
686 call check_ready (task_list, link)
690 task_list%n_tasks = task_list%na
694 call mpi%barrier (
'task_list%execute')
696 END SUBROUTINE startup
702 SUBROUTINE check_all (list)
704 class(
link_t),
pointer:: link
705 integer,
save:: itimer=0
707 call trace_begin(
'dispatcer5::check_all', itimer=itimer)
710 do while (
associated (link))
711 call link%task%clear (bits%ready)
712 call check_ready (list, link)
716 call trace_end (itimer)
717 END SUBROUTINE check_all
Each thread uses a private timer data type, with arrays for start time and total time for each regist...
Module for handling blocking and non-blocking MPI parallel I/O to a single file. The module is initia...
Support tic/toc timing, as in MATLAB, and accurate wallclock() function. The timing is generally much...
Module with list handling for generic class task_t objects.
Template module for patches, which adds pointers to memory and mesh, and number of dimensions and var...
This module handles checking max change between neighboring points. Each instance of it needs an inde...
Message handling for task lists. Some of the methods are only used by dispatcher0_t, so should perhaps be moved over to that file.
Task list data type, with methods for startup and updates. Message handling is inherited from the tas...
Module with list handling for generic class task_t objects.
Keep track of neighbor ranks and their loads, by sending and receiving short messages, storing the info in a linked list.
Template module for tasks.
Dispatcher method that relies on all threads maintaining a "ready queue", with tasks ready for updati...