Rheolef  7.2
an efficient C++ finite element environment
mpi_scatter_init.h
Go to the documentation of this file.
1 #ifndef _RHEO_MPI_SCATTER_INIT_H
2 #define _RHEO_MPI_SCATTER_INIT_H
23 
24 #include "rheolef/compiler.h"
25 #include "rheolef/communicator.h"
26 #include "rheolef/scatter_message.h"
27 
28 #include "rheolef/msg_sort_with_permutation.h"
29 #include "rheolef/msg_to_context.h"
30 #include "rheolef/msg_from_context_pattern.h"
31 #include "rheolef/msg_from_context_indices.h"
32 #include "rheolef/msg_local_context.h"
33 #include "rheolef/msg_local_optimize.h"
34 
35 #include "rheolef/msg_util.h"
36 
37 #pragma GCC diagnostic push
38 #pragma GCC diagnostic ignored "-Weffc++"
39 #pragma GCC diagnostic ignored "-Wparentheses"
40 #pragma GCC diagnostic ignored "-Wnon-virtual-dtor"
41 #include <boost/functional.hpp>
42 #include <boost/iterator/transform_iterator.hpp>
43 #pragma GCC diagnostic pop
44 
45 /*F:
46 NAME: mpi_scatter_init -- gather/scatter initialize (@PACKAGE@ @VERSION@)
47 DESCRIPTION:
48  Initialize communication
49  for distributed to sequential scatter context.
50 COMPLEXITY:
51  Time and memory complexity is O(nidx+nproc).
52  For finite-element problems in d dimenion
53 
54 | nidx ~ N^((d-1)/d)
55 
56  where N is the number of degrees of freedom.
57 
58 IMPLEMENTATION
59  Inspirated from petsc-2.0/vpscat.c: VecScatterCreate_PtoS()
60 AUTHORS:
61  LMC-IMAG, 38041 Grenoble cedex 9, France
62  | Pierre.Saramito@imag.fr
63 DATE: 23 march 1999
64 END:
65 */
66 
67 namespace rheolef {
68 
69 //<mpi_scatter_init:
70 template <class Message, class Size, class SizeRandomIterator1,
71  class SizeRandomIterator2, class SizeRandomIterator3, class Tag>
72 void
74 // input:
75  Size nidx,
76  SizeRandomIterator1 idx,
77  Size nidy,
78  SizeRandomIterator2 idy,
79  Size idy_maxval,
80  SizeRandomIterator3 ownership,
81  Tag tag,
83 // output:
84  Message& from,
85  Message& to)
86 {
87  typedef Size size_type;
88  size_type my_proc = comm.rank();
89  size_type nproc = comm.size();
90 
91  // -------------------------------------------------------
92  // 1) first count number of contributors to each processor
93  // -------------------------------------------------------
94  std::vector<size_type> msg_size(nproc, 0);
95  std::vector<size_type> msg_mark(nproc, 0);
96  std::vector<size_type> owner (nidx);
97  size_type send_nproc = 0;
98  {
99  size_type iproc = 0;
100  for (size_type i = 0; i < nidx; i++) {
101  for (; iproc < nproc; iproc++) {
102  if (idx[i] >= ownership[iproc] && idx[i] < ownership[iproc+1]) {
103  owner[i] = iproc;
104  msg_size [iproc]++;
105  if (!msg_mark[iproc]) {
106  msg_mark[iproc] = 1;
107  send_nproc++;
108  }
109  break;
110  }
111  }
112  check_macro (iproc != nproc, "bad stash data: idx["<<i<<"]="<<idx[i]<<" out of range [0:"<<ownership[nproc]<<"[");
113  }
114  } // end block
115  // -------------------------------------------------------
116  // 2) avoid to send message to my-proc in counting
117  // -------------------------------------------------------
118  size_type n_local = msg_size[my_proc];
119  if (n_local != 0) {
120  msg_size [my_proc] = 0;
121  msg_mark [my_proc] = 0;
122  send_nproc--;
123  }
124  // ----------------------------------------------------------------
125  // 3) compute number of messages to be send to my_proc
126  // ----------------------------------------------------------------
127  std::vector<size_type> work(nproc);
128  mpi::all_reduce (
129  comm,
130  msg_mark.begin().operator->(),
131  nproc,
132  work.begin().operator->(),
133  std::plus<size_type>());
134  size_type receive_nproc = work [my_proc];
135  // ----------------------------------------------------------------
136  // 4) compute messages max size to be send to my_proc
137  // ----------------------------------------------------------------
138  mpi::all_reduce (
139  comm,
140  msg_size.begin().operator->(),
141  nproc,
142  work.begin().operator->(),
143  mpi::maximum<size_type>());
144  size_type receive_max_size = work [my_proc];
145  // ----------------------------------------------------------------
146  // 5) post receive: exchange the buffer adresses between processes
147  // ----------------------------------------------------------------
148  std::list<std::pair<size_type,mpi::request> > receive_waits;
149  std::vector<size_type> receive_data (receive_nproc*receive_max_size);
150  for (size_type i_receive = 0; i_receive < receive_nproc; i_receive++) {
151  mpi::request i_req = comm.irecv (
152  mpi::any_source,
153  tag,
154  receive_data.begin().operator->() + i_receive*receive_max_size,
155  receive_max_size);
156  receive_waits.push_back (std::make_pair(i_receive, i_req));
157  }
158  // ---------------------------------------------------------------------------
159  // 6) compute the send indexes
160  // ---------------------------------------------------------------------------
161  // comme idx est trie, on peut faire une copie de idx dans send_data
162  // et du coup owner et send_data_ownership sont inutiles
163  std::vector<size_type> send_data (nidx);
164  std::copy (idx, idx+nidx, send_data.begin());
165  // ---------------------------------------------------------------------------
166  // 7) do send
167  // ---------------------------------------------------------------------------
168  std::list<std::pair<size_type,mpi::request> > send_waits;
169  {
170  size_type i_send = 0;
171  size_type i_start = 0;
172  for (size_type iproc = 0; iproc < nproc; iproc++) {
173  size_type i_msg_size = msg_size[iproc];
174  if (i_msg_size == 0) continue;
175  mpi::request i_req = comm.isend (
176  iproc,
177  tag,
178  send_data.begin().operator->() + i_start,
179  i_msg_size);
180  send_waits.push_back(std::make_pair(i_send,i_req));
181  i_send++;
182  i_start += i_msg_size;
183  }
184  } // end block
185  // ---------------------------------------------------------------------------
186  // 8) wait on receives
187  // ---------------------------------------------------------------------------
188  // note: for wait_all, build an iterator adapter that scan the pair.second in [index,request]
189  // and then get an iterator in the pair using iter.base(): retrive the corresponding index
190  // for computing the position in the receive.data buffer
191  typedef boost::transform_iterator<select2nd<size_t,mpi::request>, std::list<std::pair<size_t,mpi::request> >::iterator>
192  request_iterator;
193  std::vector<size_type> receive_size (receive_nproc);
194  std::vector<size_type> receive_proc (receive_nproc);
195  size_type receive_total_size = 0;
196  while (receive_waits.size() != 0) {
197  typedef size_type data_type; // exchanged data is of "size_type"
198  request_iterator iter_r_waits (receive_waits.begin(), select2nd<size_t,mpi::request>()),
199  last_r_waits (receive_waits.end(), select2nd<size_t,mpi::request>());
200  // waits on any receive...
201  std::pair<mpi::status,request_iterator> pair_status = mpi::wait_any (iter_r_waits, last_r_waits);
202  // check status
203  boost::optional<int> i_msg_size_opt = pair_status.first.count<data_type>();
204  check_macro (i_msg_size_opt, "receive wait failed");
205  int iproc = pair_status.first.source();
206  check_macro (iproc >= 0, "receive: source iproc = "<<iproc<<" < 0 !");
207  // get size of receive and number in data
208  size_type i_msg_size = (size_t)i_msg_size_opt.get();
209  std::list<std::pair<size_t,mpi::request> >::iterator i_pair_ptr = pair_status.second.base();
210  size_type i_receive = (*i_pair_ptr).first;
211  receive_proc [i_receive] = iproc;
212  receive_size [i_receive] = i_msg_size;
213  receive_total_size += i_msg_size;
214  receive_waits.erase (i_pair_ptr);
215  }
216  // ---------------------------------------------------------------------------
217  // 9) allocate the entire send(to) scatter context
218  // ---------------------------------------------------------------------------
219  to.resize (receive_total_size, receive_nproc);
220 
221  // ---------------------------------------------------------------------------
222  // 10) compute the permutation of values that gives the sorted source[] sequence
223  // ---------------------------------------------------------------------------
224  // init: perm[i] = i
225  std::vector<size_type> perm(receive_nproc);
226  copy(index_iterator<size_type>(), index_iterator<size_type>(receive_nproc), perm.begin());
228  receive_proc.begin().operator->(),
229  perm.begin().operator->(),
230  receive_nproc);
231  // ---------------------------------------------------------------------------
232  // 11) Computes the receive compresed message pattern for send(to)
233  // ---------------------------------------------------------------------------
234  size_type istart = ownership[my_proc]; // = ownership.first_index()
236  perm.begin(),
237  perm.end(),
238  receive_proc.begin(),
239  receive_size.begin(),
240  receive_data.begin(),
241  receive_max_size,
242  istart,
243  to.procs().begin(),
244  to.starts().begin(),
245  to.indices().begin());
246  // ---------------------------------------------------------------------------
247  // 12) allocate the entire receive(from) scatter context
248  // ---------------------------------------------------------------------------
249  from.resize(nidy, send_nproc);
250  // ---------------------------------------------------------------------------
251  // 13) Computes the receive compresed message pattern for receive(from)
252  // ---------------------------------------------------------------------------
253  std::vector<size_type> proc2from_proc(nproc);
255  msg_size.begin(),
256  msg_size.end(),
257  from.procs().begin(),
258  from.starts().begin(),
259  proc2from_proc.begin());
260  // ---------------------------------------------------------------------------
261  // 14) Computes the receive compresed message indices for receive(from)
262  // ---------------------------------------------------------------------------
263  // assume that indices are sorted by increasing order
264  std::vector<size_type> start(send_nproc+1);
265  copy (from.starts().begin(), from.starts().end(), start.begin());
267  owner.begin(),
268  owner.end(),
269  idy,
270  proc2from_proc.begin(),
271  my_proc,
272  idy_maxval,
273  start.begin(),
274  from.indices().begin());
275  // ---------------------------------------------------------------------------
276  // 15) wait on sends
277  // ---------------------------------------------------------------------------
278  if (send_waits.size() != 0) {
279  request_iterator iter_s_waits (send_waits.begin(), select2nd<size_type,mpi::request>()),
280  last_s_waits (send_waits.end(), select2nd<size_type,mpi::request>());
281  mpi::wait_all (iter_s_waits, last_s_waits);
282  }
283  // ---------------------------------------------------------------------------
284  // 16) Computes the receive compresed message local pattern,
285  // i.e. the only part that does not requires communication.
286  // ---------------------------------------------------------------------------
287  from.local_slots.resize(n_local);
288  to.local_slots.resize(n_local);
289  size_type ilast = ownership[my_proc+1]; // = ownership.last_index()
291  idx,
292  idx+nidx,
293  idy,
294  idy_maxval,
295  istart,
296  ilast,
297  to.local_slots.begin(),
298  to.local_slots.end(),
299  from.local_slots.begin());
300  // ---------------------------------------------------------------------------
301  // 17) Optimize local exchanges during gatter/scatter
302  // ---------------------------------------------------------------------------
303  bool has_opt = msg_local_optimize (
304  to.local_slots.begin(),
305  to.local_slots.end(),
306  from.local_slots.begin());
307 
308  if (has_opt && n_local != 0) {
309  to.local_is_copy = true;
310  to.local_copy_start = to.local_slots[0];
311  to.local_copy_length = n_local;
312  from.local_is_copy = true;
313  from.local_copy_start = from.local_slots[0];
314  from.local_copy_length = n_local;
315  }
316 }
317 //>mpi_scatter_init:
318 } // namespace rheolef
319 #endif // _RHEO_MPI_SCATTER_INIT_H
field::size_type size_type
Definition: branch.cc:430
communicator communicator_type
Definition: distributor.h:79
size_t size_type
Definition: basis_get.cc:76
check_macro(expr1.have_homogeneous_space(Xh1), "dual(expr1,expr2); expr1 should have homogeneous space. HINT: use dual(interpolate(Xh, expr1),expr2)")
This file is part of Rheolef.
void mpi_scatter_init(Size nidx, SizeRandomIterator1 idx, Size nidy, SizeRandomIterator2 idy, Size idy_maxval, SizeRandomIterator3 ownership, Tag tag, const distributor::communicator_type &comm, Message &from, Message &to)
bool msg_local_optimize(InputIterator1 to_loc_idx, InputIterator1 last_to_loc_idx, InputIterator2 from_loc_idy)
void msg_from_context_indices(InputIterator1 owner, InputIterator1 last_owner, InputIterator2 idy, InputRandomIterator proc2from_proc, Proc my_proc, Size idy_maxval, MutableRandomIterator ptr, OutputIterator from_idx)
void msg_from_context_pattern(InputIterator1 msg_size, InputIterator1 last_msg_size, OutputIterator1 from_proc, OutputIterator2 from_ptr, OutputIterator3 proc2from_proc)
void msg_to_context(InputIterator1 perm, InputIterator1 last_perm, InputRandomIterator2 r_iproc, InputRandomIterator3 r_size, InputRandomIterator4 r_idx, Size receive_max_size, Size istart, OutputIterator1 to_proc, OutputIterator2 to_ptr, OutputIterator3 to_idx)
void sort_with_permutation(RandomIterator v, SizeRandomIterator p, Size n)
void msg_local_context(InputIterator1 idx, InputIterator1 last_idx, InputIterator2 idy, Size idy_maxval, Size istart, Size ilast, OutputIterator1 to_loc_idx, OutputIterator1 last_to_loc_idx, OutputIterator2 from_loc_idy)