DISPATCH
task_mesg_mod.f90
1 !===============================================================================
2 !> Message handling for task lists. Some of the methods are only used by
3 !> dispatcher0_t, so should perhaps be moved over to that file.
4 !>
5 !> Depending on switches, incoming MPU messages are handled by one of four
6 !> procedures (the values of the recv_active and recv_priv switches are shown
7 !> after the colon). Only the ones with recv_active==.false. can (FIXME) be
8 !> used in AMR runs.
9 !>
10 !> recv_improbe : master puts MPI_IMPROBE messages in recv_list and unpk_list
11 !> recv_private : master checks (only) for msgs to known virtual tasks
12 !> recv_private : ditto, but split up into thread-private lists
13 !>
14 !> When doing active receives (asking actively for the package to each virtual
15 !> task, it is necessary that the MPI message tag equals (or is a function of)
16 !> the task ID, while another, unique ID should be reserved for load balance
17 !> messages. To allow detecting packages for new patches (AMR), and still use
18 !> active receives, one could reserve another unique ID for "new" patches, for
19 !> which then the actual ID would be extracted from the header part of the
20 !> buffer. Upon receipt of such a package, the new virtual patch would be
21 !> created, the first direct request would be sent, and the message would be
22 !> added to the (thread-private) list of messages.
23 !===============================================================================
25  USE io_mod
26  USE io_unit_mod
27  USE trace_mod
28  USE omp_mod
29  USE omp_timer_mod
30  USE timer_mod
31  USE mpi_mod
32  USE mpi_mesg_mod
33  USE link_mod
34  USE list_mod
35  USE task_mod
36  USE patch_mod
37  USE experiment_mod
38  USE bits_mod
40  USE refine_mod
41  USE hash_table_mod
42  implicit none
43  private
44  type, public, extends(list_t):: task_mesg_t
45  integer:: method=0
46  type(hash_table_t):: hash_table
47  contains
48  procedure:: init
49  procedure:: check_mpi
50  procedure, private:: recv_improbe
51  procedure, private:: recv_irecv
52  procedure, private:: recv_mirecv
53  procedure, private:: move_mesgs
54  procedure, private:: unpk_mesgs
55  procedure:: init_virtual
56  procedure, private:: recv_virtual
57  procedure, private:: find_task
58  procedure, private:: unpack
59  end type
60  integer, save:: verbose=0
61  logical, save:: use_hashtable=.true.
62  type(mesg_list_t), save:: priv_list
63  type(task_mesg_t), save:: private_list, master_list, remove_list
64  !$omp threadprivate (private_list,priv_list)
65  character(len=8), save:: method='irecv'
66  public:: task2patch
67  type(task_mesg_t), public:: task_mesg
68 CONTAINS
69 
70 !===============================================================================
71 !> Check for and unpack MPI messages.
72 !===============================================================================
73 SUBROUTINE init (self, name)
74  class(task_mesg_t):: self
75  character(len=*), optional:: name
76  integer:: iostat
77  integer, save:: hash_table_size=10000
78  type(link_t):: link
79  namelist /task_mesg_params/ verbose, method, hash_table_size, use_hashtable
80  character(len=120):: id = &
81  '$Id: 7b482c1f0d310cc777fb73836ce2956a1dcb3595 $ lists/task_mesg_mod.f90'
82  !-----------------------------------------------------------------------------
83  call trace%begin ('task_mesg_t%init')
84  call trace%print_id (id)
85  rewind(io%input)
86  read (io%input, task_mesg_params, iostat=iostat)
87  call link%init_verbose (verbose)
88  write (io%output, task_mesg_params)
89  if (use_hashtable) &
90  call self%hash_table%init (hash_table_size)
91  !-----------------------------------------------------------------------------
92  ! Choose MPI recv method -- note that we force the hand in the AMR case.
93  !-----------------------------------------------------------------------------
94  if (refine%on .and. &
95  trim(method) /= 'irecv' .and. &
96  trim(method) /= 'improbe') then
97  method = 'irecv'
98  end if
99  mpi_mesg%uniq_mesg = .true.
100  select case (trim(method))
101  case ('irecv')
102  mpi_mesg%tag_type = 2
103  case ('mirecv')
104  mpi_mesg%max_recv = 5
105  mpi_mesg%tag_type = 2
106  case ('improbe')
107  mpi_mesg%tag_type = 2
108  case ('virtual')
109  mpi_mesg%tag_type = 1
110  case ('private')
111  mpi_mesg%tag_type = 1
112  case default
113  call mpi%abort ('unknown method in MPI_MESG_PARAMS')
114  end select
115  call trace%end ()
116 END SUBROUTINE init
117 
118 !===============================================================================
119 !> Set the mesg%test_time
120 !===============================================================================
121 SUBROUTINE set_test_time (self, task)
122  class(task_mesg_t):: self
123  class(patch_t):: task
124  !---------------------------------------------------------------------------
125  ! If TEST_TIME is specified in MPI_MESG_PARAMS, set that as the minimum time
126  ! between MPI_TEST for the same task
127  !---------------------------------------------------------------------------
128  if (mpi_mesg%test_time > 0.0) then
129  task%mesg%test_time = mpi_mesg%test_time
130  !---------------------------------------------------------------------------
131  ! If the parameter is zero or negative, try to estimate an optimal time.
132  ! The first test is allowed after half the estimated update_cadence, and
133  ! after that the time interval is halved after every fail. This point is
134  ! returned to about as often as a task update occurs on a single thread,
135  ! which is about the task overload times shorter than the update_cadence.
136  !---------------------------------------------------------------------------
137  else
138  task%update_cadence = 1e-6*product(task%n)*self%na/omp%nthreads
139  task%mesg%test_time = task%update_cadence*0.5**(task%mesg%n_failed+1)
140  end if
141 END SUBROUTINE set_test_time
142 
143 !===============================================================================
144 !> Check for and unpack MPI messages.
145 !===============================================================================
146 SUBROUTINE check_mpi (self, n_unpk)
147  class(task_mesg_t):: self
148  integer, optional:: n_unpk
149  integer:: n_unpk_l
150  integer:: nq
151  !.............................................................................
152  n_unpk_l = 0
153  if (mpi%size <= 1) return
154  call trace%begin ('task_mesg_t%check_mpi')
155  !-----------------------------------------------------------------------------
156  ! Check the list of sent messages
157  !-----------------------------------------------------------------------------
158  !$omp atomic read
159  nq = self%nq
160  call mpi_mesg%sent_list%check_sent (nq)
161  !-----------------------------------------------------------------------------
162  ! Check recv messages
163  !-----------------------------------------------------------------------------
164  select case (trim(method))
165  case ('improbe')
166  if (omp%master) &
167  call self%recv_improbe (n_unpk_l)
168  case ('irecv')
169  if (omp%master) &
170  call self%recv_irecv (n_unpk_l)
171  case ('mirecv')
172  call self%recv_mirecv (n_unpk_l)
173  case ('virtual')
174  if (omp%master) &
175  call self%recv_virtual (master_list, n_unpk_l)
176  case ('private')
177  call self%recv_virtual (private_list, n_unpk_l)
178  end select
179  if (present(n_unpk)) then
180  n_unpk = n_unpk_l
181  end if
182  call trace%end()
183 END SUBROUTINE check_mpi
184 
185 !===============================================================================
186 !> Check for and unpack MPI messages, using only the master thread, to avoid
187 !> the need for critical regions and locks. The cadence of checking for
188 !> incoming messages is once per task update on one thread, which is suitable,
189 !> since each task will be "served" about once per task update time.
190 !===============================================================================
191 SUBROUTINE recv_improbe (self, n_unpk)
192  class(task_mesg_t):: self
193  integer:: n_unpk
194  class(mesg_t), pointer:: mesg, next
195  integer, save:: itimer=0
196  integer:: n
197  logical:: ok
198  !.............................................................................
199  if (omp%thread > 0) return
200  call trace%begin ('task_mesg_t%recv_improbe', 1, itimer=itimer)
201  mpi_mesg%uniq_mesg = .true.
202  !-----------------------------------------------------------------------------
203  ! Step 1-3: receive, move, and unpack
204  !-----------------------------------------------------------------------------
205 if (verbose > 0) &
206 write (io_unit%log,*) wallclock(), &
207 'recv_improbe: n =', mpi_mesg%recv_list%n, mpi_mesg%max_recv
208  call recv_mesgs
209  call self%move_mesgs (mpi_mesg%recv_list)
210  call self%unpk_mesgs (n_unpk)
211 if (verbose > 0) &
212 write (io_unit%log,*) wallclock(), &
213 'recv_improbe: n =', mpi_mesg%recv_list%n, mpi_mesg%min_nq
214  !-----------------------------------------------------------------------------
215  ! Step 4: check if the queue is getting short. If so, try recv again, then
216  ! wait for messages and move + unpack immediately, one-by-one (in case a
217  ! reverse order message is received and allows more to be unpacked)
218  !-----------------------------------------------------------------------------
219  if (self%nq < mpi_mesg%min_nq) then
220  call recv_mesgs
221  call self%move_mesgs (mpi_mesg%recv_list)
222  !---------------------------------------------------------------------------
223  mesg => mpi_mesg%recv_list%head
224  do while (associated(mesg) .and. self%nq < mpi_mesg%min_nq)
225  next => mesg%next
226  call mesg%wait_for_completion()
227  !$omp atomic update
228  timer%n_master(4) = timer%n_master(4) + 1
229  if (verbose > 1) &
230  write (io_unit%log,*) 'recv_improbe: waited on mesg', &
231  mesg%id, mesg%seq
232  call mpi_mesg%recv_list%remove (mesg)
233  call mpi_mesg%unpk_list%add (mesg)
234  call self%unpk_mesgs (n_unpk)
235  mesg => next
236  end do
237 if (verbose > 0) &
238 write (io_unit%log,*) wallclock(), 'recv_improbe: n =', mpi_mesg%recv_list%n
239  end if
240  call trace%end (itimer)
241 contains
242 !===============================================================================
243 !> Step 1: Get new messages, as long as there are any -- this doesn't take
244 !> mucht time, and ensures that the master thread can keep up, even when it
245 !> has many virtual tasks to update.
246 !===============================================================================
247 subroutine recv_mesgs
248  n = 0
249  call mpi_mesg%get (mesg)
250  do while (associated(mesg))
251  !$omp atomic update
252  timer%n_master(1) = timer%n_master(1) + 1
253  call mpi_mesg%recv_list%add (mesg)
254  n = n+1
255  if (verbose > 1) &
256  write (io_unit%log,*) 'recv_improbe: addded mesg to recv_list', &
257  mesg%id, mesg%seq
258  call mpi_mesg%get (mesg)
259  end do
260 end subroutine recv_mesgs
261 END SUBROUTINE recv_improbe
262 
263 !===============================================================================
264 !> Check for and unpack MPI messages, using only the master thread, to avoid
265 !> the need for critical regions and locks. This method does not use
266 !> MPI_IMPROBE, which is not supported before IMPI/2019.x on systems with
267 !> OmniPath fabric.
268 !===============================================================================
269 SUBROUTINE recv_irecv (self, n_unpk)
270  class(task_mesg_t):: self
271  integer:: n_unpk
272  class(mesg_t), pointer:: mesg, next
273  integer, save:: itimer=0
274  integer:: n
275  logical:: ok
276  !.............................................................................
277  if (omp%thread > 0) return
278  if (mpi_mesg%nbuf == 0) return
279  call trace%begin ('task_mesg_t%recv_irecv', 1, itimer=itimer)
280  mpi_mesg%uniq_mesg = .true.
281  !-----------------------------------------------------------------------------
282  ! Check existing recv_list for completed messages
283  !-----------------------------------------------------------------------------
284  call self%move_mesgs (mpi_mesg%recv_list)
285  !-----------------------------------------------------------------------------
286  ! Refill the list
287  !-----------------------------------------------------------------------------
288  do while (mpi_mesg%recv_list%n < mpi_mesg%max_recv)
289  call mpi_mesg%iget (mesg)
290  if (mesg%is_complete('recv_irecv')) then
291  call mpi_mesg%unpk_list%add (mesg)
292  else
293  call mpi_mesg%recv_list%add (mesg)
294  end if
295  end do
296  !-----------------------------------------------------------------------------
297  ! Unpack
298  !-----------------------------------------------------------------------------
299  call self%unpk_mesgs (n_unpk)
300  call trace%end (itimer)
301 END SUBROUTINE recv_irecv
302 
303 !===============================================================================
304 !> Check for and unpack MPI messages, using all threads. Each thread has a
305 !> thread private recv_list, which the thread checks for completed messages,
306 !> which are passed on to a shared unpk_list. Each thread checks -- in a critical
307 !> region -- the unpk_list for messages in expected order; these are unpacked
308 !> in the correct order.
309 !===============================================================================
310 SUBROUTINE recv_mirecv (self, n_unpk)
311  class(task_mesg_t):: self
312  integer:: n_unpk
313  class(mesg_t), pointer:: mesg, next
314  type(mesg_list_t):: unpk_tmp
315  integer, save:: itimer=0
316  integer:: n
317  logical:: ok
318  !.............................................................................
319  if (mpi_mesg%nbuf == 0) return
320  call trace%begin ('task_mesg_t%recv_mirecv', 1, itimer=itimer)
321  mpi_mesg%uniq_mesg = .true.
322  !-----------------------------------------------------------------------------
323  ! Check the thread private list, and accumulate completed mesgs in unpck_tmp
324  !-----------------------------------------------------------------------------
325  mesg => priv_list%head
326  do while (associated(mesg))
327  next => mesg%next
328  if (mesg%is_complete('recv_mirecv')) then
329  call priv_list%remove (mesg)
330  call unpk_tmp%add (mesg)
331  end if
332  mesg => next
333  end do
334  !-----------------------------------------------------------------------------
335  ! Issue additional IRECV requests, until there are max_recv in queue
336  !-----------------------------------------------------------------------------
337  do while (priv_list%n < mpi_mesg%max_recv)
338  call mpi_mesg%iget (mesg)
339  if (mesg%is_complete('recv_irecv')) then
340  call unpk_tmp%add (mesg)
341  else
342  call priv_list%add (mesg)
343  end if
344  end do
345  !-----------------------------------------------------------------------------
346  ! Add the messages accumulated in unpk_tmp to the unpk_list, and process that
347  ! list. Since the unpk_list is shared this must be done in a critical region
348  !-----------------------------------------------------------------------------
349  !$omp critical (mirecv_cr)
350  mesg => unpk_tmp%head
351  do while (associated(mesg))
352  next => mesg%next
353  call unpk_tmp%remove (mesg)
354  call mpi_mesg%unpk_list%add (mesg)
355  mesg => next
356  end do
357  call self%unpk_mesgs (n_unpk)
358  !$omp end critical (mirecv_cr)
359  call trace%end (itimer)
360 END SUBROUTINE recv_mirecv
361 
362 !===============================================================================
363 !> Step 2: Move complete messages to unpk_list
364 !===============================================================================
365 SUBROUTINE move_mesgs (self, msg_list)
366  class(task_mesg_t):: self
367  type(mesg_list_t):: msg_list
368  class(mesg_t), pointer:: mesg, next
369  !-----------------------------------------------------------------------------
370  mesg => msg_list%head
371  do while (associated(mesg))
372  next => mesg%next
373  if (mesg%is_complete('recv_improbe')) then
374  !$omp atomic update
375  timer%n_master(2) = timer%n_master(2) + 1
376  if (verbose > 1) &
377  write (io_unit%log,*) 'recv_improbe: moved to unpk_list:', &
378  mesg%id, mesg%seq, associated(next)
379  call msg_list%remove (mesg)
380  call mpi_mesg%unpk_list%add (mesg)
381  else
382  if (verbose > 2) &
383  write (io_unit%log,*) 'recv_improbe: not yet complete:', &
384  mesg%id, mesg%seq
385  end if
386  mesg => next
387  end do
388 END SUBROUTINE move_mesgs
389 
390 !===============================================================================
391 !> Step 3: Unpack in correct order
392 !===============================================================================
393 SUBROUTINE unpk_mesgs (self, n)
394  class(task_mesg_t):: self
395  integer:: n
396  class(mesg_t), pointer:: mesg, next
397  !-----------------------------------------------------------------------------
398  mesg => mpi_mesg%unpk_list%head
399  do while (associated(mesg))
400  next => mesg%next
401  if (mesg%is_in_order()) then
402  !$omp atomic update
403  timer%n_master(3) = timer%n_master(3) + 1
404  call self%unpack (mesg)
405  if (verbose > 1) &
406  write (io_unit%log,*) 'recv_improbe: unpacked from list:', &
407  mesg%id, mesg%seq
408  call mpi_mesg%unpk_list%remove (mesg)
409  call mpi_mesg%unpk_list%delete (mesg, .false.)
410  n = n+1
411  else
412  if (verbose > 2) &
413  write (io_unit%log,*) 'recv_improbe: kept in unpk_list:', &
414  mesg%id, mesg%seq
415  end if
416  mesg => next
417  end do
418 END SUBROUTINE unpk_mesgs
419 
420 !===============================================================================
421 !> Run through all virtual patches, count them, and optionally create separate
422 !> threadprivate lists that together include all of them.
423 !===============================================================================
424 SUBROUTINE init_virtual (self)
425  class(task_mesg_t):: self
426  class(link_t), pointer:: link
427  class(task_t), pointer:: task
428  integer:: nvirt, per, i, i1, i2
429  !.............................................................................
430  if (mpi%size==1) return
431  if (trim(method) /= 'virtual' .and. trim(method) /= 'private') return
432  call trace%begin ('task_mesg_t%init_virtual')
433  !---------------------------------------------------------------------------
434  ! Count the number of virtual tasks, and assign a number per thread
435  !---------------------------------------------------------------------------
436  nvirt = 0
437  link => self%head
438  do while (associated(link))
439  task => link%task
440  if (task%is_set (bits%virtual)) then
441  nvirt = nvirt+1
442  end if
443  link => link%next
444  end do
445  per = nvirt/omp%nthreads + 1
446  if (trim(method) == 'private') then
447  !---------------------------------------------------------------------------
448  ! Each thread copies its share of virtual patches to its private list, and
449  ! initializes its private virtual task list
450  !---------------------------------------------------------------------------
451  !$omp parallel shared(verbose,nvirt,per,mpi,bits,self,io) private(i1,i2) default(none)
452  i1 = 1 + per*omp%thread
453  i2 = i1 + per
454  call init_virtual_list (self, private_list, i1, i2)
455  !$omp end parallel
456  else if (trim(method) == 'virtual') then
457  call init_virtual_list (self, master_list, 1, 1+nvirt)
458  end if
459  call trace%end ()
460 END SUBROUTINE init_virtual
461 
462 !===============================================================================
463 !> Collect the virtual tasks in the enumerated i1..i2 interval onto reduced list
464 !===============================================================================
465 SUBROUTINE init_virtual_list (self, list, i1, i2)
466  class(task_mesg_t):: self, list
467  integer, optional:: i1, i2
468  class(link_t), pointer:: link
469  class(task_t), pointer:: task
470  integer:: i
471  !-----------------------------------------------------------------------------
472  call trace%begin ('task_mesg_t%init_virtual_list')
473  i = 1
474  link => self%head
475  do while (associated(link))
476  task => link%task
477  if (task%is_set (bits%virtual)) then
478  if (i >= i1 .and. i < i2) then
479  ! -- append the task to the virtal list --
480  call list%append (task, &
481  nbor=link%nbor, needed=link%needed, needs_me=link%needs_me)
482  ! -- allocate a task message --
483  call task%allocate_mesg
484  task%wc_last = wallclock()
485  ! -- initialize the 1st receive --
486  call task%mesg%irecv (task%rank, task%id)
487  if (verbose > 0) &
488  write (io_unit%log,*) &
489  'task_mesg_t%init_virtual: id, mesg%id, mesg%tag =', &
490  task%id, task%mesg%id, task%mesg%tag
491  end if
492  i = i+1
493  end if
494  link => link%next
495  end do
496  if (verbose > 0) &
497  write (io_unit%log,'(a,2i5,2x,2i4)') &
498  'task_mesg_t%init_virtual_list: n, thread, i1, i2 =', &
499  list%n, omp%thread, i1, i2
500  call trace%end()
501 END SUBROUTINE init_virtual_list
502 
503 !===============================================================================
504 !> Run through all virtual patches, unpack completed receives, and issue new
505 !> receive requests. The virtual_list is threadprivate, so no critical region
506 !> is needed.
507 !===============================================================================
508 SUBROUTINE recv_virtual (self, list, n_unpk)
509  class(task_mesg_t):: self, list
510  class(link_t), pointer:: link
511  class(task_t), pointer:: task
512  integer:: n_unpk
513  real(8):: wc
514  integer, save:: itimer=0
515  !.............................................................................
516  call trace%begin ('task_mesg_t%recv_virtual', itimer=itimer)
517  !-----------------------------------------------------------------------------
518  ! Each thread runs through its list of virtual tasks, unpacks completed
519  ! messages, and issues a new request
520  !-----------------------------------------------------------------------------
521  link => list%head
522  do while (associated(link))
523  task => link%task
524  select type (task)
525  class is (patch_t)
526  call set_test_time (self, task)
527  if (task%mesg%is_complete('virtual')) then
528  if (verbose > 1 .or. task%id == io%id_debug) &
529  write (io_unit%log,'(f12.6,2x,a,i6,i5,i9,i6,z12)') wallclock(), &
530  'task_mesg_t%recv_virtual: recv id, seq, mesg%id, sender =', &
531  task%id, mod(task%istep,100), task%mesg%id, task%mesg%sender
532  call self%unpack (task%mesg, link=link)
533  !$omp atomic
534  mpi_mesg%n_recv = mpi_mesg%n_recv+1
535  !$omp atomic
536  timer%bytes_recv = timer%bytes_recv + 4.0_8*task%mesg%nbuf
537  !$omp atomic
538  timer%n_recv = timer%n_recv + 1_8
539  wc = wallclock()
540  if (wc-task%wc_last > 10.) write(io%output, &
541  '("WARNING: virtual patch not updated in",f5.1," sec")') wc
542  task%wc_last = wc
543  call task%mesg%irecv (task%rank, task%id)
544  !write(stdout,*) 'ircv:', task%id, task%mesg%id, task%mesg%tag, task%rank
545  n_unpk = n_unpk+1
546  else if (verbose > 2 .or. task%id == io%id_debug) then
547  write (io_unit%log,'(f12.6,2x,a,i6,i5,i9,i6,z12)') wallclock(), &
548  'task_mesg_t%recv_virtual: fail id, seq, mesg%id, sender, req =', &
549  task%id, mod(task%istep,100), task%mesg%id, task%rank, task%mesg%req
550  end if
551  end select
552  link => link%next
553  end do
554  if (verbose > 0) &
555  write (io_unit%log,*) ' task_mesg_t%recv_virtual, n_unpk =', n_unpk
556  call trace%end (itimer)
557 END SUBROUTINE recv_virtual
558 
559 !===============================================================================
560 !> Find the link with a specified task id. If there isn't any, create one!
561 !> If the patch already existed, it's nbor list should be OK, but the status
562 !> bit of it, and all its nbors, need to updated.
563 !===============================================================================
564 FUNCTION find_task (self, id, new) RESULT (link)
565  class(task_mesg_t):: self
566  integer:: id
567  logical:: new
568  logical:: error
569  class(link_t), pointer:: link
570  class(task_t), pointer:: task
571  class(patch_t), pointer:: patch
572  class(experiment_t), pointer:: exper
573  class(*), pointer:: ptr
574  integer, save:: itimer=0
575  !.............................................................................
576  call trace%begin ('task_mesg_t%find_task', itimer=itimer)
577  if (verbose > 1) &
578  write (io_unit%log,*) 'task_mesg_t%find_task: id =', id
579  nullify(ptr)
580  if (use_hashtable) &
581  call self%hash_table%get ([id, 1], ptr)
582  if (associated(ptr)) then
583  new = .false.
584  nullify(link)
585  select type (ptr)
586  class is (link_t)
587  link => ptr
588  class default
589  call io%abort ('hash table link not useful')
590  end select
591  task => link%task
592  if (verbose > 1) &
593  write (io_unit%log,*) 'task_mesg_t%find_task: hash =', task%id
594  else
595  call self%lock%set ('find_task')
596  new = .true.
597  link => self%head
598  do while (associated(link))
599  task => link%task
600  if (verbose > 2) &
601  write (io_unit%log,*) 'task_mesg_t%find_task: task%id =', task%id
602  if (task%id == id) then
603  if (verbose > 1) then
604  write (io_unit%log,*) 'task_mesg_t%find_task: match =', id
605  flush(io_unit%log)
606  end if
607  !-----------------------------------------------------------------------
608  ! Save in hash table for next time
609  !-----------------------------------------------------------------------
610  if (use_hashtable) then
611  ptr => link
612  call self%hash_table%set ([id, 1], ptr)
613  end if
614  new = .false.
615  exit
616  end if
617  link => link%next
618  end do
619  call self%lock%unset ('find_task')
620  end if
621  !-----------------------------------------------------------------------------
622  ! If new, a virtual task needs to be created
623  !-----------------------------------------------------------------------------
624  if (new) then
625  allocate (link)
626  allocate (exper)
627  link%task => exper
628  exper%link => link
629  exper%box = self%size
630  !---------------------------------------------------------------------------
631  ! Need a task id to assign the lock id in link%init
632  !---------------------------------------------------------------------------
633  exper%id = id
634  call link%init
635  !---------------------------------------------------------------------------
636  ! The virtual task needs to have all of the capabilities and procedures of the
637  ! original boundary task, since it may need to play a role in extras features.
638  !---------------------------------------------------------------------------
639  call exper%init
640  call exper%set (bits%virtual)
641  if (verbose > 0) &
642  write (io_unit%log,*) &
643  'task_mesg_t%find: created new task, id =', id, exper%id
644  end if
645  call trace%end (itimer)
646 END FUNCTION find_task
647 
648 !===============================================================================
649 !> Unpack a message, where the MPI tag is the task id. Use that to search
650 !> for the task, apply its unpack method, and check if any nbors become ready.
651 !===============================================================================
652 SUBROUTINE unpack (self, mesg, link)
653  class(task_mesg_t):: self
654  class(mesg_t), pointer:: mesg
655  class(link_t), pointer, optional:: link
656  class(link_t), pointer:: link1, link2
657  class(experiment_t), pointer:: task
658  logical:: found, failed, new
659  integer:: id, n_added
660  integer, save:: itimer=0
661  character(len=24):: label
662  real(8):: wc
663  !-----------------------------------------------------------------------------
664  call trace%begin ('task_mesg_t%unpack', itimer=itimer)
665  if (task%logging > 1) then
666  write (label,'(a,i4,i8)') 'unpack ', mesg%sender, mesg%id
667  call task%log (label)
668  end if
669  if (mesg%nbuf < 40) then
670  call load_balance%unpack (mesg%buffer)
671  return
672  end if
673  !-----------------------------------------------------------------------------
674  ! This entire operation should be threadsafe, since no other thread should be
675  ! working on the same message and the same patch. And find_task uses only the
676  ! static links in the task list -- not the dynamic next_time links.
677  !-----------------------------------------------------------------------------
678  !!omp critical (unpack_cr)
679  new = .false.
680  mesg%id = mesg%id
681  if (present(link)) then
682  link1 => link
683  if (self%verbose > 1) &
684  write (io_unit%log,'(f12.6,2x,a,2i5,z12)') &
685  wallclock(), 'task_mesg_t%unpack: id =', mesg%id, link1%task%id
686  else
687  link1 => find_task(self, mesg%id, new)
688  if (self%verbose > 1) &
689  write (io_unit%log,'(f12.6,2x,a,2i5,z12)') &
690  wallclock(), 'task_mesg_t%unpack: found id =', mesg%id, link1%task%id
691  end if
692  !-----------------------------------------------------------------------------
693  ! Make sure the task pointer has all experiment_t attributes
694  !-----------------------------------------------------------------------------
695  associate(ltask=>link1%task)
696  select type (ltask)
697  class is (experiment_t)
698  task => ltask
699  end select
700  end associate
701  failed = .false.
702  !-----------------------------------------------------------------------------
703  if (associated(link1%task)) then
704  if (verbose > 1) &
705  write (io_unit%log,'(a,i6,2x,5l1)') 'unpack: id, bits BVRES =', mesg%id, &
706  task%is_set (bits%boundary), &
707  task%is_set (bits%virtual), &
708  task%is_set (bits%ready), &
709  task%is_set (bits%remove), &
710  task%is_set (bits%swap_request)
711  end if
712  !-----------------------------------------------------------------------------
713  ! Guard against lingering extra messages to a swapped patch
714  !-----------------------------------------------------------------------------
715  if (task%is_set (bits%boundary)) then
716  write (io_unit%log,'(f12.6,2x,a,i9,3x,5l1)') wallclock(), &
717  'task_mesg_mod::unpack ERROR, received mpi_mesg for boundary task:', task%id, &
718  task%is_set(bits%internal), &
719  task%is_set(bits%boundary), &
720  task%is_set(bits%virtual), &
721  task%is_set(bits%external), &
722  task%is_set(bits%swap_request)
723  failed = .true.
724  end if
725  if (task%is_set (bits%ready)) then
726  found = .false.
727  link2 => self%queue
728  do while (associated(link2))
729  if (link2%task%id == task%id) then
730  found = .true.
731  exit
732  end if
733  link2 => link2%next_time
734  end do
735  write (io_unit%log,'(f12.6,2x,a,i9,l3,3x,5l1)') wallclock(), &
736  'task_mesg_mod::unpack ERROR, received mpi_mesg for task with ready bit:', task%id, found, &
737  task%is_set(bits%internal), &
738  task%is_set(bits%boundary), &
739  task%is_set(bits%virtual), &
740  task%is_set(bits%external), &
741  task%is_set(bits%swap_request)
742  failed = .true.
743  end if
744  !-----------------------------------------------------------------------------
745  ! Measure the update cadence for this virtual task
746  !-----------------------------------------------------------------------------
747  wc = wallclock()
748  task%update_cadence = wc - task%update_last
749  task%update_last = wc
750  !-----------------------------------------------------------------------------
751  ! Unpack a patch message (which includes swapping the roles of boundary bits).
752  ! Since an already existing patch may, at any one time, be under investigation
753  ! by check_nbors, it must be protected by a critical region (or an OMP
754  ! lock) while it is being updated here
755  !-----------------------------------------------------------------------------
756  id = task%id
757  call task%unpack (mesg)
758  if (verbose > 1) then
759  write (io_unit%mpi,'(f12.6,3x,a,3i6,f12.6,2x,3f10.4,2x,5l1)') &
760  wallclock(), 'unpack: after task%unpack BVRES =', &
761  id, mesg%id, task%id, task%time, task%position, &
762  task%is_set (bits%boundary), &
763  task%is_set (bits%virtual), &
764  task%is_set (bits%ready), &
765  task%is_set (bits%remove), &
766  task%is_set (bits%swap_request)
767  flush (io_unit%mpi)
768  end if
769  !-----------------------------------------------------------------------------
770  ! Check for suicide note
771  !-----------------------------------------------------------------------------
772  if (task%is_set (bits%remove)) then
773  if (verbose > 0) then
774  write (io_unit%log,'(f12.6,2x,a,i6)') &
775  wallclock(), 'unpack: suicide note received for id =', task%id
776  end if
777  call self%remove_and_reset (link1)
778  call trace%end (itimer)
779  return
780  end if
781  if (io%log_sent > 0) then
782  !$omp critical (log_sent_cr)
783  call mpi_mesg%log_files()
784  write (io_unit%sent,'(f16.6,i4,2x,a,i6,f16.6,8i5)') wallclock(), omp%thread, 'unp', task%id, task%time, task%rank
785  flush (io_unit%sent)
786  !$omp end critical (log_sent_cr)
787  end if
788  if (mpi_mesg%debug .or. id==io%id_debug) then
789  !$omp critical (debug_cr)
790  write (io_unit%log,'(f12.6,2x,a,2i9,z12,2x,5l1)') wallclock(), &
791  'DBG unpk: id, sender, req =', mesg%id, task%rank, mesg%req, &
792  task%is_set (bits%internal), &
793  task%is_set (bits%boundary), &
794  task%is_set (bits%virtual), &
795  task%is_set (bits%external), &
796  task%is_set (bits%swap_request)
797  !$omp end critical (debug_cr)
798  end if
799  if (mpi_mesg%debug) &
800  write (io_unit%log,'(f12.6,2x,a,i9,1p,e18.6)') wallclock(), 'unpk: id, time =', task%id, task%time
801  if (id /= mesg%id) &
802  write(io%output,*) 'unpack ERROR: wrong mesg%id', id, task%id, mesg%id
803  if (.not. failed) then
804  !$omp atomic
805  mpi_mesg%n_unpk = mpi_mesg%n_unpk+1
806  !---------------------------------------------------------------------------
807  ! If the boundary+swap bits are set, this is a task that has just changed
808  ! rank, and it needs to have its nbor relations re-initialized. This includes
809  ! resorting (removing + re-adding) the nbor's nbor lists in rank order.
810  ! FIXME: The load balancing steps should be checked for threadsafe operation
811  !---------------------------------------------------------------------------
812  if (task%is_set(bits%swap_request) .and. task%is_set(bits%boundary)) then
813  !$omp atomic update
814  self%na = self%na+1
815  !$omp atomic update
816  self%nb = self%nb+1
817  !$omp atomic update
818  self%nv = self%nv-1
819  call self%init_nbors (link1)
820  call self%check_ready (link1)
821  call task%clear (bits%swap_request+bits%ready)
822  call self%update_nbor_status (link1)
823  call self%count_status
824  if (verbose>1) &
825  write(io%output,'(f12.6,2x,a,i6,a,i9,a,i6)') &
826  wallclock(),'LB: rank',mpi%rank,' given patch',task%id,' by',mesg%sender
827  if (verbose>0) &
828  write (io_unit%log,*) 'task_mesg_t%unpack: swapped virtual to boundary:', task%id
829  !---------------------------------------------------------------------------
830  ! If link1 has no nbors it is a newly created virtual task. Does it need
831  ! an nbor list? At least we can use the nbor list to check that link1 is
832  ! in its nbors nbor lists. A new virtual task (where no task existed) means
833  ! that some nbor of it has changed from internal to boundary, which will be
834  ! checked by the test_nbor_status call below, but only if an nbor list exists.
835  !---------------------------------------------------------------------------
836  !else if (.not.associated(link1%nbor)) then
837  else if (task%is_set(bits%swap_request) .and. task%is_set(bits%virtual)) then
838  if (verbose>0) &
839  write (io_unit%log,*) 'task_mesg_t%unpack: new virtual patch:', task%id
840  self%nv = self%nv+1
841  call self%init_nbors (link1)
842  call self%check_ready (link1)
843  call task%clear (bits%swap_request+bits%ready)
844  call self%update_nbor_status (link1)
845  call self%count_status
846  end if
847  end if
848  !-----------------------------------------------------------------------------
849  ! If a virtual task was just created it, it and its nbors need new nbor lists.
850  ! Also, since the proper size and position were not known when a new task was
851  ! created, but were set by the task%unpack, we need to redo the task%setup,
852  ! which regenerates the meshes and sets up all geometric properties. New AMR
853  ! tasks may be created both directly, via calls from selective_refine(), or
854  ! indirectly, via calls in check_support(); at this point this makes little
855  ! or no difference. Check_support is associated one-to-one with new AMR
856  ! patches, so it should be sufficient to include the call under the if (new).
857  !-----------------------------------------------------------------------------
858  if (new) then
859  call task%setup
860  if (verbose > 0) then
861  write (io_unit%mpi,'(f12.6,2x,a,3i6,2i4,f12.6,2x,3f10.5,2x,"new")') &
862  wallclock(), 'task_mesg_t%unpack: ', id, task%id, link1%task%id, &
863  mesg%seq, omp%thread, task%time, task%position
864  flush(io_unit%mpi)
865  end if
866  call self%add_new_link (link1)
867  !---------------------------------------------------------------------------
868  ! If the task has the %support bit set, call refine_t%check_supprt on it
869  !---------------------------------------------------------------------------
870  if (task%is_set (bits%support)) then
871  call refine%check_support (self, link1, n_added)
872  end if
873  else
874  !-----------------------------------------------------------------------------
875  ! Patches that have the init_nbors bit set should have new nbor lists, in
876  ! order to replicate the behavior on the owner rank
877  !-----------------------------------------------------------------------------
878  if (task%is_set (bits%init_nbors)) then
879  call self%init_nbors (link1)
880  call self%check_ready (link1)
881  if (verbose > 1) &
882  write (io_unit%log,*) 'unpack: bits%init_nbors id =', task%id
883  end if
884  if (verbose > 0) then
885  write (io_unit%mpi,'(f12.6,2x,a,i6,2i4,f12.6,2x,3f10.5,2x)') &
886  wallclock(), 'task_mesg_t%unpack: ', id, mesg%seq, omp%thread, &
887  task%time, task%position
888  flush(io_unit%mpi)
889  end if
890  end if
891  !!omp end critical (unpack_cr)
892  !-----------------------------------------------------------------------------
893  ! As the task has now been updated, we need to check if any of the neighbors
894  ! have become ready to update. If the task has just been swapped into being
895  ! a boundary patch, it could possibly also be ready to update.
896  ! FIXME: Check how this applies to other dispatcher methods than method=0
897  !-----------------------------------------------------------------------------
898  if (self%method==0) then
899  if (verbose > 1) &
900  write (io_unit%log,*) 'unpack: check_nbors, id =', task%id
901  call self%check_nbors (link1)
902  end if
903  call trace%end (itimer)
904 END SUBROUTINE unpack
905 
906 END MODULE task_mesg_mod
Each thread uses a private timer data type, with arrays for start time and total time for each regist...
Definition: timer_mod.f90:11
Support tic/toc timing, as in MATLAB, and accurate wallclock() function. The timing is generally much...
Module with list handling for generic class task_t objects.
Definition: list_mod.f90:4
Hash table module for the use inside DISPATCH.
Template module for patches, which adds pointers to memory and mesh, and number of dimensions and var...
Definition: patch_mod.f90:6
This module handles checking max change between neighboring points. Each instance of it needs an inde...
Definition: refine_mod.f90:168
Message handling for task lists. Some of the methods are only used by dispatcher0_t, so should perhaps be moved over to that file.
Definition: io_mod.f90:4
Keep track of neighbor ranks and their loads, by sending and receiving short messages, storing the info in a linked list.
Template module for tasks.
Definition: task_mod.f90:4