DISPATCH
dispatcher4_mod.f90
1 !===============================================================================
2 !> Execute a task list. Since only the master thread is calling check_mpi(), we
3 !> need to make sure that it handles all send/recv, hence send_priv=.false. and
4 !> recv_priv=.false., and we can use the simplest receive mechanism (recv_active
5 !> =.true.), with no buffering needed (queue_unpack=.false.). These values are
6 !> imposed after the input namelists are read, to make sure the choices are
7 !> correct.
8 !>
9 !> Strategy: Keep track of, separately, the number of ready tasks, the number
10 !> of currently spawned tasks, and the number of currently busy tasks. To this
11 !> end, define the following states:
12 !>
13 !> state=0: task is inactive or new
14 !> state=1: the task is ready to update, but not yet spawned
15 !> state=2: the task has been spawned
16 !> state=3: the task is busy updating
17 !> state=4: the task has finished updating
18 !>
19 !> The simplest strategy for counting is to keep track of the number of tasks
20 !> in each state, by incrementing the count for the new state and decrementing
21 !> the count for the old state, whenever the state changes. These counts should
22 !> be part of the dispatcher data type.
23 !>
24 !> On the balance between spawning tasks for other treads and actually working
25 !> on an update: If a process has N threads, and has N tasks active, evenly
26 !> spread in their progress, then essentially all of these tasks will have
27 !> finished updating while the master thread works on its update. Hence there
28 !> must be approximately N additional tasks spawned, but not yet busy updating.
29 !> Hence, the criterion for the master thread to take on work itself is that
30 !> there are at least N tasks in state 2 (not counting those in state 3 or 4).
31 !>
32 !> If the master thread is unable to find enough tasks in state 1 for it to be
33 !> able to reach N in state 2, then it is certainly not a good idea to take on
34 !> one of the tasks, since there is already a surplus of threads available.
35 !>
36 !> If, on the other hand, there is only single task available in state 2, then
37 !> it IS a good idea to let the master thread update it, since it then has some
38 !> useful work to do, while other tasks are finishing, so when the master thread
39 !> looks at the situation again, there are likely to be a number of new tasks
40 !> ready to be updated.
41 !===============================================================================
43  USE io_mod
44  USE io_unit_mod
45  USE trace_mod
46  USE omp_mod
47  USE omp_timer_mod
48  USE mpi_mod
49  USE mpi_io_mod
50  USE timer_mod
51  USE link_mod
52  USE mpi_mesg_mod
53  USE task_list_mod
54  USE bits_mod
55  USE global_mod
56  USE task_mod
57  implicit none
58  private
59  type, public:: dispatcher4_t
60  integer:: verbose=0
61  integer:: n_spawn=0
62  type(task_list_t), pointer:: task_list => null()
63  contains
64  procedure:: init
65  procedure:: execute
66  procedure:: update
67  procedure:: spawn_update
68  procedure:: set_state
69  procedure:: is_ready
70  procedure:: print_state
71  procedure:: diagnostics
72  end type
73  type(global_t):: average_density
74  integer:: n_fail=0
75  integer:: n_state(0:4)=0
76  integer:: d_state(0:4)=0
77  integer:: verbose=0
78  integer:: n_write=2000
79  integer(8):: n_check=0, n_ready=0
80  real:: f_over=2.0
81  real(8):: wc_failed=0d0
82  real(8):: start
83  logical:: master_works=.true.
84  logical:: use_locks=.false.
85  logical:: use_critical=.true.
86  logical:: use_taskyield=.false.
87  character(len=32):: fmt0='( f12.6,i4,2x,a,2x,i7,2i4)'
88  character(len=32):: fmt1='(40x,f12.6,i4,2x,a,2x,i7,2i4)'
89  character(len=48):: fmt2='(40x,f12.6,i4,2x,a,2x,i7,2x,a,1p,g14.5)'
90  type(dispatcher4_t), public:: dispatcher4
91 CONTAINS
92 
93 !===============================================================================
94 !===============================================================================
95 SUBROUTINE init (self, task_list)
96  class(dispatcher4_t):: self
97  type(task_list_t), pointer:: task_list
98  integer:: iostat
99  namelist /dispatcher4_params/ use_locks, use_critical, use_taskyield, f_over, &
100  master_works, n_write, verbose
101  !.............................................................................
102  rewind(io%input); read (io%input, dispatcher4_params, iostat=iostat)
103  if (io%master) write (io%output, dispatcher4_params)
104  self%verbose = verbose
105 END SUBROUTINE init
106 
107 !===============================================================================
108 !> Simple dispatcher, which in each loop through the task list takes on the
109 !> first task that is ready, and then spawns other threads that handle the
110 !> remaining tasks that are ready.
111 !===============================================================================
112 SUBROUTINE execute (self, task_list, test)
113  class(dispatcher4_t):: self
114  type(task_list_t), pointer:: task_list
115  logical:: test, fell_through
116  !.............................................................................
117  class(link_t), pointer:: link, nbor
118  class(task_t), pointer:: task
119  logical:: mytask
120  integer:: n1, n2
121  real(8):: start_fail, start_iter
122  integer, save:: itimer=0
123  !-----------------------------------------------------------------------------
124  call trace%begin ('dispatcher4_t%execute', itimer=itimer)
125  call io%header('begin dispatcher4_t%execute: Any opportunity dispatcher')
126  call self%init (task_list)
127  self%task_list => task_list
128  call task_list%info
129  n_state = 0; n_state(0) = io%ntask
130  call tic (time=start)
131  call timer%print()
132  !$omp parallel private(link,nbor,task,mytask) shared(n_state) default(shared)
133  !$omp master
134  do while (task_list%na>0 .and. wallclock()-start < io%job_seconds)
135  start_iter = wallclock()
136  !---------------------------------------------------------------------------
137  ! Check if there is MPI work
138  !---------------------------------------------------------------------------
139  call task_list%check_mpi()
140  !---------------------------------------------------------------------------
141  ! Run through the task list, looking for and counting tasks that are ready
142  !---------------------------------------------------------------------------
143  fell_through = .true.
144  link => task_list%head
145  do while (associated(link))
146  task => link%task
147  if (.not.task%is_set (bits%virtual)) then
148  !-----------------------------------------------------------------------
149  ! If task is updated, set it to idle; this should ONLY be done by the
150  ! master thread
151  !-----------------------------------------------------------------------
152  if (link%task%state==4) then
153  call self%set_state (link%task, 0)
154  end if
155  !-----------------------------------------------------------------------
156  ! If task is idle, check if it is ready to update
157  !-----------------------------------------------------------------------
158  if (task%state==0) then
159  if (self%is_ready (link)) then
160  call self%set_state (link%task, 1)
161  else if (link%task%time == 0.0_8) then
162  write(io_unit%queue,*) link%task%id, 'ERROR: at time=0, but not ready!'
163  nbor => link%nbor
164  do while (associated(nbor))
165  write(io_unit%queue,'(i7,i3,2f10.6,2x,2l3)') &
166  nbor%task%id, nbor%task%state, (nbor%task%time-link%task%time), nbor%task%dtime, &
167  nbor%needed, nbor%task%is_ahead_of (link%task)
168  nbor => nbor%next
169  end do
170  end if
171  end if
172  !-----------------------------------------------------------------------
173  ! If task is ready to update, pawn a background task
174  !-----------------------------------------------------------------------
175  if (task%state==1) then
176  fell_through = .false.
177  !$omp atomic read
178  task_list%nq = n_state(1)
179  !$omp atomic
180  task_list%nq = task_list%nq + n_state(2)
181  if (omp%nthreads > 1) then
182  call self%spawn_update (task_list, link, test)
183  else
184  call self%update (task_list, link, test)
185  end if
186  end if
187  end if
188  link => link%next
189  end do
190  call write_state (self, io_unit%queue, '1')
191  if (self%verbose>1) call self%print_state ('1')
192  end do
193  !$omp end master
194  !$omp end parallel
195  call timer%print()
196  call finalize
197  call mpi_mesg%diagnostics(1)
198  call toc ('wall time', timer%n_update, time=start)
199  call trace%end (itimer)
200 END SUBROUTINE execute
201 
202 !===============================================================================
203 !===============================================================================
204 SUBROUTINE print_state (self, label)
205  class(dispatcher4_t):: self
206  character(len=*):: label
207  !-----------------------------------------------------------------------------
208  call write_state (self, io_unit%output, label)
209 END SUBROUTINE
210 
211 !===============================================================================
212 !===============================================================================
213 SUBROUTINE write_state (self, unit, label)
214  class(dispatcher4_t):: self
215  integer:: unit
216  character(len=*):: label
217  !-----------------------------------------------------------------------------
218  if (n_write <= 0) return
219  !$omp critical (set_state_cr)
220  n_write = n_write-1
221  write(unit,'("DISPATCHER3: ",a,f12.6,f9.3,2x,3(2x,a,i6),9(2x,a,i5))') &
222  label, wallclock(), &
223  wc_failed/max(1d-10,wallclock()-start), &
224  'fail:',n_fail , &
225  'idl:', n_state(0), &
226  'rdy:', n_state(1), &
227  'spw:', n_state(2), &
228  'act:', n_state(3), &
229  'upd:', n_state(4), &
230  'idl:', d_state(0), &
231  'rdy:', d_state(1), &
232  'spw:', d_state(2), &
233  'act:', d_state(3), &
234  'upd:', d_state(4)
235  d_state = 0
236  !$omp end critical (set_state_cr)
237 END SUBROUTINE write_state
238 
239 !===============================================================================
240 !> A task that is being actively updated is set in state 3, and when finished
241 !> it switched to state 4. We then use the same OMP task to check if any if
242 !> the nbors have beome ready to update, switching the state if it has, and
243 !> leaving it to the master thread to actually activate the task.
244 !===============================================================================
245 SUBROUTINE update (self, task_list, link, test)
246  class(dispatcher4_t):: self
247  type(task_list_t), pointer:: task_list
248  class(link_t), pointer:: link, nbor
249  class(task_t), pointer:: task
250  logical:: test
251  integer:: n_ready
252  !.............................................................................
253  call self%set_state (link%task, 3)
254  call task_list%update (link, test)
255  call self%set_state (link%task, 4)
256 END SUBROUTINE update
257 
258 !===============================================================================
259 !> Spawn an OpenMP task, for updating a DISPATCH task
260 !===============================================================================
261 SUBROUTINE spawn_update (self, task_list, link, test)
262  class(dispatcher4_t):: self
263  type(task_list_t), pointer:: task_list
264  class(link_t), pointer:: link
265  logical:: test
266  !.............................................................................
267  call self%set_state (link%task, 2)
268  !$omp task firstprivate(link) default(shared)
269  call self%set_state (link%task, 3)
270  call task_list%update (link, test)
271  call self%set_state (link%task, 4)
272  !$omp end task
273 END SUBROUTINE spawn_update
274 
275 !===============================================================================
276 !> Set a new task state, while keeping track of the number of tasks in each state
277 !===============================================================================
278 SUBROUTINE set_state (self, task, in)
279  class(dispatcher4_t):: self
280  class(task_t), pointer:: task
281  integer:: in, ip
282  !...........................................................................
283  if (use_critical) then
284  !$omp critical (set_state_cr)
285  ip = task%state
286  task%state = in
287  n_state(ip) = n_state(ip) - 1
288  n_state(in) = n_state(in) + 1
289  d_state(in) = d_state(in) + 1
290  if (self%verbose>1) &
291  print '(a,i4,f12.6,2x,5i4,2x,2i3)', ' dispatcher4:', omp%thread, wallclock(), n_state, ip, in
292  !$omp end critical (set_state_cr)
293  else
294  !$omp atomic read
295  ip = task%state
296  !$omp atomic
297  n_state(ip) = n_state(ip) - 1
298  !$omp atomic
299  n_state(in) = n_state(in) + 1
300  !$omp atomic
301  d_state(in) = d_state(in) + 1
302  !$omp atomic write
303  task%state = in
304  if (self%verbose>1) &
305  write (io%output,'(a,i4,f12.6,2x,5i4,2x,2i3)') &
306  ' dispatcher4:', omp%thread, wallclock(), n_state, ip, in
307  end if
308 END SUBROUTINE
309 
310 !===============================================================================
311 !> Check if a task is ready to update, by changing if all nbor task are "ahead
312 !> of" the task in question
313 !===============================================================================
314 LOGICAL FUNCTION is_ready (self, link)
315  class(dispatcher4_t):: self
316  class(link_t), pointer:: link, nbor
317  integer:: state
318  !...........................................................................
319  !$omp atomic
320  n_check = n_check+1
321  call io%assert (associated(link), 'is_ready: link missing')
322  call io%assert (associated(link%task), 'is_ready: link%task missing')
323  !$omp atomic read
324  state = link%task%state
325  if (link%task%is_set (bits%virtual)) then
326  is_ready = .false.
327  else
328  is_ready = .true.
329  nbor => link%nbor
330  do while (associated(nbor))
331  if (nbor%needed) then
332  !$omp atomic read
333  state = nbor%task%state
334  if (state==3) then
335  is_ready = .false.
336  else
337  is_ready = is_ready .and. nbor%task%is_ahead_of (link%task)
338  end if
339  if (.not.is_ready) then
340  if (verbose > 2) &
341  print *, link%task%id, 'is_ready: failed on', nbor%task%id, &
342  nbor%task%time, link%task%time
343  exit
344  end if
345  end if
346  nbor => nbor%next
347  end do
348  end if
349  if (is_ready) then
350  !$omp atomic
351  n_ready = n_ready+1
352  end if
353 END FUNCTION is_ready
354 
355 !===============================================================================
356 !> Print statistics
357 !===============================================================================
358 SUBROUTINE finalize
359  if (omp%master) &
360  write(io%output,'(a,i6,4(a,f7.4))') &
361  ' dispatcher4_t%finalize: rank =',mpi%rank, &
362  ', fraction of ready tasks =', real(n_ready)/real(n_check), &
363  ', fraction of dispatcher idle time =', wc_failed/max(1d-10,wallclock()-start)
364 END SUBROUTINE finalize
365 
366 !===============================================================================
367 !===============================================================================
368 SUBROUTINE diagnostics (self, task_list)
369  class(dispatcher4_t):: self
370  type(task_list_t), pointer:: task_list
371  class(link_t), pointer:: link, nbor, culprit
372  !-----------------------------------------------------------------------------
373  !write (io_unit%queue,*) 'begin starved analysis: nstate =', nstate
374  link => task_list%head
375  do while (associated(link))
376  nbor => link%nbor
377  link => link%next
378  end do
379  !write (io_unit%queue,*) 'end starved analysis: nstate =', nstate
380 END SUBROUTINE diagnostics
381 
382 END MODULE dispatcher4_mod
Each thread uses a private timer data type, with arrays for start time and total time for each regist...
Definition: timer_mod.f90:11
Module for handling blocking and non-blocking MPI parallel I/O to a single file. The module is initia...
Definition: mpi_io_mod.f90:31
Support tic/toc timing, as in MATLAB, and accurate wallclock() function. The timing is generally much...
Data type for computing global average, while using only nearest node nbor communication.
Definition: global_mod.f90:5
Task list data type, with methods for startup and updates. Message handling is inherited from the tas...
Execute a task list. Since only the master thread is calling check_mpi(), we need to make sure that i...
Definition: io_mod.f90:4
Template module for tasks.
Definition: task_mod.f90:4