Tpetra parallel linear algebra  Version of the Day
Tpetra_Distributor.cpp
1 // ***********************************************************************
2 //
3 // Tpetra: Templated Linear Algebra Services Package
4 // Copyright (2008) Sandia Corporation
5 //
6 // Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
7 // the U.S. Government retains certain rights in this software.
8 //
9 // Redistribution and use in source and binary forms, with or without
10 // modification, are permitted provided that the following conditions are
11 // met:
12 //
13 // 1. Redistributions of source code must retain the above copyright
14 // notice, this list of conditions and the following disclaimer.
15 //
16 // 2. Redistributions in binary form must reproduce the above copyright
17 // notice, this list of conditions and the following disclaimer in the
18 // documentation and/or other materials provided with the distribution.
19 //
20 // 3. Neither the name of the Corporation nor the names of the
21 // contributors may be used to endorse or promote products derived from
22 // this software without specific prior written permission.
23 //
24 // THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY
25 // EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
27 // PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE
28 // CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
29 // EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
30 // PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
31 // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
32 // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
33 // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
34 // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35 //
36 // ************************************************************************
37 // @HEADER
38 
39 #include "Tpetra_Distributor.hpp"
42 #include "Tpetra_Util.hpp"
43 #include "Tpetra_Details_makeValidVerboseStream.hpp"
44 #include "Teuchos_StandardParameterEntryValidators.hpp"
45 #include "Teuchos_VerboseObjectParameterListHelpers.hpp"
46 #include <numeric>
47 
48 namespace Tpetra {
49  namespace Details {
50  std::string
52  {
53  if (sendType == DISTRIBUTOR_ISEND) {
54  return "Isend";
55  }
56  else if (sendType == DISTRIBUTOR_RSEND) {
57  return "Rsend";
58  }
59  else if (sendType == DISTRIBUTOR_SEND) {
60  return "Send";
61  }
62  else if (sendType == DISTRIBUTOR_SSEND) {
63  return "Ssend";
64  }
65  else {
66  TEUCHOS_TEST_FOR_EXCEPTION(true, std::invalid_argument, "Invalid "
67  "EDistributorSendType enum value " << sendType << ".");
68  }
69  }
70 
71  std::string
73  {
74  switch (how) {
75  case Details::DISTRIBUTOR_NOT_INITIALIZED:
76  return "Not initialized yet";
77  case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS:
78  return "By createFromSends";
79  case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_RECVS:
80  return "By createFromRecvs";
81  case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS_N_RECVS:
82  return "By createFromSendsAndRecvs";
83  case Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE:
84  return "By createReverseDistributor";
85  case Details::DISTRIBUTOR_INITIALIZED_BY_COPY:
86  return "By copy constructor";
87  default:
88  return "INVALID";
89  }
90  }
91  } // namespace Details
92 
93  Teuchos::Array<std::string>
95  {
96  Teuchos::Array<std::string> sendTypes;
97  sendTypes.push_back ("Isend");
98  sendTypes.push_back ("Rsend");
99  sendTypes.push_back ("Send");
100  sendTypes.push_back ("Ssend");
101  return sendTypes;
102  }
103 
104  // We set default values of Distributor's Boolean parameters here,
105  // in this one place. That way, if we want to change the default
106  // value of a parameter, we don't have to search the whole file to
107  // ensure a consistent setting.
108  namespace {
109  // Default value of the "Debug" parameter.
110  const bool tpetraDistributorDebugDefault = false;
111  // Default value of the "Barrier between receives and sends" parameter.
112  const bool barrierBetween_default = false;
113  // Default value of the "Use distinct tags" parameter.
114  const bool useDistinctTags_default = true;
115  } // namespace (anonymous)
116 
117  int Distributor::getTag (const int pathTag) const {
118  return useDistinctTags_ ? pathTag : comm_->getTag ();
119  }
120 
121 
122 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
123  void Distributor::makeTimers () {
124  timer_doWaits_ = Teuchos::TimeMonitor::getNewTimer (
125  "Tpetra::Distributor: doWaits");
126 
127  timer_doPosts3TA_ = Teuchos::TimeMonitor::getNewTimer (
128  "Tpetra::Distributor: doPosts(3) TA");
129  timer_doPosts4TA_ = Teuchos::TimeMonitor::getNewTimer (
130  "Tpetra::Distributor: doPosts(4) TA");
131 
132  timer_doPosts3TA_recvs_ = Teuchos::TimeMonitor::getNewTimer (
133  "Tpetra::Distributor: doPosts(3): recvs TA");
134  timer_doPosts4TA_recvs_ = Teuchos::TimeMonitor::getNewTimer (
135  "Tpetra::Distributor: doPosts(4): recvs TA");
136 
137  timer_doPosts3TA_barrier_ = Teuchos::TimeMonitor::getNewTimer (
138  "Tpetra::Distributor: doPosts(3): barrier TA");
139  timer_doPosts4TA_barrier_ = Teuchos::TimeMonitor::getNewTimer (
140  "Tpetra::Distributor: doPosts(4): barrier TA");
141 
142  timer_doPosts3TA_sends_ = Teuchos::TimeMonitor::getNewTimer (
143  "Tpetra::Distributor: doPosts(3): sends TA");
144  timer_doPosts4TA_sends_ = Teuchos::TimeMonitor::getNewTimer (
145  "Tpetra::Distributor: doPosts(4): sends TA");
146  timer_doPosts3TA_sends_slow_ = Teuchos::TimeMonitor::getNewTimer (
147  "Tpetra::Distributor: doPosts(3): sends TA SLOW");
148  timer_doPosts4TA_sends_slow_ = Teuchos::TimeMonitor::getNewTimer (
149  "Tpetra::Distributor: doPosts(4): sends TA SLOW");
150  timer_doPosts3TA_sends_fast_ = Teuchos::TimeMonitor::getNewTimer (
151  "Tpetra::Distributor: doPosts(3): sends TA FAST");
152  timer_doPosts4TA_sends_fast_ = Teuchos::TimeMonitor::getNewTimer (
153  "Tpetra::Distributor: doPosts(4): sends TA FAST");
154 
155  timer_doPosts3KV_ = Teuchos::TimeMonitor::getNewTimer (
156  "Tpetra::Distributor: doPosts(3) KV");
157  timer_doPosts4KV_ = Teuchos::TimeMonitor::getNewTimer (
158  "Tpetra::Distributor: doPosts(4) KV");
159 
160  timer_doPosts3KV_recvs_ = Teuchos::TimeMonitor::getNewTimer (
161  "Tpetra::Distributor: doPosts(3): recvs KV");
162  timer_doPosts4KV_recvs_ = Teuchos::TimeMonitor::getNewTimer (
163  "Tpetra::Distributor: doPosts(4): recvs KV");
164 
165  timer_doPosts3KV_barrier_ = Teuchos::TimeMonitor::getNewTimer (
166  "Tpetra::Distributor: doPosts(3): barrier KV");
167  timer_doPosts4KV_barrier_ = Teuchos::TimeMonitor::getNewTimer (
168  "Tpetra::Distributor: doPosts(4): barrier KV");
169 
170  timer_doPosts3KV_sends_ = Teuchos::TimeMonitor::getNewTimer (
171  "Tpetra::Distributor: doPosts(3): sends KV");
172  timer_doPosts4KV_sends_ = Teuchos::TimeMonitor::getNewTimer (
173  "Tpetra::Distributor: doPosts(4): sends KV");
174  timer_doPosts3KV_sends_slow_ = Teuchos::TimeMonitor::getNewTimer (
175  "Tpetra::Distributor: doPosts(3): sends KV SLOW");
176  timer_doPosts4KV_sends_slow_ = Teuchos::TimeMonitor::getNewTimer (
177  "Tpetra::Distributor: doPosts(4): sends KV SLOW");
178  timer_doPosts3KV_sends_fast_ = Teuchos::TimeMonitor::getNewTimer (
179  "Tpetra::Distributor: doPosts(3): sends KV FAST");
180  timer_doPosts4KV_sends_fast_ = Teuchos::TimeMonitor::getNewTimer (
181  "Tpetra::Distributor: doPosts(4): sends KV FAST");
182  }
183 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
184 
186  Distributor (const Teuchos::RCP<const Teuchos::Comm<int> >& comm,
187  const Teuchos::RCP<Teuchos::FancyOStream>& /* out */,
188  const Teuchos::RCP<Teuchos::ParameterList>& plist)
189  : comm_ (comm)
190  , howInitialized_ (Details::DISTRIBUTOR_NOT_INITIALIZED)
191  , sendType_ (Details::DISTRIBUTOR_SEND)
192  , barrierBetween_ (barrierBetween_default)
193  , selfMessage_ (false)
194  , numSends_ (0)
195  , maxSendLength_ (0)
196  , numReceives_ (0)
197  , totalReceiveLength_ (0)
198  , lastRoundBytesSend_ (0)
199  , lastRoundBytesRecv_ (0)
200  , useDistinctTags_ (useDistinctTags_default)
201  {
202  this->setParameterList(plist);
203 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
204  makeTimers ();
205 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
206  }
207 
209  Distributor (const Teuchos::RCP<const Teuchos::Comm<int> >& comm)
210  : Distributor (comm, Teuchos::null, Teuchos::null)
211  {}
212 
214  Distributor (const Teuchos::RCP<const Teuchos::Comm<int> >& comm,
215  const Teuchos::RCP<Teuchos::FancyOStream>& out)
216  : Distributor (comm, out, Teuchos::null)
217  {}
218 
220  Distributor (const Teuchos::RCP<const Teuchos::Comm<int> >& comm,
221  const Teuchos::RCP<Teuchos::ParameterList>& plist)
222  : Distributor (comm, Teuchos::null, plist)
223  {}
224 
226  Distributor (const Distributor& distributor)
227  : comm_ (distributor.comm_)
228  , howInitialized_ (Details::DISTRIBUTOR_INITIALIZED_BY_COPY)
229  , sendType_ (distributor.sendType_)
230  , barrierBetween_ (distributor.barrierBetween_)
231  , verbose_ (distributor.verbose_)
232  , selfMessage_ (distributor.selfMessage_)
233  , numSends_ (distributor.numSends_)
234  , procsTo_ (distributor.procsTo_)
235  , startsTo_ (distributor.startsTo_)
236  , lengthsTo_ (distributor.lengthsTo_)
237  , maxSendLength_ (distributor.maxSendLength_)
238  , indicesTo_ (distributor.indicesTo_)
239  , numReceives_ (distributor.numReceives_)
240  , totalReceiveLength_ (distributor.totalReceiveLength_)
241  , lengthsFrom_ (distributor.lengthsFrom_)
242  , procsFrom_ (distributor.procsFrom_)
243  , startsFrom_ (distributor.startsFrom_)
244  , indicesFrom_ (distributor.indicesFrom_)
245  , reverseDistributor_ (distributor.reverseDistributor_)
246  , lastRoundBytesSend_ (distributor.lastRoundBytesSend_)
247  , lastRoundBytesRecv_ (distributor.lastRoundBytesRecv_)
248  , useDistinctTags_ (distributor.useDistinctTags_)
249  {
250  using Teuchos::ParameterList;
251  using Teuchos::RCP;
252  using Teuchos::rcp;
253 
254  RCP<const ParameterList> rhsList = distributor.getParameterList ();
255  RCP<ParameterList> newList = rhsList.is_null () ? Teuchos::null :
256  Teuchos::parameterList (*rhsList);
257  this->setParameterList (newList);
258 
259 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
260  makeTimers ();
261 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
262  }
263 
265  using Teuchos::ParameterList;
266  using Teuchos::parameterList;
267  using Teuchos::RCP;
268 
269  std::swap (comm_, rhs.comm_);
270  std::swap (howInitialized_, rhs.howInitialized_);
271  std::swap (sendType_, rhs.sendType_);
272  std::swap (barrierBetween_, rhs.barrierBetween_);
273  std::swap (verbose_, rhs.verbose_);
274  std::swap (selfMessage_, rhs.selfMessage_);
275  std::swap (numSends_, rhs.numSends_);
276  std::swap (procsTo_, rhs.procsTo_);
277  std::swap (startsTo_, rhs.startsTo_);
278  std::swap (lengthsTo_, rhs.lengthsTo_);
279  std::swap (maxSendLength_, rhs.maxSendLength_);
280  std::swap (indicesTo_, rhs.indicesTo_);
281  std::swap (numReceives_, rhs.numReceives_);
282  std::swap (totalReceiveLength_, rhs.totalReceiveLength_);
283  std::swap (lengthsFrom_, rhs.lengthsFrom_);
284  std::swap (procsFrom_, rhs.procsFrom_);
285  std::swap (startsFrom_, rhs.startsFrom_);
286  std::swap (indicesFrom_, rhs.indicesFrom_);
287  std::swap (reverseDistributor_, rhs.reverseDistributor_);
288  std::swap (lastRoundBytesSend_, rhs.lastRoundBytesSend_);
289  std::swap (lastRoundBytesRecv_, rhs.lastRoundBytesRecv_);
290  std::swap (useDistinctTags_, rhs.useDistinctTags_);
291 
292  // Swap parameter lists. If they are the same object, make a deep
293  // copy first, so that modifying one won't modify the other one.
294  RCP<ParameterList> lhsList = this->getNonconstParameterList ();
295  RCP<ParameterList> rhsList = rhs.getNonconstParameterList ();
296  if (lhsList.getRawPtr () == rhsList.getRawPtr () && ! rhsList.is_null ()) {
297  rhsList = parameterList (*rhsList);
298  }
299  if (! rhsList.is_null ()) {
300  this->setMyParamList (rhsList);
301  }
302  if (! lhsList.is_null ()) {
303  rhs.setMyParamList (lhsList);
304  }
305 
306  // We don't need to swap timers, because all instances of
307  // Distributor use the same timers.
308  }
309 
310  bool
311  Distributor::getVerbose()
312  {
313  return Details::Behavior::verbose("Distributor") ||
314  Details::Behavior::verbose("Tpetra::Distributor");
315  }
316 
317  std::unique_ptr<std::string>
318  Distributor::
319  createPrefix(const char methodName[]) const
320  {
321  return Details::createPrefix(
322  comm_.getRawPtr(), "Distributor", methodName);
323  }
324 
325  void
327  setParameterList (const Teuchos::RCP<Teuchos::ParameterList>& plist)
328  {
329  using ::Tpetra::Details::Behavior;
330  using Teuchos::FancyOStream;
331  using Teuchos::getIntegralValue;
332  using Teuchos::includesVerbLevel;
333  using Teuchos::ParameterList;
334  using Teuchos::parameterList;
335  using Teuchos::RCP;
336  using std::endl;
337 
338  if (! plist.is_null()) {
339  RCP<const ParameterList> validParams = getValidParameters ();
340  plist->validateParametersAndSetDefaults (*validParams);
341 
342  const bool barrierBetween =
343  plist->get<bool> ("Barrier between receives and sends");
344  const Details::EDistributorSendType sendType =
345  getIntegralValue<Details::EDistributorSendType> (*plist, "Send type");
346  const bool useDistinctTags = plist->get<bool> ("Use distinct tags");
347  {
348  // mfh 03 May 2016: We keep this option only for backwards
349  // compatibility, but it must always be true. See discussion of
350  // Github Issue #227.
351  const bool enable_cuda_rdma =
352  plist->get<bool> ("Enable MPI CUDA RDMA support");
353  TEUCHOS_TEST_FOR_EXCEPTION
354  (! enable_cuda_rdma, std::invalid_argument, "Tpetra::Distributor::"
355  "setParameterList: " << "You specified \"Enable MPI CUDA RDMA "
356  "support\" = false. This is no longer valid. You don't need to "
357  "specify this option any more; Tpetra assumes it is always true. "
358  "This is a very light assumption on the MPI implementation, and in "
359  "fact does not actually involve hardware or system RDMA support. "
360  "Tpetra just assumes that the MPI implementation can tell whether a "
361  "pointer points to host memory or CUDA device memory.");
362  }
363 
364  // We check this property explicitly, since we haven't yet learned
365  // how to make a validator that can cross-check properties.
366  // Later, turn this into a validator so that it can be embedded in
367  // the valid ParameterList and used in Optika.
368  TEUCHOS_TEST_FOR_EXCEPTION
369  (! barrierBetween && sendType == Details::DISTRIBUTOR_RSEND,
370  std::invalid_argument, "Tpetra::Distributor::setParameterList: " << endl
371  << "You specified \"Send type\"=\"Rsend\", but turned off the barrier "
372  "between receives and sends." << endl << "This is invalid; you must "
373  "include the barrier if you use ready sends." << endl << "Ready sends "
374  "require that their corresponding receives have already been posted, "
375  "and the only way to guarantee that in general is with a barrier.");
376 
377  // Now that we've validated the input list, save the results.
378  sendType_ = sendType;
379  barrierBetween_ = barrierBetween;
380  useDistinctTags_ = useDistinctTags;
381 
382  // ParameterListAcceptor semantics require pointer identity of the
383  // sublist passed to setParameterList(), so we save the pointer.
384  this->setMyParamList (plist);
385  }
386  }
387 
388  Teuchos::RCP<const Teuchos::ParameterList>
390  {
391  using Teuchos::Array;
392  using Teuchos::ParameterList;
393  using Teuchos::parameterList;
394  using Teuchos::RCP;
395  using Teuchos::setStringToIntegralParameter;
396 
397  const bool barrierBetween = barrierBetween_default;
398  const bool useDistinctTags = useDistinctTags_default;
399  const bool debug = tpetraDistributorDebugDefault;
400 
401  Array<std::string> sendTypes = distributorSendTypes ();
402  const std::string defaultSendType ("Send");
403  Array<Details::EDistributorSendType> sendTypeEnums;
404  sendTypeEnums.push_back (Details::DISTRIBUTOR_ISEND);
405  sendTypeEnums.push_back (Details::DISTRIBUTOR_RSEND);
406  sendTypeEnums.push_back (Details::DISTRIBUTOR_SEND);
407  sendTypeEnums.push_back (Details::DISTRIBUTOR_SSEND);
408 
409  RCP<ParameterList> plist = parameterList ("Tpetra::Distributor");
410  plist->set ("Barrier between receives and sends", barrierBetween,
411  "Whether to execute a barrier between receives and sends in do"
412  "[Reverse]Posts(). Required for correctness when \"Send type\""
413  "=\"Rsend\", otherwise correct but not recommended.");
414  setStringToIntegralParameter<Details::EDistributorSendType> ("Send type",
415  defaultSendType, "When using MPI, the variant of send to use in "
416  "do[Reverse]Posts()", sendTypes(), sendTypeEnums(), plist.getRawPtr());
417  plist->set ("Use distinct tags", useDistinctTags, "Whether to use distinct "
418  "MPI message tags for different code paths. Highly recommended"
419  " to avoid message collisions.");
420  plist->set ("Debug", debug, "Whether to print copious debugging output on "
421  "all processes.");
422  plist->set ("Timer Label","","Label for Time Monitor output");
423  plist->set ("Enable MPI CUDA RDMA support", true, "Assume that MPI can "
424  "tell whether a pointer points to host memory or CUDA device "
425  "memory. You don't need to specify this option any more; "
426  "Tpetra assumes it is always true. This is a very light "
427  "assumption on the MPI implementation, and in fact does not "
428  "actually involve hardware or system RDMA support.");
429 
430  // mfh 24 Dec 2015: Tpetra no longer inherits from
431  // Teuchos::VerboseObject, so it doesn't need the "VerboseObject"
432  // sublist. However, we retain the "VerboseObject" sublist
433  // anyway, for backwards compatibility (otherwise the above
434  // validation would fail with an invalid parameter name, should
435  // the user still want to provide this list).
436  Teuchos::setupVerboseObjectSublist (&*plist);
437  return Teuchos::rcp_const_cast<const ParameterList> (plist);
438  }
439 
440 
442  { return totalReceiveLength_; }
443 
445  { return numReceives_; }
446 
448  { return selfMessage_; }
449 
451  { return numSends_; }
452 
454  { return maxSendLength_; }
455 
456  Teuchos::ArrayView<const int> Distributor::getProcsFrom() const
457  { return procsFrom_; }
458 
459  Teuchos::ArrayView<const size_t> Distributor::getLengthsFrom() const
460  { return lengthsFrom_; }
461 
462  Teuchos::ArrayView<const int> Distributor::getProcsTo() const
463  { return procsTo_; }
464 
465  Teuchos::ArrayView<const size_t> Distributor::getLengthsTo() const
466  { return lengthsTo_; }
467 
468  Teuchos::RCP<Distributor>
469  Distributor::getReverse(bool create) const {
470  if (reverseDistributor_.is_null () && create) {
471  createReverseDistributor ();
472  }
473  TEUCHOS_TEST_FOR_EXCEPTION
474  (reverseDistributor_.is_null () && create, std::logic_error, "The reverse "
475  "Distributor is null after createReverseDistributor returned. "
476  "Please report this bug to the Tpetra developers.");
477  return reverseDistributor_;
478  }
479 
480 
481  void
482  Distributor::createReverseDistributor() const
483  {
484  reverseDistributor_ = Teuchos::rcp(new Distributor(comm_));
485  reverseDistributor_->howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE;
486  reverseDistributor_->sendType_ = sendType_;
487  reverseDistributor_->barrierBetween_ = barrierBetween_;
488  reverseDistributor_->verbose_ = verbose_;
489 
490  // The total length of all the sends of this Distributor. We
491  // calculate it because it's the total length of all the receives
492  // of the reverse Distributor.
493  size_t totalSendLength =
494  std::accumulate (lengthsTo_.begin(), lengthsTo_.end(), 0);
495 
496  // The maximum length of any of the receives of this Distributor.
497  // We calculate it because it's the maximum length of any of the
498  // sends of the reverse Distributor.
499  size_t maxReceiveLength = 0;
500  const int myProcID = comm_->getRank();
501  for (size_t i=0; i < numReceives_; ++i) {
502  if (procsFrom_[i] != myProcID) {
503  // Don't count receives for messages sent by myself to myself.
504  if (lengthsFrom_[i] > maxReceiveLength) {
505  maxReceiveLength = lengthsFrom_[i];
506  }
507  }
508  }
509 
510  // Initialize all of reverseDistributor's data members. This
511  // mainly just involves flipping "send" and "receive," or the
512  // equivalent "to" and "from."
513 
514  reverseDistributor_->selfMessage_ = selfMessage_;
515  reverseDistributor_->numSends_ = numReceives_;
516  reverseDistributor_->procsTo_ = procsFrom_;
517  reverseDistributor_->startsTo_ = startsFrom_;
518  reverseDistributor_->lengthsTo_ = lengthsFrom_;
519  reverseDistributor_->maxSendLength_ = maxReceiveLength;
520  reverseDistributor_->indicesTo_ = indicesFrom_;
521  reverseDistributor_->numReceives_ = numSends_;
522  reverseDistributor_->totalReceiveLength_ = totalSendLength;
523  reverseDistributor_->lengthsFrom_ = lengthsTo_;
524  reverseDistributor_->procsFrom_ = procsTo_;
525  reverseDistributor_->startsFrom_ = startsTo_;
526  reverseDistributor_->indicesFrom_ = indicesTo_;
527 
528  // requests_: Allocated on demand.
529  // reverseDistributor_: See note below
530 
531  // mfh 31 Mar 2016: These are statistics, kept on calls to
532  // doPostsAndWaits or doReversePostsAndWaits. They weren't here
533  // when I started, and I didn't add them, so I don't know if they
534  // are accurate.
535  reverseDistributor_->lastRoundBytesSend_ = 0;
536  reverseDistributor_->lastRoundBytesRecv_ = 0;
537 
538  reverseDistributor_->useDistinctTags_ = useDistinctTags_;
539 
540  // I am my reverse Distributor's reverse Distributor.
541  // Thus, it would be legit to do the following:
542  //
543  // reverseDistributor_->reverseDistributor_ = Teuchos::rcp (this, false);
544  //
545  // (Note use of a "weak reference" to avoid a circular RCP
546  // dependency.) The only issue is that if users hold on to the
547  // reverse Distributor but let go of the forward one, this
548  // reference won't be valid anymore. However, the reverse
549  // Distributor is really an implementation detail of Distributor
550  // and not meant to be used directly, so we don't need to do this.
551  reverseDistributor_->reverseDistributor_ = Teuchos::null;
552  }
553 
554  void
556  {
557  using Teuchos::Array;
558  using Teuchos::CommRequest;
559  using Teuchos::FancyOStream;
560  using Teuchos::includesVerbLevel;
561  using Teuchos::is_null;
562  using Teuchos::RCP;
563  using Teuchos::waitAll;
564  using std::endl;
565 
566 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
567  Teuchos::TimeMonitor timeMon (*timer_doWaits_);
568 #endif // HAVE_TPETRA_DISTRIBUTOR_TIMINGS
569 
570  const bool debug = Details::Behavior::debug("Distributor");
571 
572  std::unique_ptr<std::string> prefix;
573  if (verbose_) {
574  prefix = createPrefix("doWaits");
575  std::ostringstream os;
576  os << *prefix << "Start: requests_.size(): "
577  << requests_.size() << endl;
578  std::cerr << os.str();
579  }
580 
581  if (requests_.size() > 0) {
582  waitAll(*comm_, requests_());
583 
584  if (debug) {
585  // Make sure that waitAll() nulled out all the requests.
586  for (auto it = requests_.begin(); it != requests_.end(); ++it) {
587  TEUCHOS_TEST_FOR_EXCEPTION
588  (! is_null(*it), std::runtime_error,
589  "Tpetra::Distributor::doWaits: Communication requests "
590  "should all be null aftr calling Teuchos::waitAll on "
591  "them, but at least one request is not null.");
592  }
593  }
594  // Restore the invariant that requests_.size() is the number of
595  // outstanding nonblocking communication requests.
596  requests_.resize (0);
597  }
598 
599  if (debug) {
600  const int localSizeNonzero = (requests_.size () != 0) ? 1 : 0;
601  int globalSizeNonzero = 0;
602  Teuchos::reduceAll<int, int> (*comm_, Teuchos::REDUCE_MAX,
603  localSizeNonzero,
604  Teuchos::outArg (globalSizeNonzero));
605  TEUCHOS_TEST_FOR_EXCEPTION(
606  globalSizeNonzero != 0, std::runtime_error,
607  "Tpetra::Distributor::doWaits: After waitAll, at least one process has "
608  "a nonzero number of outstanding posts. There should be none at this "
609  "point. Please report this bug to the Tpetra developers.");
610  }
611 
612  if (verbose_) {
613  std::ostringstream os;
614  os << *prefix << "Done" << endl;
615  std::cerr << os.str();
616  }
617  }
618 
620  // call doWaits() on the reverse Distributor, if it exists
621  if (! reverseDistributor_.is_null()) {
622  reverseDistributor_->doWaits();
623  }
624  }
625 
626  std::string Distributor::description () const {
627  std::ostringstream out;
628 
629  out << "\"Tpetra::Distributor\": {";
630  const std::string label = this->getObjectLabel ();
631  if (label != "") {
632  out << "Label: " << label << ", ";
633  }
634  out << "How initialized: "
636  << ", Parameters: {"
637  << "Send type: "
638  << DistributorSendTypeEnumToString (sendType_)
639  << ", Barrier between receives and sends: "
640  << (barrierBetween_ ? "true" : "false")
641  << ", Use distinct tags: "
642  << (useDistinctTags_ ? "true" : "false")
643  << ", Debug: " << (verbose_ ? "true" : "false")
644  << "}}";
645  return out.str ();
646  }
647 
648  std::string
649  Distributor::
650  localDescribeToString (const Teuchos::EVerbosityLevel vl) const
651  {
652  using Teuchos::toString;
653  using Teuchos::VERB_HIGH;
654  using Teuchos::VERB_EXTREME;
655  using std::endl;
656 
657  // This preserves current behavior of Distributor.
658  if (vl <= Teuchos::VERB_LOW || comm_.is_null ()) {
659  return std::string ();
660  }
661 
662  auto outStringP = Teuchos::rcp (new std::ostringstream ());
663  auto outp = Teuchos::getFancyOStream (outStringP); // returns RCP
664  Teuchos::FancyOStream& out = *outp;
665 
666  const int myRank = comm_->getRank ();
667  const int numProcs = comm_->getSize ();
668  out << "Process " << myRank << " of " << numProcs << ":" << endl;
669  Teuchos::OSTab tab1 (out);
670 
671  out << "selfMessage: " << hasSelfMessage () << endl;
672  out << "numSends: " << getNumSends () << endl;
673  if (vl == VERB_HIGH || vl == VERB_EXTREME) {
674  out << "procsTo: " << toString (procsTo_) << endl;
675  out << "lengthsTo: " << toString (lengthsTo_) << endl;
676  out << "maxSendLength: " << getMaxSendLength () << endl;
677  }
678  if (vl == VERB_EXTREME) {
679  out << "startsTo: " << toString (startsTo_) << endl;
680  out << "indicesTo: " << toString (indicesTo_) << endl;
681  }
682  if (vl == VERB_HIGH || vl == VERB_EXTREME) {
683  out << "numReceives: " << getNumReceives () << endl;
684  out << "totalReceiveLength: " << getTotalReceiveLength () << endl;
685  out << "lengthsFrom: " << toString (lengthsFrom_) << endl;
686  out << "startsFrom: " << toString (startsFrom_) << endl;
687  out << "procsFrom: " << toString (procsFrom_) << endl;
688  }
689 
690  out.flush (); // make sure the ostringstream got everything
691  return outStringP->str ();
692  }
693 
694  void
696  describe (Teuchos::FancyOStream& out,
697  const Teuchos::EVerbosityLevel verbLevel) const
698  {
699  using std::endl;
700  using Teuchos::VERB_DEFAULT;
701  using Teuchos::VERB_NONE;
702  using Teuchos::VERB_LOW;
703  using Teuchos::VERB_MEDIUM;
704  using Teuchos::VERB_HIGH;
705  using Teuchos::VERB_EXTREME;
706  const Teuchos::EVerbosityLevel vl =
707  (verbLevel == VERB_DEFAULT) ? VERB_LOW : verbLevel;
708 
709  if (vl == VERB_NONE) {
710  return; // don't print anything
711  }
712  // If this Distributor's Comm is null, then the the calling
713  // process does not participate in Distributor-related collective
714  // operations with the other processes. In that case, it is not
715  // even legal to call this method. The reasonable thing to do in
716  // that case is nothing.
717  if (comm_.is_null ()) {
718  return;
719  }
720  const int myRank = comm_->getRank ();
721  const int numProcs = comm_->getSize ();
722 
723  // Only Process 0 should touch the output stream, but this method
724  // in general may need to do communication. Thus, we may need to
725  // preserve the current tab level across multiple "if (myRank ==
726  // 0) { ... }" inner scopes. This is why we sometimes create
727  // OSTab instances by pointer, instead of by value. We only need
728  // to create them by pointer if the tab level must persist through
729  // multiple inner scopes.
730  Teuchos::RCP<Teuchos::OSTab> tab0, tab1;
731 
732  if (myRank == 0) {
733  // At every verbosity level but VERB_NONE, Process 0 prints.
734  // By convention, describe() always begins with a tab before
735  // printing.
736  tab0 = Teuchos::rcp (new Teuchos::OSTab (out));
737  // We quote the class name because it contains colons.
738  // This makes the output valid YAML.
739  out << "\"Tpetra::Distributor\":" << endl;
740  tab1 = Teuchos::rcp (new Teuchos::OSTab (out));
741 
742  const std::string label = this->getObjectLabel ();
743  if (label != "") {
744  out << "Label: " << label << endl;
745  }
746  out << "Number of processes: " << numProcs << endl
747  << "How initialized: "
749  << endl;
750  {
751  out << "Parameters: " << endl;
752  Teuchos::OSTab tab2 (out);
753  out << "\"Send type\": "
754  << DistributorSendTypeEnumToString (sendType_) << endl
755  << "\"Barrier between receives and sends\": "
756  << (barrierBetween_ ? "true" : "false") << endl
757  << "\"Use distinct tags\": "
758  << (useDistinctTags_ ? "true" : "false") << endl
759  << "\"Debug\": " << (verbose_ ? "true" : "false") << endl;
760  }
761  } // if myRank == 0
762 
763  // This is collective over the Map's communicator.
764  if (vl > VERB_LOW) {
765  const std::string lclStr = this->localDescribeToString (vl);
766  Tpetra::Details::gathervPrint (out, lclStr, *comm_);
767  }
768 
769  out << "Reverse Distributor:";
770  if (reverseDistributor_.is_null ()) {
771  out << " null" << endl;
772  }
773  else {
774  out << endl;
775  reverseDistributor_->describe (out, vl);
776  }
777  }
778 
779  void
780  Distributor::
781  computeReceives ()
782  {
783  using Teuchos::Array;
784  using Teuchos::ArrayRCP;
785  using Teuchos::as;
786  using Teuchos::CommStatus;
787  using Teuchos::CommRequest;
788  using Teuchos::ireceive;
789  using Teuchos::RCP;
790  using Teuchos::rcp;
791  using Teuchos::REDUCE_SUM;
792  using Teuchos::receive;
793  using Teuchos::reduce;
794  using Teuchos::scatter;
795  using Teuchos::send;
796  using Teuchos::waitAll;
797  using std::endl;
798 
799  const int myRank = comm_->getRank();
800  const int numProcs = comm_->getSize();
801 
802  // MPI tag for nonblocking receives and blocking sends in this method.
803  const int pathTag = 2;
804  const int tag = this->getTag (pathTag);
805 
806  std::unique_ptr<std::string> prefix;
807  if (verbose_) {
808  prefix = createPrefix("computeReceives");
809  std::ostringstream os;
810  os << *prefix
811  << "selfMessage_: " << (selfMessage_ ? "true" : "false")
812  << ", pathTag: " << pathTag << ", tag: " << tag << endl;
813  std::cerr << os.str();
814  }
815 
816  // toProcsFromMe[i] == the number of messages sent by this process
817  // to process i. The data in numSends_, procsTo_, and lengthsTo_
818  // concern the contiguous sends. Therefore, each process will be
819  // listed in procsTo_ at most once, and so toProcsFromMe[i] will
820  // either be 0 or 1.
821  {
822  Array<int> toProcsFromMe (numProcs, 0);
823 #ifdef HAVE_TEUCHOS_DEBUG
824  bool counting_error = false;
825 #endif // HAVE_TEUCHOS_DEBUG
826  for (size_t i = 0; i < (numSends_ + (selfMessage_ ? 1 : 0)); ++i) {
827 #ifdef HAVE_TEUCHOS_DEBUG
828  if (toProcsFromMe[procsTo_[i]] != 0) {
829  counting_error = true;
830  }
831 #endif // HAVE_TEUCHOS_DEBUG
832  toProcsFromMe[procsTo_[i]] = 1;
833  }
834 #ifdef HAVE_TEUCHOS_DEBUG
835  SHARED_TEST_FOR_EXCEPTION(counting_error, std::logic_error,
836  "Tpetra::Distributor::computeReceives: There was an error on at least "
837  "one process in counting the number of messages send by that process to "
838  "the other processs. Please report this bug to the Tpetra developers.",
839  *comm_);
840 #endif // HAVE_TEUCHOS_DEBUG
841 
842  if (verbose_) {
843  std::ostringstream os;
844  os << *prefix << "Reduce & scatter" << endl;
845  std::cerr << os.str();
846  }
847 
848  // Compute the number of receives that this process needs to
849  // post. The number of receives includes any self sends (i.e.,
850  // messages sent by this process to itself).
851  //
852  // (We will use numReceives_ this below to post exactly that
853  // number of receives, with MPI_ANY_SOURCE as the sending rank.
854  // This will tell us from which processes this process expects
855  // to receive, and how many packets of data we expect to receive
856  // from each process.)
857  //
858  // toProcsFromMe[i] is the number of messages sent by this
859  // process to process i. Compute the sum (elementwise) of all
860  // the toProcsFromMe arrays on all processes in the
861  // communicator. If the array x is that sum, then if this
862  // process has rank j, x[j] is the number of messages sent
863  // to process j, that is, the number of receives on process j
864  // (including any messages sent by process j to itself).
865  //
866  // Yes, this requires storing and operating on an array of
867  // length P, where P is the number of processes in the
868  // communicator. Epetra does this too. Avoiding this O(P)
869  // memory bottleneck would require some research.
870  //
871  // mfh 09 Jan 2012, 15 Jul 2015: There are three ways to
872  // implement this O(P) memory algorithm.
873  //
874  // 1. Use MPI_Reduce and MPI_Scatter: reduce on the root
875  // process (0) from toProcsFromMe, to numRecvsOnEachProc.
876  // Then, scatter the latter, so that each process p gets
877  // numRecvsOnEachProc[p].
878  //
879  // 2. Like #1, but use MPI_Reduce_scatter instead of
880  // MPI_Reduce and MPI_Scatter. MPI_Reduce_scatter might be
881  // optimized to reduce the number of messages, but
882  // MPI_Reduce_scatter is more general than we need (it
883  // allows the equivalent of MPI_Scatterv). See Bug 6336.
884  //
885  // 3. Do an all-reduce on toProcsFromMe, and let my process
886  // (with rank myRank) get numReceives_ from
887  // toProcsFromMe[myRank]. The HPCCG miniapp uses the
888  // all-reduce method.
889  //
890  // Approaches 1 and 3 have the same critical path length.
891  // However, #3 moves more data. This is because the final
892  // result is just one integer, but #3 moves a whole array of
893  // results to all the processes. This is why we use Approach 1
894  // here.
895  //
896  // mfh 12 Apr 2013: See discussion in createFromSends() about
897  // how we could use this communication to propagate an error
898  // flag for "free" in a release build.
899 
900  const int root = 0; // rank of root process of the reduction
901  Array<int> numRecvsOnEachProc; // temp; only needed on root
902  if (myRank == root) {
903  numRecvsOnEachProc.resize (numProcs);
904  }
905  int numReceivesAsInt = 0; // output
906  reduce<int, int> (toProcsFromMe.getRawPtr (),
907  numRecvsOnEachProc.getRawPtr (),
908  numProcs, REDUCE_SUM, root, *comm_);
909  scatter<int, int> (numRecvsOnEachProc.getRawPtr (), 1,
910  &numReceivesAsInt, 1, root, *comm_);
911  numReceives_ = static_cast<size_t> (numReceivesAsInt);
912  }
913 
914  // Now we know numReceives_, which is this process' number of
915  // receives. Allocate the lengthsFrom_ and procsFrom_ arrays
916  // with this number of entries.
917  lengthsFrom_.assign (numReceives_, 0);
918  procsFrom_.assign (numReceives_, 0);
919 
920  //
921  // Ask (via nonblocking receive) each process from which we are
922  // receiving how many packets we should expect from it in the
923  // communication pattern.
924  //
925 
926  // At this point, numReceives_ includes any self message that
927  // there may be. At the end of this routine, we'll subtract off
928  // the self message (if there is one) from numReceives_. In this
929  // routine, we don't need to receive a message from ourselves in
930  // order to figure out our lengthsFrom_ and source process ID; we
931  // can just ask ourselves directly. Thus, the actual number of
932  // nonblocking receives we post here does not include the self
933  // message.
934  const size_t actualNumReceives = numReceives_ - (selfMessage_ ? 1 : 0);
935 
936  // Teuchos' wrapper for nonblocking receives requires receive
937  // buffers that it knows won't go away. This is why we use RCPs,
938  // one RCP per nonblocking receive request. They get allocated in
939  // the loop below.
940  Array<RCP<CommRequest<int> > > requests (actualNumReceives);
941  Array<ArrayRCP<size_t> > lengthsFromBuffers (actualNumReceives);
942  Array<RCP<CommStatus<int> > > statuses (actualNumReceives);
943 
944  // Teuchos::Comm treats a negative process ID as MPI_ANY_SOURCE
945  // (receive data from any process).
946 #ifdef HAVE_MPI
947  const int anySourceProc = MPI_ANY_SOURCE;
948 #else
949  const int anySourceProc = -1;
950 #endif
951 
952  if (verbose_) {
953  std::ostringstream os;
954  os << *prefix << "Post " << actualNumReceives << " irecv"
955  << (actualNumReceives != size_t (1) ? "s" : "") << endl;
956  std::cerr << os.str();
957  }
958 
959  // Post the (nonblocking) receives.
960  for (size_t i = 0; i < actualNumReceives; ++i) {
961  // Once the receive completes, we can ask the corresponding
962  // CommStatus object (output by wait()) for the sending process'
963  // ID (which we'll assign to procsFrom_[i] -- don't forget to
964  // do that!).
965  lengthsFromBuffers[i].resize (1);
966  lengthsFromBuffers[i][0] = as<size_t> (0);
967  requests[i] = ireceive<int, size_t> (lengthsFromBuffers[i], anySourceProc,
968  tag, *comm_);
969  if (verbose_) {
970  std::ostringstream os;
971  os << *prefix << "Posted any-proc irecv w/ tag " << tag << endl;
972  std::cerr << os.str();
973  }
974  }
975 
976  if (verbose_) {
977  std::ostringstream os;
978  os << *prefix << "Post " << numSends_ << " send"
979  << (numSends_ != size_t (1) ? "s" : "") << endl;
980  std::cerr << os.str();
981  }
982  // Post the sends: Tell each process to which we are sending how
983  // many packets it should expect from us in the communication
984  // pattern. We could use nonblocking sends here, as long as we do
985  // a waitAll() on all the sends and receives at once.
986  //
987  // We assume that numSends_ and selfMessage_ have already been
988  // set. The value of numSends_ (my process' number of sends) does
989  // not include any message that it might send to itself.
990  for (size_t i = 0; i < numSends_ + (selfMessage_ ? 1 : 0); ++i) {
991  if (procsTo_[i] != myRank) {
992  // Send a message to procsTo_[i], telling that process that
993  // this communication pattern will send that process
994  // lengthsTo_[i] blocks of packets.
995  const size_t* const lengthsTo_i = &lengthsTo_[i];
996  send<int, size_t> (lengthsTo_i, 1, as<int> (procsTo_[i]), tag, *comm_);
997  if (verbose_) {
998  std::ostringstream os;
999  os << *prefix << "Posted send to Proc " << procsTo_[i] << " w/ tag "
1000  << tag << endl;
1001  std::cerr << os.str();
1002  }
1003  }
1004  else {
1005  // We don't need a send in the self-message case. If this
1006  // process will send a message to itself in the communication
1007  // pattern, then the last element of lengthsFrom_ and
1008  // procsFrom_ corresponds to the self-message. Of course
1009  // this process knows how long the message is, and the process
1010  // ID is its own process ID.
1011  lengthsFrom_[numReceives_-1] = lengthsTo_[i];
1012  procsFrom_[numReceives_-1] = myRank;
1013  }
1014  }
1015 
1016  if (verbose_) {
1017  std::ostringstream os;
1018  const size_t numReq = requests.size();
1019  os << *prefix << "waitAll on " << numReq << " request"
1020  << (numReq != size_t(1) ? "s" : "") << endl;
1021  std::cerr << os.str();
1022  }
1023  //
1024  // Wait on all the receives. When they arrive, check the status
1025  // output of wait() for the receiving process ID, unpack the
1026  // request buffers into lengthsFrom_, and set procsFrom_ from the
1027  // status.
1028  //
1029  waitAll (*comm_, requests (), statuses ());
1030  for (size_t i = 0; i < actualNumReceives; ++i) {
1031  lengthsFrom_[i] = *lengthsFromBuffers[i];
1032  procsFrom_[i] = statuses[i]->getSourceRank ();
1033  }
1034 
1035  // Sort the procsFrom_ array, and apply the same permutation to
1036  // lengthsFrom_. This ensures that procsFrom_[i] and
1037  // lengthsFrom_[i] refers to the same thing.
1038  sort2 (procsFrom_.begin(), procsFrom_.end(), lengthsFrom_.begin());
1039 
1040  // Compute indicesFrom_
1041  totalReceiveLength_ =
1042  std::accumulate (lengthsFrom_.begin (), lengthsFrom_.end (), 0);
1043  indicesFrom_.clear ();
1044  // NOTE (mfh 13 Feb 2019): Epetra_MpiDistributor deliberately does
1045  // _not_ fill indicesFrom_ (what it calls "indices_from_") like
1046  // this; it leaves indicesFrom_ empty. The comment there mentions
1047  // that not filling indicesFrom_ helps reverse mode correctness.
1048 #if 0
1049  indicesFrom_.reserve (totalReceiveLength_);
1050  for (size_t i = 0; i < totalReceiveLength_; ++i) {
1051  indicesFrom_.push_back(i);
1052  }
1053 #endif // 0
1054 
1055  startsFrom_.clear ();
1056  startsFrom_.reserve (numReceives_);
1057  for (size_t i = 0, j = 0; i < numReceives_; ++i) {
1058  startsFrom_.push_back(j);
1059  j += lengthsFrom_[i];
1060  }
1061 
1062  if (selfMessage_) {
1063  --numReceives_;
1064  }
1065 
1066  if (verbose_) {
1067  std::ostringstream os;
1068  os << *prefix << "Done" << endl;
1069  std::cerr << os.str();
1070  }
1071  }
1072 
1073  size_t
1075  createFromSends (const Teuchos::ArrayView<const int>& exportProcIDs)
1076  {
1077  using Teuchos::outArg;
1078  using Teuchos::REDUCE_MAX;
1079  using Teuchos::reduceAll;
1080  using std::endl;
1081  const char rawPrefix[] = "Tpetra::Distributor::createFromSends";
1082 
1083  const size_t numExports = exportProcIDs.size();
1084  const int myProcID = comm_->getRank();
1085  const int numProcs = comm_->getSize();
1086 
1087  const bool debug = Details::Behavior::debug("Distributor");
1088  const size_t maxNumToPrint = verbose_ ?
1090  std::unique_ptr<std::string> prefix;
1091  if (verbose_) {
1092  prefix = createPrefix("createFromSends");
1093  std::ostringstream os;
1094  os << *prefix << "Start: ";
1095  Details::verbosePrintArray(os, exportProcIDs, "exportPIDs",
1096  maxNumToPrint);
1097  os << endl;
1098  std::cerr << os.str();
1099  }
1100 
1101  // exportProcIDs tells us the communication pattern for this
1102  // distributor. It dictates the way that the export data will be
1103  // interpreted in doPosts(). We want to perform at most one
1104  // send per process in doPosts; this is for two reasons:
1105  // * minimize latency / overhead in the comm routines (nice)
1106  // * match the number of receives and sends between processes
1107  // (necessary)
1108  //
1109  // Teuchos::Comm requires that the data for a send are contiguous
1110  // in a send buffer. Therefore, if the data in the send buffer
1111  // for doPosts() are not contiguous, they will need to be copied
1112  // into a contiguous buffer. The user has specified this
1113  // noncontiguous pattern and we can't do anything about it.
1114  // However, if they do not provide an efficient pattern, we will
1115  // warn them if one of the following compile-time options has been
1116  // set:
1117  // * HAVE_TPETRA_THROW_EFFICIENCY_WARNINGS
1118  // * HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS
1119  //
1120  // If the data are contiguous, then we can post the sends in situ
1121  // (i.e., without needing to copy them into a send buffer).
1122  //
1123  // Determine contiguity. There are a number of ways to do this:
1124  // * If the export IDs are sorted, then all exports to a
1125  // particular proc must be contiguous. This is what Epetra does.
1126  // * If the export ID of the current export already has been
1127  // listed, then the previous listing should correspond to the
1128  // same export. This tests contiguity, but not sortedness.
1129  //
1130  // Both of these tests require O(n), where n is the number of
1131  // exports. However, the latter will positively identify a greater
1132  // portion of contiguous patterns. We use the latter method.
1133  //
1134  // Check to see if values are grouped by procs without gaps
1135  // If so, indices_to -> 0.
1136 
1137  // Set up data structures for quick traversal of arrays.
1138  // This contains the number of sends for each process ID.
1139  //
1140  // FIXME (mfh 20 Mar 2014) This is one of a few places in Tpetra
1141  // that create an array of length the number of processes in the
1142  // communicator (plus one). Given how this code uses this array,
1143  // it should be straightforward to replace it with a hash table or
1144  // some other more space-efficient data structure. In practice,
1145  // most of the entries of starts should be zero for a sufficiently
1146  // large process count, unless the communication pattern is dense.
1147  // Note that it's important to be able to iterate through keys (i
1148  // for which starts[i] is nonzero) in increasing order.
1149  Teuchos::Array<size_t> starts (numProcs + 1, 0);
1150 
1151  // numActive is the number of sends that are not Null
1152  size_t numActive = 0;
1153  int needSendBuff = 0; // Boolean
1154 
1155  int badID = -1; // only used in debug mode
1156  for (size_t i = 0; i < numExports; ++i) {
1157  const int exportID = exportProcIDs[i];
1158  if (exportID >= numProcs) {
1159  badID = myProcID;
1160  break;
1161  }
1162  else if (exportID >= 0) {
1163  // exportID is a valid process ID. Increment the number of
1164  // messages this process will send to that process.
1165  ++starts[exportID];
1166 
1167  // If we're sending more than one message to process exportID,
1168  // then it is possible that the data are not contiguous.
1169  // Check by seeing if the previous process ID in the list
1170  // (exportProcIDs[i-1]) is the same. It's safe to use i-1,
1171  // because if starts[exportID] > 1, then i must be > 1 (since
1172  // the starts array was filled with zeros initially).
1173 
1174  // null entries break continuity.
1175  // e.g., [ 0, 0, 0, 1, -99, 1, 2, 2, 2] is not contiguous
1176  if (needSendBuff == 0 && starts[exportID] > 1 &&
1177  exportID != exportProcIDs[i-1]) {
1178  needSendBuff = 1;
1179  }
1180  ++numActive;
1181  }
1182  }
1183 
1184  if (debug) {
1185  // Test whether any process in the communicator got an invalid
1186  // process ID. If badID != -1 on this process, then it equals
1187  // this process' rank. The max of all badID over all processes
1188  // is the max rank which has an invalid process ID.
1189  int gbl_badID;
1190  reduceAll<int, int> (*comm_, REDUCE_MAX, badID, outArg (gbl_badID));
1191  TEUCHOS_TEST_FOR_EXCEPTION
1192  (gbl_badID >= 0, std::runtime_error, rawPrefix << "Proc "
1193  << gbl_badID << ", perhaps among other processes, got a bad "
1194  "send process ID.");
1195  }
1196  // FIXME (mfh 12 Apr 2013, 15 Jul 2015, 13 Feb 2020) Rather than
1197  // simply ignoring this information when not in debug mode, we
1198  // should think about how to pass it along so that all the
1199  // processes find out about it. In a release build with
1200  // efficiency warnings turned off, the next collective
1201  // communication happens in computeReceives(). We could figure
1202  // out how to encode the error flag in that operation, for example
1203  // by adding an extra entry to the collective's output array that
1204  // encodes the error condition (0 on all processes if no error,
1205  // else 1 on any process with the error, so that the sum will
1206  // produce a nonzero value if any process had an error). I'll
1207  // defer this change for now and recommend instead that people
1208  // with troubles try a debug build.
1209 
1210 #if defined(HAVE_TPETRA_THROW_EFFICIENCY_WARNINGS) || defined(HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS)
1211  {
1212  int global_needSendBuff;
1213  reduceAll<int, int> (*comm_, REDUCE_MAX, needSendBuff,
1214  outArg (global_needSendBuff));
1216  global_needSendBuff != 0, std::runtime_error,
1217  "::createFromSends: Grouping export IDs together by process rank often "
1218  "improves performance.");
1219  }
1220 #endif
1221 
1222  if (verbose_) {
1223  std::ostringstream os;
1224  os << *prefix << "Detect whether I have a self message" << endl;
1225  std::cerr << os.str();
1226  }
1227 
1228  // Determine from the caller's data whether or not the current
1229  // process should send (a) message(s) to itself.
1230  if (starts[myProcID] != 0) {
1231  selfMessage_ = true;
1232  }
1233  else {
1234  selfMessage_ = false;
1235  }
1236 
1237  bool index_neq_numActive = false;
1238  bool send_neq_numSends = false;
1239  if (! needSendBuff) {
1240  if (verbose_) {
1241  std::ostringstream os;
1242  os << *prefix << "I don't need a send buffer or indicesTo_ "
1243  "(fast path)" << endl;
1244  std::cerr << os.str();
1245  }
1246  // grouped by proc, no send buffer or indicesTo_ needed
1247  numSends_ = 0;
1248  // Count total number of sends, i.e., total number of procs to
1249  // which we are sending. This includes myself, if applicable.
1250  for (int i = 0; i < numProcs; ++i) {
1251  if (starts[i]) {
1252  ++numSends_;
1253  }
1254  }
1255 
1256  // Not only do we not need these, but we must clear them, as
1257  // empty status of indicesTo is a flag used later.
1258  indicesTo_.resize(0);
1259  // Size these to numSends_; note, at the moment, numSends_
1260  // includes self sends. Set their values to zeros.
1261  procsTo_.assign(numSends_,0);
1262  startsTo_.assign(numSends_,0);
1263  lengthsTo_.assign(numSends_,0);
1264 
1265  // set startsTo to the offset for each send (i.e., each proc ID)
1266  // set procsTo to the proc ID for each send
1267  // in interpreting this code, remember that we are assuming contiguity
1268  // that is why index skips through the ranks
1269  {
1270  size_t index = 0, procIndex = 0;
1271  for (size_t i = 0; i < numSends_; ++i) {
1272  while (exportProcIDs[procIndex] < 0) {
1273  ++procIndex; // skip all negative proc IDs
1274  }
1275  startsTo_[i] = procIndex;
1276  int procID = exportProcIDs[procIndex];
1277  procsTo_[i] = procID;
1278  index += starts[procID];
1279  procIndex += starts[procID];
1280  }
1281  if (index != numActive) {
1282  index_neq_numActive = true;
1283  }
1284  }
1285  // sort the startsTo and proc IDs together, in ascending order, according
1286  // to proc IDs
1287  if (numSends_ > 0) {
1288  sort2(procsTo_.begin(), procsTo_.end(), startsTo_.begin());
1289  }
1290  // compute the maximum send length
1291  maxSendLength_ = 0;
1292  for (size_t i = 0; i < numSends_; ++i) {
1293  int procID = procsTo_[i];
1294  lengthsTo_[i] = starts[procID];
1295  if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
1296  maxSendLength_ = lengthsTo_[i];
1297  }
1298  }
1299  }
1300  else {
1301  if (verbose_) {
1302  std::ostringstream os;
1303  os << *prefix << "I need a send buffer & indicesTo_ "
1304  "(slow path)" << endl;
1305  std::cerr << os.str();
1306  }
1307  // not grouped by proc, need send buffer and indicesTo_
1308 
1309  // starts[i] is the number of sends to proc i
1310  // numActive equals number of sends total, \sum_i starts[i]
1311 
1312  // this loop starts at starts[1], so explicitly check starts[0]
1313  if (starts[0] == 0 ) {
1314  numSends_ = 0;
1315  }
1316  else {
1317  numSends_ = 1;
1318  }
1319  for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
1320  im1=starts.begin();
1321  i != starts.end(); ++i)
1322  {
1323  if (*i != 0) ++numSends_;
1324  *i += *im1;
1325  im1 = i;
1326  }
1327  // starts[i] now contains the number of exports to procs 0 through i
1328 
1329  for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
1330  i=starts.rbegin()+1;
1331  i != starts.rend(); ++i)
1332  {
1333  *ip1 = *i;
1334  ip1 = i;
1335  }
1336  starts[0] = 0;
1337  // starts[i] now contains the number of exports to procs 0 through
1338  // i-1, i.e., all procs before proc i
1339 
1340  indicesTo_.resize(numActive);
1341 
1342  for (size_t i = 0; i < numExports; ++i) {
1343  if (exportProcIDs[i] >= 0) {
1344  // record the offset to the sendBuffer for this export
1345  indicesTo_[starts[exportProcIDs[i]]] = i;
1346  // now increment the offset for this proc
1347  ++starts[exportProcIDs[i]];
1348  }
1349  }
1350  // our send buffer will contain the export data for each of the procs
1351  // we communicate with, in order by proc id
1352  // sendBuffer = {proc_0_data, proc_1_data, ..., proc_np-1_data}
1353  // indicesTo now maps each export to the location in our send buffer
1354  // associated with the export
1355  // data for export i located at sendBuffer[indicesTo[i]]
1356  //
1357  // starts[i] once again contains the number of exports to
1358  // procs 0 through i
1359  for (int proc = numProcs-1; proc != 0; --proc) {
1360  starts[proc] = starts[proc-1];
1361  }
1362  starts.front() = 0;
1363  starts[numProcs] = numActive;
1364  //
1365  // starts[proc] once again contains the number of exports to
1366  // procs 0 through proc-1
1367  // i.e., the start of my data in the sendBuffer
1368 
1369  // this contains invalid data at procs we don't care about, that is okay
1370  procsTo_.resize(numSends_);
1371  startsTo_.resize(numSends_);
1372  lengthsTo_.resize(numSends_);
1373 
1374  // for each group of sends/exports, record the destination proc,
1375  // the length, and the offset for this send into the
1376  // send buffer (startsTo_)
1377  maxSendLength_ = 0;
1378  size_t snd = 0;
1379  for (int proc = 0; proc < numProcs; ++proc ) {
1380  if (starts[proc+1] != starts[proc]) {
1381  lengthsTo_[snd] = starts[proc+1] - starts[proc];
1382  startsTo_[snd] = starts[proc];
1383  // record max length for all off-proc sends
1384  if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
1385  maxSendLength_ = lengthsTo_[snd];
1386  }
1387  procsTo_[snd] = proc;
1388  ++snd;
1389  }
1390  }
1391  if (snd != numSends_) {
1392  send_neq_numSends = true;
1393  }
1394  }
1395  if (debug) {
1397  (index_neq_numActive, std::logic_error,
1398  rawPrefix << "logic error. Please notify the Tpetra team.", *comm_);
1400  (send_neq_numSends, std::logic_error,
1401  rawPrefix << "logic error. Please notify the Tpetra team.", *comm_);
1402  }
1403 
1404  if (selfMessage_) {
1405  if (verbose_) {
1406  std::ostringstream os;
1407  os << *prefix << "Sending self message; numSends "
1408  << numSends_ << " -> " << (numSends_ - 1) << endl;
1409  std::cerr << os.str();
1410  }
1411  --numSends_;
1412  }
1413 
1414  // Invert map to see what msgs are received and what length
1415  computeReceives();
1416 
1417  // createFromRecvs() calls createFromSends(), but will set
1418  // howInitialized_ again after calling createFromSends().
1419  howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS;
1420 
1421  if (verbose_) {
1422  std::ostringstream os;
1423  os << *prefix << "Done; totalReceiveLength_="
1424  << totalReceiveLength_ << endl;
1425  std::cerr << os.str();
1426  }
1427  return totalReceiveLength_;
1428  }
1429 
1430  void
1432  createFromSendsAndRecvs (const Teuchos::ArrayView<const int>& exportProcIDs,
1433  const Teuchos::ArrayView<const int>& remoteProcIDs)
1434  {
1435  std::unique_ptr<std::string> prefix;
1436  if (verbose_) {
1437  prefix = createPrefix("createFromSendsAndRecvs");
1438  std::ostringstream os;
1439  os << *prefix << "Start" << std::endl;
1440  std::cerr << os.str();
1441  }
1442 
1443  // note the exportProcIDs and remoteProcIDs _must_ be a list that has
1444  // an entry for each GID. If the export/remoteProcIDs is taken from
1445  // the getProcs{From|To} lists that are extracted from a previous distributor,
1446  // it will generate a wrong answer, because those lists have a unique entry
1447  // for each processor id. A version of this with lengthsTo and lengthsFrom
1448  // should be made.
1449 
1450  howInitialized_ = Tpetra::Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS_N_RECVS;
1451 
1452 
1453  int myProcID = comm_->getRank ();
1454  int numProcs = comm_->getSize();
1455 
1456  const size_t numExportIDs = exportProcIDs.size();
1457  Teuchos::Array<size_t> starts (numProcs + 1, 0);
1458 
1459  size_t numActive = 0;
1460  int needSendBuff = 0; // Boolean
1461 
1462  for(size_t i = 0; i < numExportIDs; i++ )
1463  {
1464  if( needSendBuff==0 && i && (exportProcIDs[i] < exportProcIDs[i-1]) )
1465  needSendBuff = 1;
1466  if( exportProcIDs[i] >= 0 )
1467  {
1468  ++starts[ exportProcIDs[i] ];
1469  ++numActive;
1470  }
1471  }
1472 
1473  selfMessage_ = ( starts[myProcID] != 0 ) ? 1 : 0;
1474 
1475  numSends_ = 0;
1476 
1477  if( needSendBuff ) //grouped by processor, no send buffer or indicesTo_ needed
1478  {
1479  if (starts[0] == 0 ) {
1480  numSends_ = 0;
1481  }
1482  else {
1483  numSends_ = 1;
1484  }
1485  for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
1486  im1=starts.begin();
1487  i != starts.end(); ++i)
1488  {
1489  if (*i != 0) ++numSends_;
1490  *i += *im1;
1491  im1 = i;
1492  }
1493  // starts[i] now contains the number of exports to procs 0 through i
1494 
1495  for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
1496  i=starts.rbegin()+1;
1497  i != starts.rend(); ++i)
1498  {
1499  *ip1 = *i;
1500  ip1 = i;
1501  }
1502  starts[0] = 0;
1503  // starts[i] now contains the number of exports to procs 0 through
1504  // i-1, i.e., all procs before proc i
1505 
1506  indicesTo_.resize(numActive);
1507 
1508  for (size_t i = 0; i < numExportIDs; ++i) {
1509  if (exportProcIDs[i] >= 0) {
1510  // record the offset to the sendBuffer for this export
1511  indicesTo_[starts[exportProcIDs[i]]] = i;
1512  // now increment the offset for this proc
1513  ++starts[exportProcIDs[i]];
1514  }
1515  }
1516  for (int proc = numProcs-1; proc != 0; --proc) {
1517  starts[proc] = starts[proc-1];
1518  }
1519  starts.front() = 0;
1520  starts[numProcs] = numActive;
1521  procsTo_.resize(numSends_);
1522  startsTo_.resize(numSends_);
1523  lengthsTo_.resize(numSends_);
1524  maxSendLength_ = 0;
1525  size_t snd = 0;
1526  for (int proc = 0; proc < numProcs; ++proc ) {
1527  if (starts[proc+1] != starts[proc]) {
1528  lengthsTo_[snd] = starts[proc+1] - starts[proc];
1529  startsTo_[snd] = starts[proc];
1530  // record max length for all off-proc sends
1531  if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
1532  maxSendLength_ = lengthsTo_[snd];
1533  }
1534  procsTo_[snd] = proc;
1535  ++snd;
1536  }
1537  }
1538  }
1539  else {
1540  // grouped by proc, no send buffer or indicesTo_ needed
1541  numSends_ = 0;
1542  // Count total number of sends, i.e., total number of procs to
1543  // which we are sending. This includes myself, if applicable.
1544  for (int i = 0; i < numProcs; ++i) {
1545  if (starts[i]) {
1546  ++numSends_;
1547  }
1548  }
1549 
1550  // Not only do we not need these, but we must clear them, as
1551  // empty status of indicesTo is a flag used later.
1552  indicesTo_.resize(0);
1553  // Size these to numSends_; note, at the moment, numSends_
1554  // includes self sends. Set their values to zeros.
1555  procsTo_.assign(numSends_,0);
1556  startsTo_.assign(numSends_,0);
1557  lengthsTo_.assign(numSends_,0);
1558 
1559  // set startsTo to the offset for each send (i.e., each proc ID)
1560  // set procsTo to the proc ID for each send
1561  // in interpreting this code, remember that we are assuming contiguity
1562  // that is why index skips through the ranks
1563  {
1564  size_t index = 0, procIndex = 0;
1565  for (size_t i = 0; i < numSends_; ++i) {
1566  while (exportProcIDs[procIndex] < 0) {
1567  ++procIndex; // skip all negative proc IDs
1568  }
1569  startsTo_[i] = procIndex;
1570  int procID = exportProcIDs[procIndex];
1571  procsTo_[i] = procID;
1572  index += starts[procID];
1573  procIndex += starts[procID];
1574  }
1575  }
1576  // sort the startsTo and proc IDs together, in ascending order, according
1577  // to proc IDs
1578  if (numSends_ > 0) {
1579  sort2(procsTo_.begin(), procsTo_.end(), startsTo_.begin());
1580  }
1581  // compute the maximum send length
1582  maxSendLength_ = 0;
1583  for (size_t i = 0; i < numSends_; ++i) {
1584  int procID = procsTo_[i];
1585  lengthsTo_[i] = starts[procID];
1586  if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
1587  maxSendLength_ = lengthsTo_[i];
1588  }
1589  }
1590  }
1591 
1592 
1593  numSends_ -= selfMessage_;
1594  std::vector<int> recv_list;
1595  recv_list.reserve(numSends_); //reserve an initial guess for size needed
1596 
1597  int last_pid=-2;
1598  for(int i=0; i<remoteProcIDs.size(); i++) {
1599  if(remoteProcIDs[i]>last_pid) {
1600  recv_list.push_back(remoteProcIDs[i]);
1601  last_pid = remoteProcIDs[i];
1602  }
1603  else if (remoteProcIDs[i]<last_pid)
1604  throw std::runtime_error("Tpetra::Distributor:::createFromSendsAndRecvs expected RemotePIDs to be in sorted order");
1605  }
1606  numReceives_ = recv_list.size();
1607  if(numReceives_) {
1608  procsFrom_.assign(numReceives_,0);
1609  lengthsFrom_.assign(numReceives_,0);
1610  indicesFrom_.assign(numReceives_,0);
1611  startsFrom_.assign(numReceives_,0);
1612  }
1613  for(size_t i=0,j=0; i<numReceives_; ++i) {
1614  int jlast=j;
1615  procsFrom_[i] = recv_list[i];
1616  startsFrom_[i] = j;
1617  for( ; j<(size_t)remoteProcIDs.size() &&
1618  remoteProcIDs[jlast]==remoteProcIDs[j] ; j++){;}
1619  lengthsFrom_[i] = j-jlast;
1620  }
1621  totalReceiveLength_ = remoteProcIDs.size();
1622  indicesFrom_.clear ();
1623  // NOTE (mfh 13 Feb 2019): Epetra_MpiDistributor deliberately does
1624  // _not_ fill indicesFrom_ (what it calls "indices_from_") like
1625  // this; it leaves indicesFrom_ empty. The comment there mentions
1626  // that not filling indicesFrom_ helps reverse mode correctness.
1627 #if 0
1628  indicesFrom_.reserve (totalReceiveLength_);
1629  for (size_t i = 0; i < totalReceiveLength_; ++i) {
1630  indicesFrom_.push_back(i);
1631  }
1632 #endif // 0
1633  numReceives_-=selfMessage_;
1634 
1635  if (verbose_) {
1636  std::ostringstream os;
1637  os << *prefix << "Done" << std::endl;
1638  std::cerr << os.str();
1639  }
1640  }
1641 
1642 } // namespace Tpetra
Declaration of Tpetra::Details::Behavior, a class that describes Tpetra's behavior.
Declaration of a function that prints strings from each process.
Stand-alone utility functions and macros.
#define TPETRA_EFFICIENCY_WARNING(throw_exception_test, Exception, msg)
Print or throw an efficency warning.
#define SHARED_TEST_FOR_EXCEPTION(throw_exception_test, Exception, msg, comm)
Test for exception, with reduction over the given communicator.
static bool debug()
Whether Tpetra is in debug mode.
static bool verbose()
Whether Tpetra is in verbose mode.
static size_t verbosePrintCountThreshold()
Number of entries below which arrays, lists, etc. will be printed in debug mode.
Sets up and executes a communication plan for a Tpetra DistObject.
size_t getMaxSendLength() const
Maximum number of values this process will send to another single process.
Teuchos::RCP< Distributor > getReverse(bool create=true) const
A reverse communication plan Distributor.
Teuchos::ArrayView< const int > getProcsTo() const
Ranks of the processes to which this process will send values.
size_t getNumReceives() const
The number of processes from which we will receive data.
void setParameterList(const Teuchos::RCP< Teuchos::ParameterList > &plist)
Set Distributor parameters.
size_t getTotalReceiveLength() const
Total number of values this process will receive from other processes.
bool hasSelfMessage() const
Whether the calling process will send or receive messages to itself.
void swap(Distributor &rhs)
Swap the contents of rhs with those of *this.
Teuchos::ArrayView< const size_t > getLengthsTo() const
Number of values this process will send to each process.
Teuchos::ArrayView< const int > getProcsFrom() const
Ranks of the processes sending values to this process.
Distributor(const Teuchos::RCP< const Teuchos::Comm< int > > &comm)
Construct using the specified communicator and default parameters.
std::string description() const
Return a one-line description of this object.
size_t createFromSends(const Teuchos::ArrayView< const int > &exportProcIDs)
Set up Distributor using list of process ranks to which this process will send.
void createFromSendsAndRecvs(const Teuchos::ArrayView< const int > &exportProcIDs, const Teuchos::ArrayView< const int > &remoteProcIDs)
Set up Distributor using list of process ranks to which to send, and list of process ranks from which...
Teuchos::RCP< const Teuchos::ParameterList > getValidParameters() const
List of valid Distributor parameters.
Teuchos::ArrayView< const size_t > getLengthsFrom() const
Number of values this process will receive from each process.
size_t getNumSends() const
The number of processes to which we will send data.
void describe(Teuchos::FancyOStream &out, const Teuchos::EVerbosityLevel verbLevel=Teuchos::Describable::verbLevel_default) const
Describe this object in a human-readable way to the given output stream.
Implementation details of Tpetra.
void verbosePrintArray(std::ostream &out, const ArrayType &x, const char name[], const size_t maxNumToPrint)
Print min(x.size(), maxNumToPrint) entries of x.
std::string DistributorSendTypeEnumToString(EDistributorSendType sendType)
Convert an EDistributorSendType enum value to a string.
EDistributorSendType
The type of MPI send that Distributor should use.
std::unique_ptr< std::string > createPrefix(const int myRank, const char prefix[])
Create string prefix for each line of verbose output.
EDistributorHowInitialized
Enum indicating how and whether a Distributor was initialized.
std::string DistributorHowInitializedEnumToString(EDistributorHowInitialized how)
Convert an EDistributorHowInitialized enum value to a string.
void gathervPrint(std::ostream &out, const std::string &s, const Teuchos::Comm< int > &comm)
On Process 0 in the given communicator, print strings from each process in that communicator,...
Namespace Tpetra contains the class and methods constituting the Tpetra library.
Teuchos::Array< std::string > distributorSendTypes()
Valid values for Distributor's "Send type" parameter.
void sort2(const IT1 &first1, const IT1 &last1, const IT2 &first2)
Sort the first array, and apply the resulting permutation to the second array.