14#include "arccore/message_passing_mpi/internal/MpiAdapter.h"
16#include "arccore/trace/ITraceMng.h"
18#include "arccore/collections/Array.h"
20#include "arccore/message_passing/Request.h"
21#include "arccore/message_passing/IStat.h"
22#include "arccore/message_passing/internal/SubRequestCompletionInfo.h"
24#include "arccore/base/IStackTraceService.h"
25#include "arccore/base/TimeoutException.h"
26#include "arccore/base/String.h"
27#include "arccore/base/NotImplementedException.h"
28#include "arccore/base/PlatformUtils.h"
29#include "arccore/base/FatalErrorException.h"
30#include "arccore/base/TraceInfo.h"
32#include "arccore/message_passing_mpi/StandaloneMpiMessagePassingMng.h"
33#include "arccore/message_passing_mpi/internal/MpiLock.h"
34#include "arccore/message_passing_mpi/internal/NoMpiProfiling.h"
35#include "arccore/message_passing_mpi/internal/MpiRequest.h"
42namespace Arcane::MessagePassing::Mpi
58 typedef std::map<MPI_Request,RequestInfo>::iterator Iterator;
67 m_request_error_is_fatal =
true;
70 m_is_report_error_in_request =
false;
72 m_use_trace_full_stack =
true;
74 m_trace_mpirequest =
true;
77 void addRequest(MPI_Request request)
81 if (m_trace_mpirequest)
82 info() <<
"MpiAdapter: AddRequest r=" << request;
85 void addRequest(MPI_Request request,
const TraceInfo& ti)
89 if (m_trace_mpirequest)
90 info() <<
"MpiAdapter: AddRequest r=" << request;
91 _addRequest(request,ti);
93 void removeRequest(MPI_Request request)
97 if (m_trace_mpirequest)
98 info() <<
"MpiAdapter: RemoveRequest r=" << request;
99 _removeRequest(request);
101 void removeRequest(Iterator request_iter)
105 if (request_iter==m_allocated_requests.end()){
106 if (m_trace_mpirequest)
107 info() <<
"MpiAdapter: RemoveRequestIter null iterator";
110 if (m_trace_mpirequest)
111 info() <<
"MpiAdapter: RemoveRequestIter r=" << request_iter->first;
112 m_allocated_requests.erase(request_iter);
118 return m_allocated_requests.end();
120 if (_isEmptyRequest(request))
121 return m_allocated_requests.end();
122 auto ireq = m_allocated_requests.find(request);
123 if (ireq==m_allocated_requests.end()){
124 if (m_is_report_error_in_request || m_request_error_is_fatal){
125 error() <<
"MpiAdapter::testRequest() request not referenced "
126 <<
" id=" << request;
127 _checkFatalInRequest();
136 void _addRequest(MPI_Request request,
const TraceInfo& trace_info)
138 if (request==MPI_REQUEST_NULL){
139 if (m_is_report_error_in_request || m_request_error_is_fatal){
140 error() <<
"MpiAdapter::_addRequest() trying to add null request";
141 _checkFatalInRequest();
145 if (_isEmptyRequest(request))
147 ++m_total_added_request;
149 auto i = m_allocated_requests.find(request);
150 if (i!=m_allocated_requests.end()){
151 if (m_is_report_error_in_request || m_request_error_is_fatal){
152 error() <<
"MpiAdapter::_addRequest() request already referenced "
153 <<
" id=" << request;
154 _checkFatalInRequest();
159 rinfo.m_trace = trace_info;
160 if (m_use_trace_full_stack)
162 m_allocated_requests.insert(std::make_pair(request,rinfo));
168 void _removeRequest(MPI_Request request)
171 if (request==MPI_REQUEST_NULL){
172 if (m_is_report_error_in_request || m_request_error_is_fatal){
173 error() <<
"MpiAdapter::_removeRequest() null request (" << MPI_REQUEST_NULL <<
")";
174 _checkFatalInRequest();
178 if (_isEmptyRequest(request))
180 auto i = m_allocated_requests.find(request);
181 if (i==m_allocated_requests.end()){
182 if (m_is_report_error_in_request || m_request_error_is_fatal){
183 error() <<
"MpiAdapter::_removeRequest() request not referenced "
184 <<
" id=" << request;
185 _checkFatalInRequest();
189 m_allocated_requests.erase(i);
192 void _checkFatalInRequest()
194 if (m_request_error_is_fatal)
195 ARCCORE_FATAL(
"Error in requests management");
197 Int64 nbRequest()
const {
return m_allocated_requests.size(); }
198 Int64 totalAddedRequest()
const {
return m_total_added_request; }
199 void printRequests()
const
201 info() <<
"PRINT REQUESTS\n";
202 for(
auto& x : m_allocated_requests ){
203 info() <<
"Request id=" << x.first <<
" trace=" << x.second.m_trace
204 <<
" stack=" << x.second.m_stack_trace;
207 void setEmptyRequests(MPI_Request r1,MPI_Request r2)
209 m_empty_request1 = r1;
210 m_empty_request2 = r2;
213 bool m_request_error_is_fatal =
false;
214 bool m_is_report_error_in_request =
true;
215 bool m_trace_mpirequest =
false;
219 std::map<MPI_Request,RequestInfo> m_allocated_requests;
220 bool m_use_trace_full_stack =
false;
221 MPI_Request m_empty_request1 = MPI_REQUEST_NULL;
222 MPI_Request m_empty_request2 = MPI_REQUEST_NULL;
223 Int64 m_total_added_request = 0;
226 bool _isEmptyRequest(MPI_Request r)
const
228 return (r==m_empty_request1 || r==m_empty_request2);
232#define ARCCORE_ADD_REQUEST(request)\
233 m_request_set->addRequest(request,A_FUNCINFO);
240int _checkSize(
Int64 i64_size)
242 if (i64_size>INT32_MAX)
243 ARCCORE_FATAL(
"Can not convert '{0}' to type integer",i64_size);
244 return (
int)i64_size;
253 MpiLock* mpi_lock, IMpiProfiling* mpi_op)
256, m_mpi_lock(mpi_lock)
258, m_communicator(comm)
261, m_empty_request1(MPI_REQUEST_NULL)
262, m_empty_request2(MPI_REQUEST_NULL)
264 m_request_set =
new RequestSet(trace);
270 if (s ==
"1" || s ==
"TRUE")
271 m_is_allow_null_rank_for_any_source =
true;
272 if (s ==
"0" || s ==
"FALSE")
273 m_is_allow_null_rank_for_any_source =
false;
276 ::MPI_Comm_rank(m_communicator,&m_comm_rank);
277 ::MPI_Comm_size(m_communicator,&m_comm_size);
281 m_mpi_prof =
new NoMpiProfiling();
294 MPI_Irecv(m_recv_buffer_for_empty_request, 1, MPI_CHAR, MPI_PROC_NULL,
295 50505, m_communicator, &m_empty_request1);
304 m_send_buffer_for_empty_request2[0] = 0;
305 MPI_Isend(m_send_buffer_for_empty_request2, 1, MPI_CHAR, m_comm_rank,
306 50505, m_communicator, &m_empty_request2);
308 MPI_Recv(m_recv_buffer_for_empty_request2, 1, MPI_CHAR, m_comm_rank,
309 50505, m_communicator, MPI_STATUS_IGNORE);
311 m_request_set->setEmptyRequests(m_empty_request1,m_empty_request2);
320 if (m_empty_request1 != MPI_REQUEST_NULL)
321 MPI_Request_free(&m_empty_request1);
322 if (m_empty_request2 != MPI_REQUEST_NULL)
323 MPI_Request_free(&m_empty_request2);
325 delete m_request_set;
333buildRequest(
int ret,MPI_Request mpi_request)
335 return MpiRequest(ret,
this,mpi_request);
344 Int64 nb_request = m_request_set->nbRequest();
349 warning() <<
" Pending mpi requests size=" << nb_request;
350 m_request_set->printRequests();
351 _checkFatalInRequest();
361 _checkHasNoRequests();
369setRequestErrorAreFatal(
bool v)
371 m_request_set->m_request_error_is_fatal = v;
374isRequestErrorAreFatal()
const
376 return m_request_set->m_request_error_is_fatal;
380setPrintRequestError(
bool v)
382 m_request_set->m_is_report_error_in_request = v;
385isPrintRequestError()
const
387 return m_request_set->m_is_report_error_in_request;
391setCheckRequest(
bool v)
393 m_request_set->m_no_check_request = !v;
397isCheckRequest()
const
399 return !m_request_set->m_no_check_request;
406toMPISize(
Int64 count)
408 return _checkSize(count);
415_trace(
const char* function)
420 info() <<
"MPI_TRACE: " << function <<
"\n" << stack_service->stackTrace().toString();
422 info() <<
"MPI_TRACE: " << function;
430broadcast(
void* buf,
Int64 nb_elem,
Int32 root,MPI_Datatype datatype)
432 int _nb_elem = _checkSize(nb_elem);
433 _trace(MpiInfo(eMpiName::Bcast).name().localstr());
434 double begin_time = MPI_Wtime();
436 info() <<
"MPI_TRACE: MPI broadcast: before"
438 <<
" nb_elem=" << nb_elem
440 <<
" datatype=" << datatype;
442 m_mpi_prof->broadcast(buf, _nb_elem, datatype, root, m_communicator);
443 double end_time = MPI_Wtime();
444 double sr_time = (end_time-begin_time);
446 m_stat->add(MpiInfo(eMpiName::Bcast).name(),sr_time,0);
453nonBlockingBroadcast(
void* buf,
Int64 nb_elem,
Int32 root,MPI_Datatype datatype)
455 MPI_Request mpi_request = MPI_REQUEST_NULL;
457 int _nb_elem = _checkSize(nb_elem);
458 _trace(
" MPI_Bcast");
459 double begin_time = MPI_Wtime();
460 ret = MPI_Ibcast(buf,_nb_elem,datatype,root,m_communicator,&mpi_request);
461 double end_time = MPI_Wtime();
462 double sr_time = (end_time-begin_time);
464 m_stat->add(
"IBroadcast",sr_time,0);
465 ARCCORE_ADD_REQUEST(mpi_request);
466 return buildRequest(ret,mpi_request);
473gather(
const void* send_buf,
void* recv_buf,
Int64 nb_elem,
Int32 root,MPI_Datatype datatype)
475 void* _sbuf =
const_cast<void*
>(send_buf);
476 int _nb_elem = _checkSize(nb_elem);
477 int _root =
static_cast<int>(root);
478 _trace(MpiInfo(eMpiName::Gather).name().localstr());
479 double begin_time = MPI_Wtime();
480 m_mpi_prof->gather(_sbuf, _nb_elem, datatype, recv_buf, _nb_elem, datatype, _root, m_communicator);
481 double end_time = MPI_Wtime();
482 double sr_time = (end_time-begin_time);
484 m_stat->add(MpiInfo(eMpiName::Gather).name(),sr_time,0);
491nonBlockingGather(
const void* send_buf,
void* recv_buf,
492 Int64 nb_elem,
Int32 root,MPI_Datatype datatype)
494 MPI_Request mpi_request = MPI_REQUEST_NULL;
496 void* _sbuf =
const_cast<void*
>(send_buf);
497 int _nb_elem = _checkSize(nb_elem);
498 int _root =
static_cast<int>(root);
499 _trace(
"MPI_Igather");
500 double begin_time = MPI_Wtime();
501 ret = MPI_Igather(_sbuf,_nb_elem,datatype,recv_buf,_nb_elem,datatype,_root,
502 m_communicator,&mpi_request);
503 double end_time = MPI_Wtime();
504 double sr_time = (end_time-begin_time);
506 m_stat->add(
"IGather",sr_time,0);
507 ARCCORE_ADD_REQUEST(mpi_request);
508 return buildRequest(ret,mpi_request);
515allGather(
const void* send_buf,
void* recv_buf,
516 Int64 nb_elem,MPI_Datatype datatype)
518 void* _sbuf =
const_cast<void*
>(send_buf);
519 int _nb_elem = _checkSize(nb_elem);
520 _trace(MpiInfo(eMpiName::Allgather).name().localstr());
521 double begin_time = MPI_Wtime();
522 m_mpi_prof->allGather(_sbuf, _nb_elem, datatype, recv_buf, _nb_elem, datatype, m_communicator);
523 double end_time = MPI_Wtime();
524 double sr_time = (end_time-begin_time);
526 m_stat->add(MpiInfo(eMpiName::Allgather).name(),sr_time,0);
533nonBlockingAllGather(
const void* send_buf,
void* recv_buf,
534 Int64 nb_elem,MPI_Datatype datatype)
536 MPI_Request mpi_request = MPI_REQUEST_NULL;
538 void* _sbuf =
const_cast<void*
>(send_buf);
539 int _nb_elem = _checkSize(nb_elem);
540 _trace(
"MPI_Iallgather");
541 double begin_time = MPI_Wtime();
542 ret = MPI_Iallgather(_sbuf,_nb_elem,datatype,recv_buf,_nb_elem,datatype,
543 m_communicator,&mpi_request);
544 double end_time = MPI_Wtime();
545 double sr_time = (end_time-begin_time);
547 m_stat->add(
"IAllGather",sr_time,0);
548 ARCCORE_ADD_REQUEST(mpi_request);
549 return buildRequest(ret,mpi_request);
556gatherVariable(
const void* send_buf,
void* recv_buf,
const int* recv_counts,
557 const int* recv_indexes,
Int64 nb_elem,
Int32 root,MPI_Datatype datatype)
559 void* _sbuf =
const_cast<void*
>(send_buf);
560 int _nb_elem = _checkSize(nb_elem);
561 int _root =
static_cast<int>(root);
562 _trace(MpiInfo(eMpiName::Gatherv).name().localstr());
563 double begin_time = MPI_Wtime();
564 m_mpi_prof->gatherVariable(_sbuf, _nb_elem, datatype, recv_buf, recv_counts, recv_indexes, datatype, _root, m_communicator);
565 double end_time = MPI_Wtime();
566 double sr_time = (end_time-begin_time);
568 m_stat->add(MpiInfo(eMpiName::Gatherv).name().localstr(),sr_time,0);
575allGatherVariable(
const void* send_buf,
void* recv_buf,
const int* recv_counts,
576 const int* recv_indexes,
Int64 nb_elem,MPI_Datatype datatype)
578 void* _sbuf =
const_cast<void*
>(send_buf);
579 int _nb_elem = _checkSize(nb_elem);
580 _trace(MpiInfo(eMpiName::Allgatherv).name().localstr());
585 double begin_time = MPI_Wtime();
586 m_mpi_prof->allGatherVariable(_sbuf, _nb_elem, datatype, recv_buf, recv_counts, recv_indexes, datatype, m_communicator);
587 double end_time = MPI_Wtime();
588 double sr_time = (end_time-begin_time);
590 m_stat->add(MpiInfo(eMpiName::Allgatherv).name().localstr(),sr_time,0);
597scatterVariable(
const void* send_buf,
const int* send_count,
const int* send_indexes,
598 void* recv_buf,
Int64 nb_elem,
Int32 root,MPI_Datatype datatype)
600 void* _sbuf =
const_cast<void*
>(send_buf);
601 int* _send_count =
const_cast<int*
>(send_count);
602 int* _send_indexes =
const_cast<int*
>(send_indexes);
603 int _nb_elem = _checkSize(nb_elem);
604 _trace(MpiInfo(eMpiName::Scatterv).name().localstr());
605 double begin_time = MPI_Wtime();
606 m_mpi_prof->scatterVariable(_sbuf,
615 double end_time = MPI_Wtime();
616 double sr_time = (end_time-begin_time);
618 m_stat->add(MpiInfo(eMpiName::Scatterv).name(),sr_time,0);
625allToAll(
const void* send_buf,
void* recv_buf,
Integer count,MPI_Datatype datatype)
627 void* _sbuf =
const_cast<void*
>(send_buf);
628 int icount = _checkSize(count);
629 _trace(MpiInfo(eMpiName::Alltoall).name().localstr());
630 double begin_time = MPI_Wtime();
631 m_mpi_prof->allToAll(_sbuf, icount, datatype, recv_buf, icount, datatype, m_communicator);
632 double end_time = MPI_Wtime();
633 double sr_time = (end_time-begin_time);
635 m_stat->add(MpiInfo(eMpiName::Alltoall).name().localstr(),sr_time,0);
642nonBlockingAllToAll(
const void* send_buf,
void* recv_buf,
Integer count,MPI_Datatype datatype)
644 MPI_Request mpi_request = MPI_REQUEST_NULL;
646 void* _sbuf =
const_cast<void*
>(send_buf);
647 int icount = _checkSize(count);
648 _trace(
"MPI_IAlltoall");
649 double begin_time = MPI_Wtime();
650 ret = MPI_Ialltoall(_sbuf,icount,datatype,recv_buf,icount,datatype,m_communicator,&mpi_request);
651 double end_time = MPI_Wtime();
652 double sr_time = (end_time-begin_time);
654 m_stat->add(
"IAllToAll",sr_time,0);
655 ARCCORE_ADD_REQUEST(mpi_request);
656 return buildRequest(ret,mpi_request);
663allToAllVariable(
const void* send_buf,
const int* send_counts,
664 const int* send_indexes,
void* recv_buf,
const int* recv_counts,
665 const int* recv_indexes,MPI_Datatype datatype)
667 void* _sbuf =
const_cast<void*
>(send_buf);
668 int* _send_counts =
const_cast<int*
>(send_counts);
669 int* _send_indexes =
const_cast<int*
>(send_indexes);
670 int* _recv_counts =
const_cast<int*
>(recv_counts);
671 int* _recv_indexes =
const_cast<int*
>(recv_indexes);
673 _trace(MpiInfo(eMpiName::Alltoallv).name().localstr());
674 double begin_time = MPI_Wtime();
675 m_mpi_prof->allToAllVariable(_sbuf, _send_counts, _send_indexes, datatype,
676 recv_buf, _recv_counts, _recv_indexes, datatype, m_communicator);
677 double end_time = MPI_Wtime();
678 double sr_time = (end_time-begin_time);
680 m_stat->add(MpiInfo(eMpiName::Alltoallv).name(),sr_time,0);
687nonBlockingAllToAllVariable(
const void* send_buf,
const int* send_counts,
688 const int* send_indexes,
void* recv_buf,
const int* recv_counts,
689 const int* recv_indexes,MPI_Datatype datatype)
691 MPI_Request mpi_request = MPI_REQUEST_NULL;
693 void* _sbuf =
const_cast<void*
>(send_buf);
694 int* _send_counts =
const_cast<int*
>(send_counts);
695 int* _send_indexes =
const_cast<int*
>(send_indexes);
696 int* _recv_counts =
const_cast<int*
>(recv_counts);
697 int* _recv_indexes =
const_cast<int*
>(recv_indexes);
699 _trace(
"MPI_Ialltoallv");
700 double begin_time = MPI_Wtime();
701 ret = MPI_Ialltoallv(_sbuf,_send_counts,_send_indexes,datatype,
702 recv_buf,_recv_counts,_recv_indexes,datatype,
703 m_communicator,&mpi_request);
704 double end_time = MPI_Wtime();
705 double sr_time = (end_time-begin_time);
707 m_stat->add(
"IAllToAll",sr_time,0);
708 ARCCORE_ADD_REQUEST(mpi_request);
709 return buildRequest(ret,mpi_request);
722 MPI_Barrier(m_communicator);
731 MPI_Request mpi_request = MPI_REQUEST_NULL;
733 ret = MPI_Ibarrier(m_communicator,&mpi_request);
734 ARCCORE_ADD_REQUEST(mpi_request);
735 return buildRequest(ret,mpi_request);
742allReduce(
const void* send_buf,
void* recv_buf,
Int64 count,MPI_Datatype datatype,MPI_Op op)
744 void* _sbuf =
const_cast<void*
>(send_buf);
745 int _n = _checkSize(count);
746 double begin_time = MPI_Wtime();
747 _trace(MpiInfo(eMpiName::Allreduce).name().localstr());
750 m_mpi_prof->allReduce(_sbuf, recv_buf, _n, datatype, op, m_communicator);
752 catch(TimeoutException& ex)
754 std::ostringstream ostr;
755 ostr <<
"MPI_Allreduce"
756 <<
" send_buf=" << send_buf
757 <<
" recv_buf=" << recv_buf
759 <<
" datatype=" << datatype
761 <<
" NB=" << m_nb_all_reduce;
762 ex.setAdditionalInfo(ostr.str());
765 double end_time = MPI_Wtime();
766 m_stat->add(MpiInfo(eMpiName::Allreduce).name(),end_time-begin_time,count);
773nonBlockingAllReduce(
const void* send_buf,
void* recv_buf,
Int64 count,MPI_Datatype datatype,MPI_Op op)
775 MPI_Request mpi_request = MPI_REQUEST_NULL;
777 void* _sbuf =
const_cast<void*
>(send_buf);
778 int _n = _checkSize(count);
779 double begin_time = MPI_Wtime();
780 _trace(
"MPI_IAllreduce");
781 ret = MPI_Iallreduce(_sbuf,recv_buf,_n,datatype,op,m_communicator,&mpi_request);
782 double end_time = MPI_Wtime();
783 m_stat->add(
"IReduce",end_time-begin_time,_n);
784 ARCCORE_ADD_REQUEST(mpi_request);
785 return buildRequest(ret,mpi_request);
792reduce(
const void* send_buf,
void* recv_buf,
Int64 count,MPI_Datatype datatype,MPI_Op op,
Integer root)
794 void* _sbuf =
const_cast<void*
>(send_buf);
795 int _n = _checkSize(count);
796 int _root =
static_cast<int>(root);
797 double begin_time = MPI_Wtime();
798 _trace(MpiInfo(eMpiName::Reduce).name().localstr());
801 m_mpi_prof->reduce(_sbuf, recv_buf, _n, datatype, op, _root, m_communicator);
803 catch(TimeoutException& ex)
805 std::ostringstream ostr;
807 <<
" send_buf=" << send_buf
808 <<
" recv_buf=" << recv_buf
810 <<
" datatype=" << datatype
813 <<
" NB=" << m_nb_reduce;
814 ex.setAdditionalInfo(ostr.str());
818 double end_time = MPI_Wtime();
819 m_stat->add(MpiInfo(eMpiName::Reduce).name(),end_time-begin_time,0);
826scan(
const void* send_buf,
void* recv_buf,
Int64 count,MPI_Datatype datatype,MPI_Op op)
828 void* _sbuf =
const_cast<void*
>(send_buf);
829 int _n = _checkSize(count);
830 double begin_time = MPI_Wtime();
831 _trace(MpiInfo(eMpiName::Scan).name().localstr());
832 m_mpi_prof->scan(_sbuf, recv_buf, _n, datatype, op, m_communicator);
833 double end_time = MPI_Wtime();
834 m_stat->add(MpiInfo(eMpiName::Scan).name(),end_time-begin_time,count);
841directSendRecv(
const void* send_buffer,
Int64 send_buffer_size,
842 void* recv_buffer,
Int64 recv_buffer_size,
843 Int32 proc,
Int64 elem_size,MPI_Datatype data_type)
845 void* v_send_buffer =
const_cast<void*
>(send_buffer);
846 MPI_Status mpi_status;
847 double begin_time = MPI_Wtime();
848 _trace(MpiInfo(eMpiName::Sendrecv).name().localstr());
849 int sbuf_size = _checkSize(send_buffer_size);
850 int rbuf_size = _checkSize(recv_buffer_size);
851 m_mpi_prof->sendRecv(v_send_buffer, sbuf_size, data_type, proc, 99,
852 recv_buffer, rbuf_size, data_type, proc, 99,
853 m_communicator, &mpi_status);
854 double end_time = MPI_Wtime();
855 Int64 send_size = send_buffer_size * elem_size;
856 Int64 recv_size = recv_buffer_size * elem_size;
857 double sr_time = (end_time-begin_time);
861 m_stat->add(MpiInfo(eMpiName::Sendrecv).name(),sr_time,send_size+recv_size);
868sendNonBlockingNoStat(
const void* send_buffer,
Int64 send_buffer_size,
869 Int32 dest_rank,MPI_Datatype data_type,
int mpi_tag)
871 void* v_send_buffer =
const_cast<void*
>(send_buffer);
872 MPI_Request mpi_request = MPI_REQUEST_NULL;
873 int sbuf_size = _checkSize(send_buffer_size);
875 m_mpi_prof->iSend(v_send_buffer, sbuf_size, data_type, dest_rank, mpi_tag, m_communicator, &mpi_request);
877 info() <<
" ISend ret=" << ret <<
" proc=" << dest_rank <<
" tag=" << mpi_tag <<
" request=" << mpi_request;
878 ARCCORE_ADD_REQUEST(mpi_request);
879 return buildRequest(ret,mpi_request);
886directSend(
const void* send_buffer,
Int64 send_buffer_size,
887 Int32 proc,
Int64 elem_size,MPI_Datatype data_type,
888 int mpi_tag,
bool is_blocked
891 void* v_send_buffer =
const_cast<void*
>(send_buffer);
892 MPI_Request mpi_request = MPI_REQUEST_NULL;
894 double begin_time = 0.0;
895 double end_time = 0.0;
896 Int64 send_size = send_buffer_size * elem_size;
899 info() <<
"MPI_TRACE: MPI Send: send before"
900 <<
" size=" << send_size
902 <<
" tag=" << mpi_tag
903 <<
" datatype=" << data_type
904 <<
" blocking " << is_blocked;
912 MpiLock::Section mls(m_mpi_lock);
913 begin_time = MPI_Wtime();
914 int sbuf_size = _checkSize(send_buffer_size);
915 m_mpi_prof->iSend(v_send_buffer, sbuf_size, data_type, proc, mpi_tag, m_communicator, &mpi_request);
918 MPI_Status mpi_status;
919 while (is_finished==0){
920 MpiLock::Section mls(m_mpi_lock);
921 MPI_Request_get_status(mpi_request,&is_finished,&mpi_status);
923 m_mpi_prof->wait(&mpi_request, (MPI_Status *) MPI_STATUS_IGNORE);
924 end_time = MPI_Wtime();
925 mpi_request = MPI_REQUEST_NULL;
930 MpiLock::Section mls(m_mpi_lock);
931 begin_time = MPI_Wtime();
932 int sbuf_size = _checkSize(send_buffer_size);
933 m_mpi_prof->send(v_send_buffer, sbuf_size, data_type, proc, mpi_tag, m_communicator);
934 end_time = MPI_Wtime();
939 MpiLock::Section mls(m_mpi_lock);
940 begin_time = MPI_Wtime();
941 int sbuf_size = _checkSize(send_buffer_size);
942 m_mpi_prof->iSend(v_send_buffer, sbuf_size, data_type, proc, mpi_tag, m_communicator, &mpi_request);
944 info() <<
" ISend ret=" << ret <<
" proc=" << proc <<
" tag=" << mpi_tag <<
" request=" << mpi_request;
945 end_time = MPI_Wtime();
946 ARCCORE_ADD_REQUEST(mpi_request);
949 info() <<
"MPI Send: send after"
950 <<
" request=" << mpi_request;
953 double sr_time = (end_time-begin_time);
955 debug(
Trace::High) <<
"MPI Send: send " << send_size
956 <<
" time " << sr_time <<
" blocking " << is_blocked;
958 m_stat->add(MpiInfo(eMpiName::Send).name(),end_time-begin_time,send_size);
959 return buildRequest(ret,mpi_request);
966directSendPack(
const void* send_buffer,
Int64 send_buffer_size,
967 Int32 proc,
int mpi_tag,
bool is_blocked)
969 return directSend(send_buffer,send_buffer_size,proc,1,MPI_PACKED,mpi_tag,is_blocked);
975MpiMessagePassingMng* MpiAdapter::
980 MPI_Comm_split(m_communicator, (keep) ? 1 : MPI_UNDEFINED, commRank(), &new_comm);
992receiveNonBlockingNoStat(
void* recv_buffer,
Int64 recv_buffer_size,
993 Int32 source_rank,MPI_Datatype data_type,
int mpi_tag)
995 int rbuf_size = _checkSize(recv_buffer_size);
997 MPI_Request mpi_request = MPI_REQUEST_NULL;
998 m_mpi_prof->iRecv(recv_buffer, rbuf_size, data_type, source_rank, mpi_tag, m_communicator, &mpi_request);
999 ARCCORE_ADD_REQUEST(mpi_request);
1000 return buildRequest(ret,mpi_request);
1007directRecv(
void* recv_buffer,
Int64 recv_buffer_size,
1008 Int32 proc,
Int64 elem_size,MPI_Datatype data_type,
1009 int mpi_tag,
bool is_blocked)
1011 MPI_Status mpi_status;
1012 MPI_Request mpi_request = MPI_REQUEST_NULL;
1014 double begin_time = 0.0;
1015 double end_time = 0.0;
1018 if (proc==A_PROC_NULL_RANK)
1019 ARCCORE_THROW(NotImplementedException,
"Receive with MPI_PROC_NULL");
1020 if (proc == A_NULL_RANK && !m_is_allow_null_rank_for_any_source)
1021 ARCCORE_FATAL(
"Can not use A_NULL_RANK for any source. Use A_ANY_SOURCE_RANK instead");
1022 if (proc==A_NULL_RANK || proc==A_ANY_SOURCE_RANK)
1023 i_proc = MPI_ANY_SOURCE;
1025 i_proc =
static_cast<int>(proc);
1027 Int64 recv_size = recv_buffer_size * elem_size;
1029 info() <<
"MPI_TRACE: MPI Recv: recv before "
1030 <<
" size=" << recv_size
1031 <<
" from=" << i_proc
1032 <<
" tag=" << mpi_tag
1033 <<
" datatype=" << data_type
1034 <<
" blocking=" << is_blocked;
1043 MpiLock::Section mls(m_mpi_lock);
1044 begin_time = MPI_Wtime();
1045 int rbuf_size = _checkSize(recv_buffer_size);
1046 m_mpi_prof->iRecv(recv_buffer, rbuf_size, data_type, i_proc, mpi_tag, m_communicator, &mpi_request);
1048 int is_finished = 0;
1049 MPI_Status mpi_status;
1050 while (is_finished==0){
1051 MpiLock::Section mls(m_mpi_lock);
1052 MPI_Request_get_status(mpi_request,&is_finished,&mpi_status);
1053 if (is_finished!=0){
1054 end_time = MPI_Wtime();
1055 m_mpi_prof->wait(&mpi_request, (MPI_Status *) MPI_STATUS_IGNORE);
1056 mpi_request = MPI_REQUEST_NULL;
1061 MpiLock::Section mls(m_mpi_lock);
1062 begin_time = MPI_Wtime();
1063 int rbuf_size = _checkSize(recv_buffer_size);
1064 m_mpi_prof->recv(recv_buffer, rbuf_size, data_type, i_proc, mpi_tag, m_communicator, &mpi_status);
1065 end_time = MPI_Wtime();
1070 MpiLock::Section mls(m_mpi_lock);
1071 begin_time = MPI_Wtime();
1072 int rbuf_size = _checkSize(recv_buffer_size);
1073 m_mpi_prof->iRecv(recv_buffer, rbuf_size, data_type, i_proc, mpi_tag, m_communicator, &mpi_request);
1074 end_time = MPI_Wtime();
1075 ARCCORE_ADD_REQUEST(mpi_request);
1078 info() <<
"MPI Recv: recv after "
1079 <<
" request=" << mpi_request;
1082 double sr_time = (end_time-begin_time);
1084 debug(
Trace::High) <<
"MPI Recv: recv after " << recv_size
1085 <<
" time " << sr_time <<
" blocking " << is_blocked;
1086 m_stat->add(MpiInfo(eMpiName::Recv).name(),end_time-begin_time,recv_size);
1087 return buildRequest(ret,mpi_request);
1094probeRecvPack(UniqueArray<Byte>& recv_buffer,
Int32 proc)
1096 double begin_time = MPI_Wtime();
1098 int recv_buffer_size = 0;
1099 _trace(
"MPI_Probe");
1100 m_mpi_prof->probe(proc, 101, m_communicator, &status);
1101 m_mpi_prof->getCount(&status, MPI_PACKED, &recv_buffer_size);
1103 recv_buffer.resize(recv_buffer_size);
1104 m_mpi_prof->recv(recv_buffer.data(), recv_buffer_size, MPI_PACKED, proc, 101, m_communicator, &status);
1106 double end_time = MPI_Wtime();
1107 Int64 recv_size = recv_buffer_size;
1108 double sr_time = (end_time-begin_time);
1109 debug(
Trace::High) <<
"MPI probeRecvPack " << recv_size
1110 <<
" time " << sr_time;
1111 m_stat->add(MpiInfo(eMpiName::Recv).name(),end_time-begin_time,recv_size);
1117MessageSourceInfo MpiAdapter::
1118_buildSourceInfoFromStatus(
const MPI_Status& mpi_status)
1121 MPI_Count message_size = 0;
1122 MPI_Get_elements_x(&mpi_status,MPI_BYTE,&message_size);
1123 MessageTag tag(mpi_status.MPI_TAG);
1124 MessageRank rank(mpi_status.MPI_SOURCE);
1125 return MessageSourceInfo(rank,tag,message_size);
1131MessageId MpiAdapter::
1132_probeMessage(MessageRank source,MessageTag tag,
bool is_blocking)
1134 MPI_Status mpi_status;
1135 int has_message = 0;
1136 MPI_Message message;
1138 int mpi_source = source.value();
1139 if (source.isProcNull())
1140 ARCCORE_THROW(NotImplementedException,
"Probe with MPI_PROC_NULL");
1141 if (source.isNull() && !m_is_allow_null_rank_for_any_source)
1142 ARCCORE_FATAL(
"Can not use MPI_Mprobe with null rank. Use MessageRank::anySourceRank() instead");
1143 if (source.isNull() || source.isAnySource())
1144 mpi_source = MPI_ANY_SOURCE;
1145 int mpi_tag = tag.value();
1147 mpi_tag = MPI_ANY_TAG;
1149 ret = MPI_Mprobe(mpi_source,mpi_tag,m_communicator,&message,&mpi_status);
1153 ret = MPI_Improbe(mpi_source, mpi_tag, m_communicator, &has_message, &message, &mpi_status);
1156 ARCCORE_FATAL(
"Error during call to MPI_Mprobe r={0}",ret);
1157 MessageId ret_message;
1158 if (has_message!=0){
1159 MessageSourceInfo si(_buildSourceInfoFromStatus(mpi_status));
1160 ret_message = MessageId(si,message);
1168MessageId MpiAdapter::
1169probeMessage(PointToPointMessageInfo message)
1171 if (!message.isValid())
1175 if (!message.isRankTag())
1176 ARCCORE_FATAL(
"Invalid message_info: message.isRankTag() is false");
1178 return _probeMessage(message.destinationRank(),message.tag(),message.isBlocking());
1184MessageSourceInfo MpiAdapter::
1185_legacyProbeMessage(MessageRank source,MessageTag tag,
bool is_blocking)
1187 MPI_Status mpi_status;
1188 int has_message = 0;
1190 int mpi_source = source.value();
1191 if (source.isProcNull())
1192 ARCCORE_THROW(NotImplementedException,
"Probe with MPI_PROC_NULL");
1193 if (source.isNull() && !m_is_allow_null_rank_for_any_source)
1194 ARCCORE_FATAL(
"Can not use MPI_Probe with null rank. Use MessageRank::anySourceRank() instead");
1195 if (source.isNull() || source.isAnySource())
1196 mpi_source = MPI_ANY_SOURCE;
1197 int mpi_tag = tag.value();
1199 mpi_tag = MPI_ANY_TAG;
1201 ret = MPI_Probe(mpi_source,mpi_tag,m_communicator,&mpi_status);
1205 ret = MPI_Iprobe(mpi_source,mpi_tag,m_communicator,&has_message,&mpi_status);
1207 ARCCORE_FATAL(
"Error during call to MPI_Mprobe r={0}",ret);
1209 return _buildSourceInfoFromStatus(mpi_status);
1216MessageSourceInfo MpiAdapter::
1217legacyProbeMessage(PointToPointMessageInfo message)
1219 if (!message.isValid())
1223 if (!message.isRankTag())
1224 ARCCORE_FATAL(
"Invalid message_info: message.isRankTag() is false");
1226 return _legacyProbeMessage(message.destinationRank(),message.tag(),message.isBlocking());
1233directRecv(
void* recv_buffer,
Int64 recv_buffer_size,
1234 MessageId message,
Int64 elem_size,MPI_Datatype data_type,
1237 MPI_Status mpi_status;
1238 MPI_Request mpi_request = MPI_REQUEST_NULL;
1239 MPI_Message mpi_message = (MPI_Message)message;
1241 double begin_time = 0.0;
1242 double end_time = 0.0;
1244 Int64 recv_size = recv_buffer_size * elem_size;
1246 info() <<
"MPI_TRACE: MPI Mrecv: recv before "
1247 <<
" size=" << recv_size
1248 <<
" from_msg=" << message
1249 <<
" datatype=" << data_type
1250 <<
" blocking=" << is_blocked;
1259 MpiLock::Section mls(m_mpi_lock);
1260 begin_time = MPI_Wtime();
1261 int rbuf_size = _checkSize(recv_buffer_size);
1262 MPI_Imrecv(recv_buffer,rbuf_size,data_type,&mpi_message,&mpi_request);
1265 int is_finished = 0;
1266 MPI_Status mpi_status;
1267 while (is_finished==0){
1268 MpiLock::Section mls(m_mpi_lock);
1269 MPI_Request_get_status(mpi_request,&is_finished,&mpi_status);
1270 if (is_finished!=0){
1271 end_time = MPI_Wtime();
1272 m_mpi_prof->wait(&mpi_request, (MPI_Status *) MPI_STATUS_IGNORE);
1273 mpi_request = MPI_REQUEST_NULL;
1278 MpiLock::Section mls(m_mpi_lock);
1279 begin_time = MPI_Wtime();
1280 int rbuf_size = _checkSize(recv_buffer_size);
1281 MPI_Mrecv(recv_buffer,rbuf_size,data_type,&mpi_message,&mpi_status);
1283 end_time = MPI_Wtime();
1288 MpiLock::Section mls(m_mpi_lock);
1289 begin_time = MPI_Wtime();
1290 int rbuf_size = _checkSize(recv_buffer_size);
1292 ret = MPI_Imrecv(recv_buffer,rbuf_size,data_type,&mpi_message,&mpi_request);
1294 end_time = MPI_Wtime();
1295 ARCCORE_ADD_REQUEST(mpi_request);
1298 info() <<
"MPI Recv: recv after "
1299 <<
" request=" << mpi_request;
1302 double sr_time = (end_time-begin_time);
1304 debug(
Trace::High) <<
"MPI Recv: recv after " << recv_size
1305 <<
" time " << sr_time <<
" blocking " << is_blocked;
1306 m_stat->add(MpiInfo(eMpiName::Recv).name(),end_time-begin_time,recv_size);
1307 return buildRequest(ret,mpi_request);
1314directRecvPack(
void* recv_buffer,
Int64 recv_buffer_size,
1315 Int32 proc,
int mpi_tag,
bool is_blocking)
1317 return directRecv(recv_buffer,recv_buffer_size,proc,1,MPI_PACKED,mpi_tag,is_blocking);
1325waitAllRequests(ArrayView<Request> requests)
1327 UniqueArray<bool> indexes(requests.size());
1328 UniqueArray<MPI_Status> mpi_status(requests.size());
1329 while (_waitAllRequestsMPI(requests, indexes, mpi_status)){
1339waitSomeRequests(ArrayView<Request> requests,
1340 ArrayView<bool> indexes,
1341 bool is_non_blocking)
1343 UniqueArray<MPI_Status> mpi_status(requests.size());
1344 waitSomeRequestsMPI(requests, indexes, mpi_status, is_non_blocking);
1355 , mpi_source_rank(source_rank)
1356 , mpi_source_tag(source_tag)
1361 int mpi_source_rank = MPI_PROC_NULL;
1362 int mpi_source_tag = 0;
1375 MpiLock::Section mls(m_mpi_lock);
1376 for(
Integer i=0; i<size; ++i ) {
1377 if (done_indexes[i]){
1383 if (r.hasSubRequest()){
1385 info() <<
"Done request with sub-request r=" << r <<
" mpi_r=" << r <<
" i=" << i
1386 <<
" source_rank=" << status[i].MPI_SOURCE
1387 <<
" source_tag=" << status[i].MPI_TAG;
1388 new_requests.
add(
SubRequestInfo(r.subRequest(), i, status[i].MPI_SOURCE, status[i].MPI_TAG));
1391 _removeRequest((MPI_Request)(r));
1401 bool has_new_request =
false;
1402 if (!new_requests.
empty()){
1404 UniqueArray<MPI_Status> old_status(size);
1407 for(
Integer i=0; i<size; ++i ){
1408 if (done_indexes[i]){
1409 old_status[i] = status[index];
1418 info() <<
"Before handle new request index=" << index
1419 <<
" sri.source_rank=" << sri.mpi_source_rank
1420 <<
" sri.source_tag=" << sri.mpi_source_tag;
1421 SubRequestCompletionInfo completion_info(MessageRank(old_status[index].MPI_SOURCE), MessageTag(old_status[index].MPI_TAG));
1422 Request r = sri.sub_request->executeOnCompletion(completion_info);
1424 info() <<
"Handle new request index=" << index <<
" old_r=" << requests[index] <<
" new_r=" << r;
1429 has_new_request =
true;
1430 requests[index] = r;
1431 done_indexes[index] =
false;
1436 for(
Integer i=0; i<size; ++i ){
1437 if (done_indexes[i]){
1438 status[index] = old_status[i];
1445 return has_new_request;
1452_waitAllRequestsMPI(ArrayView<Request> requests,
1453 ArrayView<bool> indexes,
1454 ArrayView<MPI_Status> mpi_status)
1456 Integer size = requests.size();
1460 UniqueArray<MPI_Request> mpi_request(size);
1461 for(
Integer i=0; i<size; ++i ){
1462 mpi_request[i] = (MPI_Request)(requests[i]);
1465 info() <<
" MPI_waitall begin size=" << size;
1466 double diff_time = 0.0;
1468 double begin_time = MPI_Wtime();
1469 for(
Integer i=0; i<size; ++i ){
1470 MPI_Request request = (MPI_Request)(mpi_request[i]);
1471 int is_finished = 0;
1472 while (is_finished==0){
1473 MpiLock::Section mls(m_mpi_lock);
1474 m_mpi_prof->test(&request, &is_finished, (MPI_Status *) MPI_STATUS_IGNORE);
1477 double end_time = MPI_Wtime();
1478 diff_time = end_time - begin_time;
1482 MpiLock::Section mls(m_mpi_lock);
1483 double begin_time = MPI_Wtime();
1484 m_mpi_prof->waitAll(size, mpi_request.data(), mpi_status.data());
1485 double end_time = MPI_Wtime();
1486 diff_time = end_time - begin_time;
1490 for(
Integer i=0; i<size; ++i ){
1494 bool has_new_request = _handleEndRequests(requests,indexes,mpi_status);
1496 info() <<
" MPI_waitall end size=" << size;
1497 m_stat->add(MpiInfo(eMpiName::Waitall).name(),diff_time,size);
1498 return has_new_request;
1505waitSomeRequestsMPI(ArrayView<Request> requests,ArrayView<bool> indexes,
1506 ArrayView<MPI_Status> mpi_status,
bool is_non_blocking)
1508 Integer size = requests.size();
1512 UniqueArray<MPI_Request> mpi_request(size);
1513 UniqueArray<MPI_Request> saved_mpi_request(size);
1514 UniqueArray<int> completed_requests(size);
1515 int nb_completed_request = 0;
1519 for (
Integer i = 0; i < size; ++i) {
1523 if (!requests[i].isValid()) {
1524 saved_mpi_request[i] = MPI_REQUEST_NULL;
1527 saved_mpi_request[i] =
static_cast<MPI_Request
>(requests[i]);
1533 bool is_print_debug = m_is_trace || (!is_non_blocking);
1535 debug() <<
"WaitRequestBegin is_non_blocking=" << is_non_blocking <<
" n=" << size;
1537 double begin_time = MPI_Wtime();
1540 if (is_non_blocking){
1541 _trace(MpiInfo(eMpiName::Testsome).name().localstr());
1543 MpiLock::Section mls(m_mpi_lock);
1544 m_mpi_prof->testSome(size, saved_mpi_request.data(), &nb_completed_request,
1545 completed_requests.data(), mpi_status.data());
1548 if (nb_completed_request == MPI_UNDEFINED)
1549 nb_completed_request = 0;
1551 debug() <<
"WaitSomeRequestMPI: TestSome nb_completed=" << nb_completed_request;
1554 _trace(MpiInfo(eMpiName::Waitsome).name().localstr());
1558 MpiLock::Section mls(m_mpi_lock);
1559 m_mpi_prof->waitSome(size, saved_mpi_request.data(), &nb_completed_request,
1560 completed_requests.data(), mpi_status.data());
1564 if (nb_completed_request == MPI_UNDEFINED)
1565 nb_completed_request = 0;
1567 debug() <<
"WaitSomeRequest nb_completed=" << nb_completed_request;
1570 catch(TimeoutException& ex)
1572 std::ostringstream ostr;
1573 if (is_non_blocking)
1574 ostr << MpiInfo(eMpiName::Testsome).name();
1576 ostr << MpiInfo(eMpiName::Waitsome).name();
1577 ostr <<
" size=" << size
1578 <<
" is_non_blocking=" << is_non_blocking;
1579 ex.setAdditionalInfo(ostr.str());
1583 for(
int z=0; z<nb_completed_request; ++z ){
1584 int index = completed_requests[z];
1586 debug() <<
"Completed my_rank=" << m_comm_rank <<
" z=" << z
1587 <<
" index=" << index
1588 <<
" tag=" << mpi_status[z].MPI_TAG
1589 <<
" source=" << mpi_status[z].MPI_SOURCE;
1591 indexes[index] =
true;
1594 bool has_new_request = _handleEndRequests(requests,indexes,mpi_status);
1595 if (has_new_request){
1600 double end_time = MPI_Wtime();
1601 m_stat->add(MpiInfo(eMpiName::Waitsome).name(),end_time-begin_time,size);
1608freeRequest(Request& request)
1610 if (!request.isValid()){
1611 warning() <<
"MpiAdapter::freeRequest() null request r=" << (MPI_Request)request;
1612 _checkFatalInRequest();
1616 MpiLock::Section mls(m_mpi_lock);
1618 auto mr = (MPI_Request)request;
1620 MPI_Request_free(&mr);
1629testRequest(Request& request)
1632 if (!request.isValid())
1635 auto mr = (MPI_Request)request;
1636 int is_finished = 0;
1639 MpiLock::Section mls(m_mpi_lock);
1644 RequestSet::Iterator request_iter = m_request_set->findRequest(mr);
1646 m_mpi_prof->test(&mr, &is_finished, (MPI_Status *) MPI_STATUS_IGNORE);
1648 if (is_finished!=0){
1649 m_request_set->removeRequest(request_iter);
1650 if (request.hasSubRequest())
1651 ARCCORE_THROW(NotImplementedException,
"SubRequest support");
1666_addRequest(MPI_Request request)
1668 m_request_set->addRequest(request);
1677_removeRequest(MPI_Request request)
1679 m_request_set->removeRequest(request);
1686enableDebugRequest(
bool enable_debug_request)
1688 m_stat->enable(enable_debug_request);
1697_checkFatalInRequest()
1699 if (isRequestErrorAreFatal())
1700 ARCCORE_FATAL(
"Error in requests management");
1707setMpiProfiling(IMpiProfiling* mpi_profiling)
1709 m_mpi_prof = mpi_profiling;
1715IMpiProfiling* MpiAdapter::
1716getMpiProfiling()
const
1725setProfiler(IProfiler* profiler)
1728 m_mpi_prof =
nullptr;
1732 IMpiProfiling* p =
dynamic_cast<IMpiProfiling*
>(profiler);
1734 ARCCORE_FATAL(
"Invalid profiler. Profiler has to implemented interface 'IMpiProfiling'");
1741IProfiler* MpiAdapter::
bool empty() const
Capacité (nombre d'éléments alloués) du vecteur.
Vue modifiable d'un tableau d'un type T.
constexpr Integer size() const noexcept
Retourne la taille du tableau.
void add(ConstReferenceType val)
Ajoute l'élément val à la fin du tableau.
Interface du gestionnaire de traces.
Statistiques sur le parallélisme.
Iterator findRequest(MPI_Request request)
Vérifie que la requête est dans la liste.
bool m_no_check_request
Vrai si on vérifie pas les requêtes.
static MpiMessagePassingMng * create(MPI_Comm comm, bool clean_comm=false)
Créé un gestionnaire associé au communicateur comm.
Référence à une instance.
Chaîne de caractères unicode.
Classe d'accès aux traces.
TraceAccessor(ITraceMng *m)
Construit un accesseur via le gestionnaire de trace m.
TraceMessage info() const
Flot pour un message d'information.
TraceMessage error() const
Flot pour un message d'erreur.
Vecteur 1D de données avec sémantique par valeur (style STL).
std::int64_t Int64
Type entier signé sur 64 bits.
Int32 Integer
Type représentant un entier.
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Créé une référence sur un pointeur.
ARCCORE_BASE_EXPORT bool arccoreIsCheck()
Vrai si on est en mode vérification.
std::int32_t Int32
Type entier signé sur 32 bits.