DISPATCH
dispatcher0_mod.f90
1 !===============================================================================
2 !> Dispatcher method that relies on all threads maintaining a "ready queue", with
3 !> tasks ready for updating. Each thread picks the task at the head of the queue,
4 !> and checks all nbors for tasks that are ready, putting them back in the queue.
5 !===============================================================================
7  USE io_mod
8  USE trace_mod
9  USE omp_mod
10  USE timer_mod
11  USE omp_timer_mod
12  USE mpi_mod
13  USE mpi_io_mod
14  USE mpi_mesg_mod
15  USE link_mod
16  USE list_mod
17  USE task_mod
18  USE experiment_mod
19  USE bits_mod
20  USE task_list_mod
21  USE data_io_mod
22  USE omp_lock_mod
23  USE validate_mod
24  USE refine_mod
25  implicit none
26  private
27  type, public:: dispatcher0_t
28  integer:: verbose=0
29  integer:: n_spawn=0
30  type(lock_t):: lock
31  contains
32  procedure:: init
33  procedure:: execute
34  procedure:: update
35  end type
36  real(8):: stall_start=0d0, max_stalled=600d0, retry_stalled=30d0
37  integer, save:: mpi_only_master=100
38  integer, save:: verbose=0
39  integer, save:: stalled=0
40  integer, save:: n_spin=0
41  integer, save:: n_update=0
42  integer, save:: min_nq=2**30
43  logical, save:: debug=.false.
44  logical, save:: do_delay=.false.
45  logical, save:: track_active=.false.
46  logical, save:: omp_pick=.false.
47  logical, save:: detailed_timer=.false.
48  type(dispatcher0_t), save:: virtual_list
49  !$omp threadprivate (virtual_list)
50  type(dispatcher0_t), public:: dispatcher0
51 CONTAINS
52 
53 !===============================================================================
54 !> Initialize the task list, by first initializing the list tasks, then making
55 !> neighbor lists, and finally checking if they are ready to execute
56 !===============================================================================
57 SUBROUTINE init (self, name)
58  class(dispatcher0_t):: self
59  character(len=*), optional:: name
60  !.............................................................................
61  class(link_t), pointer:: link
62  integer:: iostat
63  !-----------------------------------------------------------------------------
64  ! An optional namelist can be used to turn debugging on
65  !-----------------------------------------------------------------------------
66  namelist /dispatcher0_params/ verbose, max_stalled, retry_stalled, do_delay, &
67  mpi_only_master, debug, detailed_timer
68  character(len=120):: ids = &
69  '$Id: c68cfcf81b7fe3a0816f8608d703bded382b1f0c $ dispatchers/dispatcher0_mod.f90'
70  !-----------------------------------------------------------------------------
71  call trace%print_id (ids)
72  call trace%begin('dispatcher0_t%init')
73  rewind(io%input)
74  read(io%input, dispatcher0_params, iostat=iostat)
75  write (io%output, dispatcher0_params)
76  call mpi_mesg%init
77  call self%lock%init ('disp')
78  call self%lock%append
79  call trace_end
80 END SUBROUTINE init
81 
82 !===============================================================================
83 !> Execute the task list, updating it until it is empty. With !$omp parallel here,
84 !> everything local to self%update is thread private.
85 !===============================================================================
86 SUBROUTINE execute (self, task_list, test)
87  class(dispatcher0_t):: self
88  type(task_list_t), pointer:: task_list
89  logical:: test
90  !.............................................................................
91  real(8):: sec
92  integer:: dims(4)
93  integer, save:: itimer=0
94  !-----------------------------------------------------------------------------
95  call trace%begin ('dispatcher0_t%execute', itimer=itimer)
96  call io%header('begin dispatcher0_t%execute:')
97  call task_list%startup
98  call tic (time=sec)
99  call timer%print()
100  !-----------------------------------------------------------------------------
101  ! Loop over tasks
102  !-----------------------------------------------------------------------------
103  !$omp parallel
104  do while (task_list%na > 0 .and. wallclock() < io%job_seconds)
105  call self%update (task_list, test)
106  if (task_list%na == task_list%n_tasks) then
107  !$omp atomic
108  min_nq = min(min_nq,task_list%nq)
109  end if
110  end do
111  write (io_unit%log,*) 'thread',omp%thread,' arrived'
112  flush (io_unit%log)
113  !$omp barrier
114  !$omp end parallel
115  call timer%print()
116  call mpi_mesg%diagnostics(1)
117  call toc ('wall time', timer%n_update, time=sec)
118  write (io_unit%mpi,*) 'at mpi%barrier'
119  flush (io_unit%mpi)
120  call mpi%barrier ('end')
121  write (io%output,*) "task list finished, min_nq =", min_nq
122  if (validate%mode == "write") then
123  call io%print_hl()
124  write (io%output,'(a)') &
125  ' validate file '//trim(io%outputname)//'/rank_00000.val written'
126  call io%print_hl()
127  else if (validate%mode == "compare") then
128  call io%print_hl()
129  write (io%output,*) "validate%ok =", validate%ok
130  call io%print_hl()
131  end if
132  call trace%end (itimer)
133  write (io_unit%mpi,*) 'end dispatcher0_t%execute'
134  flush (io_unit%mpi)
135 END SUBROUTINE execute
136 
137 !===============================================================================
138 !> Update the state of the task list, taking the steps necessary to update the
139 !> head task on the ready_queue, and check for consequences. Two strategies:
140 !>
141 !> 1) The threads pick up tasks from the queue themselves, and put itself or
142 !> other tasks back onto the queue, based on the results of list_t%check_ready
143 !>
144 !> In this case the threads are prevented from messing up for each other by
145 !> using critical regions when manipulating the queue, and by using status bits
146 !> to indicate the state each task is in. These can be 1) in the queue, and
147 !> ready to be updated, 2) not in the queue, and busy, 3) not in the queue, and
148 !> not busy. The 'ready' bit is set while the task is in the queue, and the
149 !> 'busy' but is set while the task is busy being updated. A task should be
150 !> checked for being ready only when in state 3, meaning only when none of the
151 !> two bits are set.
152 !>
153 !> 2) The master thread takes care of picking tasks from the queue, starting an
154 !> OMP thread to update it, and checking for nbors that are ready to be
155 !> updated as a consquence
156 !===============================================================================
157 SUBROUTINE update (self, task_list, test)
158  class(dispatcher0_t):: self
159  type(task_list_t), pointer:: task_list
160  logical:: test
161  !.............................................................................
162  class(link_t), pointer:: head, prev
163  class(task_t), pointer:: task, otask
164  class(mesg_t), pointer:: mesg
165  logical:: already_busy, was_refined, was_derefined
166  real(8):: wc, start
167  real(8), save:: time, otime=0d0
168  integer, save:: oid=0
169  integer:: i, id, nq, n_unpk
170  integer:: stalled_l
171  integer, save:: itimer(5)=0
172  !.............................................................................
173  i = 1
174  call trace%begin('dispatcher0_t%update(1)', itimer=itimer(i))
175  start = wallclock()
176  !$omp atomic update
177  n_update = n_update+1
178  !-----------------------------------------------------------------------------
179  ! Check incoming MPI, which may free up tasks for execution
180  !-----------------------------------------------------------------------------
181  call task_list%check_mpi (n_unpk) ! check incoming MPI
182  call mpi_io%iwrite_list%check ! I/O check
183  if (omp%nthreads >= mpi_only_master .and. omp%master) then
184  call trace%end (itimer(i))
185  return
186  end if
187  !-----------------------------------------------------------------------------
188  ! Now is a convenient time to check if a file should be opened or closed. No
189  ! harm is done if that makes the thread hangs for a while; other threads work
190  !-----------------------------------------------------------------------------
191  call data_io%open_and_close ()
192  if (task_list%verbose > 2) &
193  write (io%output,*) wallclock(),' thread',omp%thread,' waiting for tasklist(1)'
194  if (detailed_timer) then
195  call trace%end (itimer(i))
196  i = i+1
197  call trace%begin('dispatcher0_t%update(2)', itimer=itimer(i))
198  end if
199  !-----------------------------------------------------------------------------
200  ! Check atomically if the ready queue is empty and return w/o locking if so,
201  ! to avoid hammering the task list lock, delaying other threads
202  !-----------------------------------------------------------------------------
203  !$omp atomic read
204  nq = task_list%nq
205  if (nq == 0) then
206  call stall_handler
207  call trace%end(itimer(i))
208  return
209  end if
210  !-----------------------------------------------------------------------------
211  ! Pick a task off the queue, in what is normally a very brief OMP lock region
212  !-----------------------------------------------------------------------------
213  call task_list%lock%set ('dispatch0_t%update 1')
214  head => task_list%queue ! queue start
215  if (detailed_timer) then
216  call trace%end (itimer(i))
217  i = i+1
218  call trace%begin('dispatcher0_t%update(3)', itimer=itimer(i))
219  end if
220  nullify (prev)
221  if (associated(head) .and. .not.task_list%syncing) then
222  if (verbose > 0) &
223  write(io_unit%log,'(f12.6,2x,a,i6,i7)') wallclock(), &
224  'dispather0_t%update: na, id, time =', &
225  task_list%na, head%task%id, head%task%time
226  !---------------------------------------------------------------------------
227  ! If the patch mem was OMP placed, search ready queue for matching thread
228  !---------------------------------------------------------------------------
229  if (omp_pick) then
230  do while (associated(head))
231  task => head%task
232  if (task%mem_thread==-1) exit
233  if (omp%thread == task%mem_thread) then
234  !$omp atomic update
235  timer%mem_hit = timer%mem_hit+1
236  exit
237  end if
238  prev => head
239  head => head%next_time
240  end do
241  !$omp atomic update
242  timer%mem_test = timer%mem_test+1
243  !-------------------------------------------------------------------------
244  ! If not found, fall back on first task in queue
245  !-------------------------------------------------------------------------
246  if (.not. associated(head)) then
247  nullify (prev)
248  head => task_list%queue
249  task => head%task ! head task
250  end if
251  else
252  task => head%task
253  end if
254  call task%log ('dispatcher')
255  if (debug) then
256  !$omp critical (wrt_cr)
257  write (io_unit%queue,'(f12.6,2i7,3i5,f12.6,i6,2i9,7i7)') &
258  wallclock(), task%id, task%istep, task%it, task%new, omp%thread, &
259  task%time, task_list%nq, n_spin, timer%n_master
260  timer%n_master(:) = 0
261  n_spin = 0
262  !$omp end critical (wrt_cr)
263  end if
264  !---------------------------------------------------------------------------
265  if (debug) then
266  !$omp atomic read
267  stalled_l = stalled
268  if (stalled_l > 0) then
269  write (io_unit%log,*) wallclock(), ' stall ended', stalled_l
270  end if
271  end if
272  !$omp atomic write
273  stalled = 0
274  !---------------------------------------------------------------------------
275  task%atime = task%time
276  if (verbose > 1) then
277  write (io_unit%log,'(f12.6,2x,a,i4,2x,a,i6,2x,a,1p,g16.6,2x,a,i5)') &
278  wallclock(), 'thread', omp%thread,'takes', task%id, &
279  'time', task%time, 'nq', task%nq
280  flush (io_unit%log)
281  end if
282  !---------------------------------------------------------------------------
283  ! Set the head of the queue to the next task in time. The ready bit needs
284  ! to remain on for now, to prevent other threads from checking this task
285  ! before it is updated. After the update, the clearing of the ready bit
286  ! allows the state of the current task to be evaluated by other threads, as
287  ! well as by the current thread. Locking should be used to allow only a
288  ! single thread and task to check and change the ready bit status, at any
289  ! one time.
290  !
291  ! The busy bit is set on tasks that a thread is updating, to check that a
292  ! a task that has already been taken off the queue by another thread is not
293  ! taken again. This should in principle not be possible, since removing
294  ! the task from the queue is done in a critical region, and removing a
295  ! task can only be done once.
296  !
297  ! A task generates a critical region three times: 1) when taken off the queue,
298  ! 2) when its time info is updated, 3) when it is added back to the queue.
299  !
300  ! To ensure 100% consistency, the task time information should not be
301  ! allowed to change during the testing going on in list_t%check_ready().
302  ! This could possibly be critical, if the fact that a task is / becomes
303  ! ready is missed, because of sychronization issue. Several of the tasks
304  ! that form the nbor list of a task are typically undergoing updates at the
305  ! same time. In rare cases, but only if the update time of task time info
306  ! is not uniquely defined, a task could thus be deemed not ready by the
307  ! last of its nbors that has just been updated, because another task in the
308  ! nbor list had not yet propagated its updated time info.
309  !---------------------------------------------------------------------------
310  already_busy = task%is_set (bits%busy)
311  id = task%id
312  if (.not.already_busy) then
313  call task%set (bits%busy) ! mark task busy
314  if (associated(prev)) then
315  !print *, omp%thread, 'match'
316  !call prev%qlock%set ('dipatcher0')
317  prev%next_time => head%next_time ! skip over
318  !call prev%qlock%unset ('dipatcher0')
319  else
320  !print *, omp%thread, 'fall back'
321  !call task_list%queue%qlock%set ('dipatcher0')
322  task_list%queue => head%next_time ! chop head off
323  !call task_list%queue%qlock%unset ('dipatcher0')
324  end if
325  call task%clear (bits%ready) ! not in queue
326  if (track_active) &
327  call task_list%queue_active (head)
328  !$omp atomic
329  task_list%nq = task_list%nq-1 ! decrement queue count
330  task%nq = task_list%nq ! for info print
331  if (verbose > 0) then
332  if (track_active) then
333  otask => task_list%active%task
334  time = otask%atime
335  if (time < otime) &
336  write (*,'(a,2(f12.6,i6))') 'TIME ERROR: otime, oid, time, id =', otime, oid, time, otask%id
337  !$omp atomic write
338  otime = time
339  !$omp atomic write
340  oid = otask%id
341  if (omp%master) then
342  write (io_unit%queue,'(f12.6,2i7,2f12.6,2i5)') &
343  wallclock(), task%id, task%istep, task%time, &
344  otask%atime, task_list%nq, task_list%nac
345  flush (io_unit%queue)
346  end if
347  else
348  if (omp%master) then
349  write (io_unit%queue,'(f12.6,2i7,f12.6,2i5)') &
350  wallclock(), task%id, task%istep, task%time, &
351  task_list%nq, task_list%nac
352  flush (io_unit%queue)
353  end if
354  end if
355  end if
356  end if
357  !---------------------------------------------------------------------------
358  ! The first time a task is at the head of the ready queue with time =
359  ! sync_time we can be sure that all other tasks are also being upated
360  ! (by other threads) to arrive at this time. We can then temporarily
361  ! halt updating, and wait for this task to finish its update, at a barrier
362  ! where all other ranks are doing the same.
363  !---------------------------------------------------------------------------
364  if (task%time == task_list%sync_next) then
365  task_list%syncing = .true.
366  write (io_unit%mpi,*) task%id,omp%thread, &
367  ' is triggering a sync at t =', task_list%sync_next
368  end if
369  else
370  if (verbose > 0) &
371  write(io_unit%log,'(f12.6,i6,2x,a)') wallclock(), task_list%na, 'no queue'
372  end if
373  call task_list%lock%unset ('dispatch0_t%update 1')
374  if (task_list%verbose > 2) &
375  write (io%output,*) wallclock(),' thread',omp%thread,' unlocked tasklist(1)'
376  !-----------------------------------------------------------------------------
377  ! As long as we are syncing, skip the update and come back. The on thread
378  ! that hit the sync time last of all threads has set this flag, and will clear
379  ! it as soon as all other ranks have arrived at the same time.
380  !-----------------------------------------------------------------------------
381  if (associated(head)) then
382  if (detailed_timer) then
383  call trace%end (itimer(i))
384  i = i+1
385  call trace%begin('dispatcher0_t%update(4)', itimer=itimer(i))
386  end if
387  if (already_busy) then
388  !$omp critical (stderr_cr)
389  write (io_unit%log,*) mpi%rank,' WARNING: thread', omp%thread, &
390  ' tried to update busy task', task%id
391  write (stderr,*) mpi%rank,' WARNING: thread', omp%thread, &
392  ' tried to update busy task', task%id
393  !$omp end critical (stderr_cr)
394  call trace%end (itimer(i))
395  return
396  end if
397  call task_list%update (head, test, was_refined, was_derefined)
398  !---------------------------------------------------------------------------
399  ! The task should now again be checked, before it can be returned to the
400  ! queue, and its neighbors should also be checked for return to the queue.
401  ! If the task was turned into a virtual task by the load balance, its virtual
402  ! bit will prevent it being checked, but its nbors still need to be checked
403  !---------------------------------------------------------------------------
404  if (.not. was_derefined) then
405  call task%clear (bits%busy)
406  call task_list%check_nbors (head) ! any nbors ready?
407  end if
408  !$omp atomic update
409  timer%busy_time = timer%busy_time + (wallclock()-start)
410  else
411  call stall_handler
412  end if
413  call trace%end (itimer(i))
414 contains
415 !-------------------------------------------------------------------------------
416 !> Stall handling, when the queue is empty
417 !-------------------------------------------------------------------------------
418 subroutine stall_handler
419  !$omp atomic update
420  n_spin = n_spin+1
421  if (detailed_timer) then
422  call trace%end (itimer(i))
423  i = i+1
424  call trace%begin('dispatcher0_t%update(5)', itimer=itimer(i))
425  end if
426  !---------------------------------------------------------------------------
427  ! As long as there are tasks, the stalled value remains zero. The test
428  ! below is triggered the first time a thread enters here, and the start
429  ! time of stalling is registered, before incrementing the counter, which
430  ! prevents the stall_start time from being set again.
431  !---------------------------------------------------------------------------
432  !$omp atomic read
433  stalled_l = stalled
434  !if (n_unpk > 0 .and. stalled_l > 0) then ! unpack reset
435  ! write (io_unit%log,'(a,i7,i4,f12.6)') &
436  ! 'dispatcher0_t%update: unpack reset, n, time =', &
437  ! stalled_l, n_unpk, wallclock()-stall_start
438  ! !$omp atomic write
439  ! stalled = 0 ! reset stalled
440  ! stalled_l = 0 ! new stall interval
441  !end if
442  if (stalled_l == 0) then
443  !$omp atomic write
444  stall_start = wallclock()
445  if (debug) &
446  write (io_unit%log,*) wallclock(), 'queue stalled', stalled_l
447  end if
448  !$omp atomic update
449  stalled = stalled+1
450  if (wallclock()-stall_start > max_stalled) then
451  print *, mpi%rank, 'STALLED diagnostics'
452  call mpi_mesg%diagnostics (1)
453  print *, mpi%rank, 'STALLED bailing out'
454  call mpi%abort ('exceeded max_stalled')
455  else if (wallclock()-stall_start > retry_stalled) then
456  !$omp critical (stall_cr)
457  if (wallclock()-stall_start > retry_stalled) then
458  !print *, mpi%rank, 'STALLED:'
459  call task_list%check_all
460  if (associated(task_list%queue)) then
461  write(stderr,1) mpi%rank, omp%thread, 'check_all', wallclock()
462  1 format("rank:",i5,2x,"thread:",i4,3x,"STALL revived by ",a," at",f12.3)
463  if (verbose > 1) &
464  call io%abort ('STALLED revided by check_all -- check thread log')
465  stall_start = wallclock()
466  else
467  call task_list%check_oldest
468  if (associated(task_list%queue)) then
469  write(stderr,1) mpi%rank, omp%thread, 'check_oldest', wallclock()
470  if (verbose > 1) &
471  call io%abort ('STALLED revided by check_oldest -- check thread log')
472  stall_start = wallclock()
473  else
474  write (stderr,*) mpi%rank, omp%thread, 'STALLED, revived FAILED'
475  call io%abort ('STALLED, revived FAILED')
476  end if
477  end if
478  end if
479  !$omp end critical (stall_cr)
480  else if (do_delay) then
481  !$omp atomic update
482  timer%spin_time = timer%spin_time + (wallclock()-start)
483  call mpi_mesg%delay (stalled)
484  return
485  end if
486  !$omp atomic update
487  timer%spin_time = timer%spin_time + (wallclock()-start)
488 end subroutine stall_handler
489 END SUBROUTINE update
490 
491 END MODULE
Each thread uses a private timer data type, with arrays for start time and total time for each regist...
Definition: timer_mod.f90:11
Module for handling blocking and non-blocking MPI parallel I/O to a single file. The module is initia...
Definition: mpi_io_mod.f90:31
Support tic/toc timing, as in MATLAB, and accurate wallclock() function. The timing is generally much...
Generic validation module. The general idea is to be able to compare two runs at critical points in t...
Module with list handling for generic class task_t objects.
Definition: list_mod.f90:4
Interface from gpatch_mod to a choice of binary data I/O methods, controlled by the iomethod text str...
Definition: data_io_mod.f90:17
This module handles checking max change between neighboring points. Each instance of it needs an inde...
Definition: refine_mod.f90:168
Task list data type, with methods for startup and updates. Message handling is inherited from the tas...
Definition: io_mod.f90:4
The lock module uses nested locks, to allow versatile use of locks, where a procedure may want to mak...
Dispatcher method that relies on all threads maintaining a "ready queue", with tasks ready for updati...
Template module for tasks.
Definition: task_mod.f90:4