14#include "arccore/message_passing_mpi/MpiAdapter.h"
16#include "arccore/trace/ITraceMng.h"
18#include "arccore/collections/Array.h"
20#include "arccore/message_passing/Request.h"
21#include "arccore/message_passing/IStat.h"
22#include "arccore/message_passing/internal/SubRequestCompletionInfo.h"
24#include "arccore/base/IStackTraceService.h"
25#include "arccore/base/TimeoutException.h"
26#include "arccore/base/String.h"
27#include "arccore/base/NotImplementedException.h"
28#include "arccore/base/PlatformUtils.h"
29#include "arccore/base/FatalErrorException.h"
30#include "arccore/base/TraceInfo.h"
32#include "arccore/message_passing_mpi/MpiLock.h"
33#include "arccore/message_passing_mpi/NoMpiProfiling.h"
34#include "arccore/message_passing_mpi/MpiRequest.h"
35#include "arccore/message_passing_mpi/StandaloneMpiMessagePassingMng.h"
42namespace Arccore::MessagePassing::Mpi
55 typedef std::map<MPI_Request,RequestInfo>::iterator Iterator;
59 m_trace_mng_ref = makeRef(tm);
62 m_request_error_is_fatal =
true;
65 m_is_report_error_in_request =
false;
67 m_use_trace_full_stack =
true;
69 m_trace_mpirequest =
true;
72 void addRequest(MPI_Request request)
76 if (m_trace_mpirequest)
77 info() <<
"MpiAdapter: AddRequest r=" << request;
80 void addRequest(MPI_Request request,
const TraceInfo& ti)
84 if (m_trace_mpirequest)
85 info() <<
"MpiAdapter: AddRequest r=" << request;
86 _addRequest(request,ti);
88 void removeRequest(MPI_Request request)
92 if (m_trace_mpirequest)
93 info() <<
"MpiAdapter: RemoveRequest r=" << request;
94 _removeRequest(request);
96 void removeRequest(Iterator request_iter)
100 if (request_iter==m_allocated_requests.end()){
101 if (m_trace_mpirequest)
102 info() <<
"MpiAdapter: RemoveRequestIter null iterator";
105 if (m_trace_mpirequest)
106 info() <<
"MpiAdapter: RemoveRequestIter r=" << request_iter->first;
107 m_allocated_requests.erase(request_iter);
113 return m_allocated_requests.end();
115 if (_isEmptyRequest(request))
116 return m_allocated_requests.end();
117 auto ireq = m_allocated_requests.find(request);
118 if (ireq==m_allocated_requests.end()){
119 if (m_is_report_error_in_request || m_request_error_is_fatal){
120 error() <<
"MpiAdapter::testRequest() request not referenced "
121 <<
" id=" << request;
122 _checkFatalInRequest();
131 void _addRequest(MPI_Request request,
const TraceInfo& trace_info)
133 if (request==MPI_REQUEST_NULL){
134 if (m_is_report_error_in_request || m_request_error_is_fatal){
135 error() <<
"MpiAdapter::_addRequest() trying to add null request";
136 _checkFatalInRequest();
140 if (_isEmptyRequest(request))
142 ++m_total_added_request;
144 auto i = m_allocated_requests.find(request);
145 if (i!=m_allocated_requests.end()){
146 if (m_is_report_error_in_request || m_request_error_is_fatal){
147 error() <<
"MpiAdapter::_addRequest() request already referenced "
148 <<
" id=" << request;
149 _checkFatalInRequest();
154 rinfo.m_trace = trace_info;
155 if (m_use_trace_full_stack)
157 m_allocated_requests.insert(std::make_pair(request,rinfo));
163 void _removeRequest(MPI_Request request)
166 if (request==MPI_REQUEST_NULL){
167 if (m_is_report_error_in_request || m_request_error_is_fatal){
168 error() <<
"MpiAdapter::_removeRequest() null request (" << MPI_REQUEST_NULL <<
")";
169 _checkFatalInRequest();
173 if (_isEmptyRequest(request))
175 auto i = m_allocated_requests.find(request);
176 if (i==m_allocated_requests.end()){
177 if (m_is_report_error_in_request || m_request_error_is_fatal){
178 error() <<
"MpiAdapter::_removeRequest() request not referenced "
179 <<
" id=" << request;
180 _checkFatalInRequest();
184 m_allocated_requests.erase(i);
187 void _checkFatalInRequest()
189 if (m_request_error_is_fatal)
190 ARCCORE_FATAL(
"Error in requests management");
192 Int64 nbRequest()
const {
return m_allocated_requests.size(); }
193 Int64 totalAddedRequest()
const {
return m_total_added_request; }
194 void printRequests()
const
196 info() <<
"PRINT REQUESTS\n";
197 for(
auto& x : m_allocated_requests ){
198 info() <<
"Request id=" << x.first <<
" trace=" << x.second.m_trace
199 <<
" stack=" << x.second.m_stack_trace;
202 void setEmptyRequests(MPI_Request r1,MPI_Request r2)
204 m_empty_request1 = r1;
205 m_empty_request2 = r2;
208 bool m_request_error_is_fatal =
false;
209 bool m_is_report_error_in_request =
true;
210 bool m_trace_mpirequest =
false;
214 std::map<MPI_Request,RequestInfo> m_allocated_requests;
215 bool m_use_trace_full_stack =
false;
216 MPI_Request m_empty_request1 = MPI_REQUEST_NULL;
217 MPI_Request m_empty_request2 = MPI_REQUEST_NULL;
218 Int64 m_total_added_request = 0;
219 Ref<ITraceMng> m_trace_mng_ref;
221 bool _isEmptyRequest(MPI_Request r)
const
223 return (r==m_empty_request1 || r==m_empty_request2);
227#define ARCCORE_ADD_REQUEST(request)\
228 m_request_set->addRequest(request,A_FUNCINFO);
235int _checkSize(
Int64 i64_size)
237 if (i64_size>INT32_MAX)
238 ARCCORE_FATAL(
"Can not convert '{0}' to type integer",i64_size);
239 return (
int)i64_size;
251, m_mpi_lock(mpi_lock)
253, m_communicator(comm)
256, m_empty_request1(MPI_REQUEST_NULL)
257, m_empty_request2(MPI_REQUEST_NULL)
265 if (s ==
"1" || s ==
"TRUE")
266 m_is_allow_null_rank_for_any_source =
true;
267 if (s ==
"0" || s ==
"FALSE")
268 m_is_allow_null_rank_for_any_source =
false;
271 ::MPI_Comm_rank(m_communicator,&m_comm_rank);
272 ::MPI_Comm_size(m_communicator,&m_comm_size);
289 MPI_Irecv(m_recv_buffer_for_empty_request, 1, MPI_CHAR, MPI_PROC_NULL,
290 50505, m_communicator, &m_empty_request1);
299 m_send_buffer_for_empty_request2[0] = 0;
300 MPI_Isend(m_send_buffer_for_empty_request2, 1, MPI_CHAR, m_comm_rank,
301 50505, m_communicator, &m_empty_request2);
303 MPI_Recv(m_recv_buffer_for_empty_request2, 1, MPI_CHAR, m_comm_rank,
304 50505, m_communicator, MPI_STATUS_IGNORE);
306 m_request_set->setEmptyRequests(m_empty_request1,m_empty_request2);
315 if (m_empty_request1 != MPI_REQUEST_NULL)
316 MPI_Request_free(&m_empty_request1);
317 if (m_empty_request2 != MPI_REQUEST_NULL)
318 MPI_Request_free(&m_empty_request2);
320 delete m_request_set;
339 Int64 nb_request = m_request_set->nbRequest();
344 warning() <<
" Pending mpi requests size=" << nb_request;
345 m_request_set->printRequests();
346 _checkFatalInRequest();
356 _checkHasNoRequests();
366 m_request_set->m_request_error_is_fatal = v;
369isRequestErrorAreFatal()
const
371 return m_request_set->m_request_error_is_fatal;
377 m_request_set->m_is_report_error_in_request = v;
380isPrintRequestError()
const
382 return m_request_set->m_is_report_error_in_request;
392isCheckRequest()
const
401toMPISize(
Int64 count)
403 return _checkSize(count);
410_trace(
const char* function)
417 info() <<
"MPI_TRACE: " << function;
425broadcast(
void* buf,Int64 nb_elem,Int32 root,MPI_Datatype datatype)
427 int _nb_elem = _checkSize(nb_elem);
428 _trace(MpiInfo(eMpiName::Bcast).name().localstr());
429 double begin_time = MPI_Wtime();
431 info() <<
"MPI_TRACE: MPI broadcast: before"
433 <<
" nb_elem=" << nb_elem
435 <<
" datatype=" << datatype;
437 m_mpi_prof->broadcast(buf, _nb_elem, datatype, root, m_communicator);
438 double end_time = MPI_Wtime();
439 double sr_time = (end_time-begin_time);
441 m_stat->
add(MpiInfo(eMpiName::Bcast).name(),sr_time,0);
448nonBlockingBroadcast(
void* buf,Int64 nb_elem,Int32 root,MPI_Datatype datatype)
450 MPI_Request mpi_request = MPI_REQUEST_NULL;
452 int _nb_elem = _checkSize(nb_elem);
453 _trace(
" MPI_Bcast");
454 double begin_time = MPI_Wtime();
455 ret = MPI_Ibcast(buf,_nb_elem,datatype,root,m_communicator,&mpi_request);
456 double end_time = MPI_Wtime();
457 double sr_time = (end_time-begin_time);
459 m_stat->
add(
"IBroadcast",sr_time,0);
460 ARCCORE_ADD_REQUEST(mpi_request);
468gather(
const void* send_buf,
void* recv_buf,Int64 nb_elem,Int32 root,MPI_Datatype datatype)
470 void* _sbuf =
const_cast<void*
>(send_buf);
471 int _nb_elem = _checkSize(nb_elem);
472 int _root =
static_cast<int>(root);
473 _trace(MpiInfo(eMpiName::Gather).name().localstr());
474 double begin_time = MPI_Wtime();
475 m_mpi_prof->gather(_sbuf, _nb_elem, datatype, recv_buf, _nb_elem, datatype, _root, m_communicator);
476 double end_time = MPI_Wtime();
477 double sr_time = (end_time-begin_time);
479 m_stat->
add(MpiInfo(eMpiName::Gather).name(),sr_time,0);
486nonBlockingGather(
const void* send_buf,
void* recv_buf,
487 Int64 nb_elem,Int32 root,MPI_Datatype datatype)
489 MPI_Request mpi_request = MPI_REQUEST_NULL;
491 void* _sbuf =
const_cast<void*
>(send_buf);
492 int _nb_elem = _checkSize(nb_elem);
493 int _root =
static_cast<int>(root);
494 _trace(
"MPI_Igather");
495 double begin_time = MPI_Wtime();
496 ret = MPI_Igather(_sbuf,_nb_elem,datatype,recv_buf,_nb_elem,datatype,_root,
497 m_communicator,&mpi_request);
498 double end_time = MPI_Wtime();
499 double sr_time = (end_time-begin_time);
501 m_stat->
add(
"IGather",sr_time,0);
502 ARCCORE_ADD_REQUEST(mpi_request);
510allGather(
const void* send_buf,
void* recv_buf,
511 Int64 nb_elem,MPI_Datatype datatype)
513 void* _sbuf =
const_cast<void*
>(send_buf);
514 int _nb_elem = _checkSize(nb_elem);
515 _trace(MpiInfo(eMpiName::Allgather).name().localstr());
516 double begin_time = MPI_Wtime();
517 m_mpi_prof->allGather(_sbuf, _nb_elem, datatype, recv_buf, _nb_elem, datatype, m_communicator);
518 double end_time = MPI_Wtime();
519 double sr_time = (end_time-begin_time);
521 m_stat->
add(MpiInfo(eMpiName::Allgather).name(),sr_time,0);
528nonBlockingAllGather(
const void* send_buf,
void* recv_buf,
529 Int64 nb_elem,MPI_Datatype datatype)
531 MPI_Request mpi_request = MPI_REQUEST_NULL;
533 void* _sbuf =
const_cast<void*
>(send_buf);
534 int _nb_elem = _checkSize(nb_elem);
535 _trace(
"MPI_Iallgather");
536 double begin_time = MPI_Wtime();
537 ret = MPI_Iallgather(_sbuf,_nb_elem,datatype,recv_buf,_nb_elem,datatype,
538 m_communicator,&mpi_request);
539 double end_time = MPI_Wtime();
540 double sr_time = (end_time-begin_time);
542 m_stat->
add(
"IAllGather",sr_time,0);
543 ARCCORE_ADD_REQUEST(mpi_request);
551gatherVariable(
const void* send_buf,
void* recv_buf,
const int* recv_counts,
552 const int* recv_indexes,Int64 nb_elem,Int32 root,MPI_Datatype datatype)
554 void* _sbuf =
const_cast<void*
>(send_buf);
555 int _nb_elem = _checkSize(nb_elem);
556 int _root =
static_cast<int>(root);
557 _trace(MpiInfo(eMpiName::Gatherv).name().localstr());
558 double begin_time = MPI_Wtime();
559 m_mpi_prof->gatherVariable(_sbuf, _nb_elem, datatype, recv_buf, recv_counts, recv_indexes, datatype, _root, m_communicator);
560 double end_time = MPI_Wtime();
561 double sr_time = (end_time-begin_time);
563 m_stat->
add(MpiInfo(eMpiName::Gatherv).name().localstr(),sr_time,0);
570allGatherVariable(
const void* send_buf,
void* recv_buf,
const int* recv_counts,
571 const int* recv_indexes,Int64 nb_elem,MPI_Datatype datatype)
573 void* _sbuf =
const_cast<void*
>(send_buf);
574 int _nb_elem = _checkSize(nb_elem);
575 _trace(MpiInfo(eMpiName::Allgatherv).name().localstr());
580 double begin_time = MPI_Wtime();
581 m_mpi_prof->allGatherVariable(_sbuf, _nb_elem, datatype, recv_buf, recv_counts, recv_indexes, datatype, m_communicator);
582 double end_time = MPI_Wtime();
583 double sr_time = (end_time-begin_time);
585 m_stat->
add(MpiInfo(eMpiName::Allgatherv).name().localstr(),sr_time,0);
592scatterVariable(
const void* send_buf,
const int* send_count,
const int* send_indexes,
593 void* recv_buf,Int64 nb_elem,Int32 root,MPI_Datatype datatype)
595 void* _sbuf =
const_cast<void*
>(send_buf);
596 int* _send_count =
const_cast<int*
>(send_count);
597 int* _send_indexes =
const_cast<int*
>(send_indexes);
598 int _nb_elem = _checkSize(nb_elem);
599 _trace(MpiInfo(eMpiName::Scatterv).name().localstr());
600 double begin_time = MPI_Wtime();
601 m_mpi_prof->scatterVariable(_sbuf,
610 double end_time = MPI_Wtime();
611 double sr_time = (end_time-begin_time);
613 m_stat->
add(MpiInfo(eMpiName::Scatterv).name(),sr_time,0);
620allToAll(
const void* send_buf,
void* recv_buf,
Integer count,MPI_Datatype datatype)
622 void* _sbuf =
const_cast<void*
>(send_buf);
623 int icount = _checkSize(count);
624 _trace(MpiInfo(eMpiName::Alltoall).name().localstr());
625 double begin_time = MPI_Wtime();
626 m_mpi_prof->allToAll(_sbuf, icount, datatype, recv_buf, icount, datatype, m_communicator);
627 double end_time = MPI_Wtime();
628 double sr_time = (end_time-begin_time);
630 m_stat->
add(MpiInfo(eMpiName::Alltoall).name().localstr(),sr_time,0);
637nonBlockingAllToAll(
const void* send_buf,
void* recv_buf,
Integer count,MPI_Datatype datatype)
639 MPI_Request mpi_request = MPI_REQUEST_NULL;
641 void* _sbuf =
const_cast<void*
>(send_buf);
642 int icount = _checkSize(count);
643 _trace(
"MPI_IAlltoall");
644 double begin_time = MPI_Wtime();
645 ret = MPI_Ialltoall(_sbuf,icount,datatype,recv_buf,icount,datatype,m_communicator,&mpi_request);
646 double end_time = MPI_Wtime();
647 double sr_time = (end_time-begin_time);
649 m_stat->
add(
"IAllToAll",sr_time,0);
650 ARCCORE_ADD_REQUEST(mpi_request);
658allToAllVariable(
const void* send_buf,
const int* send_counts,
659 const int* send_indexes,
void* recv_buf,
const int* recv_counts,
660 const int* recv_indexes,MPI_Datatype datatype)
662 void* _sbuf =
const_cast<void*
>(send_buf);
663 int* _send_counts =
const_cast<int*
>(send_counts);
664 int* _send_indexes =
const_cast<int*
>(send_indexes);
665 int* _recv_counts =
const_cast<int*
>(recv_counts);
666 int* _recv_indexes =
const_cast<int*
>(recv_indexes);
668 _trace(MpiInfo(eMpiName::Alltoallv).name().localstr());
669 double begin_time = MPI_Wtime();
670 m_mpi_prof->allToAllVariable(_sbuf, _send_counts, _send_indexes, datatype,
671 recv_buf, _recv_counts, _recv_indexes, datatype, m_communicator);
672 double end_time = MPI_Wtime();
673 double sr_time = (end_time-begin_time);
675 m_stat->
add(MpiInfo(eMpiName::Alltoallv).name(),sr_time,0);
682nonBlockingAllToAllVariable(
const void* send_buf,
const int* send_counts,
683 const int* send_indexes,
void* recv_buf,
const int* recv_counts,
684 const int* recv_indexes,MPI_Datatype datatype)
686 MPI_Request mpi_request = MPI_REQUEST_NULL;
688 void* _sbuf =
const_cast<void*
>(send_buf);
689 int* _send_counts =
const_cast<int*
>(send_counts);
690 int* _send_indexes =
const_cast<int*
>(send_indexes);
691 int* _recv_counts =
const_cast<int*
>(recv_counts);
692 int* _recv_indexes =
const_cast<int*
>(recv_indexes);
694 _trace(
"MPI_Ialltoallv");
695 double begin_time = MPI_Wtime();
696 ret = MPI_Ialltoallv(_sbuf,_send_counts,_send_indexes,datatype,
697 recv_buf,_recv_counts,_recv_indexes,datatype,
698 m_communicator,&mpi_request);
699 double end_time = MPI_Wtime();
700 double sr_time = (end_time-begin_time);
702 m_stat->
add(
"IAllToAll",sr_time,0);
703 ARCCORE_ADD_REQUEST(mpi_request);
717 MPI_Barrier(m_communicator);
726 MPI_Request mpi_request = MPI_REQUEST_NULL;
728 ret = MPI_Ibarrier(m_communicator,&mpi_request);
729 ARCCORE_ADD_REQUEST(mpi_request);
737allReduce(
const void* send_buf,
void* recv_buf,Int64 count,MPI_Datatype datatype,MPI_Op op)
739 void* _sbuf =
const_cast<void*
>(send_buf);
740 int _n = _checkSize(count);
741 double begin_time = MPI_Wtime();
742 _trace(MpiInfo(eMpiName::Allreduce).name().localstr());
745 m_mpi_prof->allReduce(_sbuf, recv_buf, _n, datatype, op, m_communicator);
747 catch(TimeoutException& ex)
749 std::ostringstream ostr;
750 ostr <<
"MPI_Allreduce"
751 <<
" send_buf=" << send_buf
752 <<
" recv_buf=" << recv_buf
754 <<
" datatype=" << datatype
756 <<
" NB=" << m_nb_all_reduce;
757 ex.setAdditionalInfo(ostr.str());
760 double end_time = MPI_Wtime();
761 m_stat->
add(MpiInfo(eMpiName::Allreduce).name(),end_time-begin_time,count);
768nonBlockingAllReduce(
const void* send_buf,
void* recv_buf,Int64 count,MPI_Datatype datatype,MPI_Op op)
770 MPI_Request mpi_request = MPI_REQUEST_NULL;
772 void* _sbuf =
const_cast<void*
>(send_buf);
773 int _n = _checkSize(count);
774 double begin_time = MPI_Wtime();
775 _trace(
"MPI_IAllreduce");
776 ret = MPI_Iallreduce(_sbuf,recv_buf,_n,datatype,op,m_communicator,&mpi_request);
777 double end_time = MPI_Wtime();
778 m_stat->
add(
"IReduce",end_time-begin_time,_n);
779 ARCCORE_ADD_REQUEST(mpi_request);
787reduce(
const void* send_buf,
void* recv_buf,Int64 count,MPI_Datatype datatype,MPI_Op op,
Integer root)
789 void* _sbuf =
const_cast<void*
>(send_buf);
790 int _n = _checkSize(count);
791 int _root =
static_cast<int>(root);
792 double begin_time = MPI_Wtime();
793 _trace(MpiInfo(eMpiName::Reduce).name().localstr());
796 m_mpi_prof->reduce(_sbuf, recv_buf, _n, datatype, op, _root, m_communicator);
798 catch(TimeoutException& ex)
800 std::ostringstream ostr;
802 <<
" send_buf=" << send_buf
803 <<
" recv_buf=" << recv_buf
805 <<
" datatype=" << datatype
808 <<
" NB=" << m_nb_reduce;
809 ex.setAdditionalInfo(ostr.str());
813 double end_time = MPI_Wtime();
814 m_stat->
add(MpiInfo(eMpiName::Reduce).name(),end_time-begin_time,0);
821scan(
const void* send_buf,
void* recv_buf,Int64 count,MPI_Datatype datatype,MPI_Op op)
823 void* _sbuf =
const_cast<void*
>(send_buf);
824 int _n = _checkSize(count);
825 double begin_time = MPI_Wtime();
826 _trace(MpiInfo(eMpiName::Scan).name().localstr());
827 m_mpi_prof->scan(_sbuf, recv_buf, _n, datatype, op, m_communicator);
828 double end_time = MPI_Wtime();
829 m_stat->
add(MpiInfo(eMpiName::Scan).name(),end_time-begin_time,count);
836directSendRecv(
const void* send_buffer,Int64 send_buffer_size,
837 void* recv_buffer,Int64 recv_buffer_size,
838 Int32 proc,Int64 elem_size,MPI_Datatype data_type)
840 void* v_send_buffer =
const_cast<void*
>(send_buffer);
841 MPI_Status mpi_status;
842 double begin_time = MPI_Wtime();
843 _trace(MpiInfo(eMpiName::Sendrecv).name().localstr());
844 int sbuf_size = _checkSize(send_buffer_size);
845 int rbuf_size = _checkSize(recv_buffer_size);
846 m_mpi_prof->sendRecv(v_send_buffer, sbuf_size, data_type, proc, 99,
847 recv_buffer, rbuf_size, data_type, proc, 99,
848 m_communicator, &mpi_status);
849 double end_time = MPI_Wtime();
850 Int64 send_size = send_buffer_size * elem_size;
851 Int64 recv_size = recv_buffer_size * elem_size;
852 double sr_time = (end_time-begin_time);
856 m_stat->
add(MpiInfo(eMpiName::Sendrecv).name(),sr_time,send_size+recv_size);
864 Int32 dest_rank,MPI_Datatype data_type,
int mpi_tag)
866 void* v_send_buffer =
const_cast<void*
>(send_buffer);
867 MPI_Request mpi_request = MPI_REQUEST_NULL;
868 int sbuf_size = _checkSize(send_buffer_size);
870 m_mpi_prof->iSend(v_send_buffer, sbuf_size, data_type, dest_rank, mpi_tag, m_communicator, &mpi_request);
872 info() <<
" ISend ret=" << ret <<
" proc=" << dest_rank <<
" tag=" << mpi_tag <<
" request=" << mpi_request;
873 ARCCORE_ADD_REQUEST(mpi_request);
881directSend(
const void* send_buffer,
Int64 send_buffer_size,
882 Int32 proc,
Int64 elem_size,MPI_Datatype data_type,
883 int mpi_tag,
bool is_blocked
886 void* v_send_buffer =
const_cast<void*
>(send_buffer);
887 MPI_Request mpi_request = MPI_REQUEST_NULL;
889 double begin_time = 0.0;
890 double end_time = 0.0;
891 Int64 send_size = send_buffer_size * elem_size;
894 info() <<
"MPI_TRACE: MPI Send: send before"
895 <<
" size=" << send_size
897 <<
" tag=" << mpi_tag
898 <<
" datatype=" << data_type
899 <<
" blocking " << is_blocked;
908 begin_time = MPI_Wtime();
909 int sbuf_size = _checkSize(send_buffer_size);
910 m_mpi_prof->iSend(v_send_buffer, sbuf_size, data_type, proc, mpi_tag, m_communicator, &mpi_request);
913 MPI_Status mpi_status;
914 while (is_finished==0){
915 MpiLock::Section mls(m_mpi_lock);
916 MPI_Request_get_status(mpi_request,&is_finished,&mpi_status);
918 m_mpi_prof->wait(&mpi_request, (MPI_Status *) MPI_STATUS_IGNORE);
919 end_time = MPI_Wtime();
920 mpi_request = MPI_REQUEST_NULL;
925 MpiLock::Section mls(m_mpi_lock);
926 begin_time = MPI_Wtime();
927 int sbuf_size = _checkSize(send_buffer_size);
928 m_mpi_prof->send(v_send_buffer, sbuf_size, data_type, proc, mpi_tag, m_communicator);
929 end_time = MPI_Wtime();
934 MpiLock::Section mls(m_mpi_lock);
935 begin_time = MPI_Wtime();
936 int sbuf_size = _checkSize(send_buffer_size);
937 m_mpi_prof->iSend(v_send_buffer, sbuf_size, data_type, proc, mpi_tag, m_communicator, &mpi_request);
939 info() <<
" ISend ret=" << ret <<
" proc=" << proc <<
" tag=" << mpi_tag <<
" request=" << mpi_request;
940 end_time = MPI_Wtime();
941 ARCCORE_ADD_REQUEST(mpi_request);
944 info() <<
"MPI Send: send after"
945 <<
" request=" << mpi_request;
948 double sr_time = (end_time-begin_time);
951 <<
" time " << sr_time <<
" blocking " << is_blocked;
953 m_stat->
add(MpiInfo(eMpiName::Send).name(),end_time-begin_time,send_size);
961directSendPack(
const void* send_buffer,Int64 send_buffer_size,
962 Int32 proc,
int mpi_tag,
bool is_blocked)
964 return directSend(send_buffer,send_buffer_size,proc,1,MPI_PACKED,mpi_tag,is_blocked);
970MpiMessagePassingMng* MpiAdapter::
975 MPI_Comm_split(m_communicator, (keep) ? 1 : MPI_UNDEFINED,
commRank(), &new_comm);
988 Int32 source_rank,MPI_Datatype data_type,
int mpi_tag)
990 int rbuf_size = _checkSize(recv_buffer_size);
992 MPI_Request mpi_request = MPI_REQUEST_NULL;
993 m_mpi_prof->iRecv(recv_buffer, rbuf_size, data_type, source_rank, mpi_tag, m_communicator, &mpi_request);
994 ARCCORE_ADD_REQUEST(mpi_request);
1002directRecv(
void* recv_buffer,
Int64 recv_buffer_size,
1003 Int32 proc,
Int64 elem_size,MPI_Datatype data_type,
1004 int mpi_tag,
bool is_blocked)
1006 MPI_Status mpi_status;
1007 MPI_Request mpi_request = MPI_REQUEST_NULL;
1009 double begin_time = 0.0;
1010 double end_time = 0.0;
1013 if (proc==A_PROC_NULL_RANK)
1015 if (proc == A_NULL_RANK && !m_is_allow_null_rank_for_any_source)
1016 ARCCORE_FATAL(
"Can not use A_NULL_RANK for any source. Use A_ANY_SOURCE_RANK instead");
1017 if (proc==A_NULL_RANK || proc==A_ANY_SOURCE_RANK)
1018 i_proc = MPI_ANY_SOURCE;
1020 i_proc =
static_cast<int>(proc);
1022 Int64 recv_size = recv_buffer_size * elem_size;
1024 info() <<
"MPI_TRACE: MPI Recv: recv before "
1025 <<
" size=" << recv_size
1026 <<
" from=" << i_proc
1027 <<
" tag=" << mpi_tag
1028 <<
" datatype=" << data_type
1029 <<
" blocking=" << is_blocked;
1038 MpiLock::Section mls(m_mpi_lock);
1039 begin_time = MPI_Wtime();
1040 int rbuf_size = _checkSize(recv_buffer_size);
1041 m_mpi_prof->iRecv(recv_buffer, rbuf_size, data_type, i_proc, mpi_tag, m_communicator, &mpi_request);
1043 int is_finished = 0;
1044 MPI_Status mpi_status;
1045 while (is_finished==0){
1046 MpiLock::Section mls(m_mpi_lock);
1047 MPI_Request_get_status(mpi_request,&is_finished,&mpi_status);
1048 if (is_finished!=0){
1049 end_time = MPI_Wtime();
1050 m_mpi_prof->wait(&mpi_request, (MPI_Status *) MPI_STATUS_IGNORE);
1051 mpi_request = MPI_REQUEST_NULL;
1056 MpiLock::Section mls(m_mpi_lock);
1057 begin_time = MPI_Wtime();
1058 int rbuf_size = _checkSize(recv_buffer_size);
1059 m_mpi_prof->recv(recv_buffer, rbuf_size, data_type, i_proc, mpi_tag, m_communicator, &mpi_status);
1060 end_time = MPI_Wtime();
1065 MpiLock::Section mls(m_mpi_lock);
1066 begin_time = MPI_Wtime();
1067 int rbuf_size = _checkSize(recv_buffer_size);
1068 m_mpi_prof->iRecv(recv_buffer, rbuf_size, data_type, i_proc, mpi_tag, m_communicator, &mpi_request);
1069 end_time = MPI_Wtime();
1070 ARCCORE_ADD_REQUEST(mpi_request);
1073 info() <<
"MPI Recv: recv after "
1074 <<
" request=" << mpi_request;
1077 double sr_time = (end_time-begin_time);
1080 <<
" time " << sr_time <<
" blocking " << is_blocked;
1081 m_stat->
add(MpiInfo(eMpiName::Recv).name(),end_time-begin_time,recv_size);
1089probeRecvPack(UniqueArray<Byte>& recv_buffer,Int32 proc)
1091 double begin_time = MPI_Wtime();
1093 int recv_buffer_size = 0;
1094 _trace(
"MPI_Probe");
1095 m_mpi_prof->probe(proc, 101, m_communicator, &status);
1096 m_mpi_prof->getCount(&status, MPI_PACKED, &recv_buffer_size);
1098 recv_buffer.resize(recv_buffer_size);
1099 m_mpi_prof->recv(recv_buffer.data(), recv_buffer_size, MPI_PACKED, proc, 101, m_communicator, &status);
1101 double end_time = MPI_Wtime();
1102 Int64 recv_size = recv_buffer_size;
1103 double sr_time = (end_time-begin_time);
1105 <<
" time " << sr_time;
1106 m_stat->
add(MpiInfo(eMpiName::Recv).name(),end_time-begin_time,recv_size);
1112MessageSourceInfo MpiAdapter::
1113_buildSourceInfoFromStatus(
const MPI_Status& mpi_status)
1116 MPI_Count message_size = 0;
1117 MPI_Get_elements_x(&mpi_status,MPI_BYTE,&message_size);
1118 MessageTag tag(mpi_status.MPI_TAG);
1119 MessageRank rank(mpi_status.MPI_SOURCE);
1120 return MessageSourceInfo(rank,tag,message_size);
1126MessageId MpiAdapter::
1127_probeMessage(MessageRank source,MessageTag tag,
bool is_blocking)
1129 MPI_Status mpi_status;
1130 int has_message = 0;
1131 MPI_Message message;
1133 int mpi_source = source.value();
1134 if (source.isProcNull())
1135 ARCCORE_THROW(NotImplementedException,
"Probe with MPI_PROC_NULL");
1136 if (source.isNull() && !m_is_allow_null_rank_for_any_source)
1137 ARCCORE_FATAL(
"Can not use MPI_Mprobe with null rank. Use MessageRank::anySourceRank() instead");
1138 if (source.isNull() || source.isAnySource())
1139 mpi_source = MPI_ANY_SOURCE;
1140 int mpi_tag = tag.value();
1142 mpi_tag = MPI_ANY_TAG;
1144 ret = MPI_Mprobe(mpi_source,mpi_tag,m_communicator,&message,&mpi_status);
1148 ret = MPI_Improbe(mpi_source, mpi_tag, m_communicator, &has_message, &message, &mpi_status);
1151 ARCCORE_FATAL(
"Error during call to MPI_Mprobe r={0}",ret);
1152 MessageId ret_message;
1153 if (has_message!=0){
1154 MessageSourceInfo si(_buildSourceInfoFromStatus(mpi_status));
1155 ret_message = MessageId(si,message);
1163MessageId MpiAdapter::
1164probeMessage(PointToPointMessageInfo message)
1166 if (!message.isValid())
1170 if (!message.isRankTag())
1171 ARCCORE_FATAL(
"Invalid message_info: message.isRankTag() is false");
1173 return _probeMessage(message.destinationRank(),message.tag(),message.isBlocking());
1179MessageSourceInfo MpiAdapter::
1180_legacyProbeMessage(MessageRank source,MessageTag tag,
bool is_blocking)
1182 MPI_Status mpi_status;
1183 int has_message = 0;
1185 int mpi_source = source.value();
1186 if (source.isProcNull())
1187 ARCCORE_THROW(NotImplementedException,
"Probe with MPI_PROC_NULL");
1188 if (source.isNull() && !m_is_allow_null_rank_for_any_source)
1189 ARCCORE_FATAL(
"Can not use MPI_Probe with null rank. Use MessageRank::anySourceRank() instead");
1190 if (source.isNull() || source.isAnySource())
1191 mpi_source = MPI_ANY_SOURCE;
1192 int mpi_tag = tag.value();
1194 mpi_tag = MPI_ANY_TAG;
1196 ret = MPI_Probe(mpi_source,mpi_tag,m_communicator,&mpi_status);
1200 ret = MPI_Iprobe(mpi_source,mpi_tag,m_communicator,&has_message,&mpi_status);
1202 ARCCORE_FATAL(
"Error during call to MPI_Mprobe r={0}",ret);
1204 return _buildSourceInfoFromStatus(mpi_status);
1211MessageSourceInfo MpiAdapter::
1212legacyProbeMessage(PointToPointMessageInfo message)
1214 if (!message.isValid())
1218 if (!message.isRankTag())
1219 ARCCORE_FATAL(
"Invalid message_info: message.isRankTag() is false");
1221 return _legacyProbeMessage(message.destinationRank(),message.tag(),message.isBlocking());
1228directRecv(
void* recv_buffer,
Int64 recv_buffer_size,
1232 MPI_Status mpi_status;
1233 MPI_Request mpi_request = MPI_REQUEST_NULL;
1234 MPI_Message mpi_message = (MPI_Message)message;
1236 double begin_time = 0.0;
1237 double end_time = 0.0;
1239 Int64 recv_size = recv_buffer_size * elem_size;
1241 info() <<
"MPI_TRACE: MPI Mrecv: recv before "
1242 <<
" size=" << recv_size
1243 <<
" from_msg=" << message
1244 <<
" datatype=" << data_type
1245 <<
" blocking=" << is_blocked;
1255 begin_time = MPI_Wtime();
1256 int rbuf_size = _checkSize(recv_buffer_size);
1257 MPI_Imrecv(recv_buffer,rbuf_size,data_type,&mpi_message,&mpi_request);
1260 int is_finished = 0;
1261 MPI_Status mpi_status;
1262 while (is_finished==0){
1264 MPI_Request_get_status(mpi_request,&is_finished,&mpi_status);
1265 if (is_finished!=0){
1266 end_time = MPI_Wtime();
1267 m_mpi_prof->wait(&mpi_request, (MPI_Status *) MPI_STATUS_IGNORE);
1268 mpi_request = MPI_REQUEST_NULL;
1274 begin_time = MPI_Wtime();
1275 int rbuf_size = _checkSize(recv_buffer_size);
1276 MPI_Mrecv(recv_buffer,rbuf_size,data_type,&mpi_message,&mpi_status);
1278 end_time = MPI_Wtime();
1284 begin_time = MPI_Wtime();
1285 int rbuf_size = _checkSize(recv_buffer_size);
1287 ret = MPI_Imrecv(recv_buffer,rbuf_size,data_type,&mpi_message,&mpi_request);
1289 end_time = MPI_Wtime();
1290 ARCCORE_ADD_REQUEST(mpi_request);
1293 info() <<
"MPI Recv: recv after "
1294 <<
" request=" << mpi_request;
1297 double sr_time = (end_time-begin_time);
1300 <<
" time " << sr_time <<
" blocking " << is_blocked;
1301 m_stat->
add(
MpiInfo(eMpiName::Recv).name(),end_time-begin_time,recv_size);
1309directRecvPack(
void* recv_buffer,
Int64 recv_buffer_size,
1310 Int32 proc,
int mpi_tag,
bool is_blocking)
1312 return directRecv(recv_buffer,recv_buffer_size,proc,1,MPI_PACKED,mpi_tag,is_blocking);
1324 while (_waitAllRequestsMPI(requests, indexes, mpi_status)){
1334waitSomeRequests(ArrayView<Request> requests,
1335 ArrayView<bool> indexes,
1336 bool is_non_blocking)
1338 UniqueArray<MPI_Status> mpi_status(requests.size());
1339 waitSomeRequestsMPI(requests, indexes, mpi_status, is_non_blocking);
1350 , mpi_source_rank(source_rank)
1351 , mpi_source_tag(source_tag)
1354 Ref<ISubRequest> sub_request;
1356 int mpi_source_rank = MPI_PROC_NULL;
1357 int mpi_source_tag = 0;
1371 for(
Integer i=0; i<size; ++i ) {
1372 if (done_indexes[i]){
1378 if (r.hasSubRequest()){
1380 info() <<
"Done request with sub-request r=" << r <<
" mpi_r=" << r <<
" i=" << i
1381 <<
" source_rank=" << status[i].MPI_SOURCE
1382 <<
" source_tag=" << status[i].MPI_TAG;
1383 new_requests.
add(
SubRequestInfo(r.subRequest(), i, status[i].MPI_SOURCE, status[i].MPI_TAG));
1386 _removeRequest((MPI_Request)(r));
1396 bool has_new_request =
false;
1397 if (!new_requests.
empty()){
1399 UniqueArray<MPI_Status> old_status(size);
1402 for(
Integer i=0; i<size; ++i ){
1403 if (done_indexes[i]){
1404 old_status[i] = status[index];
1410 for( SubRequestInfo& sri : new_requests ){
1413 info() <<
"Before handle new request index=" << index
1414 <<
" sri.source_rank=" << sri.mpi_source_rank
1415 <<
" sri.source_tag=" << sri.mpi_source_tag;
1416 SubRequestCompletionInfo completion_info(MessageRank(old_status[index].MPI_SOURCE), MessageTag(old_status[index].MPI_TAG));
1417 Request r = sri.sub_request->executeOnCompletion(completion_info);
1419 info() <<
"Handle new request index=" << index <<
" old_r=" << requests[index] <<
" new_r=" << r;
1424 has_new_request =
true;
1425 requests[index] = r;
1426 done_indexes[index] =
false;
1431 for(
Integer i=0; i<size; ++i ){
1432 if (done_indexes[i]){
1433 status[index] = old_status[i];
1440 return has_new_request;
1447_waitAllRequestsMPI(ArrayView<Request> requests,
1448 ArrayView<bool> indexes,
1449 ArrayView<MPI_Status> mpi_status)
1455 UniqueArray<MPI_Request> mpi_request(size);
1456 for(
Integer i=0; i<size; ++i ){
1457 mpi_request[i] = (MPI_Request)(requests[i]);
1460 info() <<
" MPI_waitall begin size=" << size;
1461 double diff_time = 0.0;
1463 double begin_time = MPI_Wtime();
1464 for(
Integer i=0; i<size; ++i ){
1465 MPI_Request request = (MPI_Request)(mpi_request[i]);
1466 int is_finished = 0;
1467 while (is_finished==0){
1468 MpiLock::Section mls(m_mpi_lock);
1469 m_mpi_prof->test(&request, &is_finished, (MPI_Status *) MPI_STATUS_IGNORE);
1472 double end_time = MPI_Wtime();
1473 diff_time = end_time - begin_time;
1477 MpiLock::Section mls(m_mpi_lock);
1478 double begin_time = MPI_Wtime();
1479 m_mpi_prof->waitAll(size, mpi_request.data(), mpi_status.data());
1480 double end_time = MPI_Wtime();
1481 diff_time = end_time - begin_time;
1485 for(
Integer i=0; i<size; ++i ){
1489 bool has_new_request = _handleEndRequests(requests,indexes,mpi_status);
1491 info() <<
" MPI_waitall end size=" << size;
1492 m_stat->
add(MpiInfo(eMpiName::Waitall).name(),diff_time,size);
1493 return has_new_request;
1500waitSomeRequestsMPI(ArrayView<Request> requests,ArrayView<bool> indexes,
1501 ArrayView<MPI_Status> mpi_status,
bool is_non_blocking)
1503 Integer size = requests.size();
1507 UniqueArray<MPI_Request> mpi_request(size);
1508 UniqueArray<MPI_Request> saved_mpi_request(size);
1509 UniqueArray<int> completed_requests(size);
1510 int nb_completed_request = 0;
1514 for (
Integer i = 0; i < size; ++i) {
1518 if (!requests[i].isValid()) {
1519 saved_mpi_request[i] = MPI_REQUEST_NULL;
1522 saved_mpi_request[i] =
static_cast<MPI_Request
>(requests[i]);
1528 bool is_print_debug = m_is_trace || (!is_non_blocking);
1530 debug() <<
"WaitRequestBegin is_non_blocking=" << is_non_blocking <<
" n=" << size;
1532 double begin_time = MPI_Wtime();
1535 if (is_non_blocking){
1536 _trace(MpiInfo(eMpiName::Testsome).name().localstr());
1538 MpiLock::Section mls(m_mpi_lock);
1539 m_mpi_prof->testSome(size, saved_mpi_request.data(), &nb_completed_request,
1540 completed_requests.data(), mpi_status.data());
1543 if (nb_completed_request == MPI_UNDEFINED)
1544 nb_completed_request = 0;
1546 debug() <<
"WaitSomeRequestMPI: TestSome nb_completed=" << nb_completed_request;
1549 _trace(MpiInfo(eMpiName::Waitsome).name().localstr());
1553 MpiLock::Section mls(m_mpi_lock);
1554 m_mpi_prof->waitSome(size, saved_mpi_request.data(), &nb_completed_request,
1555 completed_requests.data(), mpi_status.data());
1559 if (nb_completed_request == MPI_UNDEFINED)
1560 nb_completed_request = 0;
1562 debug() <<
"WaitSomeRequest nb_completed=" << nb_completed_request;
1565 catch(TimeoutException& ex)
1567 std::ostringstream ostr;
1568 if (is_non_blocking)
1569 ostr << MpiInfo(eMpiName::Testsome).name();
1571 ostr << MpiInfo(eMpiName::Waitsome).name();
1572 ostr <<
" size=" << size
1573 <<
" is_non_blocking=" << is_non_blocking;
1574 ex.setAdditionalInfo(ostr.str());
1578 for(
int z=0; z<nb_completed_request; ++z ){
1579 int index = completed_requests[z];
1581 debug() <<
"Completed my_rank=" << m_comm_rank <<
" z=" << z
1582 <<
" index=" << index
1583 <<
" tag=" << mpi_status[z].MPI_TAG
1584 <<
" source=" << mpi_status[z].MPI_SOURCE;
1586 indexes[index] =
true;
1589 bool has_new_request = _handleEndRequests(requests,indexes,mpi_status);
1590 if (has_new_request){
1595 double end_time = MPI_Wtime();
1596 m_stat->
add(MpiInfo(eMpiName::Waitsome).name(),end_time-begin_time,size);
1603freeRequest(Request& request)
1605 if (!request.isValid()){
1606 warning() <<
"MpiAdapter::freeRequest() null request r=" << (MPI_Request)request;
1607 _checkFatalInRequest();
1611 MpiLock::Section mls(m_mpi_lock);
1613 auto mr = (MPI_Request)request;
1615 MPI_Request_free(&mr);
1624testRequest(Request& request)
1627 if (!request.isValid())
1630 auto mr = (MPI_Request)request;
1631 int is_finished = 0;
1634 MpiLock::Section mls(m_mpi_lock);
1639 RequestSet::Iterator request_iter = m_request_set->
findRequest(mr);
1641 m_mpi_prof->test(&mr, &is_finished, (MPI_Status *) MPI_STATUS_IGNORE);
1643 if (is_finished!=0){
1644 m_request_set->removeRequest(request_iter);
1645 if (request.hasSubRequest())
1646 ARCCORE_THROW(NotImplementedException,
"SubRequest support");
1661_addRequest(MPI_Request request)
1663 m_request_set->addRequest(request);
1672_removeRequest(MPI_Request request)
1674 m_request_set->removeRequest(request);
1681enableDebugRequest(
bool enable_debug_request)
1683 m_stat->
enable(enable_debug_request);
1692_checkFatalInRequest()
1694 if (isRequestErrorAreFatal())
1695 ARCCORE_FATAL(
"Error in requests management");
1702setMpiProfiling(IMpiProfiling* mpi_profiling)
1704 m_mpi_prof = mpi_profiling;
1710IMpiProfiling* MpiAdapter::
1711getMpiProfiling()
const
1720setProfiler(IProfiler* profiler)
1723 m_mpi_prof =
nullptr;
1727 IMpiProfiling* p =
dynamic_cast<IMpiProfiling*
>(profiler);
1729 ARCCORE_FATAL(
"Invalid profiler. Profiler has to implemented interface 'IMpiProfiling'");
1736IProfiler* MpiAdapter::
bool empty() const
Capacité (nombre d'éléments alloués) du vecteur.
Vue modifiable d'un tableau d'un type T.
constexpr Integer size() const noexcept
Retourne la taille du tableau.
void add(ConstReferenceType val)
Ajoute l'élément val à la fin du tableau.
virtual StackTrace stackTrace(int first_function=0)=0
Chaîne de caractère indiquant la pile d'appel.
Interface du gestionnaire de traces.
Statistiques sur le parallélisme.
virtual void add(const String &name, double elapsed_time, Int64 msg_size)=0
Ajoute une statistique.
virtual void enable(bool is_enabled)=0
Active ou désactive les statistiques.
bool m_no_check_request
Vrai si on vérifie pas les requêtes.
Iterator findRequest(MPI_Request request)
Vérifie que la requête est dans la liste.
Request sendNonBlockingNoStat(const void *send_buffer, Int64 send_buffer_size, Int32 proc, MPI_Datatype data_type, int mpi_tag)
Version non bloquante de send sans statistique temporelle.
MpiAdapter(ITraceMng *msg, IStat *stat, MPI_Comm comm, MpiLock *mpi_lock, IMpiProfiling *mpi_prof=nullptr)
void setPrintRequestError(bool v)
Indique si on affiche des messages pour les erreurs dans les requêtes.
Request buildRequest(int ret, MPI_Request request)
Construit une requête Arccore à partir d'une requête MPI.
void setRequestErrorAreFatal(bool v)
Indique si les erreurs dans la liste des requêtes sont fatales.
Request receiveNonBlockingNoStat(void *recv_buffer, Int64 recv_buffer_size, Int32 source_rank, MPI_Datatype data_type, int mpi_tag)
Version non bloquante de receive sans statistiques temporelles.
void destroy()
Détruit l'instance. Elle ne doit plus être utilisée par la suite.
void setCheckRequest(bool v)
Indique si on vérifie les requêtes.
int commRank() const
Rang de cette instance dans le communicateur.
Verrou pour les appels MPI.
Spécialisation MPI d'une 'Request'.
static MpiMessagePassingMng * create(MPI_Comm comm, bool clean_comm=false)
Créé un gestionnaire associé au communicateur comm.
const String & toString() const
Chaîne de caractères indiquant la pile d'appel.
Chaîne de caractères unicode.
Classe d'accès aux traces.
TraceMessage warning() const
Flot pour un message d'avertissement.
TraceMessage error() const
Flot pour un message d'erreur.
TraceMessage info() const
Flot pour un message d'information.
TraceMessageDbg debug(Trace::eDebugLevel=Trace::Medium) const
Flot pour un message de debug.
Vecteur 1D de données avec sémantique par valeur (style STL).
Int32 Integer
Type représentant un entier.
std::int64_t Int64
Type entier signé sur 64 bits.
ARCCORE_BASE_EXPORT bool arccoreIsCheck()
Vrai si on est en mode vérification.
std::int32_t Int32
Type entier signé sur 32 bits.