14#include "arccore/message_passing_mpi/internal/MpiAdapter.h"
16#include "arccore/trace/ITraceMng.h"
18#include "arccore/collections/Array.h"
20#include "arccore/message_passing/Request.h"
21#include "arccore/message_passing/IStat.h"
22#include "arccore/message_passing/internal/SubRequestCompletionInfo.h"
24#include "arccore/base/IStackTraceService.h"
25#include "arccore/base/TimeoutException.h"
26#include "arccore/base/String.h"
27#include "arccore/base/NotImplementedException.h"
28#include "arccore/base/PlatformUtils.h"
29#include "arccore/base/FatalErrorException.h"
30#include "arccore/base/TraceInfo.h"
32#include "arccore/message_passing_mpi/StandaloneMpiMessagePassingMng.h"
33#include "arccore/message_passing_mpi/internal/MpiLock.h"
34#include "arccore/message_passing_mpi/internal/NoMpiProfiling.h"
35#include "arccore/message_passing_mpi/internal/MpiRequest.h"
36#include "arccore/message_passing_mpi/internal/MpiMachineMemoryWindowBaseInternalCreator.h"
43namespace Arcane::MessagePassing::Mpi
59 typedef std::map<MPI_Request,RequestInfo>::iterator Iterator;
68 m_request_error_is_fatal =
true;
71 m_is_report_error_in_request =
false;
73 m_use_trace_full_stack =
true;
75 m_trace_mpirequest =
true;
78 void addRequest(MPI_Request request)
82 if (m_trace_mpirequest)
83 info() <<
"MpiAdapter: AddRequest r=" << request;
86 void addRequest(MPI_Request request,
const TraceInfo& ti)
90 if (m_trace_mpirequest)
91 info() <<
"MpiAdapter: AddRequest r=" << request;
94 void removeRequest(MPI_Request request)
98 if (m_trace_mpirequest)
99 info() <<
"MpiAdapter: RemoveRequest r=" << request;
102 void removeRequest(
Iterator request_iter)
106 if (request_iter==m_allocated_requests.end()){
107 if (m_trace_mpirequest)
108 info() <<
"MpiAdapter: RemoveRequestIter null iterator";
111 if (m_trace_mpirequest)
112 info() <<
"MpiAdapter: RemoveRequestIter r=" << request_iter->first;
113 m_allocated_requests.erase(request_iter);
119 return m_allocated_requests.end();
121 if (_isEmptyRequest(request))
122 return m_allocated_requests.end();
123 auto ireq = m_allocated_requests.find(request);
124 if (ireq==m_allocated_requests.end()){
125 if (m_is_report_error_in_request || m_request_error_is_fatal){
126 error() <<
"MpiAdapter::testRequest() request not referenced "
127 <<
" id=" << request;
128 _checkFatalInRequest();
139 if (request==MPI_REQUEST_NULL){
140 if (m_is_report_error_in_request || m_request_error_is_fatal){
141 error() <<
"MpiAdapter::_addRequest() trying to add null request";
142 _checkFatalInRequest();
146 if (_isEmptyRequest(request))
148 ++m_total_added_request;
150 auto i = m_allocated_requests.find(request);
151 if (i!=m_allocated_requests.end()){
152 if (m_is_report_error_in_request || m_request_error_is_fatal){
153 error() <<
"MpiAdapter::_addRequest() request already referenced "
154 <<
" id=" << request;
155 _checkFatalInRequest();
160 rinfo.m_trace = trace_info;
161 if (m_use_trace_full_stack)
163 m_allocated_requests.insert(std::make_pair(request,rinfo));
172 if (request==MPI_REQUEST_NULL){
173 if (m_is_report_error_in_request || m_request_error_is_fatal){
174 error() <<
"MpiAdapter::_removeRequest() null request (" << MPI_REQUEST_NULL <<
")";
175 _checkFatalInRequest();
179 if (_isEmptyRequest(request))
181 auto i = m_allocated_requests.find(request);
182 if (i==m_allocated_requests.end()){
183 if (m_is_report_error_in_request || m_request_error_is_fatal){
184 error() <<
"MpiAdapter::_removeRequest() request not referenced "
185 <<
" id=" << request;
186 _checkFatalInRequest();
190 m_allocated_requests.erase(i);
193 void _checkFatalInRequest()
195 if (m_request_error_is_fatal)
196 ARCCORE_FATAL(
"Error in requests management");
198 Int64 nbRequest()
const {
return m_allocated_requests.size(); }
199 Int64 totalAddedRequest()
const {
return m_total_added_request; }
200 void printRequests()
const
202 info() <<
"PRINT REQUESTS\n";
203 for(
auto& x : m_allocated_requests ){
204 info() <<
"Request id=" << x.first <<
" trace=" << x.second.m_trace
205 <<
" stack=" << x.second.m_stack_trace;
208 void setEmptyRequests(MPI_Request r1,MPI_Request r2)
210 m_empty_request1 = r1;
211 m_empty_request2 = r2;
214 bool m_request_error_is_fatal =
false;
215 bool m_is_report_error_in_request =
true;
216 bool m_trace_mpirequest =
false;
220 std::map<MPI_Request,RequestInfo> m_allocated_requests;
221 bool m_use_trace_full_stack =
false;
222 MPI_Request m_empty_request1 = MPI_REQUEST_NULL;
223 MPI_Request m_empty_request2 = MPI_REQUEST_NULL;
224 Int64 m_total_added_request = 0;
227 bool _isEmptyRequest(MPI_Request r)
const
229 return (r==m_empty_request1 || r==m_empty_request2);
233#define ARCCORE_ADD_REQUEST(request)\
234 m_request_set->addRequest(request,A_FUNCINFO);
241int _checkSize(
Int64 i64_size)
243 if (i64_size>INT32_MAX)
244 ARCCORE_FATAL(
"Can not convert '{0}' to type integer",i64_size);
245 return (
int)i64_size;
257, m_mpi_lock(mpi_lock)
263, m_empty_request2(MPI_REQUEST_NULL)
271 if (s ==
"1" || s ==
"TRUE")
272 m_is_allow_null_rank_for_any_source =
true;
273 if (s ==
"0" || s ==
"FALSE")
274 m_is_allow_null_rank_for_any_source =
false;
295 MPI_Irecv(m_recv_buffer_for_empty_request, 1, MPI_CHAR, MPI_PROC_NULL,
305 m_send_buffer_for_empty_request2[0] = 0;
306 MPI_Isend(m_send_buffer_for_empty_request2, 1, MPI_CHAR, m_comm_rank,
309 MPI_Recv(m_recv_buffer_for_empty_request2, 1, MPI_CHAR, m_comm_rank,
323 if (m_empty_request2 != MPI_REQUEST_NULL)
324 MPI_Request_free(&m_empty_request2);
326 delete m_request_set;
345 Int64 nb_request = m_request_set->nbRequest();
350 warning() <<
" Pending mpi requests size=" << nb_request;
351 m_request_set->printRequests();
352 _checkFatalInRequest();
362 _checkHasNoRequests();
372 m_request_set->m_request_error_is_fatal = v;
375isRequestErrorAreFatal()
const
377 return m_request_set->m_request_error_is_fatal;
383 m_request_set->m_is_report_error_in_request = v;
386isPrintRequestError()
const
388 return m_request_set->m_is_report_error_in_request;
394 m_request_set->m_no_check_request = !v;
398isCheckRequest()
const
407toMPISize(
Int64 count)
409 return _checkSize(count);
416_trace(
const char* function)
423 info() <<
"MPI_TRACE: " << function;
431broadcast(
void* buf,
Int64 nb_elem,
Int32 root,MPI_Datatype datatype)
433 int _nb_elem = _checkSize(nb_elem);
434 _trace(MpiInfo(eMpiName::Bcast).name().localstr());
435 double begin_time = MPI_Wtime();
437 info() <<
"MPI_TRACE: MPI broadcast: before"
439 <<
" nb_elem=" << nb_elem
441 <<
" datatype=" << datatype;
443 m_mpi_prof->broadcast(buf, _nb_elem, datatype, root,
m_communicator);
444 double end_time = MPI_Wtime();
445 double sr_time = (end_time-begin_time);
447 m_stat->add(MpiInfo(eMpiName::Bcast).name(),sr_time,0);
454nonBlockingBroadcast(
void* buf,
Int64 nb_elem,
Int32 root,MPI_Datatype datatype)
456 MPI_Request mpi_request = MPI_REQUEST_NULL;
458 int _nb_elem = _checkSize(nb_elem);
459 _trace(
" MPI_Bcast");
460 double begin_time = MPI_Wtime();
461 ret = MPI_Ibcast(buf,_nb_elem,datatype,root,
m_communicator,&mpi_request);
462 double end_time = MPI_Wtime();
463 double sr_time = (end_time-begin_time);
465 m_stat->add(
"IBroadcast",sr_time,0);
466 ARCCORE_ADD_REQUEST(mpi_request);
474gather(
const void* send_buf,
void* recv_buf,
Int64 nb_elem,
Int32 root,MPI_Datatype datatype)
476 void* _sbuf =
const_cast<void*
>(send_buf);
477 int _nb_elem = _checkSize(nb_elem);
478 int _root =
static_cast<int>(root);
479 _trace(MpiInfo(eMpiName::Gather).name().localstr());
480 double begin_time = MPI_Wtime();
481 m_mpi_prof->gather(_sbuf, _nb_elem, datatype, recv_buf, _nb_elem, datatype, _root,
m_communicator);
482 double end_time = MPI_Wtime();
483 double sr_time = (end_time-begin_time);
485 m_stat->add(MpiInfo(eMpiName::Gather).name(),sr_time,0);
492nonBlockingGather(
const void* send_buf,
void* recv_buf,
493 Int64 nb_elem,
Int32 root,MPI_Datatype datatype)
495 MPI_Request mpi_request = MPI_REQUEST_NULL;
497 void* _sbuf =
const_cast<void*
>(send_buf);
498 int _nb_elem = _checkSize(nb_elem);
499 int _root =
static_cast<int>(root);
500 _trace(
"MPI_Igather");
501 double begin_time = MPI_Wtime();
502 ret = MPI_Igather(_sbuf,_nb_elem,datatype,recv_buf,_nb_elem,datatype,_root,
504 double end_time = MPI_Wtime();
505 double sr_time = (end_time-begin_time);
507 m_stat->add(
"IGather",sr_time,0);
508 ARCCORE_ADD_REQUEST(mpi_request);
516allGather(
const void* send_buf,
void* recv_buf,
517 Int64 nb_elem,MPI_Datatype datatype)
519 void* _sbuf =
const_cast<void*
>(send_buf);
520 int _nb_elem = _checkSize(nb_elem);
521 _trace(MpiInfo(eMpiName::Allgather).name().localstr());
522 double begin_time = MPI_Wtime();
523 m_mpi_prof->allGather(_sbuf, _nb_elem, datatype, recv_buf, _nb_elem, datatype,
m_communicator);
524 double end_time = MPI_Wtime();
525 double sr_time = (end_time-begin_time);
527 m_stat->add(MpiInfo(eMpiName::Allgather).name(),sr_time,0);
534nonBlockingAllGather(
const void* send_buf,
void* recv_buf,
535 Int64 nb_elem,MPI_Datatype datatype)
537 MPI_Request mpi_request = MPI_REQUEST_NULL;
539 void* _sbuf =
const_cast<void*
>(send_buf);
540 int _nb_elem = _checkSize(nb_elem);
541 _trace(
"MPI_Iallgather");
542 double begin_time = MPI_Wtime();
543 ret = MPI_Iallgather(_sbuf,_nb_elem,datatype,recv_buf,_nb_elem,datatype,
545 double end_time = MPI_Wtime();
546 double sr_time = (end_time-begin_time);
548 m_stat->add(
"IAllGather",sr_time,0);
549 ARCCORE_ADD_REQUEST(mpi_request);
557gatherVariable(
const void* send_buf,
void* recv_buf,
const int* recv_counts,
558 const int* recv_indexes,
Int64 nb_elem,
Int32 root,MPI_Datatype datatype)
560 void* _sbuf =
const_cast<void*
>(send_buf);
561 int _nb_elem = _checkSize(nb_elem);
562 int _root =
static_cast<int>(root);
563 _trace(MpiInfo(eMpiName::Gatherv).name().localstr());
564 double begin_time = MPI_Wtime();
565 m_mpi_prof->gatherVariable(_sbuf, _nb_elem, datatype, recv_buf, recv_counts, recv_indexes, datatype, _root,
m_communicator);
566 double end_time = MPI_Wtime();
567 double sr_time = (end_time-begin_time);
569 m_stat->add(MpiInfo(eMpiName::Gatherv).name().localstr(),sr_time,0);
576allGatherVariable(
const void* send_buf,
void* recv_buf,
const int* recv_counts,
577 const int* recv_indexes,
Int64 nb_elem,MPI_Datatype datatype)
579 void* _sbuf =
const_cast<void*
>(send_buf);
580 int _nb_elem = _checkSize(nb_elem);
581 _trace(MpiInfo(eMpiName::Allgatherv).name().localstr());
586 double begin_time = MPI_Wtime();
587 m_mpi_prof->allGatherVariable(_sbuf, _nb_elem, datatype, recv_buf, recv_counts, recv_indexes, datatype,
m_communicator);
588 double end_time = MPI_Wtime();
589 double sr_time = (end_time-begin_time);
591 m_stat->add(MpiInfo(eMpiName::Allgatherv).name().localstr(),sr_time,0);
598scatterVariable(
const void* send_buf,
const int* send_count,
const int* send_indexes,
599 void* recv_buf,
Int64 nb_elem,
Int32 root,MPI_Datatype datatype)
601 void* _sbuf =
const_cast<void*
>(send_buf);
602 int* _send_count =
const_cast<int*
>(send_count);
603 int* _send_indexes =
const_cast<int*
>(send_indexes);
604 int _nb_elem = _checkSize(nb_elem);
605 _trace(MpiInfo(eMpiName::Scatterv).name().localstr());
606 double begin_time = MPI_Wtime();
607 m_mpi_prof->scatterVariable(_sbuf,
616 double end_time = MPI_Wtime();
617 double sr_time = (end_time-begin_time);
619 m_stat->add(MpiInfo(eMpiName::Scatterv).name(),sr_time,0);
626allToAll(
const void* send_buf,
void* recv_buf,
Integer count,MPI_Datatype datatype)
628 void* _sbuf =
const_cast<void*
>(send_buf);
629 int icount = _checkSize(count);
630 _trace(MpiInfo(eMpiName::Alltoall).name().localstr());
631 double begin_time = MPI_Wtime();
632 m_mpi_prof->allToAll(_sbuf, icount, datatype, recv_buf, icount, datatype,
m_communicator);
633 double end_time = MPI_Wtime();
634 double sr_time = (end_time-begin_time);
636 m_stat->add(MpiInfo(eMpiName::Alltoall).name().localstr(),sr_time,0);
643nonBlockingAllToAll(
const void* send_buf,
void* recv_buf,
Integer count,MPI_Datatype datatype)
645 MPI_Request mpi_request = MPI_REQUEST_NULL;
647 void* _sbuf =
const_cast<void*
>(send_buf);
648 int icount = _checkSize(count);
649 _trace(
"MPI_IAlltoall");
650 double begin_time = MPI_Wtime();
651 ret = MPI_Ialltoall(_sbuf,icount,datatype,recv_buf,icount,datatype,
m_communicator,&mpi_request);
652 double end_time = MPI_Wtime();
653 double sr_time = (end_time-begin_time);
655 m_stat->add(
"IAllToAll",sr_time,0);
656 ARCCORE_ADD_REQUEST(mpi_request);
664allToAllVariable(
const void* send_buf,
const int* send_counts,
665 const int* send_indexes,
void* recv_buf,
const int* recv_counts,
666 const int* recv_indexes,MPI_Datatype datatype)
668 void* _sbuf =
const_cast<void*
>(send_buf);
669 int* _send_counts =
const_cast<int*
>(send_counts);
670 int* _send_indexes =
const_cast<int*
>(send_indexes);
671 int* _recv_counts =
const_cast<int*
>(recv_counts);
672 int* _recv_indexes =
const_cast<int*
>(recv_indexes);
674 _trace(MpiInfo(eMpiName::Alltoallv).name().localstr());
675 double begin_time = MPI_Wtime();
676 m_mpi_prof->allToAllVariable(_sbuf, _send_counts, _send_indexes, datatype,
678 double end_time = MPI_Wtime();
679 double sr_time = (end_time-begin_time);
681 m_stat->add(MpiInfo(eMpiName::Alltoallv).name(),sr_time,0);
688nonBlockingAllToAllVariable(
const void* send_buf,
const int* send_counts,
689 const int* send_indexes,
void* recv_buf,
const int* recv_counts,
690 const int* recv_indexes,MPI_Datatype datatype)
692 MPI_Request mpi_request = MPI_REQUEST_NULL;
694 void* _sbuf =
const_cast<void*
>(send_buf);
695 int* _send_counts =
const_cast<int*
>(send_counts);
696 int* _send_indexes =
const_cast<int*
>(send_indexes);
697 int* _recv_counts =
const_cast<int*
>(recv_counts);
698 int* _recv_indexes =
const_cast<int*
>(recv_indexes);
700 _trace(
"MPI_Ialltoallv");
701 double begin_time = MPI_Wtime();
702 ret = MPI_Ialltoallv(_sbuf,_send_counts,_send_indexes,datatype,
703 recv_buf,_recv_counts,_recv_indexes,datatype,
705 double end_time = MPI_Wtime();
706 double sr_time = (end_time-begin_time);
708 m_stat->add(
"IAllToAll",sr_time,0);
709 ARCCORE_ADD_REQUEST(mpi_request);
732 MPI_Request mpi_request = MPI_REQUEST_NULL;
735 ARCCORE_ADD_REQUEST(mpi_request);
743allReduce(
const void* send_buf,
void* recv_buf,
Int64 count,MPI_Datatype datatype,MPI_Op op)
745 void* _sbuf =
const_cast<void*
>(send_buf);
746 int _n = _checkSize(count);
747 double begin_time = MPI_Wtime();
748 _trace(MpiInfo(eMpiName::Allreduce).name().localstr());
751 m_mpi_prof->allReduce(_sbuf, recv_buf, _n, datatype, op,
m_communicator);
753 catch(TimeoutException& ex)
755 std::ostringstream ostr;
756 ostr <<
"MPI_Allreduce"
757 <<
" send_buf=" << send_buf
758 <<
" recv_buf=" << recv_buf
760 <<
" datatype=" << datatype
762 <<
" NB=" << m_nb_all_reduce;
763 ex.setAdditionalInfo(ostr.str());
766 double end_time = MPI_Wtime();
767 m_stat->add(MpiInfo(eMpiName::Allreduce).name(),end_time-begin_time,count);
774nonBlockingAllReduce(
const void* send_buf,
void* recv_buf,
Int64 count,MPI_Datatype datatype,MPI_Op op)
776 MPI_Request mpi_request = MPI_REQUEST_NULL;
778 void* _sbuf =
const_cast<void*
>(send_buf);
779 int _n = _checkSize(count);
780 double begin_time = MPI_Wtime();
781 _trace(
"MPI_IAllreduce");
782 ret = MPI_Iallreduce(_sbuf,recv_buf,_n,datatype,op,
m_communicator,&mpi_request);
783 double end_time = MPI_Wtime();
784 m_stat->add(
"IReduce",end_time-begin_time,_n);
785 ARCCORE_ADD_REQUEST(mpi_request);
793reduce(
const void* send_buf,
void* recv_buf,
Int64 count,MPI_Datatype datatype,MPI_Op op,
Integer root)
795 void* _sbuf =
const_cast<void*
>(send_buf);
796 int _n = _checkSize(count);
797 int _root =
static_cast<int>(root);
798 double begin_time = MPI_Wtime();
799 _trace(MpiInfo(eMpiName::Reduce).name().localstr());
802 m_mpi_prof->reduce(_sbuf, recv_buf, _n, datatype, op, _root,
m_communicator);
804 catch(TimeoutException& ex)
806 std::ostringstream ostr;
808 <<
" send_buf=" << send_buf
809 <<
" recv_buf=" << recv_buf
811 <<
" datatype=" << datatype
814 <<
" NB=" << m_nb_reduce;
815 ex.setAdditionalInfo(ostr.str());
819 double end_time = MPI_Wtime();
820 m_stat->add(MpiInfo(eMpiName::Reduce).name(),end_time-begin_time,0);
827scan(
const void* send_buf,
void* recv_buf,
Int64 count,MPI_Datatype datatype,MPI_Op op)
829 void* _sbuf =
const_cast<void*
>(send_buf);
830 int _n = _checkSize(count);
831 double begin_time = MPI_Wtime();
832 _trace(MpiInfo(eMpiName::Scan).name().localstr());
833 m_mpi_prof->scan(_sbuf, recv_buf, _n, datatype, op,
m_communicator);
834 double end_time = MPI_Wtime();
835 m_stat->add(MpiInfo(eMpiName::Scan).name(),end_time-begin_time,count);
842directSendRecv(
const void* send_buffer,
Int64 send_buffer_size,
843 void* recv_buffer,
Int64 recv_buffer_size,
844 Int32 proc,
Int64 elem_size,MPI_Datatype data_type)
846 void* v_send_buffer =
const_cast<void*
>(send_buffer);
847 MPI_Status mpi_status;
848 double begin_time = MPI_Wtime();
849 _trace(MpiInfo(eMpiName::Sendrecv).name().localstr());
850 int sbuf_size = _checkSize(send_buffer_size);
851 int rbuf_size = _checkSize(recv_buffer_size);
852 m_mpi_prof->sendRecv(v_send_buffer, sbuf_size, data_type, proc, 99,
853 recv_buffer, rbuf_size, data_type, proc, 99,
855 double end_time = MPI_Wtime();
856 Int64 send_size = send_buffer_size * elem_size;
857 Int64 recv_size = recv_buffer_size * elem_size;
858 double sr_time = (end_time-begin_time);
862 m_stat->add(MpiInfo(eMpiName::Sendrecv).name(),sr_time,send_size+recv_size);
870 Int32 dest_rank,MPI_Datatype data_type,
int mpi_tag)
872 void* v_send_buffer =
const_cast<void*
>(send_buffer);
873 MPI_Request mpi_request = MPI_REQUEST_NULL;
874 int sbuf_size = _checkSize(send_buffer_size);
876 m_mpi_prof->iSend(v_send_buffer, sbuf_size, data_type, dest_rank, mpi_tag,
m_communicator, &mpi_request);
878 info() <<
" ISend ret=" << ret <<
" proc=" << dest_rank <<
" tag=" << mpi_tag <<
" request=" << mpi_request;
879 ARCCORE_ADD_REQUEST(mpi_request);
887directSend(
const void* send_buffer,
Int64 send_buffer_size,
888 Int32 proc,
Int64 elem_size,MPI_Datatype data_type,
889 int mpi_tag,
bool is_blocked
892 void* v_send_buffer =
const_cast<void*
>(send_buffer);
893 MPI_Request mpi_request = MPI_REQUEST_NULL;
895 double begin_time = 0.0;
896 double end_time = 0.0;
897 Int64 send_size = send_buffer_size * elem_size;
900 info() <<
"MPI_TRACE: MPI Send: send before"
901 <<
" size=" << send_size
903 <<
" tag=" << mpi_tag
904 <<
" datatype=" << data_type
905 <<
" blocking " << is_blocked;
914 begin_time = MPI_Wtime();
915 int sbuf_size = _checkSize(send_buffer_size);
916 m_mpi_prof->iSend(v_send_buffer, sbuf_size, data_type, proc, mpi_tag,
m_communicator, &mpi_request);
919 MPI_Status mpi_status;
920 while (is_finished==0){
922 MPI_Request_get_status(mpi_request,&is_finished,&mpi_status);
924 m_mpi_prof->wait(&mpi_request, (MPI_Status *) MPI_STATUS_IGNORE);
925 end_time = MPI_Wtime();
926 mpi_request = MPI_REQUEST_NULL;
931 MpiLock::Section mls(m_mpi_lock);
932 begin_time = MPI_Wtime();
933 int sbuf_size = _checkSize(send_buffer_size);
934 m_mpi_prof->send(v_send_buffer, sbuf_size, data_type, proc, mpi_tag,
m_communicator);
935 end_time = MPI_Wtime();
940 MpiLock::Section mls(m_mpi_lock);
941 begin_time = MPI_Wtime();
942 int sbuf_size = _checkSize(send_buffer_size);
943 m_mpi_prof->iSend(v_send_buffer, sbuf_size, data_type, proc, mpi_tag,
m_communicator, &mpi_request);
945 info() <<
" ISend ret=" << ret <<
" proc=" << proc <<
" tag=" << mpi_tag <<
" request=" << mpi_request;
946 end_time = MPI_Wtime();
947 ARCCORE_ADD_REQUEST(mpi_request);
950 info() <<
"MPI Send: send after"
951 <<
" request=" << mpi_request;
954 double sr_time = (end_time-begin_time);
957 <<
" time " << sr_time <<
" blocking " << is_blocked;
959 m_stat->add(MpiInfo(eMpiName::Send).name(),end_time-begin_time,send_size);
967directSendPack(
const void* send_buffer,
Int64 send_buffer_size,
968 Int32 proc,
int mpi_tag,
bool is_blocked)
970 return directSend(send_buffer,send_buffer_size,proc,1,MPI_PACKED,mpi_tag,is_blocked);
994 Int32 source_rank,MPI_Datatype data_type,
int mpi_tag)
996 int rbuf_size = _checkSize(recv_buffer_size);
998 MPI_Request mpi_request = MPI_REQUEST_NULL;
999 m_mpi_prof->iRecv(recv_buffer, rbuf_size, data_type, source_rank, mpi_tag,
m_communicator, &mpi_request);
1000 ARCCORE_ADD_REQUEST(mpi_request);
1008directRecv(
void* recv_buffer,
Int64 recv_buffer_size,
1009 Int32 proc,
Int64 elem_size,MPI_Datatype data_type,
1010 int mpi_tag,
bool is_blocked)
1012 MPI_Status mpi_status;
1013 MPI_Request mpi_request = MPI_REQUEST_NULL;
1015 double begin_time = 0.0;
1016 double end_time = 0.0;
1019 if (proc==A_PROC_NULL_RANK)
1021 if (proc == A_NULL_RANK && !m_is_allow_null_rank_for_any_source)
1022 ARCCORE_FATAL(
"Can not use A_NULL_RANK for any source. Use A_ANY_SOURCE_RANK instead");
1023 if (proc==A_NULL_RANK || proc==A_ANY_SOURCE_RANK)
1024 i_proc = MPI_ANY_SOURCE;
1026 i_proc =
static_cast<int>(proc);
1028 Int64 recv_size = recv_buffer_size * elem_size;
1030 info() <<
"MPI_TRACE: MPI Recv: recv before "
1031 <<
" size=" << recv_size
1032 <<
" from=" << i_proc
1033 <<
" tag=" << mpi_tag
1034 <<
" datatype=" << data_type
1035 <<
" blocking=" << is_blocked;
1045 begin_time = MPI_Wtime();
1046 int rbuf_size = _checkSize(recv_buffer_size);
1047 m_mpi_prof->iRecv(recv_buffer, rbuf_size, data_type, i_proc, mpi_tag,
m_communicator, &mpi_request);
1049 int is_finished = 0;
1050 MPI_Status mpi_status;
1051 while (is_finished==0){
1053 MPI_Request_get_status(mpi_request,&is_finished,&mpi_status);
1054 if (is_finished!=0){
1055 end_time = MPI_Wtime();
1056 m_mpi_prof->wait(&mpi_request, (MPI_Status *) MPI_STATUS_IGNORE);
1057 mpi_request = MPI_REQUEST_NULL;
1062 MpiLock::Section mls(m_mpi_lock);
1063 begin_time = MPI_Wtime();
1064 int rbuf_size = _checkSize(recv_buffer_size);
1065 m_mpi_prof->recv(recv_buffer, rbuf_size, data_type, i_proc, mpi_tag,
m_communicator, &mpi_status);
1066 end_time = MPI_Wtime();
1071 MpiLock::Section mls(m_mpi_lock);
1072 begin_time = MPI_Wtime();
1073 int rbuf_size = _checkSize(recv_buffer_size);
1074 m_mpi_prof->iRecv(recv_buffer, rbuf_size, data_type, i_proc, mpi_tag,
m_communicator, &mpi_request);
1075 end_time = MPI_Wtime();
1076 ARCCORE_ADD_REQUEST(mpi_request);
1079 info() <<
"MPI Recv: recv after "
1080 <<
" request=" << mpi_request;
1083 double sr_time = (end_time-begin_time);
1086 <<
" time " << sr_time <<
" blocking " << is_blocked;
1087 m_stat->add(MpiInfo(eMpiName::Recv).name(),end_time-begin_time,recv_size);
1097 double begin_time = MPI_Wtime();
1099 int recv_buffer_size = 0;
1100 _trace(
"MPI_Probe");
1102 m_mpi_prof->getCount(&status, MPI_PACKED, &recv_buffer_size);
1104 recv_buffer.resize(recv_buffer_size);
1105 m_mpi_prof->recv(recv_buffer.data(), recv_buffer_size, MPI_PACKED, proc, 101,
m_communicator, &status);
1107 double end_time = MPI_Wtime();
1108 Int64 recv_size = recv_buffer_size;
1109 double sr_time = (end_time-begin_time);
1111 <<
" time " << sr_time;
1112 m_stat->add(MpiInfo(eMpiName::Recv).name(),end_time-begin_time,recv_size);
1119_buildSourceInfoFromStatus(
const MPI_Status& mpi_status)
1122 MPI_Count message_size = 0;
1123 MPI_Get_elements_x(&mpi_status,MPI_BYTE,&message_size);
1124 MessageTag tag(mpi_status.MPI_TAG);
1125 MessageRank rank(mpi_status.MPI_SOURCE);
1126 return MessageSourceInfo(rank,tag,message_size);
1135 MPI_Status mpi_status;
1136 int has_message = 0;
1137 MPI_Message message;
1139 int mpi_source = source.value();
1140 if (source.isProcNull())
1141 ARCCORE_THROW(NotImplementedException,
"Probe with MPI_PROC_NULL");
1142 if (source.isNull() && !m_is_allow_null_rank_for_any_source)
1143 ARCCORE_FATAL(
"Can not use MPI_Mprobe with null rank. Use MessageRank::anySourceRank() instead");
1144 if (source.isNull() || source.isAnySource())
1145 mpi_source = MPI_ANY_SOURCE;
1146 int mpi_tag = tag.value();
1148 mpi_tag = MPI_ANY_TAG;
1150 ret = MPI_Mprobe(mpi_source,mpi_tag,
m_communicator,&message,&mpi_status);
1154 ret = MPI_Improbe(mpi_source, mpi_tag,
m_communicator, &has_message, &message, &mpi_status);
1157 ARCCORE_FATAL(
"Error during call to MPI_Mprobe r={0}",ret);
1158 MessageId ret_message;
1159 if (has_message!=0){
1160 MessageSourceInfo si(_buildSourceInfoFromStatus(mpi_status));
1161 ret_message = MessageId(si,message);
1172 if (!message.isValid())
1176 if (!message.isRankTag())
1177 ARCCORE_FATAL(
"Invalid message_info: message.isRankTag() is false");
1179 return _probeMessage(message.destinationRank(),message.tag(),message.isBlocking());
1188 MPI_Status mpi_status;
1189 int has_message = 0;
1191 int mpi_source = source.value();
1192 if (source.isProcNull())
1193 ARCCORE_THROW(NotImplementedException,
"Probe with MPI_PROC_NULL");
1194 if (source.isNull() && !m_is_allow_null_rank_for_any_source)
1195 ARCCORE_FATAL(
"Can not use MPI_Probe with null rank. Use MessageRank::anySourceRank() instead");
1196 if (source.isNull() || source.isAnySource())
1197 mpi_source = MPI_ANY_SOURCE;
1198 int mpi_tag = tag.value();
1200 mpi_tag = MPI_ANY_TAG;
1206 ret = MPI_Iprobe(mpi_source,mpi_tag,
m_communicator,&has_message,&mpi_status);
1208 ARCCORE_FATAL(
"Error during call to MPI_Mprobe r={0}",ret);
1210 return _buildSourceInfoFromStatus(mpi_status);
1220 if (!message.isValid())
1224 if (!message.isRankTag())
1225 ARCCORE_FATAL(
"Invalid message_info: message.isRankTag() is false");
1227 return _legacyProbeMessage(message.destinationRank(),message.tag(),message.isBlocking());
1234directRecv(
void* recv_buffer,
Int64 recv_buffer_size,
1238 MPI_Status mpi_status;
1239 MPI_Request mpi_request = MPI_REQUEST_NULL;
1240 MPI_Message mpi_message = (MPI_Message)message;
1242 double begin_time = 0.0;
1243 double end_time = 0.0;
1245 Int64 recv_size = recv_buffer_size * elem_size;
1247 info() <<
"MPI_TRACE: MPI Mrecv: recv before "
1248 <<
" size=" << recv_size
1249 <<
" from_msg=" << message
1250 <<
" datatype=" << data_type
1251 <<
" blocking=" << is_blocked;
1261 begin_time = MPI_Wtime();
1262 int rbuf_size = _checkSize(recv_buffer_size);
1263 MPI_Imrecv(recv_buffer,rbuf_size,data_type,&mpi_message,&mpi_request);
1266 int is_finished = 0;
1267 MPI_Status mpi_status;
1268 while (is_finished==0){
1270 MPI_Request_get_status(mpi_request,&is_finished,&mpi_status);
1271 if (is_finished!=0){
1272 end_time = MPI_Wtime();
1273 m_mpi_prof->wait(&mpi_request, (MPI_Status *) MPI_STATUS_IGNORE);
1274 mpi_request = MPI_REQUEST_NULL;
1280 begin_time = MPI_Wtime();
1281 int rbuf_size = _checkSize(recv_buffer_size);
1282 MPI_Mrecv(recv_buffer,rbuf_size,data_type,&mpi_message,&mpi_status);
1284 end_time = MPI_Wtime();
1290 begin_time = MPI_Wtime();
1291 int rbuf_size = _checkSize(recv_buffer_size);
1293 ret = MPI_Imrecv(recv_buffer,rbuf_size,data_type,&mpi_message,&mpi_request);
1295 end_time = MPI_Wtime();
1296 ARCCORE_ADD_REQUEST(mpi_request);
1299 info() <<
"MPI Recv: recv after "
1300 <<
" request=" << mpi_request;
1303 double sr_time = (end_time-begin_time);
1306 <<
" time " << sr_time <<
" blocking " << is_blocked;
1307 m_stat->add(
MpiInfo(eMpiName::Recv).name(),end_time-begin_time,recv_size);
1315directRecvPack(
void* recv_buffer,
Int64 recv_buffer_size,
1316 Int32 proc,
int mpi_tag,
bool is_blocking)
1318 return directRecv(recv_buffer,recv_buffer_size,proc,1,MPI_PACKED,mpi_tag,is_blocking);
1330 while (_waitAllRequestsMPI(requests, indexes, mpi_status)){
1340waitSomeRequests(ArrayView<Request> requests,
1341 ArrayView<bool> indexes,
1342 bool is_non_blocking)
1344 UniqueArray<MPI_Status> mpi_status(requests.size());
1345 waitSomeRequestsMPI(requests, indexes, mpi_status, is_non_blocking);
1356 , mpi_source_rank(source_rank)
1357 , mpi_source_tag(source_tag)
1362 int mpi_source_rank = MPI_PROC_NULL;
1363 int mpi_source_tag = 0;
1377 for(
Integer i=0; i<size; ++i ) {
1378 if (done_indexes[i]){
1384 if (r.hasSubRequest()){
1386 info() <<
"Done request with sub-request r=" << r <<
" mpi_r=" << r <<
" i=" << i
1387 <<
" source_rank=" << status[i].MPI_SOURCE
1388 <<
" source_tag=" << status[i].MPI_TAG;
1389 new_requests.
add(
SubRequestInfo(r.subRequest(), i, status[i].MPI_SOURCE, status[i].MPI_TAG));
1402 bool has_new_request =
false;
1403 if (!new_requests.
empty()){
1405 UniqueArray<MPI_Status> old_status(size);
1408 for(
Integer i=0; i<size; ++i ){
1409 if (done_indexes[i]){
1410 old_status[i] = status[index];
1419 info() <<
"Before handle new request index=" << index
1420 <<
" sri.source_rank=" << sri.mpi_source_rank
1421 <<
" sri.source_tag=" << sri.mpi_source_tag;
1422 SubRequestCompletionInfo completion_info(MessageRank(old_status[index].MPI_SOURCE), MessageTag(old_status[index].MPI_TAG));
1423 Request r = sri.sub_request->executeOnCompletion(completion_info);
1425 info() <<
"Handle new request index=" << index <<
" old_r=" << requests[index] <<
" new_r=" << r;
1430 has_new_request =
true;
1431 requests[index] = r;
1432 done_indexes[index] =
false;
1437 for(
Integer i=0; i<size; ++i ){
1438 if (done_indexes[i]){
1439 status[index] = old_status[i];
1446 return has_new_request;
1457 Integer size = requests.size();
1461 UniqueArray<MPI_Request> mpi_request(size);
1462 for(
Integer i=0; i<size; ++i ){
1463 mpi_request[i] = (MPI_Request)(requests[i]);
1466 info() <<
" MPI_waitall begin size=" << size;
1467 double diff_time = 0.0;
1469 double begin_time = MPI_Wtime();
1470 for(
Integer i=0; i<size; ++i ){
1471 MPI_Request request = (MPI_Request)(mpi_request[i]);
1472 int is_finished = 0;
1473 while (is_finished==0){
1474 MpiLock::Section mls(m_mpi_lock);
1475 m_mpi_prof->test(&request, &is_finished, (MPI_Status *) MPI_STATUS_IGNORE);
1478 double end_time = MPI_Wtime();
1479 diff_time = end_time - begin_time;
1483 MpiLock::Section mls(m_mpi_lock);
1484 double begin_time = MPI_Wtime();
1485 m_mpi_prof->waitAll(size, mpi_request.data(), mpi_status.data());
1486 double end_time = MPI_Wtime();
1487 diff_time = end_time - begin_time;
1491 for(
Integer i=0; i<size; ++i ){
1495 bool has_new_request = _handleEndRequests(requests,indexes,mpi_status);
1497 info() <<
" MPI_waitall end size=" << size;
1498 m_stat->add(MpiInfo(eMpiName::Waitall).name(),diff_time,size);
1499 return has_new_request;
1509 Integer size = requests.size();
1513 UniqueArray<MPI_Request> mpi_request(size);
1514 UniqueArray<MPI_Request> saved_mpi_request(size);
1515 UniqueArray<int> completed_requests(size);
1516 int nb_completed_request = 0;
1520 for (
Integer i = 0; i < size; ++i) {
1524 if (!requests[i].isValid()) {
1525 saved_mpi_request[i] = MPI_REQUEST_NULL;
1528 saved_mpi_request[i] =
static_cast<MPI_Request
>(requests[i]);
1534 bool is_print_debug = m_is_trace || (!is_non_blocking);
1536 debug() <<
"WaitRequestBegin is_non_blocking=" << is_non_blocking <<
" n=" << size;
1538 double begin_time = MPI_Wtime();
1541 if (is_non_blocking){
1542 _trace(MpiInfo(eMpiName::Testsome).name().localstr());
1544 MpiLock::Section mls(m_mpi_lock);
1545 m_mpi_prof->testSome(size, saved_mpi_request.data(), &nb_completed_request,
1546 completed_requests.data(), mpi_status.data());
1549 if (nb_completed_request == MPI_UNDEFINED)
1550 nb_completed_request = 0;
1552 debug() <<
"WaitSomeRequestMPI: TestSome nb_completed=" << nb_completed_request;
1555 _trace(MpiInfo(eMpiName::Waitsome).name().localstr());
1559 MpiLock::Section mls(m_mpi_lock);
1560 m_mpi_prof->waitSome(size, saved_mpi_request.data(), &nb_completed_request,
1561 completed_requests.data(), mpi_status.data());
1565 if (nb_completed_request == MPI_UNDEFINED)
1566 nb_completed_request = 0;
1568 debug() <<
"WaitSomeRequest nb_completed=" << nb_completed_request;
1571 catch(TimeoutException& ex)
1573 std::ostringstream ostr;
1574 if (is_non_blocking)
1575 ostr << MpiInfo(eMpiName::Testsome).name();
1577 ostr << MpiInfo(eMpiName::Waitsome).name();
1578 ostr <<
" size=" << size
1579 <<
" is_non_blocking=" << is_non_blocking;
1580 ex.setAdditionalInfo(ostr.str());
1584 for(
int z=0; z<nb_completed_request; ++z ){
1585 int index = completed_requests[z];
1587 debug() <<
"Completed my_rank=" << m_comm_rank <<
" z=" << z
1588 <<
" index=" << index
1589 <<
" tag=" << mpi_status[z].MPI_TAG
1590 <<
" source=" << mpi_status[z].MPI_SOURCE;
1592 indexes[index] =
true;
1595 bool has_new_request = _handleEndRequests(requests,indexes,mpi_status);
1596 if (has_new_request){
1601 double end_time = MPI_Wtime();
1602 m_stat->add(MpiInfo(eMpiName::Waitsome).name(),end_time-begin_time,size);
1611 if (!request.isValid()){
1612 warning() <<
"MpiAdapter::freeRequest() null request r=" << (MPI_Request)request;
1613 _checkFatalInRequest();
1617 MpiLock::Section mls(m_mpi_lock);
1619 auto mr = (MPI_Request)request;
1621 MPI_Request_free(&mr);
1633 if (!request.isValid())
1636 auto mr = (MPI_Request)request;
1637 int is_finished = 0;
1640 MpiLock::Section mls(m_mpi_lock);
1645 RequestSet::Iterator request_iter = m_request_set->findRequest(mr);
1647 m_mpi_prof->test(&mr, &is_finished, (MPI_Status *) MPI_STATUS_IGNORE);
1649 if (is_finished!=0){
1650 m_request_set->removeRequest(request_iter);
1651 if (request.hasSubRequest())
1652 ARCCORE_THROW(NotImplementedException,
"SubRequest support");
1669 m_request_set->addRequest(request);
1680 m_request_set->removeRequest(request);
1687enableDebugRequest(
bool enable_debug_request)
1689 m_stat->
enable(enable_debug_request);
1698_checkFatalInRequest()
1700 if (isRequestErrorAreFatal())
1701 ARCCORE_FATAL(
"Error in requests management");
1710 m_mpi_prof = mpi_profiling;
1716IMpiProfiling* MpiAdapter::
1717getMpiProfiling()
const
1729 m_mpi_prof =
nullptr;
1733 IMpiProfiling* p =
dynamic_cast<IMpiProfiling*
>(profiler);
1735 ARCCORE_FATAL(
"Invalid profiler. Profiler has to implemented interface 'IMpiProfiling'");
1752windowCreator(MPI_Comm comm_machine)
1754 if (m_window_creator.isNull()) {
1755 Integer machine_comm_rank = 0;
1756 Integer machine_comm_size = 0;
1757 ::MPI_Comm_rank(comm_machine, &machine_comm_rank);
1758 ::MPI_Comm_size(comm_machine, &machine_comm_size);
1759 m_window_creator =
makeRef(
new MpiMachineMemoryWindowBaseInternalCreator(comm_machine, machine_comm_rank, machine_comm_size,
m_communicator, m_comm_size));
1761 return m_window_creator.get();
bool empty() const
Capacité (nombre d'éléments alloués) du vecteur.
Vue modifiable d'un tableau d'un type T.
constexpr Integer size() const noexcept
Retourne la taille du tableau.
void add(ConstReferenceType val)
Ajoute l'élément val à la fin du tableau.
Interface d'un service de trace des appels de fonctions.
virtual StackTrace stackTrace(int first_function=0)=0
Chaîne de caractère indiquant la pile d'appel.
Interface du gestionnaire de traces.
Interface d'un profiler pour les échanges de messages.
Statistiques sur le parallélisme.
virtual void enable(bool is_enabled)=0
Active ou désactive les statistiques.
Informations sur la source d'un message.
Interface d'abstraction pour les operations MPI. Sert principalement a utiliser un decorateur pour le...
void _removeRequest(MPI_Request request)
Iterator findRequest(MPI_Request request)
Vérifie que la requête est dans la liste.
bool m_no_check_request
Vrai si on vérifie pas les requêtes.
void _addRequest(MPI_Request request, const TraceInfo &trace_info)
MpiAdapter(ITraceMng *msg, IStat *stat, MPI_Comm comm, MpiLock *mpi_lock, IMpiProfiling *mpi_prof=nullptr)
void setRequestErrorAreFatal(bool v)
Indique si les erreurs dans la liste des requêtes sont fatales.
int commRank() const
Rang de cette instance dans le communicateur.
void destroy()
Détruit l'instance. Elle ne doit plus être utilisée par la suite.
MPI_Comm m_communicator
Communicateur MPI.
Request sendNonBlockingNoStat(const void *send_buffer, Int64 send_buffer_size, Int32 proc, MPI_Datatype data_type, int mpi_tag)
Version non bloquante de send sans statistique temporelle.
void setCheckRequest(bool v)
Indique si on vérifie les requêtes.
void setPrintRequestError(bool v)
Indique si on affiche des messages pour les erreurs dans les requêtes.
Request buildRequest(int ret, MPI_Request request)
Construit une requête Arccore à partir d'une requête MPI.
MPI_Request m_empty_request1
Requêtes vides. Voir MpiAdapter.cc pour plus d'infos.
void _addRequest(MPI_Request request)
Request receiveNonBlockingNoStat(void *recv_buffer, Int64 recv_buffer_size, Int32 source_rank, MPI_Datatype data_type, int mpi_tag)
Version non bloquante de receive sans statistiques temporelles.
void _removeRequest(MPI_Request request)
Structure informative liee aux enumerationx pour les operations MPI. Donne le nom associe a l'enum ai...
Verrou pour les appels MPI.
Implémentation MPI du gestionnaire des échanges de messages.
Spécialisation MPI d'une 'Request'.
Implementation de l'interface des operations MPI. Correspond a un simple appel aux fonctions MPI du m...
static MpiMessagePassingMng * create(MPI_Comm comm, bool clean_comm=false)
Créé un gestionnaire associé au communicateur comm.
Informations pour envoyer/recevoir un message point à point.
Exception lorsqu'une fonction n'est pas implémentée.
Référence à une instance.
const String & toString() const
Chaîne de caractères indiquant la pile d'appel.
Chaîne de caractères unicode.
TraceAccessor(ITraceMng *m)
Construit un accesseur via le gestionnaire de trace m.
TraceMessageDbg debug(Trace::eDebugLevel=Trace::Medium) const
Flot pour un message de debug.
TraceMessage info() const
Flot pour un message d'information.
TraceMessage error() const
Flot pour un message d'erreur.
TraceMessage warning() const
Flot pour un message d'avertissement.
Vecteur 1D de données avec sémantique par valeur (style STL).
std::int64_t Int64
Type entier signé sur 64 bits.
Int32 Integer
Type représentant un entier.
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Créé une référence sur un pointeur.
ARCCORE_BASE_EXPORT bool arccoreIsCheck()
Vrai si on est en mode vérification.
std::int32_t Int32
Type entier signé sur 32 bits.