Arcane  v3.14.10.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
MpiAdapter.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2024 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* MpiAdapter.cc (C) 2000-2024 */
9/* */
10/* Gestionnaire de parallélisme utilisant MPI. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arccore/message_passing_mpi/MpiAdapter.h"
15
16#include "arccore/trace/ITraceMng.h"
17
18#include "arccore/collections/Array.h"
19
20#include "arccore/message_passing/Request.h"
21#include "arccore/message_passing/IStat.h"
22#include "arccore/message_passing/internal/SubRequestCompletionInfo.h"
23
24#include "arccore/base/IStackTraceService.h"
25#include "arccore/base/TimeoutException.h"
26#include "arccore/base/String.h"
27#include "arccore/base/NotImplementedException.h"
28#include "arccore/base/PlatformUtils.h"
29#include "arccore/base/FatalErrorException.h"
30#include "arccore/base/TraceInfo.h"
31
32#include "arccore/message_passing_mpi/MpiLock.h"
33#include "arccore/message_passing_mpi/NoMpiProfiling.h"
34#include "arccore/message_passing_mpi/MpiRequest.h"
35#include "arccore/message_passing_mpi/StandaloneMpiMessagePassingMng.h"
36
37#include <cstdint>
38
39/*---------------------------------------------------------------------------*/
40/*---------------------------------------------------------------------------*/
41
42namespace Arccore::MessagePassing::Mpi
43{
44
46: public TraceAccessor
47{
48 public:
50 {
51 TraceInfo m_trace;
52 String m_stack_trace;
53 };
54 public:
55 typedef std::map<MPI_Request,RequestInfo>::iterator Iterator;
56 public:
58 {
59 m_trace_mng_ref = makeRef(tm);
60 if (arccoreIsCheck()){
61 m_no_check_request = false;
62 m_request_error_is_fatal = true;
63 }
64 if (Platform::getEnvironmentVariable("ARCCORE_NOREPORT_ERROR_MPIREQUEST")=="TRUE")
65 m_is_report_error_in_request = false;
66 if (Platform::getEnvironmentVariable("ARCCORE_MPIREQUEST_STACKTRACE")=="TRUE")
67 m_use_trace_full_stack = true;
68 if (Platform::getEnvironmentVariable("ARCCORE_TRACE_MPIREQUEST")=="TRUE")
69 m_trace_mpirequest = true;
70 }
71 public:
72 void addRequest(MPI_Request request)
73 {
75 return;
76 if (m_trace_mpirequest)
77 info() << "MpiAdapter: AddRequest r=" << request;
78 _addRequest(request,TraceInfo());
79 }
80 void addRequest(MPI_Request request,const TraceInfo& ti)
81 {
83 return;
84 if (m_trace_mpirequest)
85 info() << "MpiAdapter: AddRequest r=" << request;
86 _addRequest(request,ti);
87 }
88 void removeRequest(MPI_Request request)
89 {
91 return;
92 if (m_trace_mpirequest)
93 info() << "MpiAdapter: RemoveRequest r=" << request;
94 _removeRequest(request);
95 }
96 void removeRequest(Iterator request_iter)
97 {
99 return;
100 if (request_iter==m_allocated_requests.end()){
101 if (m_trace_mpirequest)
102 info() << "MpiAdapter: RemoveRequestIter null iterator";
103 return;
104 }
105 if (m_trace_mpirequest)
106 info() << "MpiAdapter: RemoveRequestIter r=" << request_iter->first;
107 m_allocated_requests.erase(request_iter);
108 }
110 Iterator findRequest(MPI_Request request)
111 {
113 return m_allocated_requests.end();
114
115 if (_isEmptyRequest(request))
116 return m_allocated_requests.end();
117 auto ireq = m_allocated_requests.find(request);
118 if (ireq==m_allocated_requests.end()){
119 if (m_is_report_error_in_request || m_request_error_is_fatal){
120 error() << "MpiAdapter::testRequest() request not referenced "
121 << " id=" << request;
122 _checkFatalInRequest();
123 }
124 }
125 return ireq;
126 }
127 private:
132 {
133 if (request==MPI_REQUEST_NULL){
134 if (m_is_report_error_in_request || m_request_error_is_fatal){
135 error() << "MpiAdapter::_addRequest() trying to add null request";
136 _checkFatalInRequest();
137 }
138 return;
139 }
140 if (_isEmptyRequest(request))
141 return;
142 ++m_total_added_request;
143 //info() << "MPI_ADAPTER:ADD REQUEST " << request;
144 auto i = m_allocated_requests.find(request);
145 if (i!=m_allocated_requests.end()){
146 if (m_is_report_error_in_request || m_request_error_is_fatal){
147 error() << "MpiAdapter::_addRequest() request already referenced "
148 << " id=" << request;
149 _checkFatalInRequest();
150 }
151 return;
152 }
154 rinfo.m_trace = trace_info;
155 if (m_use_trace_full_stack)
156 rinfo.m_stack_trace = Platform::getStackTrace();
157 m_allocated_requests.insert(std::make_pair(request,rinfo));
158 }
159
164 {
165 //info() << "MPI_ADAPTER:REMOVE REQUEST " << request;
166 if (request==MPI_REQUEST_NULL){
167 if (m_is_report_error_in_request || m_request_error_is_fatal){
168 error() << "MpiAdapter::_removeRequest() null request (" << MPI_REQUEST_NULL << ")";
169 _checkFatalInRequest();
170 }
171 return;
172 }
173 if (_isEmptyRequest(request))
174 return;
175 auto i = m_allocated_requests.find(request);
176 if (i==m_allocated_requests.end()){
177 if (m_is_report_error_in_request || m_request_error_is_fatal){
178 error() << "MpiAdapter::_removeRequest() request not referenced "
179 << " id=" << request;
180 _checkFatalInRequest();
181 }
182 }
183 else
184 m_allocated_requests.erase(i);
185 }
186 public:
187 void _checkFatalInRequest()
188 {
189 if (m_request_error_is_fatal)
190 ARCCORE_FATAL("Error in requests management");
191 }
192 Int64 nbRequest() const { return m_allocated_requests.size(); }
193 Int64 totalAddedRequest() const { return m_total_added_request; }
194 void printRequests() const
195 {
196 info() << "PRINT REQUESTS\n";
197 for( auto& x : m_allocated_requests ){
198 info() << "Request id=" << x.first << " trace=" << x.second.m_trace
199 << " stack=" << x.second.m_stack_trace;
200 }
201 }
202 void setEmptyRequests(MPI_Request r1,MPI_Request r2)
203 {
204 m_empty_request1 = r1;
205 m_empty_request2 = r2;
206 }
207 public:
208 bool m_request_error_is_fatal = false;
209 bool m_is_report_error_in_request = true;
210 bool m_trace_mpirequest = false;
213 private:
214 std::map<MPI_Request,RequestInfo> m_allocated_requests;
215 bool m_use_trace_full_stack = false;
216 MPI_Request m_empty_request1 = MPI_REQUEST_NULL;
217 MPI_Request m_empty_request2 = MPI_REQUEST_NULL;
218 Int64 m_total_added_request = 0;
219 Ref<ITraceMng> m_trace_mng_ref;
220 private:
221 bool _isEmptyRequest(MPI_Request r) const
222 {
223 return (r==m_empty_request1 || r==m_empty_request2);
224 }
225};
226
227#define ARCCORE_ADD_REQUEST(request)\
228 m_request_set->addRequest(request,A_FUNCINFO);
229
230/*---------------------------------------------------------------------------*/
231/*---------------------------------------------------------------------------*/
232
233namespace
234{
235int _checkSize(Int64 i64_size)
236{
237 if (i64_size>INT32_MAX)
238 ARCCORE_FATAL("Can not convert '{0}' to type integer",i64_size);
239 return (int)i64_size;
240}
241}
242
243/*---------------------------------------------------------------------------*/
244/*---------------------------------------------------------------------------*/
245
248 MpiLock* mpi_lock, IMpiProfiling* mpi_op)
249: TraceAccessor(trace)
250, m_stat(stat)
251, m_mpi_lock(mpi_lock)
252, m_mpi_prof(mpi_op)
253, m_communicator(comm)
254, m_comm_rank(0)
255, m_comm_size(0)
256, m_empty_request1(MPI_REQUEST_NULL)
257, m_empty_request2(MPI_REQUEST_NULL)
258{
259 m_request_set = new RequestSet(trace);
260
261 if (Platform::getEnvironmentVariable("ARCCORE_TRACE_MPI")=="TRUE")
262 m_is_trace = true;
263 {
264 String s = Platform::getEnvironmentVariable("ARCCORE_ALLOW_NULL_RANK_FOR_MPI_ANY_SOURCE");
265 if (s == "1" || s == "TRUE")
266 m_is_allow_null_rank_for_any_source = true;
267 if (s == "0" || s == "FALSE")
268 m_is_allow_null_rank_for_any_source = false;
269 }
270
271 ::MPI_Comm_rank(m_communicator,&m_comm_rank);
272 ::MPI_Comm_size(m_communicator,&m_comm_size);
273
274 // Par defaut, on ne fait pas de profiling MPI, on utilisera la methode set idoine pour changer
275 if (!m_mpi_prof)
276 m_mpi_prof = new NoMpiProfiling();
277
289 MPI_Irecv(m_recv_buffer_for_empty_request, 1, MPI_CHAR, MPI_PROC_NULL,
291
292 /*
293 * A partir de la version 4 de openmpi, il semble aussi que les send avec
294 * de petits buffers génèrent toujours la même requête. Il faut donc aussi
295 * la supprimer des requêtes à tester. On poste aussi le MPI_Recv correspondant
296 * pour éviter que le MPI_ISend puisse involontairement être utilisé dans un
297 * MPI_Recv de l'utilisateur (via par exemple un MPI_Recv(MPI_ANY_TAG).
298 */
299 m_send_buffer_for_empty_request2[0] = 0;
300 MPI_Isend(m_send_buffer_for_empty_request2, 1, MPI_CHAR, m_comm_rank,
301 50505, m_communicator, &m_empty_request2);
302
303 MPI_Recv(m_recv_buffer_for_empty_request2, 1, MPI_CHAR, m_comm_rank,
305
306 m_request_set->setEmptyRequests(m_empty_request1,m_empty_request2);
307}
308
309/*---------------------------------------------------------------------------*/
310/*---------------------------------------------------------------------------*/
311
312MpiAdapter::
313~MpiAdapter()
314{
317 if (m_empty_request2 != MPI_REQUEST_NULL)
318 MPI_Request_free(&m_empty_request2);
319
320 delete m_request_set;
321 delete m_mpi_prof;
322}
323
324/*---------------------------------------------------------------------------*/
325/*---------------------------------------------------------------------------*/
326
332
333/*---------------------------------------------------------------------------*/
334/*---------------------------------------------------------------------------*/
335
336void MpiAdapter::
337_checkHasNoRequests()
338{
339 Int64 nb_request = m_request_set->nbRequest();
340 // On ne peut pas faire ce test dans le destructeur car il peut
341 // potentiellement lancé une exception et cela ne doit pas être fait
342 // dans un destructeur.
343 if (nb_request!=0){
344 warning() << " Pending mpi requests size=" << nb_request;
345 m_request_set->printRequests();
346 _checkFatalInRequest();
347 }
348}
349
350/*---------------------------------------------------------------------------*/
351/*---------------------------------------------------------------------------*/
352
354destroy()
355{
356 _checkHasNoRequests();
357 delete this;
358}
359
360/*---------------------------------------------------------------------------*/
361/*---------------------------------------------------------------------------*/
362
365{
366 m_request_set->m_request_error_is_fatal = v;
367}
368bool MpiAdapter::
369isRequestErrorAreFatal() const
370{
371 return m_request_set->m_request_error_is_fatal;
372}
373
376{
377 m_request_set->m_is_report_error_in_request = v;
378}
379bool MpiAdapter::
380isPrintRequestError() const
381{
382 return m_request_set->m_is_report_error_in_request;
383}
384
386setCheckRequest(bool v)
387{
388 m_request_set->m_no_check_request = !v;
389}
390
391bool MpiAdapter::
392isCheckRequest() const
393{
394 return !m_request_set->m_no_check_request;
395}
396
397/*---------------------------------------------------------------------------*/
398/*---------------------------------------------------------------------------*/
399
400int MpiAdapter::
401toMPISize(Int64 count)
402{
403 return _checkSize(count);
404}
405
406/*---------------------------------------------------------------------------*/
407/*---------------------------------------------------------------------------*/
408
409void MpiAdapter::
410_trace(const char* function)
411{
412 if (m_is_trace) {
414 if (stack_service)
415 info() << "MPI_TRACE: " << function << "\n" << stack_service->stackTrace().toString();
416 else
417 info() << "MPI_TRACE: " << function;
418 }
419}
420
421/*---------------------------------------------------------------------------*/
422/*---------------------------------------------------------------------------*/
423
424void MpiAdapter::
425broadcast(void* buf,Int64 nb_elem,Int32 root,MPI_Datatype datatype)
426{
427 int _nb_elem = _checkSize(nb_elem);
428 _trace(MpiInfo(eMpiName::Bcast).name().localstr());
429 double begin_time = MPI_Wtime();
430 if (m_is_trace)
431 info() << "MPI_TRACE: MPI broadcast: before"
432 << " buf=" << buf
433 << " nb_elem=" << nb_elem
434 << " root=" << root
435 << " datatype=" << datatype;
436
437 m_mpi_prof->broadcast(buf, _nb_elem, datatype, root, m_communicator);
438 double end_time = MPI_Wtime();
439 double sr_time = (end_time-begin_time);
440 //TODO determiner la taille des messages
441 m_stat->add(MpiInfo(eMpiName::Bcast).name(),sr_time,0);
442}
443
444/*---------------------------------------------------------------------------*/
445/*---------------------------------------------------------------------------*/
446
447Request MpiAdapter::
448nonBlockingBroadcast(void* buf,Int64 nb_elem,Int32 root,MPI_Datatype datatype)
449{
450 MPI_Request mpi_request = MPI_REQUEST_NULL;
451 int ret = -1;
452 int _nb_elem = _checkSize(nb_elem);
453 _trace(" MPI_Bcast");
454 double begin_time = MPI_Wtime();
455 ret = MPI_Ibcast(buf,_nb_elem,datatype,root,m_communicator,&mpi_request);
456 double end_time = MPI_Wtime();
457 double sr_time = (end_time-begin_time);
458 //TODO determiner la taille des messages
459 m_stat->add("IBroadcast",sr_time,0);
460 ARCCORE_ADD_REQUEST(mpi_request);
461 return buildRequest(ret,mpi_request);
462}
463
464/*---------------------------------------------------------------------------*/
465/*---------------------------------------------------------------------------*/
466
467void MpiAdapter::
468gather(const void* send_buf,void* recv_buf,Int64 nb_elem,Int32 root,MPI_Datatype datatype)
469{
470 void* _sbuf = const_cast<void*>(send_buf);
471 int _nb_elem = _checkSize(nb_elem);
472 int _root = static_cast<int>(root);
473 _trace(MpiInfo(eMpiName::Gather).name().localstr());
474 double begin_time = MPI_Wtime();
475 m_mpi_prof->gather(_sbuf, _nb_elem, datatype, recv_buf, _nb_elem, datatype, _root, m_communicator);
476 double end_time = MPI_Wtime();
477 double sr_time = (end_time-begin_time);
478 //TODO determiner la taille des messages
479 m_stat->add(MpiInfo(eMpiName::Gather).name(),sr_time,0);
480}
481
482/*---------------------------------------------------------------------------*/
483/*---------------------------------------------------------------------------*/
484
485Request MpiAdapter::
486nonBlockingGather(const void* send_buf,void* recv_buf,
487 Int64 nb_elem,Int32 root,MPI_Datatype datatype)
488{
489 MPI_Request mpi_request = MPI_REQUEST_NULL;
490 int ret = -1;
491 void* _sbuf = const_cast<void*>(send_buf);
492 int _nb_elem = _checkSize(nb_elem);
493 int _root = static_cast<int>(root);
494 _trace("MPI_Igather");
495 double begin_time = MPI_Wtime();
496 ret = MPI_Igather(_sbuf,_nb_elem,datatype,recv_buf,_nb_elem,datatype,_root,
497 m_communicator,&mpi_request);
498 double end_time = MPI_Wtime();
499 double sr_time = (end_time-begin_time);
500 //TODO determiner la taille des messages
501 m_stat->add("IGather",sr_time,0);
502 ARCCORE_ADD_REQUEST(mpi_request);
503 return buildRequest(ret,mpi_request);
504}
505
506/*---------------------------------------------------------------------------*/
507/*---------------------------------------------------------------------------*/
508
509void MpiAdapter::
510allGather(const void* send_buf,void* recv_buf,
511 Int64 nb_elem,MPI_Datatype datatype)
512{
513 void* _sbuf = const_cast<void*>(send_buf);
514 int _nb_elem = _checkSize(nb_elem);
515 _trace(MpiInfo(eMpiName::Allgather).name().localstr());
516 double begin_time = MPI_Wtime();
517 m_mpi_prof->allGather(_sbuf, _nb_elem, datatype, recv_buf, _nb_elem, datatype, m_communicator);
518 double end_time = MPI_Wtime();
519 double sr_time = (end_time-begin_time);
520 //TODO determiner la taille des messages
521 m_stat->add(MpiInfo(eMpiName::Allgather).name(),sr_time,0);
522}
523
524/*---------------------------------------------------------------------------*/
525/*---------------------------------------------------------------------------*/
526
527Request MpiAdapter::
528nonBlockingAllGather(const void* send_buf,void* recv_buf,
529 Int64 nb_elem,MPI_Datatype datatype)
530{
531 MPI_Request mpi_request = MPI_REQUEST_NULL;
532 int ret = -1;
533 void* _sbuf = const_cast<void*>(send_buf);
534 int _nb_elem = _checkSize(nb_elem);
535 _trace("MPI_Iallgather");
536 double begin_time = MPI_Wtime();
537 ret = MPI_Iallgather(_sbuf,_nb_elem,datatype,recv_buf,_nb_elem,datatype,
538 m_communicator,&mpi_request);
539 double end_time = MPI_Wtime();
540 double sr_time = (end_time-begin_time);
541 //TODO determiner la taille des messages
542 m_stat->add("IAllGather",sr_time,0);
543 ARCCORE_ADD_REQUEST(mpi_request);
544 return buildRequest(ret,mpi_request);
545}
546
547/*---------------------------------------------------------------------------*/
548/*---------------------------------------------------------------------------*/
549
550void MpiAdapter::
551gatherVariable(const void* send_buf,void* recv_buf,const int* recv_counts,
552 const int* recv_indexes,Int64 nb_elem,Int32 root,MPI_Datatype datatype)
553{
554 void* _sbuf = const_cast<void*>(send_buf);
555 int _nb_elem = _checkSize(nb_elem);
556 int _root = static_cast<int>(root);
557 _trace(MpiInfo(eMpiName::Gatherv).name().localstr());
558 double begin_time = MPI_Wtime();
559 m_mpi_prof->gatherVariable(_sbuf, _nb_elem, datatype, recv_buf, recv_counts, recv_indexes, datatype, _root, m_communicator);
560 double end_time = MPI_Wtime();
561 double sr_time = (end_time-begin_time);
562 //TODO determiner la taille des messages
563 m_stat->add(MpiInfo(eMpiName::Gatherv).name().localstr(),sr_time,0);
564}
565
566/*---------------------------------------------------------------------------*/
567/*---------------------------------------------------------------------------*/
568
569void MpiAdapter::
570allGatherVariable(const void* send_buf,void* recv_buf,const int* recv_counts,
571 const int* recv_indexes,Int64 nb_elem,MPI_Datatype datatype)
572{
573 void* _sbuf = const_cast<void*>(send_buf);
574 int _nb_elem = _checkSize(nb_elem);
575 _trace(MpiInfo(eMpiName::Allgatherv).name().localstr());
576 //info() << " ALLGATHERV N=" << _nb_elem;
577 //for( int i=0; i<m_comm_size; ++i )
578 //info() << " ALLGATHERV I=" << i << " recv_count=" << recv_counts[i]
579 // << " recv_indexes=" << recv_indexes[i];
580 double begin_time = MPI_Wtime();
581 m_mpi_prof->allGatherVariable(_sbuf, _nb_elem, datatype, recv_buf, recv_counts, recv_indexes, datatype, m_communicator);
582 double end_time = MPI_Wtime();
583 double sr_time = (end_time-begin_time);
584 //TODO determiner la taille des messages
585 m_stat->add(MpiInfo(eMpiName::Allgatherv).name().localstr(),sr_time,0);
586}
587
588/*---------------------------------------------------------------------------*/
589/*---------------------------------------------------------------------------*/
590
591void MpiAdapter::
592scatterVariable(const void* send_buf,const int* send_count,const int* send_indexes,
593 void* recv_buf,Int64 nb_elem,Int32 root,MPI_Datatype datatype)
594{
595 void* _sbuf = const_cast<void*>(send_buf);
596 int* _send_count = const_cast<int*>(send_count);
597 int* _send_indexes = const_cast<int*>(send_indexes);
598 int _nb_elem = _checkSize(nb_elem);
599 _trace(MpiInfo(eMpiName::Scatterv).name().localstr());
600 double begin_time = MPI_Wtime();
601 m_mpi_prof->scatterVariable(_sbuf,
602 _send_count,
603 _send_indexes,
604 datatype,
605 recv_buf,
606 _nb_elem,
607 datatype,
608 root,
610 double end_time = MPI_Wtime();
611 double sr_time = (end_time-begin_time);
612 //TODO determiner la taille des messages
613 m_stat->add(MpiInfo(eMpiName::Scatterv).name(),sr_time,0);
614}
615
616/*---------------------------------------------------------------------------*/
617/*---------------------------------------------------------------------------*/
618
619void MpiAdapter::
620allToAll(const void* send_buf,void* recv_buf,Integer count,MPI_Datatype datatype)
621{
622 void* _sbuf = const_cast<void*>(send_buf);
623 int icount = _checkSize(count);
624 _trace(MpiInfo(eMpiName::Alltoall).name().localstr());
625 double begin_time = MPI_Wtime();
626 m_mpi_prof->allToAll(_sbuf, icount, datatype, recv_buf, icount, datatype, m_communicator);
627 double end_time = MPI_Wtime();
628 double sr_time = (end_time-begin_time);
629 //TODO determiner la taille des messages
630 m_stat->add(MpiInfo(eMpiName::Alltoall).name().localstr(),sr_time,0);
631}
632
633/*---------------------------------------------------------------------------*/
634/*---------------------------------------------------------------------------*/
635
636Request MpiAdapter::
637nonBlockingAllToAll(const void* send_buf,void* recv_buf,Integer count,MPI_Datatype datatype)
638{
639 MPI_Request mpi_request = MPI_REQUEST_NULL;
640 int ret = -1;
641 void* _sbuf = const_cast<void*>(send_buf);
642 int icount = _checkSize(count);
643 _trace("MPI_IAlltoall");
644 double begin_time = MPI_Wtime();
645 ret = MPI_Ialltoall(_sbuf,icount,datatype,recv_buf,icount,datatype,m_communicator,&mpi_request);
646 double end_time = MPI_Wtime();
647 double sr_time = (end_time-begin_time);
648 //TODO determiner la taille des messages
649 m_stat->add("IAllToAll",sr_time,0);
650 ARCCORE_ADD_REQUEST(mpi_request);
651 return buildRequest(ret,mpi_request);
652}
653
654/*---------------------------------------------------------------------------*/
655/*---------------------------------------------------------------------------*/
656
657void MpiAdapter::
658allToAllVariable(const void* send_buf,const int* send_counts,
659 const int* send_indexes,void* recv_buf,const int* recv_counts,
660 const int* recv_indexes,MPI_Datatype datatype)
661{
662 void* _sbuf = const_cast<void*>(send_buf);
663 int* _send_counts = const_cast<int*>(send_counts);
664 int* _send_indexes = const_cast<int*>(send_indexes);
665 int* _recv_counts = const_cast<int*>(recv_counts);
666 int* _recv_indexes = const_cast<int*>(recv_indexes);
667
668 _trace(MpiInfo(eMpiName::Alltoallv).name().localstr());
669 double begin_time = MPI_Wtime();
670 m_mpi_prof->allToAllVariable(_sbuf, _send_counts, _send_indexes, datatype,
671 recv_buf, _recv_counts, _recv_indexes, datatype, m_communicator);
672 double end_time = MPI_Wtime();
673 double sr_time = (end_time-begin_time);
674 //TODO determiner la taille des messages
675 m_stat->add(MpiInfo(eMpiName::Alltoallv).name(),sr_time,0);
676}
677
678/*---------------------------------------------------------------------------*/
679/*---------------------------------------------------------------------------*/
680
681Request MpiAdapter::
682nonBlockingAllToAllVariable(const void* send_buf,const int* send_counts,
683 const int* send_indexes,void* recv_buf,const int* recv_counts,
684 const int* recv_indexes,MPI_Datatype datatype)
685{
686 MPI_Request mpi_request = MPI_REQUEST_NULL;
687 int ret = -1;
688 void* _sbuf = const_cast<void*>(send_buf);
689 int* _send_counts = const_cast<int*>(send_counts);
690 int* _send_indexes = const_cast<int*>(send_indexes);
691 int* _recv_counts = const_cast<int*>(recv_counts);
692 int* _recv_indexes = const_cast<int*>(recv_indexes);
693
694 _trace("MPI_Ialltoallv");
695 double begin_time = MPI_Wtime();
696 ret = MPI_Ialltoallv(_sbuf,_send_counts,_send_indexes,datatype,
697 recv_buf,_recv_counts,_recv_indexes,datatype,
698 m_communicator,&mpi_request);
699 double end_time = MPI_Wtime();
700 double sr_time = (end_time-begin_time);
701 //TODO determiner la taille des messages
702 m_stat->add("IAllToAll",sr_time,0);
703 ARCCORE_ADD_REQUEST(mpi_request);
704 return buildRequest(ret,mpi_request);
705}
706
707/*---------------------------------------------------------------------------*/
708/*---------------------------------------------------------------------------*/
709
710void MpiAdapter::
711barrier()
712{
713 // TODO: a priori on ne devrait pas avoir de requêtes en vol possible
714 // entre deux barrières pour éviter tout problème.
715 // _checkHasNoRequests();
716 // TODO ajouter trace correspondante pour le profiling.
717 MPI_Barrier(m_communicator);
718}
719
720/*---------------------------------------------------------------------------*/
721/*---------------------------------------------------------------------------*/
722
723Request MpiAdapter::
724nonBlockingBarrier()
725{
726 MPI_Request mpi_request = MPI_REQUEST_NULL;
727 int ret = -1;
728 ret = MPI_Ibarrier(m_communicator,&mpi_request);
729 ARCCORE_ADD_REQUEST(mpi_request);
730 return buildRequest(ret,mpi_request);
731}
732
733/*---------------------------------------------------------------------------*/
734/*---------------------------------------------------------------------------*/
735
736void MpiAdapter::
737allReduce(const void* send_buf,void* recv_buf,Int64 count,MPI_Datatype datatype,MPI_Op op)
738{
739 void* _sbuf = const_cast<void*>(send_buf);
740 int _n = _checkSize(count);
741 double begin_time = MPI_Wtime();
742 _trace(MpiInfo(eMpiName::Allreduce).name().localstr());
743 try{
744 ++m_nb_all_reduce;
745 m_mpi_prof->allReduce(_sbuf, recv_buf, _n, datatype, op, m_communicator);
746 }
747 catch(TimeoutException& ex)
748 {
749 std::ostringstream ostr;
750 ostr << "MPI_Allreduce"
751 << " send_buf=" << send_buf
752 << " recv_buf=" << recv_buf
753 << " n=" << count
754 << " datatype=" << datatype
755 << " op=" << op
756 << " NB=" << m_nb_all_reduce;
757 ex.setAdditionalInfo(ostr.str());
758 throw;
759 }
760 double end_time = MPI_Wtime();
761 m_stat->add(MpiInfo(eMpiName::Allreduce).name(),end_time-begin_time,count);
762}
763
764/*---------------------------------------------------------------------------*/
765/*---------------------------------------------------------------------------*/
766
767Request MpiAdapter::
768nonBlockingAllReduce(const void* send_buf,void* recv_buf,Int64 count,MPI_Datatype datatype,MPI_Op op)
769{
770 MPI_Request mpi_request = MPI_REQUEST_NULL;
771 int ret = -1;
772 void* _sbuf = const_cast<void*>(send_buf);
773 int _n = _checkSize(count);
774 double begin_time = MPI_Wtime();
775 _trace("MPI_IAllreduce");
776 ret = MPI_Iallreduce(_sbuf,recv_buf,_n,datatype,op,m_communicator,&mpi_request);
777 double end_time = MPI_Wtime();
778 m_stat->add("IReduce",end_time-begin_time,_n);
779 ARCCORE_ADD_REQUEST(mpi_request);
780 return buildRequest(ret,mpi_request);
781}
782
783/*---------------------------------------------------------------------------*/
784/*---------------------------------------------------------------------------*/
785
786void MpiAdapter::
787reduce(const void* send_buf,void* recv_buf,Int64 count,MPI_Datatype datatype,MPI_Op op,Integer root)
788{
789 void* _sbuf = const_cast<void*>(send_buf);
790 int _n = _checkSize(count);
791 int _root = static_cast<int>(root);
792 double begin_time = MPI_Wtime();
793 _trace(MpiInfo(eMpiName::Reduce).name().localstr());
794 try{
795 ++m_nb_reduce;
796 m_mpi_prof->reduce(_sbuf, recv_buf, _n, datatype, op, _root, m_communicator);
797 }
798 catch(TimeoutException& ex)
799 {
800 std::ostringstream ostr;
801 ostr << "MPI_reduce"
802 << " send_buf=" << send_buf
803 << " recv_buf=" << recv_buf
804 << " n=" << count
805 << " datatype=" << datatype
806 << " op=" << op
807 << " root=" << root
808 << " NB=" << m_nb_reduce;
809 ex.setAdditionalInfo(ostr.str());
810 throw;
811 }
812
813 double end_time = MPI_Wtime();
814 m_stat->add(MpiInfo(eMpiName::Reduce).name(),end_time-begin_time,0);
815}
816
817/*---------------------------------------------------------------------------*/
818/*---------------------------------------------------------------------------*/
819
820void MpiAdapter::
821scan(const void* send_buf,void* recv_buf,Int64 count,MPI_Datatype datatype,MPI_Op op)
822{
823 void* _sbuf = const_cast<void*>(send_buf);
824 int _n = _checkSize(count);
825 double begin_time = MPI_Wtime();
826 _trace(MpiInfo(eMpiName::Scan).name().localstr());
827 m_mpi_prof->scan(_sbuf, recv_buf, _n, datatype, op, m_communicator);
828 double end_time = MPI_Wtime();
829 m_stat->add(MpiInfo(eMpiName::Scan).name(),end_time-begin_time,count);
830}
831
832/*---------------------------------------------------------------------------*/
833/*---------------------------------------------------------------------------*/
834
835void MpiAdapter::
836directSendRecv(const void* send_buffer,Int64 send_buffer_size,
837 void* recv_buffer,Int64 recv_buffer_size,
838 Int32 proc,Int64 elem_size,MPI_Datatype data_type)
839{
840 void* v_send_buffer = const_cast<void*>(send_buffer);
841 MPI_Status mpi_status;
842 double begin_time = MPI_Wtime();
843 _trace(MpiInfo(eMpiName::Sendrecv).name().localstr());
844 int sbuf_size = _checkSize(send_buffer_size);
845 int rbuf_size = _checkSize(recv_buffer_size);
846 m_mpi_prof->sendRecv(v_send_buffer, sbuf_size, data_type, proc, 99,
847 recv_buffer, rbuf_size, data_type, proc, 99,
848 m_communicator, &mpi_status);
849 double end_time = MPI_Wtime();
850 Int64 send_size = send_buffer_size * elem_size;
851 Int64 recv_size = recv_buffer_size * elem_size;
852 double sr_time = (end_time-begin_time);
853
854 //debug(Trace::High) << "MPI SendRecv: send " << send_size << " recv "
855 // << recv_size << " time " << sr_time ;
856 m_stat->add(MpiInfo(eMpiName::Sendrecv).name(),sr_time,send_size+recv_size);
857}
858
859/*---------------------------------------------------------------------------*/
860/*---------------------------------------------------------------------------*/
861
865{
866 void* v_send_buffer = const_cast<void*>(send_buffer);
868 int sbuf_size = _checkSize(send_buffer_size);
869 int ret = 0;
871 if (m_is_trace)
872 info() << " ISend ret=" << ret << " proc=" << dest_rank << " tag=" << mpi_tag << " request=" << mpi_request;
873 ARCCORE_ADD_REQUEST(mpi_request);
875}
876
877/*---------------------------------------------------------------------------*/
878/*---------------------------------------------------------------------------*/
879
880Request MpiAdapter::
881directSend(const void* send_buffer,Int64 send_buffer_size,
883 int mpi_tag,bool is_blocked
884 )
885{
886 void* v_send_buffer = const_cast<void*>(send_buffer);
888
889 double begin_time = 0.0;
890 double end_time = 0.0;
892 int ret = 0;
893 if (m_is_trace)
894 info() << "MPI_TRACE: MPI Send: send before"
895 << " size=" << send_size
896 << " dest=" << proc
897 << " tag=" << mpi_tag
898 << " datatype=" << data_type
899 << " blocking " << is_blocked;
900 if (is_blocked){
901 // si m_mpi_lock n'est pas nul, il faut
902 // utiliser un MPI_ISend suivi d'une boucle
903 // active de MPI_Test pour eviter tout probleme
904 // de dead lock.
905 if (m_mpi_lock){
906 {
907 MpiLock::Section mls(m_mpi_lock);
909 int sbuf_size = _checkSize(send_buffer_size);
911 }
912 int is_finished = 0;
913 MPI_Status mpi_status;
914 while (is_finished==0){
915 MpiLock::Section mls(m_mpi_lock);
916 MPI_Request_get_status(mpi_request,&is_finished,&mpi_status);
917 if (is_finished!=0){
918 m_mpi_prof->wait(&mpi_request, (MPI_Status *) MPI_STATUS_IGNORE);
919 end_time = MPI_Wtime();
920 mpi_request = MPI_REQUEST_NULL;
921 }
922 }
923 }
924 else{
925 MpiLock::Section mls(m_mpi_lock);
926 begin_time = MPI_Wtime();
927 int sbuf_size = _checkSize(send_buffer_size);
928 m_mpi_prof->send(v_send_buffer, sbuf_size, data_type, proc, mpi_tag, m_communicator);
929 end_time = MPI_Wtime();
930 }
931 }
932 else{
933 {
934 MpiLock::Section mls(m_mpi_lock);
935 begin_time = MPI_Wtime();
936 int sbuf_size = _checkSize(send_buffer_size);
937 m_mpi_prof->iSend(v_send_buffer, sbuf_size, data_type, proc, mpi_tag, m_communicator, &mpi_request);
938 if (m_is_trace)
939 info() << " ISend ret=" << ret << " proc=" << proc << " tag=" << mpi_tag << " request=" << mpi_request;
940 end_time = MPI_Wtime();
941 ARCCORE_ADD_REQUEST(mpi_request);
942 }
943 if (m_is_trace){
944 info() << "MPI Send: send after"
945 << " request=" << mpi_request;
946 }
947 }
948 double sr_time = (end_time-begin_time);
949
950 debug(Trace::High) << "MPI Send: send " << send_size
951 << " time " << sr_time << " blocking " << is_blocked;
952 // TODO(FL): regarder comment faire pour profiler le Isend
953 m_stat->add(MpiInfo(eMpiName::Send).name(),end_time-begin_time,send_size);
954 return buildRequest(ret,mpi_request);
955}
956
957/*---------------------------------------------------------------------------*/
958/*---------------------------------------------------------------------------*/
959
960Request MpiAdapter::
961directSendPack(const void* send_buffer,Int64 send_buffer_size,
962 Int32 proc,int mpi_tag,bool is_blocked)
963{
964 return directSend(send_buffer,send_buffer_size,proc,1,MPI_PACKED,mpi_tag,is_blocked);
965}
966
967/*---------------------------------------------------------------------------*/
968/*---------------------------------------------------------------------------*/
969
970MpiMessagePassingMng* MpiAdapter::
971commSplit(bool keep)
972{
973 MPI_Comm new_comm;
974
975 MPI_Comm_split(m_communicator, (keep) ? 1 : MPI_UNDEFINED, commRank(), &new_comm);
976 if (keep) {
977 // Failed if new_comm is MPI_COMM_NULL
978 return StandaloneMpiMessagePassingMng::create(new_comm, true);
979 }
980 return nullptr;
981}
982
983/*---------------------------------------------------------------------------*/
984/*---------------------------------------------------------------------------*/
985
997
998/*---------------------------------------------------------------------------*/
999/*---------------------------------------------------------------------------*/
1000
1001Request MpiAdapter::
1002directRecv(void* recv_buffer,Int64 recv_buffer_size,
1004 int mpi_tag,bool is_blocked)
1005{
1008 int ret = 0;
1009 double begin_time = 0.0;
1010 double end_time = 0.0;
1011
1012 int i_proc = 0;
1013 if (proc==A_PROC_NULL_RANK)
1014 ARCCORE_THROW(NotImplementedException,"Receive with MPI_PROC_NULL");
1015 if (proc == A_NULL_RANK && !m_is_allow_null_rank_for_any_source)
1016 ARCCORE_FATAL("Can not use A_NULL_RANK for any source. Use A_ANY_SOURCE_RANK instead");
1017 if (proc==A_NULL_RANK || proc==A_ANY_SOURCE_RANK)
1019 else
1020 i_proc = static_cast<int>(proc);
1021
1023 if (m_is_trace){
1024 info() << "MPI_TRACE: MPI Recv: recv before "
1025 << " size=" << recv_size
1026 << " from=" << i_proc
1027 << " tag=" << mpi_tag
1028 << " datatype=" << data_type
1029 << " blocking=" << is_blocked;
1030 }
1031 if (is_blocked){
1032 // si m_mpi_lock n'est pas nul, il faut
1033 // utiliser un MPI_IRecv suivi d'une boucle
1034 // active de MPI_Test pour eviter tout probleme
1035 // de dead lock.
1036 if (m_mpi_lock){
1037 {
1038 MpiLock::Section mls(m_mpi_lock);
1039 begin_time = MPI_Wtime();
1040 int rbuf_size = _checkSize(recv_buffer_size);
1041 m_mpi_prof->iRecv(recv_buffer, rbuf_size, data_type, i_proc, mpi_tag, m_communicator, &mpi_request);
1042 }
1043 int is_finished = 0;
1044 MPI_Status mpi_status;
1045 while (is_finished==0){
1046 MpiLock::Section mls(m_mpi_lock);
1047 MPI_Request_get_status(mpi_request,&is_finished,&mpi_status);
1048 if (is_finished!=0){
1049 end_time = MPI_Wtime();
1050 m_mpi_prof->wait(&mpi_request, (MPI_Status *) MPI_STATUS_IGNORE);
1051 mpi_request = MPI_REQUEST_NULL;
1052 }
1053 }
1054 }
1055 else{
1056 MpiLock::Section mls(m_mpi_lock);
1057 begin_time = MPI_Wtime();
1058 int rbuf_size = _checkSize(recv_buffer_size);
1059 m_mpi_prof->recv(recv_buffer, rbuf_size, data_type, i_proc, mpi_tag, m_communicator, &mpi_status);
1060 end_time = MPI_Wtime();
1061 }
1062 }
1063 else{
1064 {
1065 MpiLock::Section mls(m_mpi_lock);
1066 begin_time = MPI_Wtime();
1067 int rbuf_size = _checkSize(recv_buffer_size);
1068 m_mpi_prof->iRecv(recv_buffer, rbuf_size, data_type, i_proc, mpi_tag, m_communicator, &mpi_request);
1069 end_time = MPI_Wtime();
1070 ARCCORE_ADD_REQUEST(mpi_request);
1071 }
1072 if (m_is_trace){
1073 info() << "MPI Recv: recv after "
1074 << " request=" << mpi_request;
1075 }
1076 }
1077 double sr_time = (end_time-begin_time);
1078
1079 debug(Trace::High) << "MPI Recv: recv after " << recv_size
1080 << " time " << sr_time << " blocking " << is_blocked;
1081 m_stat->add(MpiInfo(eMpiName::Recv).name(),end_time-begin_time,recv_size);
1082 return buildRequest(ret,mpi_request);
1083}
1084
1085/*---------------------------------------------------------------------------*/
1086/*---------------------------------------------------------------------------*/
1087
1088void MpiAdapter::
1089probeRecvPack(UniqueArray<Byte>& recv_buffer,Int32 proc)
1090{
1091 double begin_time = MPI_Wtime();
1092 MPI_Status status;
1093 int recv_buffer_size = 0;
1094 _trace("MPI_Probe");
1095 m_mpi_prof->probe(proc, 101, m_communicator, &status);
1096 m_mpi_prof->getCount(&status, MPI_PACKED, &recv_buffer_size);
1097
1098 recv_buffer.resize(recv_buffer_size);
1099 m_mpi_prof->recv(recv_buffer.data(), recv_buffer_size, MPI_PACKED, proc, 101, m_communicator, &status);
1100
1101 double end_time = MPI_Wtime();
1102 Int64 recv_size = recv_buffer_size;
1103 double sr_time = (end_time-begin_time);
1104 debug(Trace::High) << "MPI probeRecvPack " << recv_size
1105 << " time " << sr_time;
1106 m_stat->add(MpiInfo(eMpiName::Recv).name(),end_time-begin_time,recv_size);
1107}
1108
1109/*---------------------------------------------------------------------------*/
1110/*---------------------------------------------------------------------------*/
1111
1112MessageSourceInfo MpiAdapter::
1113_buildSourceInfoFromStatus(const MPI_Status& mpi_status)
1114{
1115 // Récupère la taille en octet du message.
1116 MPI_Count message_size = 0;
1117 MPI_Get_elements_x(&mpi_status,MPI_BYTE,&message_size);
1118 MessageTag tag(mpi_status.MPI_TAG);
1119 MessageRank rank(mpi_status.MPI_SOURCE);
1120 return MessageSourceInfo(rank,tag,message_size);
1121}
1122
1123/*---------------------------------------------------------------------------*/
1124/*---------------------------------------------------------------------------*/
1125
1126MessageId MpiAdapter::
1127_probeMessage(MessageRank source,MessageTag tag,bool is_blocking)
1128{
1129 MPI_Status mpi_status;
1130 int has_message = 0;
1131 MPI_Message message;
1132 int ret = 0;
1133 int mpi_source = source.value();
1134 if (source.isProcNull())
1135 ARCCORE_THROW(NotImplementedException,"Probe with MPI_PROC_NULL");
1136 if (source.isNull() && !m_is_allow_null_rank_for_any_source)
1137 ARCCORE_FATAL("Can not use MPI_Mprobe with null rank. Use MessageRank::anySourceRank() instead");
1138 if (source.isNull() || source.isAnySource())
1139 mpi_source = MPI_ANY_SOURCE;
1140 int mpi_tag = tag.value();
1141 if (tag.isNull())
1142 mpi_tag = MPI_ANY_TAG;
1143 if (is_blocking){
1144 ret = MPI_Mprobe(mpi_source,mpi_tag,m_communicator,&message,&mpi_status);
1145 has_message = true;
1146 }
1147 else {
1148 ret = MPI_Improbe(mpi_source, mpi_tag, m_communicator, &has_message, &message, &mpi_status);
1149 }
1150 if (ret!=0)
1151 ARCCORE_FATAL("Error during call to MPI_Mprobe r={0}",ret);
1152 MessageId ret_message;
1153 if (has_message!=0){
1154 MessageSourceInfo si(_buildSourceInfoFromStatus(mpi_status));
1155 ret_message = MessageId(si,message);
1156 }
1157 return ret_message;
1158}
1159
1160/*---------------------------------------------------------------------------*/
1161/*---------------------------------------------------------------------------*/
1162
1163MessageId MpiAdapter::
1164probeMessage(PointToPointMessageInfo message)
1165{
1166 if (!message.isValid())
1167 return MessageId();
1168
1169 // Il faut avoir initialisé le message avec un couple (rang/tag).
1170 if (!message.isRankTag())
1171 ARCCORE_FATAL("Invalid message_info: message.isRankTag() is false");
1172
1173 return _probeMessage(message.destinationRank(),message.tag(),message.isBlocking());
1174}
1175
1176/*---------------------------------------------------------------------------*/
1177/*---------------------------------------------------------------------------*/
1178
1179MessageSourceInfo MpiAdapter::
1180_legacyProbeMessage(MessageRank source,MessageTag tag,bool is_blocking)
1181{
1182 MPI_Status mpi_status;
1183 int has_message = 0;
1184 int ret = 0;
1185 int mpi_source = source.value();
1186 if (source.isProcNull())
1187 ARCCORE_THROW(NotImplementedException,"Probe with MPI_PROC_NULL");
1188 if (source.isNull() && !m_is_allow_null_rank_for_any_source)
1189 ARCCORE_FATAL("Can not use MPI_Probe with null rank. Use MessageRank::anySourceRank() instead");
1190 if (source.isNull() || source.isAnySource())
1191 mpi_source = MPI_ANY_SOURCE;
1192 int mpi_tag = tag.value();
1193 if (tag.isNull())
1194 mpi_tag = MPI_ANY_TAG;
1195 if (is_blocking){
1196 ret = MPI_Probe(mpi_source,mpi_tag,m_communicator,&mpi_status);
1197 has_message = true;
1198 }
1199 else
1200 ret = MPI_Iprobe(mpi_source,mpi_tag,m_communicator,&has_message,&mpi_status);
1201 if (ret!=0)
1202 ARCCORE_FATAL("Error during call to MPI_Mprobe r={0}",ret);
1203 if (has_message!=0)
1204 return _buildSourceInfoFromStatus(mpi_status);
1205 return {};
1206}
1207
1208/*---------------------------------------------------------------------------*/
1209/*---------------------------------------------------------------------------*/
1210
1211MessageSourceInfo MpiAdapter::
1212legacyProbeMessage(PointToPointMessageInfo message)
1213{
1214 if (!message.isValid())
1215 return {};
1216
1217 // Il faut avoir initialisé le message avec un couple (rang/tag).
1218 if (!message.isRankTag())
1219 ARCCORE_FATAL("Invalid message_info: message.isRankTag() is false");
1220
1221 return _legacyProbeMessage(message.destinationRank(),message.tag(),message.isBlocking());
1222}
1223
1224/*---------------------------------------------------------------------------*/
1225/*---------------------------------------------------------------------------*/
1227Request MpiAdapter::
1228directRecv(void* recv_buffer,Int64 recv_buffer_size,
1230 bool is_blocked)
1231{
1235 int ret = 0;
1236 double begin_time = 0.0;
1237 double end_time = 0.0;
1238
1240 if (m_is_trace){
1241 info() << "MPI_TRACE: MPI Mrecv: recv before "
1242 << " size=" << recv_size
1243 << " from_msg=" << message
1244 << " datatype=" << data_type
1245 << " blocking=" << is_blocked;
1246 }
1247 if (is_blocked){
1248 // si m_mpi_lock n'est pas nul, il faut
1249 // utiliser un MPI_IRecv suivi d'une boucle
1250 // active de MPI_Test pour eviter tout probleme
1251 // de dead lock.
1252 if (m_mpi_lock){
1253 {
1254 MpiLock::Section mls(m_mpi_lock);
1256 int rbuf_size = _checkSize(recv_buffer_size);
1258 //m_mpi_prof->iRecv(recv_buffer, rbuf_size, data_type, i_proc, mpi_tag, m_communicator, &mpi_request);
1259 }
1260 int is_finished = 0;
1262 while (is_finished==0){
1263 MpiLock::Section mls(m_mpi_lock);
1265 if (is_finished!=0){
1266 end_time = MPI_Wtime();
1267 m_mpi_prof->wait(&mpi_request, (MPI_Status *) MPI_STATUS_IGNORE);
1269 }
1270 }
1271 }
1272 else{
1273 MpiLock::Section mls(m_mpi_lock);
1275 int rbuf_size = _checkSize(recv_buffer_size);
1277 //m_mpi_prof->recv(recv_buffer, rbuf_size, data_type, i_proc, mpi_tag, m_communicator, &mpi_status);
1278 end_time = MPI_Wtime();
1279 }
1280 }
1281 else{
1282 {
1283 MpiLock::Section mls(m_mpi_lock);
1285 int rbuf_size = _checkSize(recv_buffer_size);
1286 //m_mpi_prof->iRecv(recv_buffer, rbuf_size, data_type, i_proc, mpi_tag, m_communicator, &mpi_request);
1288 //m_mpi_prof->iRecv(recv_buffer, rbuf_size, data_type, i_proc, mpi_tag, m_communicator, &mpi_request);
1289 end_time = MPI_Wtime();
1290 ARCCORE_ADD_REQUEST(mpi_request);
1291 }
1292 if (m_is_trace){
1293 info() << "MPI Recv: recv after "
1294 << " request=" << mpi_request;
1295 }
1296 }
1297 double sr_time = (end_time-begin_time);
1298
1299 debug(Trace::High) << "MPI Recv: recv after " << recv_size
1300 << " time " << sr_time << " blocking " << is_blocked;
1301 m_stat->add(MpiInfo(eMpiName::Recv).name(),end_time-begin_time,recv_size);
1303}
1304
1305/*---------------------------------------------------------------------------*/
1306/*---------------------------------------------------------------------------*/
1307
1308Request MpiAdapter::
1309directRecvPack(void* recv_buffer,Int64 recv_buffer_size,
1310 Int32 proc,int mpi_tag,bool is_blocking)
1311{
1313}
1314
1315/*---------------------------------------------------------------------------*/
1316/*---------------------------------------------------------------------------*/
1317
1318// FIXME: Implement direct method with MPI_STATUS_IGNORE
1319void MpiAdapter::
1320waitAllRequests(ArrayView<Request> requests)
1321{
1322 UniqueArray<bool> indexes(requests.size());
1323 UniqueArray<MPI_Status> mpi_status(requests.size());
1324 while (_waitAllRequestsMPI(requests, indexes, mpi_status)){
1325 ; // Continue tant qu'il y a des requêtes.
1326 }
1327}
1328
1329/*---------------------------------------------------------------------------*/
1330/*---------------------------------------------------------------------------*/
1331
1332// FIXME: Implement direct method with MPI_STATUS_IGNORE
1333void MpiAdapter::
1334waitSomeRequests(ArrayView<Request> requests,
1335 ArrayView<bool> indexes,
1336 bool is_non_blocking)
1337{
1338 UniqueArray<MPI_Status> mpi_status(requests.size());
1339 waitSomeRequestsMPI(requests, indexes, mpi_status, is_non_blocking);
1340}
1341
1342/*---------------------------------------------------------------------------*/
1343/*---------------------------------------------------------------------------*/
1344
1346{
1348 : sub_request(sr)
1349 , index(i)
1350 , mpi_source_rank(source_rank)
1351 , mpi_source_tag(source_tag)
1352 {}
1353
1354 Ref<ISubRequest> sub_request;
1355 Integer index = -1;
1356 int mpi_source_rank = MPI_PROC_NULL;
1357 int mpi_source_tag = 0;
1358};
1359
1360/*---------------------------------------------------------------------------*/
1361/*---------------------------------------------------------------------------*/
1362
1363bool MpiAdapter::
1364_handleEndRequests(ArrayView<Request> requests,ArrayView<bool> done_indexes,
1365 ArrayView<MPI_Status> status)
1366{
1368 Integer size = requests.size();
1369 {
1370 MpiLock::Section mls(m_mpi_lock);
1371 for( Integer i=0; i<size; ++i ) {
1372 if (done_indexes[i]){
1373 // Attention à bien utiliser une référence sinon le reset ne
1374 // s'applique pas à la bonne variable
1375 Request& r = requests[i];
1376 // Note: la requête peut ne pas être valide (par exemple s'il s'agit)
1377 // d'une requête bloquante mais avoir tout de même une sous-requête.
1378 if (r.hasSubRequest()){
1379 if (m_is_trace)
1380 info() << "Done request with sub-request r=" << r << " mpi_r=" << r << " i=" << i
1381 << " source_rank=" << status[i].MPI_SOURCE
1382 << " source_tag=" << status[i].MPI_TAG;
1383 new_requests.add(SubRequestInfo(r.subRequest(), i, status[i].MPI_SOURCE, status[i].MPI_TAG));
1384 }
1385 if (r.isValid()){
1387 r.reset();
1388 }
1389 }
1390 }
1391 }
1392
1393 // NOTE: les appels aux sous-requêtes peuvent générer d'autres requêtes.
1394 // Il faut faire attention à ne pas utiliser les sous-requêtes avec
1395 // le verrou actif.
1396 bool has_new_request = false;
1397 if (!new_requests.empty()){
1398 // Contient le status de la \a ième requête
1399 UniqueArray<MPI_Status> old_status(size);
1400 {
1401 Integer index = 0;
1402 for( Integer i=0; i<size; ++i ){
1403 if (done_indexes[i]){
1404 old_status[i] = status[index];
1405 ++index;
1406 }
1407 }
1408 }
1409 // S'il y a des nouvelles requêtes, il faut décaler les valeurs de 'status'
1410 for( SubRequestInfo& sri : new_requests ){
1411 Integer index = sri.index;
1412 if (m_is_trace)
1413 info() << "Before handle new request index=" << index
1414 << " sri.source_rank=" << sri.mpi_source_rank
1415 << " sri.source_tag=" << sri.mpi_source_tag;
1416 SubRequestCompletionInfo completion_info(MessageRank(old_status[index].MPI_SOURCE), MessageTag(old_status[index].MPI_TAG));
1417 Request r = sri.sub_request->executeOnCompletion(completion_info);
1418 if (m_is_trace)
1419 info() << "Handle new request index=" << index << " old_r=" << requests[index] << " new_r=" << r;
1420 // S'il y a une nouvelle requête, alors elle remplace
1421 // l'ancienne et donc il faut faire comme si
1422 // la requête d'origine n'est pas terminée.
1423 if (r.isValid()){
1424 has_new_request = true;
1425 requests[index] = r;
1426 done_indexes[index] = false;
1427 }
1428 }
1429 {
1430 Integer index = 0;
1431 for( Integer i=0; i<size; ++i ){
1432 if (done_indexes[i]){
1433 status[index] = old_status[i];
1434 ++index;
1435 }
1436 }
1437 }
1438
1439 }
1440 return has_new_request;
1441}
1442
1443/*---------------------------------------------------------------------------*/
1444/*---------------------------------------------------------------------------*/
1445
1446bool MpiAdapter::
1447_waitAllRequestsMPI(ArrayView<Request> requests,
1448 ArrayView<bool> indexes,
1449 ArrayView<MPI_Status> mpi_status)
1450{
1451 Integer size = requests.size();
1452 if (size==0)
1453 return false;
1454 //ATTENTION: Mpi modifie en retour de MPI_Waitall ce tableau
1455 UniqueArray<MPI_Request> mpi_request(size);
1456 for( Integer i=0; i<size; ++i ){
1457 mpi_request[i] = (MPI_Request)(requests[i]);
1458 }
1459 if (m_is_trace)
1460 info() << " MPI_waitall begin size=" << size;
1461 double diff_time = 0.0;
1462 if (m_mpi_lock){
1463 double begin_time = MPI_Wtime();
1464 for( Integer i=0; i<size; ++i ){
1465 MPI_Request request = (MPI_Request)(mpi_request[i]);
1466 int is_finished = 0;
1467 while (is_finished==0){
1468 MpiLock::Section mls(m_mpi_lock);
1469 m_mpi_prof->test(&request, &is_finished, (MPI_Status *) MPI_STATUS_IGNORE);
1470 }
1471 }
1472 double end_time = MPI_Wtime();
1473 diff_time = end_time - begin_time;
1474 }
1475 else{
1476 //TODO: transformer en boucle while et MPI_Testall si m_mpi_lock est non nul
1477 MpiLock::Section mls(m_mpi_lock);
1478 double begin_time = MPI_Wtime();
1479 m_mpi_prof->waitAll(size, mpi_request.data(), mpi_status.data());
1480 double end_time = MPI_Wtime();
1481 diff_time = end_time - begin_time;
1482 }
1483
1484 // Indique que chaque requête est traitée car on a fait un waitall.
1485 for( Integer i=0; i<size; ++i ){
1486 indexes[i] = true;
1487 }
1488
1489 bool has_new_request = _handleEndRequests(requests,indexes,mpi_status);
1490 if (m_is_trace)
1491 info() << " MPI_waitall end size=" << size;
1492 m_stat->add(MpiInfo(eMpiName::Waitall).name(),diff_time,size);
1493 return has_new_request;
1494}
1495
1496/*---------------------------------------------------------------------------*/
1497/*---------------------------------------------------------------------------*/
1498
1499void MpiAdapter::
1500waitSomeRequestsMPI(ArrayView<Request> requests,ArrayView<bool> indexes,
1501 ArrayView<MPI_Status> mpi_status,bool is_non_blocking)
1502{
1503 Integer size = requests.size();
1504 if (size==0)
1505 return;
1506 //TODO: utiliser des StackArray (quand ils seront disponibles...)
1507 UniqueArray<MPI_Request> mpi_request(size);
1508 UniqueArray<MPI_Request> saved_mpi_request(size);
1509 UniqueArray<int> completed_requests(size);
1510 int nb_completed_request = 0;
1511
1512 // Sauve la requete pour la desallouer dans m_allocated_requests,
1513 // car sa valeur ne sera plus valide après appel à MPI_Wait*
1514 for (Integer i = 0; i < size; ++i) {
1515 // Dans le cas où l'on appelle cette méthode plusieurs fois
1516 // avec le même tableau de requests, il se peut qu'il y ai des
1517 // requests invalides qui feront planter l'appel à MPI.
1518 if (!requests[i].isValid()) {
1519 saved_mpi_request[i] = MPI_REQUEST_NULL;
1520 }
1521 else {
1522 saved_mpi_request[i] = static_cast<MPI_Request>(requests[i]);
1523 }
1524 }
1525
1526 // N'affiche le debug que en mode bloquant ou si explicitement demandé pour
1527 // éviter trop de messages
1528 bool is_print_debug = m_is_trace || (!is_non_blocking);
1529 if (is_print_debug)
1530 debug() << "WaitRequestBegin is_non_blocking=" << is_non_blocking << " n=" << size;
1531
1532 double begin_time = MPI_Wtime();
1533
1534 try{
1535 if (is_non_blocking){
1536 _trace(MpiInfo(eMpiName::Testsome).name().localstr());
1537 {
1538 MpiLock::Section mls(m_mpi_lock);
1539 m_mpi_prof->testSome(size, saved_mpi_request.data(), &nb_completed_request,
1540 completed_requests.data(), mpi_status.data());
1541 }
1542 //If there is no active handle in the list, it returns outcount = MPI_UNDEFINED.
1543 if (nb_completed_request == MPI_UNDEFINED) // Si aucune requete n'etait valide.
1544 nb_completed_request = 0;
1545 if (is_print_debug)
1546 debug() << "WaitSomeRequestMPI: TestSome nb_completed=" << nb_completed_request;
1547 }
1548 else{
1549 _trace(MpiInfo(eMpiName::Waitsome).name().localstr());
1550 {
1551 // TODO: si le verrou existe, il faut faire une boucle de testSome() pour ne
1552 // pas bloquer.
1553 MpiLock::Section mls(m_mpi_lock);
1554 m_mpi_prof->waitSome(size, saved_mpi_request.data(), &nb_completed_request,
1555 completed_requests.data(), mpi_status.data());
1556 }
1557 // Il ne faut pas utiliser mpi_request[i] car il est modifié par Mpi
1558 // mpi_request[i] == MPI_REQUEST_NULL
1559 if (nb_completed_request == MPI_UNDEFINED) // Si aucune requete n'etait valide.
1560 nb_completed_request = 0;
1561 if (is_print_debug)
1562 debug() << "WaitSomeRequest nb_completed=" << nb_completed_request;
1563 }
1564 }
1565 catch(TimeoutException& ex)
1566 {
1567 std::ostringstream ostr;
1568 if (is_non_blocking)
1569 ostr << MpiInfo(eMpiName::Testsome).name();
1570 else
1571 ostr << MpiInfo(eMpiName::Waitsome).name();
1572 ostr << " size=" << size
1573 << " is_non_blocking=" << is_non_blocking;
1574 ex.setAdditionalInfo(ostr.str());
1575 throw;
1576 }
1577
1578 for( int z=0; z<nb_completed_request; ++z ){
1579 int index = completed_requests[z];
1580 if (is_print_debug)
1581 debug() << "Completed my_rank=" << m_comm_rank << " z=" << z
1582 << " index=" << index
1583 << " tag=" << mpi_status[z].MPI_TAG
1584 << " source=" << mpi_status[z].MPI_SOURCE;
1585
1586 indexes[index] = true;
1587 }
1588
1589 bool has_new_request = _handleEndRequests(requests,indexes,mpi_status);
1590 if (has_new_request){
1591 // Si on a de nouvelles requêtes, alors il est possible qu'aucune
1592 // requête n'aie aboutie. En cas de testSome, cela n'est pas grave.
1593 // En cas de waitSome, cela signifie qu'il faut attendre à nouveau.
1594 }
1595 double end_time = MPI_Wtime();
1596 m_stat->add(MpiInfo(eMpiName::Waitsome).name(),end_time-begin_time,size);
1597}
1598
1599/*---------------------------------------------------------------------------*/
1600/*---------------------------------------------------------------------------*/
1601
1602void MpiAdapter::
1603freeRequest(Request& request)
1604{
1605 if (!request.isValid()){
1606 warning() << "MpiAdapter::freeRequest() null request r=" << (MPI_Request)request;
1607 _checkFatalInRequest();
1608 return;
1609 }
1610 {
1611 MpiLock::Section mls(m_mpi_lock);
1612
1613 auto mr = (MPI_Request)request;
1614 _removeRequest(mr);
1615 MPI_Request_free(&mr);
1616 }
1617 request.reset();
1618}
1619
1620/*---------------------------------------------------------------------------*/
1621/*---------------------------------------------------------------------------*/
1622
1623bool MpiAdapter::
1624testRequest(Request& request)
1625{
1626 // Il est autorisé par MPI de faire un test avec une requête nulle.
1627 if (!request.isValid())
1628 return true;
1629
1630 auto mr = (MPI_Request)request;
1631 int is_finished = 0;
1632
1633 {
1634 MpiLock::Section mls(m_mpi_lock);
1635
1636 // Il faut d'abord recuperer l'emplacement de la requete car si elle
1637 // est finie, elle sera automatiquement libérée par MPI lors du test
1638 // et du coup on ne pourra plus la supprimer
1639 RequestSet::Iterator request_iter = m_request_set->findRequest(mr);
1640
1641 m_mpi_prof->test(&mr, &is_finished, (MPI_Status *) MPI_STATUS_IGNORE);
1642 //info() << "** TEST REQUEST r=" << mr << " is_finished=" << is_finished;
1643 if (is_finished!=0){
1644 m_request_set->removeRequest(request_iter);
1645 if (request.hasSubRequest())
1646 ARCCORE_THROW(NotImplementedException,"SubRequest support");
1647 request.reset();
1648 return true;
1649 }
1650 }
1651
1652 return false;
1653}
1654
1655/*---------------------------------------------------------------------------*/
1656/*---------------------------------------------------------------------------*/
1661_addRequest(MPI_Request request)
1662{
1663 m_request_set->addRequest(request);
1664}
1665
1666/*---------------------------------------------------------------------------*/
1667/*---------------------------------------------------------------------------*/
1673{
1674 m_request_set->removeRequest(request);
1675}
1676
1677/*---------------------------------------------------------------------------*/
1678/*---------------------------------------------------------------------------*/
1679
1680void MpiAdapter::
1681enableDebugRequest(bool enable_debug_request)
1682{
1684 //if (!m_enable_debug_request)
1685 //info() << "WARNING: Mpi adpater debug request is disabled (multi-threading)";
1686}
1687
1688/*---------------------------------------------------------------------------*/
1689/*---------------------------------------------------------------------------*/
1690
1691void MpiAdapter::
1692_checkFatalInRequest()
1693{
1694 if (isRequestErrorAreFatal())
1695 ARCCORE_FATAL("Error in requests management");
1696}
1697
1698/*---------------------------------------------------------------------------*/
1699/*---------------------------------------------------------------------------*/
1700
1701void MpiAdapter::
1702setMpiProfiling(IMpiProfiling* mpi_profiling)
1703{
1704 m_mpi_prof = mpi_profiling;
1705}
1706
1707/*---------------------------------------------------------------------------*/
1708/*---------------------------------------------------------------------------*/
1709
1710IMpiProfiling* MpiAdapter::
1711getMpiProfiling() const
1712{
1713 return m_mpi_prof;
1714}
1715
1716/*---------------------------------------------------------------------------*/
1717/*---------------------------------------------------------------------------*/
1718
1719void MpiAdapter::
1720setProfiler(IProfiler* profiler)
1721{
1722 if (!profiler){
1723 m_mpi_prof = nullptr;
1724 return;
1725 }
1726
1727 IMpiProfiling* p = dynamic_cast<IMpiProfiling*>(profiler);
1728 if (!p)
1729 ARCCORE_FATAL("Invalid profiler. Profiler has to implemented interface 'IMpiProfiling'");
1730 m_mpi_prof = p;
1731}
1732
1733/*---------------------------------------------------------------------------*/
1734/*---------------------------------------------------------------------------*/
1735
1736IProfiler* MpiAdapter::
1737profiler() const
1738{
1739 return m_mpi_prof;
1740}
1741
1742/*---------------------------------------------------------------------------*/
1743/*---------------------------------------------------------------------------*/
1744
1745} // End namespace Arccore::MessagePassing::Mpi
1746
1747/*---------------------------------------------------------------------------*/
1748/*---------------------------------------------------------------------------*/
Lecteur des fichiers de maillage via la bibliothèque LIMA.
Definition Lima.cc:120
Vue modifiable d'un tableau d'un type T.
constexpr Integer size() const noexcept
Retourne la taille du tableau.
Interface d'un service de trace des appels de fonctions.
virtual StackTrace stackTrace(int first_function=0)=0
Chaîne de caractère indiquant la pile d'appel.
Interface du gestionnaire de traces.
virtual void add(const String &name, double elapsed_time, Int64 msg_size)=0
Ajoute une statistique.
virtual void enable(bool is_enabled)=0
Active ou désactive les statistiques.
Interface d'abstraction pour les operations MPI. Sert principalement a utiliser un decorateur pour le...
bool m_no_check_request
Vrai si on vérifie pas les requêtes.
Iterator findRequest(MPI_Request request)
Vérifie que la requête est dans la liste.
void _addRequest(MPI_Request request, const TraceInfo &trace_info)
Request sendNonBlockingNoStat(const void *send_buffer, Int64 send_buffer_size, Int32 proc, MPI_Datatype data_type, int mpi_tag)
Version non bloquante de send sans statistique temporelle.
MpiAdapter(ITraceMng *msg, IStat *stat, MPI_Comm comm, MpiLock *mpi_lock, IMpiProfiling *mpi_prof=nullptr)
void setPrintRequestError(bool v)
Indique si on affiche des messages pour les erreurs dans les requêtes.
Request buildRequest(int ret, MPI_Request request)
Construit une requête Arccore à partir d'une requête MPI.
void setRequestErrorAreFatal(bool v)
Indique si les erreurs dans la liste des requêtes sont fatales.
Request receiveNonBlockingNoStat(void *recv_buffer, Int64 recv_buffer_size, Int32 source_rank, MPI_Datatype data_type, int mpi_tag)
Version non bloquante de receive sans statistiques temporelles.
void _removeRequest(MPI_Request request)
void _addRequest(MPI_Request request)
void destroy()
Détruit l'instance. Elle ne doit plus être utilisée par la suite.
MPI_Request m_empty_request1
Requêtes vides. Voir MpiAdapter.cc pour plus d'infos.
void setCheckRequest(bool v)
Indique si on vérifie les requêtes.
Structure informative liee aux enumerationx pour les operations MPI. Donne le nom associe a l'enum ai...
Spécialisation MPI d'une 'Request'.
Definition MpiRequest.h:35
Implementation de l'interface des operations MPI. Correspond a un simple appel aux fonctions MPI du m...
static MpiMessagePassingMng * create(MPI_Comm comm, bool clean_comm=false)
Créé un gestionnaire associé au communicateur comm.
Requête d'un message.
Definition Request.h:77
Exception lorsqu'une fonction n'est pas implémentée.
const String & toString() const
Chaîne de caractères indiquant la pile d'appel.
Chaîne de caractères unicode.
TraceMessage error() const
Flot pour un message d'erreur.
TraceMessageDbg debug(Trace::eDebugLevel=Trace::Medium) const
Flot pour un message de debug.
TraceMessage warning() const
Flot pour un message d'avertissement.
TraceMessage info() const
Flot pour un message d'information.
Vecteur 1D de données avec sémantique par valeur (style STL).
ARCCORE_BASE_EXPORT String getStackTrace()
Retourne une chaîne de caractere contenant la pile d'appel.
ARCCORE_BASE_EXPORT IStackTraceService * getStackTraceService()
Service utilisé pour obtenir la pile d'appel.
ARCCORE_BASE_EXPORT String getEnvironmentVariable(const String &name)
Variable d'environnement du nom name.
Int32 Integer
Type représentant un entier.
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Créé une référence sur un pointeur.
std::int64_t Int64
Type entier signé sur 64 bits.
ARCCORE_BASE_EXPORT bool arccoreIsCheck()
Vrai si on est en mode vérification.