DISPATCH
buffered_io_mod.f90
1 !===============================================================================
2 !> $Id: eb90519f67f73959d43f9476317a256ab739df87 $
3 !> This module handles buffering and output of data to disk. It receives
4 !> call of the type "call output%buffer (id, it, data)", where id is the task
5 !> id, iout is the output number, and data is a pointer to a one-dimensional
6 !> buffer. The actual content of the buffers has no relevance for this module,
7 !> and is left to the procedure that packs the information into the buffer.
8 !> For simplicity, the buffer size (ndata) is assumed to remain constant.
9 !===============================================================================
11  USE io_mod
12  USE patch_mod
13  USE trace_mod
14  implicit none
15  private
16  !-----------------------------------------------------------------------------
17  ! The dump data type holds a pointer to the data to be dumped to disk, with id
18  ! the task id, it the time slot, and buffer the packed task info. It is
19  ! assumed that the caller packed data into a temporarily allocated buffer,
20  ! which is deallocated once the data has been written to disk.
21  !-----------------------------------------------------------------------------
22  type, public:: dump_t
23  class(dump_t), pointer:: next => null()
24  integer:: id, iout
25  real(8):: time
26  integer:: ndata
27  real(4), dimension(:), pointer:: data
28  end type
29  !-----------------------------------------------------------------------------
30  ! The output object contains procedures for opening, buffering, writing, and
31  ! closing buffered output to a direct access file dumps.dat, with index data
32  ! saved to a separate index.dat file.
33  !-----------------------------------------------------------------------------
34  type, public:: buffered_io_t
35  integer:: ndata
36  integer:: iout = -1
37  integer(8):: record = 0
38  integer:: n_buffer = 0
39  integer:: max_buffer = 10
40  class(dump_t), pointer:: head => null()
41  class(dump_t), pointer:: tail => null()
42  contains
43  procedure:: open
44  procedure, nopass:: output
45  procedure:: buffer
46  procedure:: check
47  procedure:: write
48  procedure:: close
49  procedure:: find
50  procedure:: test
51  end type
52  integer(8):: revision=1
53  type(buffered_io_t), public:: buffered_io
54 CONTAINS
55 
56 !===============================================================================
57 !> Write results to disk, using the buffered I/O in output_mod
58 !===============================================================================
59 SUBROUTINE output (patch)
60  class(patch_t):: patch
61  !.............................................................................
62  type(header_t):: header
63  integer:: n_buf, n_data, ibuf, it1, it2, iv
64  real(4), dimension(:), pointer:: buffer
65  real, dimension(:,:,:), pointer:: var
66  real:: pt
67  !-----------------------------------------------------------------------------
68  ! Compute size of buffer and allocate -- it will be freed by the writer
69  !-----------------------------------------------------------------------------
70  call trace_begin ('patch_t%output_new')
71  !$omp critical (buffered_cr)
72  n_buf = product(patch%gn)
73  n_data = n_header + patch%nv*n_buf
74  allocate (buffer(n_data), var(patch%gn(1),patch%gn(2),patch%gn(3)))
75  !-----------------------------------------------------------------------------
76  ! Copy relevant patch info to sequenced header, and copy that to the buffer
77  !-----------------------------------------------------------------------------
78  call patch%patch_to_header (header)
79  ibuf = 1
80  call anonymous_copy (n_header, header, buffer(ibuf))
81  ibuf = ibuf + n_header
82  !-----------------------------------------------------------------------------
83  ! Copy the variables to the output buffer; these are first interpolated in time
84  !-----------------------------------------------------------------------------
85  it1 = patch%iit(patch%nt-2)
86  it2 = patch%iit(patch%nt-1)
87  pt = (patch%out_next-patch%t(it1))/max(patch%t(it2)-patch%t(it1),1d-30)
88  do iv=1,patch%nv
89  var = patch%mem(:,:,:,iv,it1,1)*(1.-pt) + patch%mem(:,:,:,iv,it2,1)*pt
90  call anonymous_copy (n_buf, var, buffer(ibuf))
91  ibuf = ibuf + n_buf
92  end do
93  deallocate (var)
94  call buffered_io%buffer (patch%id, patch%iout, patch%out_next, buffer)
95  call buffered_io%check
96  !$omp end critical (buffered_cr)
97  call trace_end
98 END SUBROUTINE output
99 
100 !===============================================================================
101 !> Open an index file and a data file. The first record of the index file starts
102 !> with the data format revision number and the buffer size.
103 !===============================================================================
104 SUBROUTINE open (self, nwords, iout)
105  class(buffered_io_t):: self
106  integer:: nwords, iout
107  logical, save:: first_time=.true., exist
108  character(len=64):: filename
109  !-----------------------------------------------------------------------------
110  call trace_begin('buffered_io_mod::open')
111  self%iout= iout
112  self%ndata = nwords
113  write (filename,'(a,i5.5,"/buffered")') trim(io%outputname), iout
114  if (io%verbose>0) &
115  print'(a)', 'OPEN: '//trim(filename)
116  !---------------------------------------------------------------------------
117  ! Check if the index file exists. If it does, read the record counter, and
118  ! if not, reset the counter and write it out.
119  !---------------------------------------------------------------------------
120  if (io%verbose>1) &
121  print*, 'index file: ', trim(filename)//'.idx'
122  inquire (file=trim(filename)//'.idx', exist=exist)
123  if (exist) then
124  open (io_unit%index, file=trim(filename)//'.idx', &
125  form='unformatted', access='direct', status='unknown', recl=2*io%word_size)
126  read (io_unit%index, rec=3) self%record
127  else
128  open (io_unit%index, file=trim(filename)//'.idx', &
129  form='unformatted', access='direct', status='unknown', recl=2*io%word_size)
130  self%record = 0
131  write (io_unit%index, rec=3) self%record
132  end if
133  write (io_unit%index, rec=1) revision
134  write (io_unit%index, rec=2) int(nwords,kind=8)
135  !---------------------------------------------------------------------------
136  ! Open the data file for writing
137  !---------------------------------------------------------------------------
138  if (io%verbose>1) &
139  print*, ' dump file: ', trim(filename)//'.dat'
140  open (io_unit%dump, file=trim(filename)//'.dat', &
141  form='unformatted', access='direct', status='unknown', recl=nwords*io%word_size)
142  call trace_end
143 END SUBROUTINE open
144 
145 !===============================================================================
146 !> Append a dump instance to a linked list.
147 !===============================================================================
148 SUBROUTINE buffer (self, id, iout, time, data)
149  class(buffered_io_t):: self
150  integer:: id, iout
151  real(8):: time
152  real(4), dimension(:), pointer:: data
153  class(dump_t), pointer:: dump
154  !-----------------------------------------------------------------------------
155  call trace_begin('buffered_io_mod::buffer')
156  allocate (dump)
157  dump%id = id
158  dump%iout = iout
159  dump%time = time
160  dump%ndata = size(data)
161  dump%data => data
162  if (associated(self%tail)) then
163  self%tail%next => dump
164  else
165  self%head => dump
166  end if
167  self%tail => dump
168  self%n_buffer = self%n_buffer+1
169  if (io%verbose>1) &
170  print*,'buffered_io_mod::buffer: id, iout, n_buffer=', id, iout, self%n_buffer
171  call trace_end
172  call self%check
173 END SUBROUTINE buffer
174 
175 !===============================================================================
176 !> Check if the number of dumps has reached the maximum => write dumps to disk.
177 !> This is done as a task-list independent step by all threads.
178 !===============================================================================
179 SUBROUTINE check (self)
180  class(buffered_io_t):: self
181  class(dump_t), pointer:: dump
182  !-----------------------------------------------------------------------------
183  if (self%n_buffer >= self%max_buffer) then
184  call self%write
185  end if
186 END SUBROUTINE check
187 
188 !===============================================================================
189 !> Write the buffers in the linked list to disk and deallocate the buffers.
190 !> The index file records, sequentially, the task id and dump number of each
191 !> dump written. By reading the entire index file one can search very rapidly
192 !> for a specific dump, by treating the id+iout combination as an 8 byte integer
193 !===============================================================================
194 SUBROUTINE write (self)
195  class(buffered_io_t):: self
196  class(dump_t), pointer:: dump, old
197  !-----------------------------------------------------------------------------
198  call trace_begin('buffered_io_mod::write')
199  dump => self%head
200  if (io%verbose>0) &
201  print'(a,i7,i6,i9,g15.6)', 'output: it, nbuf, ndata, time', &
202  dump%id, self%n_buffer, dump%ndata, dump%time
203  do while (associated(dump))
204  !---------------------------------------------------------------------------
205  ! If time slot differs, close the previous one and open the current one
206  !---------------------------------------------------------------------------
207  if (dump%iout /= self%iout) then
208  if (self%iout >= 0) call self%close
209  call self%open (dump%ndata, dump%iout)
210  end if
211  !---------------------------------------------------------------------------
212  ! Increment the record counter, record its value, and write the data
213  !---------------------------------------------------------------------------
214  !$omp atomic
215  self%record = self%record+1
216  write (io_unit%index, rec=3) self%record
217  write (io_unit%index, rec=self%record+3) dump%id, dump%iout
218  flush (io_unit%index)
219  write (io_unit%dump, rec=self%record) dump%data
220  flush (io_unit%dump)
221  !$omp atomic
222  self%n_buffer = self%n_buffer-1
223  if (io%verbose>1) &
224  print'(a,i7,i6,i9,g15.6)', 'output: id, it, ndata, time', &
225  dump%id, dump%iout, dump%ndata, dump%time
226  !---------------------------------------------------------------------------
227  ! Delete the buffer data, and the buffer
228  !---------------------------------------------------------------------------
229  old => dump
230  dump => dump%next
231  deallocate (old%data)
232  deallocate (old)
233  end do
234  nullify (self%head)
235  nullify (self%tail)
236  call trace_end
237 END SUBROUTINE write
238 
239 !===============================================================================
240 !> Close the files, writing out the total number of records in slot 3 of the
241 !> index file.
242 !===============================================================================
243 SUBROUTINE close (self)
244  class(buffered_io_t):: self
245  !-----------------------------------------------------------------------------
246  if (io%do_legacy) return
247  call trace_begin('buffered_io_mod::close')
248  close (io_unit%index)
249  close (io_unit%dump)
250  call trace_end
251 END SUBROUTINE close
252 
253 !===============================================================================
254 !> Find the record that contains the combination of task id and dump number iout.
255 !> This will typically be done in Python or IDL; this is just a demo.
256 !===============================================================================
257 FUNCTION find (self, id, iout)
258  class(buffered_io_t):: self
259  integer, optional:: id, iout
260  integer:: find, loc(1)
261  integer(8), dimension(:), pointer:: index
262  integer(8):: key, ndata, nrecord, revision, i
263  !-----------------------------------------------------------------------------
264  call trace_begin('buffered_io_mod::find')
265  !-----------------------------------------------------------------------------
266  ! Read the number of records and the buffer size
267  !-----------------------------------------------------------------------------
268  open (io_unit%index, file=trim(io%outputname)//'index.dat', &
269  form='unformatted', access='direct', status='old', recl=8)
270  read (io_unit%index,rec=1) revision
271  read (io_unit%index,rec=2) ndata
272  read (io_unit%index,rec=3) nrecord
273  print *, revision, ndata, nrecord
274  close (io_unit%index)
275  !-----------------------------------------------------------------------------
276  ! Get the entire index in one read
277  !-----------------------------------------------------------------------------
278  allocate (index(nrecord))
279  open (io_unit%index, file=trim(io%outputname)//'index.dat', &
280  form='unformatted', access='direct', status='old', recl=8*(nrecord+3))
281  read (io_unit%index,rec=1) revision, ndata, nrecord, index
282  close (io_unit%index)
283  !-----------------------------------------------------------------------------
284  ! Find the id+iout combo
285  !-----------------------------------------------------------------------------
286  key = int(id,kind=8) + int(iout,kind=8)*2_8**32
287  if (io%verbose>1) &
288  print*, 'searching for id, iout, key =', id, iout, key
289  if (io%verbose>2) then
290  do i=1,nrecord
291  print *, i, index(i)
292  end do
293  end if
294  loc = minloc(abs(index-key))
295  find = loc(1)
296  if (io%verbose>1) &
297  print*, 'record =', find
298  deallocate (index)
299  call trace_end
300 END FUNCTION find
301 
302 !===============================================================================
303 !> Test the output module, by buffering enough dumps for a self-triggered write,
304 !> followed by a write triggered by a close.
305 !===============================================================================
306 SUBROUTINE test (self)
307  class(buffered_io_t):: self
308  real(4), dimension(:), pointer:: data
309  integer, dimension(:), pointer:: p
310  integer:: n=20, nv=8, id, it, ndata, record
311  real(8):: time
312  !-----------------------------------------------------------------------------
313  call trace_begin ('buffered_io_mod::test')
314  ndata = n**3*nv+n**2
315  call self%open (ndata,0)
316  do it=1,3
317  time = it-1d0
318  do id=1,9
319  allocate (data(ndata))
320  call self%buffer (id, it, time, data)
321  end do
322  end do
323  call self%close
324  record = self%find (id=2,iout=3)
325  print *,'record =', record
326  call trace_end
327 END SUBROUTINE test
328 
329 END MODULE buffered_io_mod
Template module for patches, which adds pointers to memory and mesh, and number of dimensions and var...
Definition: patch_mod.f90:6
Definition: io_mod.f90:4