39 #include "Tpetra_Distributor.hpp"
43 #include "Tpetra_Details_makeValidVerboseStream.hpp"
44 #include "Teuchos_StandardParameterEntryValidators.hpp"
45 #include "Teuchos_VerboseObjectParameterListHelpers.hpp"
53 if (sendType == DISTRIBUTOR_ISEND) {
56 else if (sendType == DISTRIBUTOR_RSEND) {
59 else if (sendType == DISTRIBUTOR_SEND) {
62 else if (sendType == DISTRIBUTOR_SSEND) {
66 TEUCHOS_TEST_FOR_EXCEPTION(
true, std::invalid_argument,
"Invalid "
67 "EDistributorSendType enum value " << sendType <<
".");
75 case Details::DISTRIBUTOR_NOT_INITIALIZED:
76 return "Not initialized yet";
77 case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS:
78 return "By createFromSends";
79 case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_RECVS:
80 return "By createFromRecvs";
81 case Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS_N_RECVS:
82 return "By createFromSendsAndRecvs";
83 case Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE:
84 return "By createReverseDistributor";
85 case Details::DISTRIBUTOR_INITIALIZED_BY_COPY:
86 return "By copy constructor";
93 Teuchos::Array<std::string>
96 Teuchos::Array<std::string> sendTypes;
97 sendTypes.push_back (
"Isend");
98 sendTypes.push_back (
"Rsend");
99 sendTypes.push_back (
"Send");
100 sendTypes.push_back (
"Ssend");
110 const bool tpetraDistributorDebugDefault =
false;
112 const bool barrierBetween_default =
false;
114 const bool useDistinctTags_default =
true;
117 int Distributor::getTag (
const int pathTag)
const {
118 return useDistinctTags_ ? pathTag : comm_->getTag ();
122 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
123 void Distributor::makeTimers () {
124 timer_doWaits_ = Teuchos::TimeMonitor::getNewTimer (
125 "Tpetra::Distributor: doWaits");
127 timer_doPosts3TA_ = Teuchos::TimeMonitor::getNewTimer (
128 "Tpetra::Distributor: doPosts(3) TA");
129 timer_doPosts4TA_ = Teuchos::TimeMonitor::getNewTimer (
130 "Tpetra::Distributor: doPosts(4) TA");
132 timer_doPosts3TA_recvs_ = Teuchos::TimeMonitor::getNewTimer (
133 "Tpetra::Distributor: doPosts(3): recvs TA");
134 timer_doPosts4TA_recvs_ = Teuchos::TimeMonitor::getNewTimer (
135 "Tpetra::Distributor: doPosts(4): recvs TA");
137 timer_doPosts3TA_barrier_ = Teuchos::TimeMonitor::getNewTimer (
138 "Tpetra::Distributor: doPosts(3): barrier TA");
139 timer_doPosts4TA_barrier_ = Teuchos::TimeMonitor::getNewTimer (
140 "Tpetra::Distributor: doPosts(4): barrier TA");
142 timer_doPosts3TA_sends_ = Teuchos::TimeMonitor::getNewTimer (
143 "Tpetra::Distributor: doPosts(3): sends TA");
144 timer_doPosts4TA_sends_ = Teuchos::TimeMonitor::getNewTimer (
145 "Tpetra::Distributor: doPosts(4): sends TA");
146 timer_doPosts3TA_sends_slow_ = Teuchos::TimeMonitor::getNewTimer (
147 "Tpetra::Distributor: doPosts(3): sends TA SLOW");
148 timer_doPosts4TA_sends_slow_ = Teuchos::TimeMonitor::getNewTimer (
149 "Tpetra::Distributor: doPosts(4): sends TA SLOW");
150 timer_doPosts3TA_sends_fast_ = Teuchos::TimeMonitor::getNewTimer (
151 "Tpetra::Distributor: doPosts(3): sends TA FAST");
152 timer_doPosts4TA_sends_fast_ = Teuchos::TimeMonitor::getNewTimer (
153 "Tpetra::Distributor: doPosts(4): sends TA FAST");
155 timer_doPosts3KV_ = Teuchos::TimeMonitor::getNewTimer (
156 "Tpetra::Distributor: doPosts(3) KV");
157 timer_doPosts4KV_ = Teuchos::TimeMonitor::getNewTimer (
158 "Tpetra::Distributor: doPosts(4) KV");
160 timer_doPosts3KV_recvs_ = Teuchos::TimeMonitor::getNewTimer (
161 "Tpetra::Distributor: doPosts(3): recvs KV");
162 timer_doPosts4KV_recvs_ = Teuchos::TimeMonitor::getNewTimer (
163 "Tpetra::Distributor: doPosts(4): recvs KV");
165 timer_doPosts3KV_barrier_ = Teuchos::TimeMonitor::getNewTimer (
166 "Tpetra::Distributor: doPosts(3): barrier KV");
167 timer_doPosts4KV_barrier_ = Teuchos::TimeMonitor::getNewTimer (
168 "Tpetra::Distributor: doPosts(4): barrier KV");
170 timer_doPosts3KV_sends_ = Teuchos::TimeMonitor::getNewTimer (
171 "Tpetra::Distributor: doPosts(3): sends KV");
172 timer_doPosts4KV_sends_ = Teuchos::TimeMonitor::getNewTimer (
173 "Tpetra::Distributor: doPosts(4): sends KV");
174 timer_doPosts3KV_sends_slow_ = Teuchos::TimeMonitor::getNewTimer (
175 "Tpetra::Distributor: doPosts(3): sends KV SLOW");
176 timer_doPosts4KV_sends_slow_ = Teuchos::TimeMonitor::getNewTimer (
177 "Tpetra::Distributor: doPosts(4): sends KV SLOW");
178 timer_doPosts3KV_sends_fast_ = Teuchos::TimeMonitor::getNewTimer (
179 "Tpetra::Distributor: doPosts(3): sends KV FAST");
180 timer_doPosts4KV_sends_fast_ = Teuchos::TimeMonitor::getNewTimer (
181 "Tpetra::Distributor: doPosts(4): sends KV FAST");
186 Distributor (
const Teuchos::RCP<
const Teuchos::Comm<int> >& comm,
187 const Teuchos::RCP<Teuchos::FancyOStream>& ,
188 const Teuchos::RCP<Teuchos::ParameterList>& plist)
190 , howInitialized_ (
Details::DISTRIBUTOR_NOT_INITIALIZED)
191 , sendType_ (
Details::DISTRIBUTOR_SEND)
192 , barrierBetween_ (barrierBetween_default)
193 , selfMessage_ (false)
197 , totalReceiveLength_ (0)
198 , lastRoundBytesSend_ (0)
199 , lastRoundBytesRecv_ (0)
200 , useDistinctTags_ (useDistinctTags_default)
203 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
209 Distributor (
const Teuchos::RCP<
const Teuchos::Comm<int> >& comm)
214 Distributor (
const Teuchos::RCP<
const Teuchos::Comm<int> >& comm,
215 const Teuchos::RCP<Teuchos::FancyOStream>& out)
220 Distributor (
const Teuchos::RCP<
const Teuchos::Comm<int> >& comm,
221 const Teuchos::RCP<Teuchos::ParameterList>& plist)
227 : comm_ (distributor.comm_)
228 , howInitialized_ (
Details::DISTRIBUTOR_INITIALIZED_BY_COPY)
229 , sendType_ (distributor.sendType_)
230 , barrierBetween_ (distributor.barrierBetween_)
231 , verbose_ (distributor.verbose_)
232 , selfMessage_ (distributor.selfMessage_)
233 , numSends_ (distributor.numSends_)
234 , procsTo_ (distributor.procsTo_)
235 , startsTo_ (distributor.startsTo_)
236 , lengthsTo_ (distributor.lengthsTo_)
237 , maxSendLength_ (distributor.maxSendLength_)
238 , indicesTo_ (distributor.indicesTo_)
239 , numReceives_ (distributor.numReceives_)
240 , totalReceiveLength_ (distributor.totalReceiveLength_)
241 , lengthsFrom_ (distributor.lengthsFrom_)
242 , procsFrom_ (distributor.procsFrom_)
243 , startsFrom_ (distributor.startsFrom_)
244 , indicesFrom_ (distributor.indicesFrom_)
245 , reverseDistributor_ (distributor.reverseDistributor_)
246 , lastRoundBytesSend_ (distributor.lastRoundBytesSend_)
247 , lastRoundBytesRecv_ (distributor.lastRoundBytesRecv_)
248 , useDistinctTags_ (distributor.useDistinctTags_)
250 using Teuchos::ParameterList;
254 RCP<const ParameterList> rhsList = distributor.getParameterList ();
255 RCP<ParameterList> newList = rhsList.is_null () ? Teuchos::null :
256 Teuchos::parameterList (*rhsList);
259 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
265 using Teuchos::ParameterList;
266 using Teuchos::parameterList;
269 std::swap (comm_, rhs.comm_);
270 std::swap (howInitialized_, rhs.howInitialized_);
271 std::swap (sendType_, rhs.sendType_);
272 std::swap (barrierBetween_, rhs.barrierBetween_);
273 std::swap (verbose_, rhs.verbose_);
274 std::swap (selfMessage_, rhs.selfMessage_);
275 std::swap (numSends_, rhs.numSends_);
276 std::swap (procsTo_, rhs.procsTo_);
277 std::swap (startsTo_, rhs.startsTo_);
278 std::swap (lengthsTo_, rhs.lengthsTo_);
279 std::swap (maxSendLength_, rhs.maxSendLength_);
280 std::swap (indicesTo_, rhs.indicesTo_);
281 std::swap (numReceives_, rhs.numReceives_);
282 std::swap (totalReceiveLength_, rhs.totalReceiveLength_);
283 std::swap (lengthsFrom_, rhs.lengthsFrom_);
284 std::swap (procsFrom_, rhs.procsFrom_);
285 std::swap (startsFrom_, rhs.startsFrom_);
286 std::swap (indicesFrom_, rhs.indicesFrom_);
287 std::swap (reverseDistributor_, rhs.reverseDistributor_);
288 std::swap (lastRoundBytesSend_, rhs.lastRoundBytesSend_);
289 std::swap (lastRoundBytesRecv_, rhs.lastRoundBytesRecv_);
290 std::swap (useDistinctTags_, rhs.useDistinctTags_);
294 RCP<ParameterList> lhsList = this->getNonconstParameterList ();
295 RCP<ParameterList> rhsList = rhs.getNonconstParameterList ();
296 if (lhsList.getRawPtr () == rhsList.getRawPtr () && ! rhsList.is_null ()) {
297 rhsList = parameterList (*rhsList);
299 if (! rhsList.is_null ()) {
300 this->setMyParamList (rhsList);
302 if (! lhsList.is_null ()) {
303 rhs.setMyParamList (lhsList);
311 Distributor::getVerbose()
317 std::unique_ptr<std::string>
319 createPrefix(
const char methodName[])
const
322 comm_.getRawPtr(),
"Distributor", methodName);
329 using ::Tpetra::Details::Behavior;
330 using Teuchos::FancyOStream;
331 using Teuchos::getIntegralValue;
332 using Teuchos::includesVerbLevel;
333 using Teuchos::ParameterList;
334 using Teuchos::parameterList;
338 if (! plist.is_null()) {
340 plist->validateParametersAndSetDefaults (*validParams);
342 const bool barrierBetween =
343 plist->get<
bool> (
"Barrier between receives and sends");
345 getIntegralValue<Details::EDistributorSendType> (*plist,
"Send type");
346 const bool useDistinctTags = plist->get<
bool> (
"Use distinct tags");
351 const bool enable_cuda_rdma =
352 plist->get<
bool> (
"Enable MPI CUDA RDMA support");
353 TEUCHOS_TEST_FOR_EXCEPTION
354 (! enable_cuda_rdma, std::invalid_argument,
"Tpetra::Distributor::"
355 "setParameterList: " <<
"You specified \"Enable MPI CUDA RDMA "
356 "support\" = false. This is no longer valid. You don't need to "
357 "specify this option any more; Tpetra assumes it is always true. "
358 "This is a very light assumption on the MPI implementation, and in "
359 "fact does not actually involve hardware or system RDMA support. "
360 "Tpetra just assumes that the MPI implementation can tell whether a "
361 "pointer points to host memory or CUDA device memory.");
368 TEUCHOS_TEST_FOR_EXCEPTION
369 (! barrierBetween && sendType == Details::DISTRIBUTOR_RSEND,
370 std::invalid_argument,
"Tpetra::Distributor::setParameterList: " << endl
371 <<
"You specified \"Send type\"=\"Rsend\", but turned off the barrier "
372 "between receives and sends." << endl <<
"This is invalid; you must "
373 "include the barrier if you use ready sends." << endl <<
"Ready sends "
374 "require that their corresponding receives have already been posted, "
375 "and the only way to guarantee that in general is with a barrier.");
378 sendType_ = sendType;
379 barrierBetween_ = barrierBetween;
380 useDistinctTags_ = useDistinctTags;
384 this->setMyParamList (plist);
388 Teuchos::RCP<const Teuchos::ParameterList>
391 using Teuchos::Array;
392 using Teuchos::ParameterList;
393 using Teuchos::parameterList;
395 using Teuchos::setStringToIntegralParameter;
397 const bool barrierBetween = barrierBetween_default;
398 const bool useDistinctTags = useDistinctTags_default;
399 const bool debug = tpetraDistributorDebugDefault;
402 const std::string defaultSendType (
"Send");
403 Array<Details::EDistributorSendType> sendTypeEnums;
404 sendTypeEnums.push_back (Details::DISTRIBUTOR_ISEND);
405 sendTypeEnums.push_back (Details::DISTRIBUTOR_RSEND);
406 sendTypeEnums.push_back (Details::DISTRIBUTOR_SEND);
407 sendTypeEnums.push_back (Details::DISTRIBUTOR_SSEND);
409 RCP<ParameterList> plist = parameterList (
"Tpetra::Distributor");
410 plist->set (
"Barrier between receives and sends", barrierBetween,
411 "Whether to execute a barrier between receives and sends in do"
412 "[Reverse]Posts(). Required for correctness when \"Send type\""
413 "=\"Rsend\", otherwise correct but not recommended.");
414 setStringToIntegralParameter<Details::EDistributorSendType> (
"Send type",
415 defaultSendType,
"When using MPI, the variant of send to use in "
416 "do[Reverse]Posts()", sendTypes(), sendTypeEnums(), plist.getRawPtr());
417 plist->set (
"Use distinct tags", useDistinctTags,
"Whether to use distinct "
418 "MPI message tags for different code paths. Highly recommended"
419 " to avoid message collisions.");
420 plist->set (
"Debug", debug,
"Whether to print copious debugging output on "
422 plist->set (
"Timer Label",
"",
"Label for Time Monitor output");
423 plist->set (
"Enable MPI CUDA RDMA support",
true,
"Assume that MPI can "
424 "tell whether a pointer points to host memory or CUDA device "
425 "memory. You don't need to specify this option any more; "
426 "Tpetra assumes it is always true. This is a very light "
427 "assumption on the MPI implementation, and in fact does not "
428 "actually involve hardware or system RDMA support.");
436 Teuchos::setupVerboseObjectSublist (&*plist);
437 return Teuchos::rcp_const_cast<const ParameterList> (plist);
442 {
return totalReceiveLength_; }
445 {
return numReceives_; }
448 {
return selfMessage_; }
451 {
return numSends_; }
454 {
return maxSendLength_; }
457 {
return procsFrom_; }
460 {
return lengthsFrom_; }
466 {
return lengthsTo_; }
468 Teuchos::RCP<Distributor>
470 if (reverseDistributor_.is_null () && create) {
471 createReverseDistributor ();
473 TEUCHOS_TEST_FOR_EXCEPTION
474 (reverseDistributor_.is_null () && create, std::logic_error,
"The reverse "
475 "Distributor is null after createReverseDistributor returned. "
476 "Please report this bug to the Tpetra developers.");
477 return reverseDistributor_;
482 Distributor::createReverseDistributor()
const
484 reverseDistributor_ = Teuchos::rcp(
new Distributor(comm_));
485 reverseDistributor_->howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_REVERSE;
486 reverseDistributor_->sendType_ = sendType_;
487 reverseDistributor_->barrierBetween_ = barrierBetween_;
488 reverseDistributor_->verbose_ = verbose_;
493 size_t totalSendLength =
494 std::accumulate (lengthsTo_.begin(), lengthsTo_.end(), 0);
499 size_t maxReceiveLength = 0;
500 const int myProcID = comm_->getRank();
501 for (
size_t i=0; i < numReceives_; ++i) {
502 if (procsFrom_[i] != myProcID) {
504 if (lengthsFrom_[i] > maxReceiveLength) {
505 maxReceiveLength = lengthsFrom_[i];
514 reverseDistributor_->selfMessage_ = selfMessage_;
515 reverseDistributor_->numSends_ = numReceives_;
516 reverseDistributor_->procsTo_ = procsFrom_;
517 reverseDistributor_->startsTo_ = startsFrom_;
518 reverseDistributor_->lengthsTo_ = lengthsFrom_;
519 reverseDistributor_->maxSendLength_ = maxReceiveLength;
520 reverseDistributor_->indicesTo_ = indicesFrom_;
521 reverseDistributor_->numReceives_ = numSends_;
522 reverseDistributor_->totalReceiveLength_ = totalSendLength;
523 reverseDistributor_->lengthsFrom_ = lengthsTo_;
524 reverseDistributor_->procsFrom_ = procsTo_;
525 reverseDistributor_->startsFrom_ = startsTo_;
526 reverseDistributor_->indicesFrom_ = indicesTo_;
535 reverseDistributor_->lastRoundBytesSend_ = 0;
536 reverseDistributor_->lastRoundBytesRecv_ = 0;
538 reverseDistributor_->useDistinctTags_ = useDistinctTags_;
551 reverseDistributor_->reverseDistributor_ = Teuchos::null;
557 using Teuchos::Array;
558 using Teuchos::CommRequest;
559 using Teuchos::FancyOStream;
560 using Teuchos::includesVerbLevel;
561 using Teuchos::is_null;
563 using Teuchos::waitAll;
566 #ifdef HAVE_TPETRA_DISTRIBUTOR_TIMINGS
567 Teuchos::TimeMonitor timeMon (*timer_doWaits_);
572 std::unique_ptr<std::string> prefix;
574 prefix = createPrefix(
"doWaits");
575 std::ostringstream os;
576 os << *prefix <<
"Start: requests_.size(): "
577 << requests_.size() << endl;
578 std::cerr << os.str();
581 if (requests_.size() > 0) {
582 waitAll(*comm_, requests_());
586 for (
auto it = requests_.begin(); it != requests_.end(); ++it) {
587 TEUCHOS_TEST_FOR_EXCEPTION
588 (! is_null(*it), std::runtime_error,
589 "Tpetra::Distributor::doWaits: Communication requests "
590 "should all be null aftr calling Teuchos::waitAll on "
591 "them, but at least one request is not null.");
596 requests_.resize (0);
600 const int localSizeNonzero = (requests_.size () != 0) ? 1 : 0;
601 int globalSizeNonzero = 0;
602 Teuchos::reduceAll<int, int> (*comm_, Teuchos::REDUCE_MAX,
604 Teuchos::outArg (globalSizeNonzero));
605 TEUCHOS_TEST_FOR_EXCEPTION(
606 globalSizeNonzero != 0, std::runtime_error,
607 "Tpetra::Distributor::doWaits: After waitAll, at least one process has "
608 "a nonzero number of outstanding posts. There should be none at this "
609 "point. Please report this bug to the Tpetra developers.");
613 std::ostringstream os;
614 os << *prefix <<
"Done" << endl;
615 std::cerr << os.str();
621 if (! reverseDistributor_.is_null()) {
622 reverseDistributor_->doWaits();
627 std::ostringstream out;
629 out <<
"\"Tpetra::Distributor\": {";
630 const std::string label = this->getObjectLabel ();
632 out <<
"Label: " << label <<
", ";
634 out <<
"How initialized: "
638 << DistributorSendTypeEnumToString (sendType_)
639 <<
", Barrier between receives and sends: "
640 << (barrierBetween_ ?
"true" :
"false")
641 <<
", Use distinct tags: "
642 << (useDistinctTags_ ?
"true" :
"false")
643 <<
", Debug: " << (verbose_ ?
"true" :
"false")
650 localDescribeToString (
const Teuchos::EVerbosityLevel vl)
const
652 using Teuchos::toString;
653 using Teuchos::VERB_HIGH;
654 using Teuchos::VERB_EXTREME;
658 if (vl <= Teuchos::VERB_LOW || comm_.is_null ()) {
659 return std::string ();
662 auto outStringP = Teuchos::rcp (
new std::ostringstream ());
663 auto outp = Teuchos::getFancyOStream (outStringP);
664 Teuchos::FancyOStream& out = *outp;
666 const int myRank = comm_->getRank ();
667 const int numProcs = comm_->getSize ();
668 out <<
"Process " << myRank <<
" of " << numProcs <<
":" << endl;
669 Teuchos::OSTab tab1 (out);
673 if (vl == VERB_HIGH || vl == VERB_EXTREME) {
674 out <<
"procsTo: " << toString (procsTo_) << endl;
675 out <<
"lengthsTo: " << toString (lengthsTo_) << endl;
678 if (vl == VERB_EXTREME) {
679 out <<
"startsTo: " << toString (startsTo_) << endl;
680 out <<
"indicesTo: " << toString (indicesTo_) << endl;
682 if (vl == VERB_HIGH || vl == VERB_EXTREME) {
685 out <<
"lengthsFrom: " << toString (lengthsFrom_) << endl;
686 out <<
"startsFrom: " << toString (startsFrom_) << endl;
687 out <<
"procsFrom: " << toString (procsFrom_) << endl;
691 return outStringP->str ();
696 describe (Teuchos::FancyOStream& out,
697 const Teuchos::EVerbosityLevel verbLevel)
const
700 using Teuchos::VERB_DEFAULT;
701 using Teuchos::VERB_NONE;
702 using Teuchos::VERB_LOW;
703 using Teuchos::VERB_MEDIUM;
704 using Teuchos::VERB_HIGH;
705 using Teuchos::VERB_EXTREME;
706 const Teuchos::EVerbosityLevel vl =
707 (verbLevel == VERB_DEFAULT) ? VERB_LOW : verbLevel;
709 if (vl == VERB_NONE) {
717 if (comm_.is_null ()) {
720 const int myRank = comm_->getRank ();
721 const int numProcs = comm_->getSize ();
730 Teuchos::RCP<Teuchos::OSTab> tab0, tab1;
736 tab0 = Teuchos::rcp (
new Teuchos::OSTab (out));
739 out <<
"\"Tpetra::Distributor\":" << endl;
740 tab1 = Teuchos::rcp (
new Teuchos::OSTab (out));
742 const std::string label = this->getObjectLabel ();
744 out <<
"Label: " << label << endl;
746 out <<
"Number of processes: " << numProcs << endl
747 <<
"How initialized: "
751 out <<
"Parameters: " << endl;
752 Teuchos::OSTab tab2 (out);
753 out <<
"\"Send type\": "
754 << DistributorSendTypeEnumToString (sendType_) << endl
755 <<
"\"Barrier between receives and sends\": "
756 << (barrierBetween_ ?
"true" :
"false") << endl
757 <<
"\"Use distinct tags\": "
758 << (useDistinctTags_ ?
"true" :
"false") << endl
759 <<
"\"Debug\": " << (verbose_ ?
"true" :
"false") << endl;
765 const std::string lclStr = this->localDescribeToString (vl);
769 out <<
"Reverse Distributor:";
770 if (reverseDistributor_.is_null ()) {
771 out <<
" null" << endl;
775 reverseDistributor_->describe (out, vl);
783 using Teuchos::Array;
784 using Teuchos::ArrayRCP;
786 using Teuchos::CommStatus;
787 using Teuchos::CommRequest;
788 using Teuchos::ireceive;
791 using Teuchos::REDUCE_SUM;
792 using Teuchos::receive;
793 using Teuchos::reduce;
794 using Teuchos::scatter;
796 using Teuchos::waitAll;
799 const int myRank = comm_->getRank();
800 const int numProcs = comm_->getSize();
803 const int pathTag = 2;
804 const int tag = this->getTag (pathTag);
806 std::unique_ptr<std::string> prefix;
808 prefix = createPrefix(
"computeReceives");
809 std::ostringstream os;
811 <<
"selfMessage_: " << (selfMessage_ ?
"true" :
"false")
812 <<
", pathTag: " << pathTag <<
", tag: " << tag << endl;
813 std::cerr << os.str();
822 Array<int> toProcsFromMe (numProcs, 0);
823 #ifdef HAVE_TEUCHOS_DEBUG
824 bool counting_error =
false;
826 for (
size_t i = 0; i < (numSends_ + (selfMessage_ ? 1 : 0)); ++i) {
827 #ifdef HAVE_TEUCHOS_DEBUG
828 if (toProcsFromMe[procsTo_[i]] != 0) {
829 counting_error =
true;
832 toProcsFromMe[procsTo_[i]] = 1;
834 #ifdef HAVE_TEUCHOS_DEBUG
836 "Tpetra::Distributor::computeReceives: There was an error on at least "
837 "one process in counting the number of messages send by that process to "
838 "the other processs. Please report this bug to the Tpetra developers.",
843 std::ostringstream os;
844 os << *prefix <<
"Reduce & scatter" << endl;
845 std::cerr << os.str();
901 Array<int> numRecvsOnEachProc;
902 if (myRank == root) {
903 numRecvsOnEachProc.resize (numProcs);
905 int numReceivesAsInt = 0;
906 reduce<int, int> (toProcsFromMe.getRawPtr (),
907 numRecvsOnEachProc.getRawPtr (),
908 numProcs, REDUCE_SUM, root, *comm_);
909 scatter<int, int> (numRecvsOnEachProc.getRawPtr (), 1,
910 &numReceivesAsInt, 1, root, *comm_);
911 numReceives_ =
static_cast<size_t> (numReceivesAsInt);
917 lengthsFrom_.assign (numReceives_, 0);
918 procsFrom_.assign (numReceives_, 0);
934 const size_t actualNumReceives = numReceives_ - (selfMessage_ ? 1 : 0);
940 Array<RCP<CommRequest<int> > > requests (actualNumReceives);
941 Array<ArrayRCP<size_t> > lengthsFromBuffers (actualNumReceives);
942 Array<RCP<CommStatus<int> > > statuses (actualNumReceives);
947 const int anySourceProc = MPI_ANY_SOURCE;
949 const int anySourceProc = -1;
953 std::ostringstream os;
954 os << *prefix <<
"Post " << actualNumReceives <<
" irecv"
955 << (actualNumReceives != size_t (1) ?
"s" :
"") << endl;
956 std::cerr << os.str();
960 for (
size_t i = 0; i < actualNumReceives; ++i) {
965 lengthsFromBuffers[i].resize (1);
966 lengthsFromBuffers[i][0] = as<size_t> (0);
967 requests[i] = ireceive<int, size_t> (lengthsFromBuffers[i], anySourceProc,
970 std::ostringstream os;
971 os << *prefix <<
"Posted any-proc irecv w/ tag " << tag << endl;
972 std::cerr << os.str();
977 std::ostringstream os;
978 os << *prefix <<
"Post " << numSends_ <<
" send"
979 << (numSends_ != size_t (1) ?
"s" :
"") << endl;
980 std::cerr << os.str();
990 for (
size_t i = 0; i < numSends_ + (selfMessage_ ? 1 : 0); ++i) {
991 if (procsTo_[i] != myRank) {
995 const size_t*
const lengthsTo_i = &lengthsTo_[i];
996 send<int, size_t> (lengthsTo_i, 1, as<int> (procsTo_[i]), tag, *comm_);
998 std::ostringstream os;
999 os << *prefix <<
"Posted send to Proc " << procsTo_[i] <<
" w/ tag "
1001 std::cerr << os.str();
1011 lengthsFrom_[numReceives_-1] = lengthsTo_[i];
1012 procsFrom_[numReceives_-1] = myRank;
1017 std::ostringstream os;
1018 const size_t numReq = requests.size();
1019 os << *prefix <<
"waitAll on " << numReq <<
" request"
1020 << (numReq != size_t(1) ?
"s" :
"") << endl;
1021 std::cerr << os.str();
1029 waitAll (*comm_, requests (), statuses ());
1030 for (
size_t i = 0; i < actualNumReceives; ++i) {
1031 lengthsFrom_[i] = *lengthsFromBuffers[i];
1032 procsFrom_[i] = statuses[i]->getSourceRank ();
1038 sort2 (procsFrom_.begin(), procsFrom_.end(), lengthsFrom_.begin());
1041 totalReceiveLength_ =
1042 std::accumulate (lengthsFrom_.begin (), lengthsFrom_.end (), 0);
1043 indicesFrom_.clear ();
1049 indicesFrom_.reserve (totalReceiveLength_);
1050 for (
size_t i = 0; i < totalReceiveLength_; ++i) {
1051 indicesFrom_.push_back(i);
1055 startsFrom_.clear ();
1056 startsFrom_.reserve (numReceives_);
1057 for (
size_t i = 0, j = 0; i < numReceives_; ++i) {
1058 startsFrom_.push_back(j);
1059 j += lengthsFrom_[i];
1067 std::ostringstream os;
1068 os << *prefix <<
"Done" << endl;
1069 std::cerr << os.str();
1077 using Teuchos::outArg;
1078 using Teuchos::REDUCE_MAX;
1079 using Teuchos::reduceAll;
1081 const char rawPrefix[] =
"Tpetra::Distributor::createFromSends";
1083 const size_t numExports = exportProcIDs.size();
1084 const int myProcID = comm_->getRank();
1085 const int numProcs = comm_->getSize();
1088 const size_t maxNumToPrint = verbose_ ?
1090 std::unique_ptr<std::string> prefix;
1092 prefix = createPrefix(
"createFromSends");
1093 std::ostringstream os;
1094 os << *prefix <<
"Start: ";
1098 std::cerr << os.str();
1149 Teuchos::Array<size_t> starts (numProcs + 1, 0);
1152 size_t numActive = 0;
1153 int needSendBuff = 0;
1156 for (
size_t i = 0; i < numExports; ++i) {
1157 const int exportID = exportProcIDs[i];
1158 if (exportID >= numProcs) {
1162 else if (exportID >= 0) {
1176 if (needSendBuff == 0 && starts[exportID] > 1 &&
1177 exportID != exportProcIDs[i-1]) {
1190 reduceAll<int, int> (*comm_, REDUCE_MAX, badID, outArg (gbl_badID));
1191 TEUCHOS_TEST_FOR_EXCEPTION
1192 (gbl_badID >= 0, std::runtime_error, rawPrefix <<
"Proc "
1193 << gbl_badID <<
", perhaps among other processes, got a bad "
1194 "send process ID.");
1210 #if defined(HAVE_TPETRA_THROW_EFFICIENCY_WARNINGS) || defined(HAVE_TPETRA_PRINT_EFFICIENCY_WARNINGS)
1212 int global_needSendBuff;
1213 reduceAll<int, int> (*comm_, REDUCE_MAX, needSendBuff,
1214 outArg (global_needSendBuff));
1216 global_needSendBuff != 0, std::runtime_error,
1217 "::createFromSends: Grouping export IDs together by process rank often "
1218 "improves performance.");
1223 std::ostringstream os;
1224 os << *prefix <<
"Detect whether I have a self message" << endl;
1225 std::cerr << os.str();
1230 if (starts[myProcID] != 0) {
1231 selfMessage_ =
true;
1234 selfMessage_ =
false;
1237 bool index_neq_numActive =
false;
1238 bool send_neq_numSends =
false;
1239 if (! needSendBuff) {
1241 std::ostringstream os;
1242 os << *prefix <<
"I don't need a send buffer or indicesTo_ "
1243 "(fast path)" << endl;
1244 std::cerr << os.str();
1250 for (
int i = 0; i < numProcs; ++i) {
1258 indicesTo_.resize(0);
1261 procsTo_.assign(numSends_,0);
1262 startsTo_.assign(numSends_,0);
1263 lengthsTo_.assign(numSends_,0);
1270 size_t index = 0, procIndex = 0;
1271 for (
size_t i = 0; i < numSends_; ++i) {
1272 while (exportProcIDs[procIndex] < 0) {
1275 startsTo_[i] = procIndex;
1276 int procID = exportProcIDs[procIndex];
1277 procsTo_[i] = procID;
1278 index += starts[procID];
1279 procIndex += starts[procID];
1281 if (index != numActive) {
1282 index_neq_numActive =
true;
1287 if (numSends_ > 0) {
1288 sort2(procsTo_.begin(), procsTo_.end(), startsTo_.begin());
1292 for (
size_t i = 0; i < numSends_; ++i) {
1293 int procID = procsTo_[i];
1294 lengthsTo_[i] = starts[procID];
1295 if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
1296 maxSendLength_ = lengthsTo_[i];
1302 std::ostringstream os;
1303 os << *prefix <<
"I need a send buffer & indicesTo_ "
1304 "(slow path)" << endl;
1305 std::cerr << os.str();
1313 if (starts[0] == 0 ) {
1319 for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
1321 i != starts.end(); ++i)
1323 if (*i != 0) ++numSends_;
1329 for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
1330 i=starts.rbegin()+1;
1331 i != starts.rend(); ++i)
1340 indicesTo_.resize(numActive);
1342 for (
size_t i = 0; i < numExports; ++i) {
1343 if (exportProcIDs[i] >= 0) {
1345 indicesTo_[starts[exportProcIDs[i]]] = i;
1347 ++starts[exportProcIDs[i]];
1359 for (
int proc = numProcs-1; proc != 0; --proc) {
1360 starts[proc] = starts[proc-1];
1363 starts[numProcs] = numActive;
1370 procsTo_.resize(numSends_);
1371 startsTo_.resize(numSends_);
1372 lengthsTo_.resize(numSends_);
1379 for (
int proc = 0; proc < numProcs; ++proc ) {
1380 if (starts[proc+1] != starts[proc]) {
1381 lengthsTo_[snd] = starts[proc+1] - starts[proc];
1382 startsTo_[snd] = starts[proc];
1384 if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
1385 maxSendLength_ = lengthsTo_[snd];
1387 procsTo_[snd] = proc;
1391 if (snd != numSends_) {
1392 send_neq_numSends =
true;
1397 (index_neq_numActive, std::logic_error,
1398 rawPrefix <<
"logic error. Please notify the Tpetra team.", *comm_);
1400 (send_neq_numSends, std::logic_error,
1401 rawPrefix <<
"logic error. Please notify the Tpetra team.", *comm_);
1406 std::ostringstream os;
1407 os << *prefix <<
"Sending self message; numSends "
1408 << numSends_ <<
" -> " << (numSends_ - 1) << endl;
1409 std::cerr << os.str();
1419 howInitialized_ = Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS;
1422 std::ostringstream os;
1423 os << *prefix <<
"Done; totalReceiveLength_="
1424 << totalReceiveLength_ << endl;
1425 std::cerr << os.str();
1427 return totalReceiveLength_;
1433 const Teuchos::ArrayView<const int>& remoteProcIDs)
1435 std::unique_ptr<std::string> prefix;
1437 prefix = createPrefix(
"createFromSendsAndRecvs");
1438 std::ostringstream os;
1439 os << *prefix <<
"Start" << std::endl;
1440 std::cerr << os.str();
1450 howInitialized_ = Tpetra::Details::DISTRIBUTOR_INITIALIZED_BY_CREATE_FROM_SENDS_N_RECVS;
1453 int myProcID = comm_->getRank ();
1454 int numProcs = comm_->getSize();
1456 const size_t numExportIDs = exportProcIDs.size();
1457 Teuchos::Array<size_t> starts (numProcs + 1, 0);
1459 size_t numActive = 0;
1460 int needSendBuff = 0;
1462 for(
size_t i = 0; i < numExportIDs; i++ )
1464 if( needSendBuff==0 && i && (exportProcIDs[i] < exportProcIDs[i-1]) )
1466 if( exportProcIDs[i] >= 0 )
1468 ++starts[ exportProcIDs[i] ];
1473 selfMessage_ = ( starts[myProcID] != 0 ) ? 1 : 0;
1479 if (starts[0] == 0 ) {
1485 for (Teuchos::Array<size_t>::iterator i=starts.begin()+1,
1487 i != starts.end(); ++i)
1489 if (*i != 0) ++numSends_;
1495 for (Teuchos::Array<size_t>::reverse_iterator ip1=starts.rbegin(),
1496 i=starts.rbegin()+1;
1497 i != starts.rend(); ++i)
1506 indicesTo_.resize(numActive);
1508 for (
size_t i = 0; i < numExportIDs; ++i) {
1509 if (exportProcIDs[i] >= 0) {
1511 indicesTo_[starts[exportProcIDs[i]]] = i;
1513 ++starts[exportProcIDs[i]];
1516 for (
int proc = numProcs-1; proc != 0; --proc) {
1517 starts[proc] = starts[proc-1];
1520 starts[numProcs] = numActive;
1521 procsTo_.resize(numSends_);
1522 startsTo_.resize(numSends_);
1523 lengthsTo_.resize(numSends_);
1526 for (
int proc = 0; proc < numProcs; ++proc ) {
1527 if (starts[proc+1] != starts[proc]) {
1528 lengthsTo_[snd] = starts[proc+1] - starts[proc];
1529 startsTo_[snd] = starts[proc];
1531 if ((proc != myProcID) && (lengthsTo_[snd] > maxSendLength_)) {
1532 maxSendLength_ = lengthsTo_[snd];
1534 procsTo_[snd] = proc;
1544 for (
int i = 0; i < numProcs; ++i) {
1552 indicesTo_.resize(0);
1555 procsTo_.assign(numSends_,0);
1556 startsTo_.assign(numSends_,0);
1557 lengthsTo_.assign(numSends_,0);
1564 size_t index = 0, procIndex = 0;
1565 for (
size_t i = 0; i < numSends_; ++i) {
1566 while (exportProcIDs[procIndex] < 0) {
1569 startsTo_[i] = procIndex;
1570 int procID = exportProcIDs[procIndex];
1571 procsTo_[i] = procID;
1572 index += starts[procID];
1573 procIndex += starts[procID];
1578 if (numSends_ > 0) {
1579 sort2(procsTo_.begin(), procsTo_.end(), startsTo_.begin());
1583 for (
size_t i = 0; i < numSends_; ++i) {
1584 int procID = procsTo_[i];
1585 lengthsTo_[i] = starts[procID];
1586 if ((procID != myProcID) && (lengthsTo_[i] > maxSendLength_)) {
1587 maxSendLength_ = lengthsTo_[i];
1593 numSends_ -= selfMessage_;
1594 std::vector<int> recv_list;
1595 recv_list.reserve(numSends_);
1598 for(
int i=0; i<remoteProcIDs.size(); i++) {
1599 if(remoteProcIDs[i]>last_pid) {
1600 recv_list.push_back(remoteProcIDs[i]);
1601 last_pid = remoteProcIDs[i];
1603 else if (remoteProcIDs[i]<last_pid)
1604 throw std::runtime_error(
"Tpetra::Distributor:::createFromSendsAndRecvs expected RemotePIDs to be in sorted order");
1606 numReceives_ = recv_list.size();
1608 procsFrom_.assign(numReceives_,0);
1609 lengthsFrom_.assign(numReceives_,0);
1610 indicesFrom_.assign(numReceives_,0);
1611 startsFrom_.assign(numReceives_,0);
1613 for(
size_t i=0,j=0; i<numReceives_; ++i) {
1615 procsFrom_[i] = recv_list[i];
1617 for( ; j<(size_t)remoteProcIDs.size() &&
1618 remoteProcIDs[jlast]==remoteProcIDs[j] ; j++){;}
1619 lengthsFrom_[i] = j-jlast;
1621 totalReceiveLength_ = remoteProcIDs.size();
1622 indicesFrom_.clear ();
1628 indicesFrom_.reserve (totalReceiveLength_);
1629 for (
size_t i = 0; i < totalReceiveLength_; ++i) {
1630 indicesFrom_.push_back(i);
1633 numReceives_-=selfMessage_;
1636 std::ostringstream os;
1637 os << *prefix <<
"Done" << std::endl;
1638 std::cerr << os.str();
Declaration of Tpetra::Details::Behavior, a class that describes Tpetra's behavior.
Declaration of a function that prints strings from each process.
Stand-alone utility functions and macros.
#define TPETRA_EFFICIENCY_WARNING(throw_exception_test, Exception, msg)
Print or throw an efficency warning.
#define SHARED_TEST_FOR_EXCEPTION(throw_exception_test, Exception, msg, comm)
Test for exception, with reduction over the given communicator.
static bool debug()
Whether Tpetra is in debug mode.
static bool verbose()
Whether Tpetra is in verbose mode.
static size_t verbosePrintCountThreshold()
Number of entries below which arrays, lists, etc. will be printed in debug mode.
Sets up and executes a communication plan for a Tpetra DistObject.
size_t getMaxSendLength() const
Maximum number of values this process will send to another single process.
Teuchos::RCP< Distributor > getReverse(bool create=true) const
A reverse communication plan Distributor.
Teuchos::ArrayView< const int > getProcsTo() const
Ranks of the processes to which this process will send values.
size_t getNumReceives() const
The number of processes from which we will receive data.
void setParameterList(const Teuchos::RCP< Teuchos::ParameterList > &plist)
Set Distributor parameters.
size_t getTotalReceiveLength() const
Total number of values this process will receive from other processes.
bool hasSelfMessage() const
Whether the calling process will send or receive messages to itself.
void swap(Distributor &rhs)
Swap the contents of rhs with those of *this.
Teuchos::ArrayView< const size_t > getLengthsTo() const
Number of values this process will send to each process.
Teuchos::ArrayView< const int > getProcsFrom() const
Ranks of the processes sending values to this process.
Distributor(const Teuchos::RCP< const Teuchos::Comm< int > > &comm)
Construct using the specified communicator and default parameters.
std::string description() const
Return a one-line description of this object.
size_t createFromSends(const Teuchos::ArrayView< const int > &exportProcIDs)
Set up Distributor using list of process ranks to which this process will send.
void createFromSendsAndRecvs(const Teuchos::ArrayView< const int > &exportProcIDs, const Teuchos::ArrayView< const int > &remoteProcIDs)
Set up Distributor using list of process ranks to which to send, and list of process ranks from which...
Teuchos::RCP< const Teuchos::ParameterList > getValidParameters() const
List of valid Distributor parameters.
Teuchos::ArrayView< const size_t > getLengthsFrom() const
Number of values this process will receive from each process.
size_t getNumSends() const
The number of processes to which we will send data.
void describe(Teuchos::FancyOStream &out, const Teuchos::EVerbosityLevel verbLevel=Teuchos::Describable::verbLevel_default) const
Describe this object in a human-readable way to the given output stream.
Implementation details of Tpetra.
void verbosePrintArray(std::ostream &out, const ArrayType &x, const char name[], const size_t maxNumToPrint)
Print min(x.size(), maxNumToPrint) entries of x.
std::string DistributorSendTypeEnumToString(EDistributorSendType sendType)
Convert an EDistributorSendType enum value to a string.
EDistributorSendType
The type of MPI send that Distributor should use.
std::unique_ptr< std::string > createPrefix(const int myRank, const char prefix[])
Create string prefix for each line of verbose output.
EDistributorHowInitialized
Enum indicating how and whether a Distributor was initialized.
std::string DistributorHowInitializedEnumToString(EDistributorHowInitialized how)
Convert an EDistributorHowInitialized enum value to a string.
void gathervPrint(std::ostream &out, const std::string &s, const Teuchos::Comm< int > &comm)
On Process 0 in the given communicator, print strings from each process in that communicator,...
Namespace Tpetra contains the class and methods constituting the Tpetra library.
Teuchos::Array< std::string > distributorSendTypes()
Valid values for Distributor's "Send type" parameter.
void sort2(const IT1 &first1, const IT1 &last1, const IT2 &first2)
Sort the first array, and apply the resulting permutation to the second array.