Arcane  v3.16.0.0
Documentation utilisateur
Chargement...
Recherche...
Aucune correspondance
MpiAdapter.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2025 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* MpiAdapter.cc (C) 2000-2025 */
9/* */
10/* Gestionnaire de parallélisme utilisant MPI. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arccore/message_passing_mpi/internal/MpiAdapter.h"
15
16#include "arccore/trace/ITraceMng.h"
17
18#include "arccore/collections/Array.h"
19
20#include "arccore/message_passing/Request.h"
21#include "arccore/message_passing/IStat.h"
22#include "arccore/message_passing/internal/SubRequestCompletionInfo.h"
23
24#include "arccore/base/IStackTraceService.h"
25#include "arccore/base/TimeoutException.h"
26#include "arccore/base/String.h"
27#include "arccore/base/NotImplementedException.h"
28#include "arccore/base/PlatformUtils.h"
29#include "arccore/base/FatalErrorException.h"
30#include "arccore/base/TraceInfo.h"
31
32#include "arccore/message_passing_mpi/StandaloneMpiMessagePassingMng.h"
33#include "arccore/message_passing_mpi/internal/MpiLock.h"
34#include "arccore/message_passing_mpi/internal/NoMpiProfiling.h"
35#include "arccore/message_passing_mpi/internal/MpiRequest.h"
36
37#include <cstdint>
38
39/*---------------------------------------------------------------------------*/
40/*---------------------------------------------------------------------------*/
41
42namespace Arcane::MessagePassing::Mpi
43{
44
45/*---------------------------------------------------------------------------*/
46/*---------------------------------------------------------------------------*/
47
49: public TraceAccessor
50{
51 public:
53 {
54 TraceInfo m_trace;
55 String m_stack_trace;
56 };
57 public:
58 typedef std::map<MPI_Request,RequestInfo>::iterator Iterator;
59 public:
60
61 explicit RequestSet(ITraceMng* tm)
62 : TraceAccessor(tm)
63 {
64 m_trace_mng_ref = makeRef(tm);
65 if (arccoreIsCheck()){
66 m_no_check_request = false;
67 m_request_error_is_fatal = true;
68 }
69 if (Platform::getEnvironmentVariable("ARCCORE_NOREPORT_ERROR_MPIREQUEST")=="TRUE")
70 m_is_report_error_in_request = false;
71 if (Platform::getEnvironmentVariable("ARCCORE_MPIREQUEST_STACKTRACE")=="TRUE")
72 m_use_trace_full_stack = true;
73 if (Platform::getEnvironmentVariable("ARCCORE_TRACE_MPIREQUEST")=="TRUE")
74 m_trace_mpirequest = true;
75 }
76 public:
77 void addRequest(MPI_Request request)
78 {
80 return;
81 if (m_trace_mpirequest)
82 info() << "MpiAdapter: AddRequest r=" << request;
83 _addRequest(request,TraceInfo());
84 }
85 void addRequest(MPI_Request request,const TraceInfo& ti)
86 {
88 return;
89 if (m_trace_mpirequest)
90 info() << "MpiAdapter: AddRequest r=" << request;
91 _addRequest(request,ti);
92 }
93 void removeRequest(MPI_Request request)
94 {
96 return;
97 if (m_trace_mpirequest)
98 info() << "MpiAdapter: RemoveRequest r=" << request;
99 _removeRequest(request);
100 }
101 void removeRequest(Iterator request_iter)
102 {
104 return;
105 if (request_iter==m_allocated_requests.end()){
106 if (m_trace_mpirequest)
107 info() << "MpiAdapter: RemoveRequestIter null iterator";
108 return;
109 }
110 if (m_trace_mpirequest)
111 info() << "MpiAdapter: RemoveRequestIter r=" << request_iter->first;
112 m_allocated_requests.erase(request_iter);
113 }
114 //! Vérifie que la requête est dans la liste
115 Iterator findRequest(MPI_Request request)
116 {
118 return m_allocated_requests.end();
119
120 if (_isEmptyRequest(request))
121 return m_allocated_requests.end();
122 auto ireq = m_allocated_requests.find(request);
123 if (ireq==m_allocated_requests.end()){
124 if (m_is_report_error_in_request || m_request_error_is_fatal){
125 error() << "MpiAdapter::testRequest() request not referenced "
126 << " id=" << request;
127 _checkFatalInRequest();
128 }
129 }
130 return ireq;
131 }
132 private:
133 /*!
134 * \warning Cette fonction doit etre appelee avec le verrou mpi_lock actif.
135 */
136 void _addRequest(MPI_Request request,const TraceInfo& trace_info)
137 {
138 if (request==MPI_REQUEST_NULL){
139 if (m_is_report_error_in_request || m_request_error_is_fatal){
140 error() << "MpiAdapter::_addRequest() trying to add null request";
141 _checkFatalInRequest();
142 }
143 return;
144 }
145 if (_isEmptyRequest(request))
146 return;
147 ++m_total_added_request;
148 //info() << "MPI_ADAPTER:ADD REQUEST " << request;
149 auto i = m_allocated_requests.find(request);
150 if (i!=m_allocated_requests.end()){
151 if (m_is_report_error_in_request || m_request_error_is_fatal){
152 error() << "MpiAdapter::_addRequest() request already referenced "
153 << " id=" << request;
154 _checkFatalInRequest();
155 }
156 return;
157 }
158 RequestInfo rinfo;
159 rinfo.m_trace = trace_info;
160 if (m_use_trace_full_stack)
161 rinfo.m_stack_trace = Platform::getStackTrace();
162 m_allocated_requests.insert(std::make_pair(request,rinfo));
163 }
164
165 /*!
166 * \warning Cette fonction doit être appelé avec le verrou mpi_lock actif.
167 */
168 void _removeRequest(MPI_Request request)
169 {
170 //info() << "MPI_ADAPTER:REMOVE REQUEST " << request;
171 if (request==MPI_REQUEST_NULL){
172 if (m_is_report_error_in_request || m_request_error_is_fatal){
173 error() << "MpiAdapter::_removeRequest() null request (" << MPI_REQUEST_NULL << ")";
174 _checkFatalInRequest();
175 }
176 return;
177 }
178 if (_isEmptyRequest(request))
179 return;
180 auto i = m_allocated_requests.find(request);
181 if (i==m_allocated_requests.end()){
182 if (m_is_report_error_in_request || m_request_error_is_fatal){
183 error() << "MpiAdapter::_removeRequest() request not referenced "
184 << " id=" << request;
185 _checkFatalInRequest();
186 }
187 }
188 else
189 m_allocated_requests.erase(i);
190 }
191 public:
192 void _checkFatalInRequest()
193 {
194 if (m_request_error_is_fatal)
195 ARCCORE_FATAL("Error in requests management");
196 }
197 Int64 nbRequest() const { return m_allocated_requests.size(); }
198 Int64 totalAddedRequest() const { return m_total_added_request; }
199 void printRequests() const
200 {
201 info() << "PRINT REQUESTS\n";
202 for( auto& x : m_allocated_requests ){
203 info() << "Request id=" << x.first << " trace=" << x.second.m_trace
204 << " stack=" << x.second.m_stack_trace;
205 }
206 }
207 void setEmptyRequests(MPI_Request r1,MPI_Request r2)
208 {
209 m_empty_request1 = r1;
210 m_empty_request2 = r2;
211 }
212 public:
213 bool m_request_error_is_fatal = false;
214 bool m_is_report_error_in_request = true;
215 bool m_trace_mpirequest = false;
216 //! Vrai si on vérifie pas les requêtes
218 private:
219 std::map<MPI_Request,RequestInfo> m_allocated_requests;
220 bool m_use_trace_full_stack = false;
221 MPI_Request m_empty_request1 = MPI_REQUEST_NULL;
222 MPI_Request m_empty_request2 = MPI_REQUEST_NULL;
223 Int64 m_total_added_request = 0;
224 Ref<ITraceMng> m_trace_mng_ref;
225 private:
226 bool _isEmptyRequest(MPI_Request r) const
227 {
228 return (r==m_empty_request1 || r==m_empty_request2);
229 }
230};
231
232#define ARCCORE_ADD_REQUEST(request)\
233 m_request_set->addRequest(request,A_FUNCINFO);
234
235/*---------------------------------------------------------------------------*/
236/*---------------------------------------------------------------------------*/
237
238namespace
239{
240int _checkSize(Int64 i64_size)
241{
242 if (i64_size>INT32_MAX)
243 ARCCORE_FATAL("Can not convert '{0}' to type integer",i64_size);
244 return (int)i64_size;
245}
246}
247
248/*---------------------------------------------------------------------------*/
249/*---------------------------------------------------------------------------*/
250
251MpiAdapter::
252MpiAdapter(ITraceMng* trace,IStat* stat,MPI_Comm comm,
253 MpiLock* mpi_lock, IMpiProfiling* mpi_op)
254: TraceAccessor(trace)
255, m_stat(stat)
256, m_mpi_lock(mpi_lock)
257, m_mpi_prof(mpi_op)
258, m_communicator(comm)
259, m_comm_rank(0)
260, m_comm_size(0)
261, m_empty_request1(MPI_REQUEST_NULL)
262, m_empty_request2(MPI_REQUEST_NULL)
263{
264 m_request_set = new RequestSet(trace);
265
266 if (Platform::getEnvironmentVariable("ARCCORE_TRACE_MPI")=="TRUE")
267 m_is_trace = true;
268 {
269 String s = Platform::getEnvironmentVariable("ARCCORE_ALLOW_NULL_RANK_FOR_MPI_ANY_SOURCE");
270 if (s == "1" || s == "TRUE")
271 m_is_allow_null_rank_for_any_source = true;
272 if (s == "0" || s == "FALSE")
273 m_is_allow_null_rank_for_any_source = false;
274 }
275
276 ::MPI_Comm_rank(m_communicator,&m_comm_rank);
277 ::MPI_Comm_size(m_communicator,&m_comm_size);
278
279 // Par defaut, on ne fait pas de profiling MPI, on utilisera la methode set idoine pour changer
280 if (!m_mpi_prof)
281 m_mpi_prof = new NoMpiProfiling();
282
283 /*!
284 * Ce type de requête est utilisé par openmpi à partir de
285 * la version 1.8 (il faut voir pour la 1.6, sachant que la 1.4 et 1.5
286 * ne l'ont pas). Cette requête fonctionne un peu comme MPI_REQUEST_NULL
287 * et il est possible qu'elle soit retournée plusieurs fois. Il ne faut
288 * donc pas mettre cette requête dans m_allocated_requests.
289 * On ne peut pas accéder directement à l'adresse de cette requête vide
290 * mais l'implémentation 1.8 de openmpi retourne cette requête lorsqu'on
291 * appelle un IRecv avec une source MPI_PROC_NULL. On récupère donc la valeur
292 * comme cela.
293 */
294 MPI_Irecv(m_recv_buffer_for_empty_request, 1, MPI_CHAR, MPI_PROC_NULL,
295 50505, m_communicator, &m_empty_request1);
296
297 /*
298 * A partir de la version 4 de openmpi, il semble aussi que les send avec
299 * de petits buffers génèrent toujours la même requête. Il faut donc aussi
300 * la supprimer des requêtes à tester. On poste aussi le MPI_Recv correspondant
301 * pour éviter que le MPI_ISend puisse involontairement être utilisé dans un
302 * MPI_Recv de l'utilisateur (via par exemple un MPI_Recv(MPI_ANY_TAG).
303 */
304 m_send_buffer_for_empty_request2[0] = 0;
305 MPI_Isend(m_send_buffer_for_empty_request2, 1, MPI_CHAR, m_comm_rank,
306 50505, m_communicator, &m_empty_request2);
307
308 MPI_Recv(m_recv_buffer_for_empty_request2, 1, MPI_CHAR, m_comm_rank,
309 50505, m_communicator, MPI_STATUS_IGNORE);
310
311 m_request_set->setEmptyRequests(m_empty_request1,m_empty_request2);
312}
313
314/*---------------------------------------------------------------------------*/
315/*---------------------------------------------------------------------------*/
316
317MpiAdapter::
318~MpiAdapter()
319{
320 if (m_empty_request1 != MPI_REQUEST_NULL)
321 MPI_Request_free(&m_empty_request1);
322 if (m_empty_request2 != MPI_REQUEST_NULL)
323 MPI_Request_free(&m_empty_request2);
324
325 delete m_request_set;
326 delete m_mpi_prof;
327}
328
329/*---------------------------------------------------------------------------*/
330/*---------------------------------------------------------------------------*/
331
332Request MpiAdapter::
333buildRequest(int ret,MPI_Request mpi_request)
334{
335 return MpiRequest(ret,this,mpi_request);
336}
337
338/*---------------------------------------------------------------------------*/
339/*---------------------------------------------------------------------------*/
340
341void MpiAdapter::
342_checkHasNoRequests()
343{
344 Int64 nb_request = m_request_set->nbRequest();
345 // On ne peut pas faire ce test dans le destructeur car il peut
346 // potentiellement lancé une exception et cela ne doit pas être fait
347 // dans un destructeur.
348 if (nb_request!=0){
349 warning() << " Pending mpi requests size=" << nb_request;
350 m_request_set->printRequests();
351 _checkFatalInRequest();
352 }
353}
354
355/*---------------------------------------------------------------------------*/
356/*---------------------------------------------------------------------------*/
357
358void MpiAdapter::
359destroy()
360{
361 _checkHasNoRequests();
362 delete this;
363}
364
365/*---------------------------------------------------------------------------*/
366/*---------------------------------------------------------------------------*/
367
368void MpiAdapter::
369setRequestErrorAreFatal(bool v)
370{
371 m_request_set->m_request_error_is_fatal = v;
372}
373bool MpiAdapter::
374isRequestErrorAreFatal() const
375{
376 return m_request_set->m_request_error_is_fatal;
377}
378
379void MpiAdapter::
380setPrintRequestError(bool v)
381{
382 m_request_set->m_is_report_error_in_request = v;
383}
384bool MpiAdapter::
385isPrintRequestError() const
386{
387 return m_request_set->m_is_report_error_in_request;
388}
389
390void MpiAdapter::
391setCheckRequest(bool v)
392{
393 m_request_set->m_no_check_request = !v;
394}
395
396bool MpiAdapter::
397isCheckRequest() const
398{
399 return !m_request_set->m_no_check_request;
400}
401
402/*---------------------------------------------------------------------------*/
403/*---------------------------------------------------------------------------*/
404
405int MpiAdapter::
406toMPISize(Int64 count)
407{
408 return _checkSize(count);
409}
410
411/*---------------------------------------------------------------------------*/
412/*---------------------------------------------------------------------------*/
413
414void MpiAdapter::
415_trace(const char* function)
416{
417 if (m_is_trace) {
418 IStackTraceService* stack_service = Platform::getStackTraceService();
419 if (stack_service)
420 info() << "MPI_TRACE: " << function << "\n" << stack_service->stackTrace().toString();
421 else
422 info() << "MPI_TRACE: " << function;
423 }
424}
425
426/*---------------------------------------------------------------------------*/
427/*---------------------------------------------------------------------------*/
428
429void MpiAdapter::
430broadcast(void* buf,Int64 nb_elem,Int32 root,MPI_Datatype datatype)
431{
432 int _nb_elem = _checkSize(nb_elem);
433 _trace(MpiInfo(eMpiName::Bcast).name().localstr());
434 double begin_time = MPI_Wtime();
435 if (m_is_trace)
436 info() << "MPI_TRACE: MPI broadcast: before"
437 << " buf=" << buf
438 << " nb_elem=" << nb_elem
439 << " root=" << root
440 << " datatype=" << datatype;
441
442 m_mpi_prof->broadcast(buf, _nb_elem, datatype, root, m_communicator);
443 double end_time = MPI_Wtime();
444 double sr_time = (end_time-begin_time);
445 //TODO determiner la taille des messages
446 m_stat->add(MpiInfo(eMpiName::Bcast).name(),sr_time,0);
447}
448
449/*---------------------------------------------------------------------------*/
450/*---------------------------------------------------------------------------*/
451
452Request MpiAdapter::
453nonBlockingBroadcast(void* buf,Int64 nb_elem,Int32 root,MPI_Datatype datatype)
454{
455 MPI_Request mpi_request = MPI_REQUEST_NULL;
456 int ret = -1;
457 int _nb_elem = _checkSize(nb_elem);
458 _trace(" MPI_Bcast");
459 double begin_time = MPI_Wtime();
460 ret = MPI_Ibcast(buf,_nb_elem,datatype,root,m_communicator,&mpi_request);
461 double end_time = MPI_Wtime();
462 double sr_time = (end_time-begin_time);
463 //TODO determiner la taille des messages
464 m_stat->add("IBroadcast",sr_time,0);
465 ARCCORE_ADD_REQUEST(mpi_request);
466 return buildRequest(ret,mpi_request);
467}
468
469/*---------------------------------------------------------------------------*/
470/*---------------------------------------------------------------------------*/
471
472void MpiAdapter::
473gather(const void* send_buf,void* recv_buf,Int64 nb_elem,Int32 root,MPI_Datatype datatype)
474{
475 void* _sbuf = const_cast<void*>(send_buf);
476 int _nb_elem = _checkSize(nb_elem);
477 int _root = static_cast<int>(root);
478 _trace(MpiInfo(eMpiName::Gather).name().localstr());
479 double begin_time = MPI_Wtime();
480 m_mpi_prof->gather(_sbuf, _nb_elem, datatype, recv_buf, _nb_elem, datatype, _root, m_communicator);
481 double end_time = MPI_Wtime();
482 double sr_time = (end_time-begin_time);
483 //TODO determiner la taille des messages
484 m_stat->add(MpiInfo(eMpiName::Gather).name(),sr_time,0);
485}
486
487/*---------------------------------------------------------------------------*/
488/*---------------------------------------------------------------------------*/
489
490Request MpiAdapter::
491nonBlockingGather(const void* send_buf,void* recv_buf,
492 Int64 nb_elem,Int32 root,MPI_Datatype datatype)
493{
494 MPI_Request mpi_request = MPI_REQUEST_NULL;
495 int ret = -1;
496 void* _sbuf = const_cast<void*>(send_buf);
497 int _nb_elem = _checkSize(nb_elem);
498 int _root = static_cast<int>(root);
499 _trace("MPI_Igather");
500 double begin_time = MPI_Wtime();
501 ret = MPI_Igather(_sbuf,_nb_elem,datatype,recv_buf,_nb_elem,datatype,_root,
502 m_communicator,&mpi_request);
503 double end_time = MPI_Wtime();
504 double sr_time = (end_time-begin_time);
505 //TODO determiner la taille des messages
506 m_stat->add("IGather",sr_time,0);
507 ARCCORE_ADD_REQUEST(mpi_request);
508 return buildRequest(ret,mpi_request);
509}
510
511/*---------------------------------------------------------------------------*/
512/*---------------------------------------------------------------------------*/
513
514void MpiAdapter::
515allGather(const void* send_buf,void* recv_buf,
516 Int64 nb_elem,MPI_Datatype datatype)
517{
518 void* _sbuf = const_cast<void*>(send_buf);
519 int _nb_elem = _checkSize(nb_elem);
520 _trace(MpiInfo(eMpiName::Allgather).name().localstr());
521 double begin_time = MPI_Wtime();
522 m_mpi_prof->allGather(_sbuf, _nb_elem, datatype, recv_buf, _nb_elem, datatype, m_communicator);
523 double end_time = MPI_Wtime();
524 double sr_time = (end_time-begin_time);
525 //TODO determiner la taille des messages
526 m_stat->add(MpiInfo(eMpiName::Allgather).name(),sr_time,0);
527}
528
529/*---------------------------------------------------------------------------*/
530/*---------------------------------------------------------------------------*/
531
532Request MpiAdapter::
533nonBlockingAllGather(const void* send_buf,void* recv_buf,
534 Int64 nb_elem,MPI_Datatype datatype)
535{
536 MPI_Request mpi_request = MPI_REQUEST_NULL;
537 int ret = -1;
538 void* _sbuf = const_cast<void*>(send_buf);
539 int _nb_elem = _checkSize(nb_elem);
540 _trace("MPI_Iallgather");
541 double begin_time = MPI_Wtime();
542 ret = MPI_Iallgather(_sbuf,_nb_elem,datatype,recv_buf,_nb_elem,datatype,
543 m_communicator,&mpi_request);
544 double end_time = MPI_Wtime();
545 double sr_time = (end_time-begin_time);
546 //TODO determiner la taille des messages
547 m_stat->add("IAllGather",sr_time,0);
548 ARCCORE_ADD_REQUEST(mpi_request);
549 return buildRequest(ret,mpi_request);
550}
551
552/*---------------------------------------------------------------------------*/
553/*---------------------------------------------------------------------------*/
554
555void MpiAdapter::
556gatherVariable(const void* send_buf,void* recv_buf,const int* recv_counts,
557 const int* recv_indexes,Int64 nb_elem,Int32 root,MPI_Datatype datatype)
558{
559 void* _sbuf = const_cast<void*>(send_buf);
560 int _nb_elem = _checkSize(nb_elem);
561 int _root = static_cast<int>(root);
562 _trace(MpiInfo(eMpiName::Gatherv).name().localstr());
563 double begin_time = MPI_Wtime();
564 m_mpi_prof->gatherVariable(_sbuf, _nb_elem, datatype, recv_buf, recv_counts, recv_indexes, datatype, _root, m_communicator);
565 double end_time = MPI_Wtime();
566 double sr_time = (end_time-begin_time);
567 //TODO determiner la taille des messages
568 m_stat->add(MpiInfo(eMpiName::Gatherv).name().localstr(),sr_time,0);
569}
570
571/*---------------------------------------------------------------------------*/
572/*---------------------------------------------------------------------------*/
573
574void MpiAdapter::
575allGatherVariable(const void* send_buf,void* recv_buf,const int* recv_counts,
576 const int* recv_indexes,Int64 nb_elem,MPI_Datatype datatype)
577{
578 void* _sbuf = const_cast<void*>(send_buf);
579 int _nb_elem = _checkSize(nb_elem);
580 _trace(MpiInfo(eMpiName::Allgatherv).name().localstr());
581 //info() << " ALLGATHERV N=" << _nb_elem;
582 //for( int i=0; i<m_comm_size; ++i )
583 //info() << " ALLGATHERV I=" << i << " recv_count=" << recv_counts[i]
584 // << " recv_indexes=" << recv_indexes[i];
585 double begin_time = MPI_Wtime();
586 m_mpi_prof->allGatherVariable(_sbuf, _nb_elem, datatype, recv_buf, recv_counts, recv_indexes, datatype, m_communicator);
587 double end_time = MPI_Wtime();
588 double sr_time = (end_time-begin_time);
589 //TODO determiner la taille des messages
590 m_stat->add(MpiInfo(eMpiName::Allgatherv).name().localstr(),sr_time,0);
591}
592
593/*---------------------------------------------------------------------------*/
594/*---------------------------------------------------------------------------*/
595
596void MpiAdapter::
597scatterVariable(const void* send_buf,const int* send_count,const int* send_indexes,
598 void* recv_buf,Int64 nb_elem,Int32 root,MPI_Datatype datatype)
599{
600 void* _sbuf = const_cast<void*>(send_buf);
601 int* _send_count = const_cast<int*>(send_count);
602 int* _send_indexes = const_cast<int*>(send_indexes);
603 int _nb_elem = _checkSize(nb_elem);
604 _trace(MpiInfo(eMpiName::Scatterv).name().localstr());
605 double begin_time = MPI_Wtime();
606 m_mpi_prof->scatterVariable(_sbuf,
607 _send_count,
608 _send_indexes,
609 datatype,
610 recv_buf,
611 _nb_elem,
612 datatype,
613 root,
614 m_communicator);
615 double end_time = MPI_Wtime();
616 double sr_time = (end_time-begin_time);
617 //TODO determiner la taille des messages
618 m_stat->add(MpiInfo(eMpiName::Scatterv).name(),sr_time,0);
619}
620
621/*---------------------------------------------------------------------------*/
622/*---------------------------------------------------------------------------*/
623
624void MpiAdapter::
625allToAll(const void* send_buf,void* recv_buf,Integer count,MPI_Datatype datatype)
626{
627 void* _sbuf = const_cast<void*>(send_buf);
628 int icount = _checkSize(count);
629 _trace(MpiInfo(eMpiName::Alltoall).name().localstr());
630 double begin_time = MPI_Wtime();
631 m_mpi_prof->allToAll(_sbuf, icount, datatype, recv_buf, icount, datatype, m_communicator);
632 double end_time = MPI_Wtime();
633 double sr_time = (end_time-begin_time);
634 //TODO determiner la taille des messages
635 m_stat->add(MpiInfo(eMpiName::Alltoall).name().localstr(),sr_time,0);
636}
637
638/*---------------------------------------------------------------------------*/
639/*---------------------------------------------------------------------------*/
640
641Request MpiAdapter::
642nonBlockingAllToAll(const void* send_buf,void* recv_buf,Integer count,MPI_Datatype datatype)
643{
644 MPI_Request mpi_request = MPI_REQUEST_NULL;
645 int ret = -1;
646 void* _sbuf = const_cast<void*>(send_buf);
647 int icount = _checkSize(count);
648 _trace("MPI_IAlltoall");
649 double begin_time = MPI_Wtime();
650 ret = MPI_Ialltoall(_sbuf,icount,datatype,recv_buf,icount,datatype,m_communicator,&mpi_request);
651 double end_time = MPI_Wtime();
652 double sr_time = (end_time-begin_time);
653 //TODO determiner la taille des messages
654 m_stat->add("IAllToAll",sr_time,0);
655 ARCCORE_ADD_REQUEST(mpi_request);
656 return buildRequest(ret,mpi_request);
657}
658
659/*---------------------------------------------------------------------------*/
660/*---------------------------------------------------------------------------*/
661
662void MpiAdapter::
663allToAllVariable(const void* send_buf,const int* send_counts,
664 const int* send_indexes,void* recv_buf,const int* recv_counts,
665 const int* recv_indexes,MPI_Datatype datatype)
666{
667 void* _sbuf = const_cast<void*>(send_buf);
668 int* _send_counts = const_cast<int*>(send_counts);
669 int* _send_indexes = const_cast<int*>(send_indexes);
670 int* _recv_counts = const_cast<int*>(recv_counts);
671 int* _recv_indexes = const_cast<int*>(recv_indexes);
672
673 _trace(MpiInfo(eMpiName::Alltoallv).name().localstr());
674 double begin_time = MPI_Wtime();
675 m_mpi_prof->allToAllVariable(_sbuf, _send_counts, _send_indexes, datatype,
676 recv_buf, _recv_counts, _recv_indexes, datatype, m_communicator);
677 double end_time = MPI_Wtime();
678 double sr_time = (end_time-begin_time);
679 //TODO determiner la taille des messages
680 m_stat->add(MpiInfo(eMpiName::Alltoallv).name(),sr_time,0);
681}
682
683/*---------------------------------------------------------------------------*/
684/*---------------------------------------------------------------------------*/
685
686Request MpiAdapter::
687nonBlockingAllToAllVariable(const void* send_buf,const int* send_counts,
688 const int* send_indexes,void* recv_buf,const int* recv_counts,
689 const int* recv_indexes,MPI_Datatype datatype)
690{
691 MPI_Request mpi_request = MPI_REQUEST_NULL;
692 int ret = -1;
693 void* _sbuf = const_cast<void*>(send_buf);
694 int* _send_counts = const_cast<int*>(send_counts);
695 int* _send_indexes = const_cast<int*>(send_indexes);
696 int* _recv_counts = const_cast<int*>(recv_counts);
697 int* _recv_indexes = const_cast<int*>(recv_indexes);
698
699 _trace("MPI_Ialltoallv");
700 double begin_time = MPI_Wtime();
701 ret = MPI_Ialltoallv(_sbuf,_send_counts,_send_indexes,datatype,
702 recv_buf,_recv_counts,_recv_indexes,datatype,
703 m_communicator,&mpi_request);
704 double end_time = MPI_Wtime();
705 double sr_time = (end_time-begin_time);
706 //TODO determiner la taille des messages
707 m_stat->add("IAllToAll",sr_time,0);
708 ARCCORE_ADD_REQUEST(mpi_request);
709 return buildRequest(ret,mpi_request);
710}
711
712/*---------------------------------------------------------------------------*/
713/*---------------------------------------------------------------------------*/
714
715void MpiAdapter::
716barrier()
717{
718 // TODO: a priori on ne devrait pas avoir de requêtes en vol possible
719 // entre deux barrières pour éviter tout problème.
720 // _checkHasNoRequests();
721 // TODO ajouter trace correspondante pour le profiling.
722 MPI_Barrier(m_communicator);
723}
724
725/*---------------------------------------------------------------------------*/
726/*---------------------------------------------------------------------------*/
727
728Request MpiAdapter::
729nonBlockingBarrier()
730{
731 MPI_Request mpi_request = MPI_REQUEST_NULL;
732 int ret = -1;
733 ret = MPI_Ibarrier(m_communicator,&mpi_request);
734 ARCCORE_ADD_REQUEST(mpi_request);
735 return buildRequest(ret,mpi_request);
736}
737
738/*---------------------------------------------------------------------------*/
739/*---------------------------------------------------------------------------*/
740
741void MpiAdapter::
742allReduce(const void* send_buf,void* recv_buf,Int64 count,MPI_Datatype datatype,MPI_Op op)
743{
744 void* _sbuf = const_cast<void*>(send_buf);
745 int _n = _checkSize(count);
746 double begin_time = MPI_Wtime();
747 _trace(MpiInfo(eMpiName::Allreduce).name().localstr());
748 try{
749 ++m_nb_all_reduce;
750 m_mpi_prof->allReduce(_sbuf, recv_buf, _n, datatype, op, m_communicator);
751 }
752 catch(TimeoutException& ex)
753 {
754 std::ostringstream ostr;
755 ostr << "MPI_Allreduce"
756 << " send_buf=" << send_buf
757 << " recv_buf=" << recv_buf
758 << " n=" << count
759 << " datatype=" << datatype
760 << " op=" << op
761 << " NB=" << m_nb_all_reduce;
762 ex.setAdditionalInfo(ostr.str());
763 throw;
764 }
765 double end_time = MPI_Wtime();
766 m_stat->add(MpiInfo(eMpiName::Allreduce).name(),end_time-begin_time,count);
767}
768
769/*---------------------------------------------------------------------------*/
770/*---------------------------------------------------------------------------*/
771
772Request MpiAdapter::
773nonBlockingAllReduce(const void* send_buf,void* recv_buf,Int64 count,MPI_Datatype datatype,MPI_Op op)
774{
775 MPI_Request mpi_request = MPI_REQUEST_NULL;
776 int ret = -1;
777 void* _sbuf = const_cast<void*>(send_buf);
778 int _n = _checkSize(count);
779 double begin_time = MPI_Wtime();
780 _trace("MPI_IAllreduce");
781 ret = MPI_Iallreduce(_sbuf,recv_buf,_n,datatype,op,m_communicator,&mpi_request);
782 double end_time = MPI_Wtime();
783 m_stat->add("IReduce",end_time-begin_time,_n);
784 ARCCORE_ADD_REQUEST(mpi_request);
785 return buildRequest(ret,mpi_request);
786}
787
788/*---------------------------------------------------------------------------*/
789/*---------------------------------------------------------------------------*/
790
791void MpiAdapter::
792reduce(const void* send_buf,void* recv_buf,Int64 count,MPI_Datatype datatype,MPI_Op op,Integer root)
793{
794 void* _sbuf = const_cast<void*>(send_buf);
795 int _n = _checkSize(count);
796 int _root = static_cast<int>(root);
797 double begin_time = MPI_Wtime();
798 _trace(MpiInfo(eMpiName::Reduce).name().localstr());
799 try{
800 ++m_nb_reduce;
801 m_mpi_prof->reduce(_sbuf, recv_buf, _n, datatype, op, _root, m_communicator);
802 }
803 catch(TimeoutException& ex)
804 {
805 std::ostringstream ostr;
806 ostr << "MPI_reduce"
807 << " send_buf=" << send_buf
808 << " recv_buf=" << recv_buf
809 << " n=" << count
810 << " datatype=" << datatype
811 << " op=" << op
812 << " root=" << root
813 << " NB=" << m_nb_reduce;
814 ex.setAdditionalInfo(ostr.str());
815 throw;
816 }
817
818 double end_time = MPI_Wtime();
819 m_stat->add(MpiInfo(eMpiName::Reduce).name(),end_time-begin_time,0);
820}
821
822/*---------------------------------------------------------------------------*/
823/*---------------------------------------------------------------------------*/
824
825void MpiAdapter::
826scan(const void* send_buf,void* recv_buf,Int64 count,MPI_Datatype datatype,MPI_Op op)
827{
828 void* _sbuf = const_cast<void*>(send_buf);
829 int _n = _checkSize(count);
830 double begin_time = MPI_Wtime();
831 _trace(MpiInfo(eMpiName::Scan).name().localstr());
832 m_mpi_prof->scan(_sbuf, recv_buf, _n, datatype, op, m_communicator);
833 double end_time = MPI_Wtime();
834 m_stat->add(MpiInfo(eMpiName::Scan).name(),end_time-begin_time,count);
835}
836
837/*---------------------------------------------------------------------------*/
838/*---------------------------------------------------------------------------*/
839
840void MpiAdapter::
841directSendRecv(const void* send_buffer,Int64 send_buffer_size,
842 void* recv_buffer,Int64 recv_buffer_size,
843 Int32 proc,Int64 elem_size,MPI_Datatype data_type)
844{
845 void* v_send_buffer = const_cast<void*>(send_buffer);
846 MPI_Status mpi_status;
847 double begin_time = MPI_Wtime();
848 _trace(MpiInfo(eMpiName::Sendrecv).name().localstr());
849 int sbuf_size = _checkSize(send_buffer_size);
850 int rbuf_size = _checkSize(recv_buffer_size);
851 m_mpi_prof->sendRecv(v_send_buffer, sbuf_size, data_type, proc, 99,
852 recv_buffer, rbuf_size, data_type, proc, 99,
853 m_communicator, &mpi_status);
854 double end_time = MPI_Wtime();
855 Int64 send_size = send_buffer_size * elem_size;
856 Int64 recv_size = recv_buffer_size * elem_size;
857 double sr_time = (end_time-begin_time);
858
859 //debug(Trace::High) << "MPI SendRecv: send " << send_size << " recv "
860 // << recv_size << " time " << sr_time ;
861 m_stat->add(MpiInfo(eMpiName::Sendrecv).name(),sr_time,send_size+recv_size);
862}
863
864/*---------------------------------------------------------------------------*/
865/*---------------------------------------------------------------------------*/
866
867Request MpiAdapter::
868sendNonBlockingNoStat(const void* send_buffer,Int64 send_buffer_size,
869 Int32 dest_rank,MPI_Datatype data_type,int mpi_tag)
870{
871 void* v_send_buffer = const_cast<void*>(send_buffer);
872 MPI_Request mpi_request = MPI_REQUEST_NULL;
873 int sbuf_size = _checkSize(send_buffer_size);
874 int ret = 0;
875 m_mpi_prof->iSend(v_send_buffer, sbuf_size, data_type, dest_rank, mpi_tag, m_communicator, &mpi_request);
876 if (m_is_trace)
877 info() << " ISend ret=" << ret << " proc=" << dest_rank << " tag=" << mpi_tag << " request=" << mpi_request;
878 ARCCORE_ADD_REQUEST(mpi_request);
879 return buildRequest(ret,mpi_request);
880}
881
882/*---------------------------------------------------------------------------*/
883/*---------------------------------------------------------------------------*/
884
885Request MpiAdapter::
886directSend(const void* send_buffer,Int64 send_buffer_size,
887 Int32 proc,Int64 elem_size,MPI_Datatype data_type,
888 int mpi_tag,bool is_blocked
889 )
890{
891 void* v_send_buffer = const_cast<void*>(send_buffer);
892 MPI_Request mpi_request = MPI_REQUEST_NULL;
893
894 double begin_time = 0.0;
895 double end_time = 0.0;
896 Int64 send_size = send_buffer_size * elem_size;
897 int ret = 0;
898 if (m_is_trace)
899 info() << "MPI_TRACE: MPI Send: send before"
900 << " size=" << send_size
901 << " dest=" << proc
902 << " tag=" << mpi_tag
903 << " datatype=" << data_type
904 << " blocking " << is_blocked;
905 if (is_blocked){
906 // si m_mpi_lock n'est pas nul, il faut
907 // utiliser un MPI_ISend suivi d'une boucle
908 // active de MPI_Test pour eviter tout probleme
909 // de dead lock.
910 if (m_mpi_lock){
911 {
912 MpiLock::Section mls(m_mpi_lock);
913 begin_time = MPI_Wtime();
914 int sbuf_size = _checkSize(send_buffer_size);
915 m_mpi_prof->iSend(v_send_buffer, sbuf_size, data_type, proc, mpi_tag, m_communicator, &mpi_request);
916 }
917 int is_finished = 0;
918 MPI_Status mpi_status;
919 while (is_finished==0){
920 MpiLock::Section mls(m_mpi_lock);
921 MPI_Request_get_status(mpi_request,&is_finished,&mpi_status);
922 if (is_finished!=0){
923 m_mpi_prof->wait(&mpi_request, (MPI_Status *) MPI_STATUS_IGNORE);
924 end_time = MPI_Wtime();
925 mpi_request = MPI_REQUEST_NULL;
926 }
927 }
928 }
929 else{
930 MpiLock::Section mls(m_mpi_lock);
931 begin_time = MPI_Wtime();
932 int sbuf_size = _checkSize(send_buffer_size);
933 m_mpi_prof->send(v_send_buffer, sbuf_size, data_type, proc, mpi_tag, m_communicator);
934 end_time = MPI_Wtime();
935 }
936 }
937 else{
938 {
939 MpiLock::Section mls(m_mpi_lock);
940 begin_time = MPI_Wtime();
941 int sbuf_size = _checkSize(send_buffer_size);
942 m_mpi_prof->iSend(v_send_buffer, sbuf_size, data_type, proc, mpi_tag, m_communicator, &mpi_request);
943 if (m_is_trace)
944 info() << " ISend ret=" << ret << " proc=" << proc << " tag=" << mpi_tag << " request=" << mpi_request;
945 end_time = MPI_Wtime();
946 ARCCORE_ADD_REQUEST(mpi_request);
947 }
948 if (m_is_trace){
949 info() << "MPI Send: send after"
950 << " request=" << mpi_request;
951 }
952 }
953 double sr_time = (end_time-begin_time);
954
955 debug(Trace::High) << "MPI Send: send " << send_size
956 << " time " << sr_time << " blocking " << is_blocked;
957 // TODO(FL): regarder comment faire pour profiler le Isend
958 m_stat->add(MpiInfo(eMpiName::Send).name(),end_time-begin_time,send_size);
959 return buildRequest(ret,mpi_request);
960}
961
962/*---------------------------------------------------------------------------*/
963/*---------------------------------------------------------------------------*/
964
965Request MpiAdapter::
966directSendPack(const void* send_buffer,Int64 send_buffer_size,
967 Int32 proc,int mpi_tag,bool is_blocked)
968{
969 return directSend(send_buffer,send_buffer_size,proc,1,MPI_PACKED,mpi_tag,is_blocked);
970}
971
972/*---------------------------------------------------------------------------*/
973/*---------------------------------------------------------------------------*/
974
975MpiMessagePassingMng* MpiAdapter::
976commSplit(bool keep)
977{
978 MPI_Comm new_comm;
979
980 MPI_Comm_split(m_communicator, (keep) ? 1 : MPI_UNDEFINED, commRank(), &new_comm);
981 if (keep) {
982 // Failed if new_comm is MPI_COMM_NULL
983 return StandaloneMpiMessagePassingMng::create(new_comm, true);
984 }
985 return nullptr;
986}
987
988/*---------------------------------------------------------------------------*/
989/*---------------------------------------------------------------------------*/
990
991Request MpiAdapter::
992receiveNonBlockingNoStat(void* recv_buffer,Int64 recv_buffer_size,
993 Int32 source_rank,MPI_Datatype data_type,int mpi_tag)
994{
995 int rbuf_size = _checkSize(recv_buffer_size);
996 int ret = 0;
997 MPI_Request mpi_request = MPI_REQUEST_NULL;
998 m_mpi_prof->iRecv(recv_buffer, rbuf_size, data_type, source_rank, mpi_tag, m_communicator, &mpi_request);
999 ARCCORE_ADD_REQUEST(mpi_request);
1000 return buildRequest(ret,mpi_request);
1001}
1002
1003/*---------------------------------------------------------------------------*/
1004/*---------------------------------------------------------------------------*/
1005
1006Request MpiAdapter::
1007directRecv(void* recv_buffer,Int64 recv_buffer_size,
1008 Int32 proc,Int64 elem_size,MPI_Datatype data_type,
1009 int mpi_tag,bool is_blocked)
1010{
1011 MPI_Status mpi_status;
1012 MPI_Request mpi_request = MPI_REQUEST_NULL;
1013 int ret = 0;
1014 double begin_time = 0.0;
1015 double end_time = 0.0;
1016
1017 int i_proc = 0;
1018 if (proc==A_PROC_NULL_RANK)
1019 ARCCORE_THROW(NotImplementedException,"Receive with MPI_PROC_NULL");
1020 if (proc == A_NULL_RANK && !m_is_allow_null_rank_for_any_source)
1021 ARCCORE_FATAL("Can not use A_NULL_RANK for any source. Use A_ANY_SOURCE_RANK instead");
1022 if (proc==A_NULL_RANK || proc==A_ANY_SOURCE_RANK)
1023 i_proc = MPI_ANY_SOURCE;
1024 else
1025 i_proc = static_cast<int>(proc);
1026
1027 Int64 recv_size = recv_buffer_size * elem_size;
1028 if (m_is_trace){
1029 info() << "MPI_TRACE: MPI Recv: recv before "
1030 << " size=" << recv_size
1031 << " from=" << i_proc
1032 << " tag=" << mpi_tag
1033 << " datatype=" << data_type
1034 << " blocking=" << is_blocked;
1035 }
1036 if (is_blocked){
1037 // si m_mpi_lock n'est pas nul, il faut
1038 // utiliser un MPI_IRecv suivi d'une boucle
1039 // active de MPI_Test pour eviter tout probleme
1040 // de dead lock.
1041 if (m_mpi_lock){
1042 {
1043 MpiLock::Section mls(m_mpi_lock);
1044 begin_time = MPI_Wtime();
1045 int rbuf_size = _checkSize(recv_buffer_size);
1046 m_mpi_prof->iRecv(recv_buffer, rbuf_size, data_type, i_proc, mpi_tag, m_communicator, &mpi_request);
1047 }
1048 int is_finished = 0;
1049 MPI_Status mpi_status;
1050 while (is_finished==0){
1051 MpiLock::Section mls(m_mpi_lock);
1052 MPI_Request_get_status(mpi_request,&is_finished,&mpi_status);
1053 if (is_finished!=0){
1054 end_time = MPI_Wtime();
1055 m_mpi_prof->wait(&mpi_request, (MPI_Status *) MPI_STATUS_IGNORE);
1056 mpi_request = MPI_REQUEST_NULL;
1057 }
1058 }
1059 }
1060 else{
1061 MpiLock::Section mls(m_mpi_lock);
1062 begin_time = MPI_Wtime();
1063 int rbuf_size = _checkSize(recv_buffer_size);
1064 m_mpi_prof->recv(recv_buffer, rbuf_size, data_type, i_proc, mpi_tag, m_communicator, &mpi_status);
1065 end_time = MPI_Wtime();
1066 }
1067 }
1068 else{
1069 {
1070 MpiLock::Section mls(m_mpi_lock);
1071 begin_time = MPI_Wtime();
1072 int rbuf_size = _checkSize(recv_buffer_size);
1073 m_mpi_prof->iRecv(recv_buffer, rbuf_size, data_type, i_proc, mpi_tag, m_communicator, &mpi_request);
1074 end_time = MPI_Wtime();
1075 ARCCORE_ADD_REQUEST(mpi_request);
1076 }
1077 if (m_is_trace){
1078 info() << "MPI Recv: recv after "
1079 << " request=" << mpi_request;
1080 }
1081 }
1082 double sr_time = (end_time-begin_time);
1083
1084 debug(Trace::High) << "MPI Recv: recv after " << recv_size
1085 << " time " << sr_time << " blocking " << is_blocked;
1086 m_stat->add(MpiInfo(eMpiName::Recv).name(),end_time-begin_time,recv_size);
1087 return buildRequest(ret,mpi_request);
1088}
1089
1090/*---------------------------------------------------------------------------*/
1091/*---------------------------------------------------------------------------*/
1092
1093void MpiAdapter::
1094probeRecvPack(UniqueArray<Byte>& recv_buffer,Int32 proc)
1095{
1096 double begin_time = MPI_Wtime();
1097 MPI_Status status;
1098 int recv_buffer_size = 0;
1099 _trace("MPI_Probe");
1100 m_mpi_prof->probe(proc, 101, m_communicator, &status);
1101 m_mpi_prof->getCount(&status, MPI_PACKED, &recv_buffer_size);
1102
1103 recv_buffer.resize(recv_buffer_size);
1104 m_mpi_prof->recv(recv_buffer.data(), recv_buffer_size, MPI_PACKED, proc, 101, m_communicator, &status);
1105
1106 double end_time = MPI_Wtime();
1107 Int64 recv_size = recv_buffer_size;
1108 double sr_time = (end_time-begin_time);
1109 debug(Trace::High) << "MPI probeRecvPack " << recv_size
1110 << " time " << sr_time;
1111 m_stat->add(MpiInfo(eMpiName::Recv).name(),end_time-begin_time,recv_size);
1112}
1113
1114/*---------------------------------------------------------------------------*/
1115/*---------------------------------------------------------------------------*/
1116
1117MessageSourceInfo MpiAdapter::
1118_buildSourceInfoFromStatus(const MPI_Status& mpi_status)
1119{
1120 // Récupère la taille en octet du message.
1121 MPI_Count message_size = 0;
1122 MPI_Get_elements_x(&mpi_status,MPI_BYTE,&message_size);
1123 MessageTag tag(mpi_status.MPI_TAG);
1124 MessageRank rank(mpi_status.MPI_SOURCE);
1125 return MessageSourceInfo(rank,tag,message_size);
1126}
1127
1128/*---------------------------------------------------------------------------*/
1129/*---------------------------------------------------------------------------*/
1130
1131MessageId MpiAdapter::
1132_probeMessage(MessageRank source,MessageTag tag,bool is_blocking)
1133{
1134 MPI_Status mpi_status;
1135 int has_message = 0;
1136 MPI_Message message;
1137 int ret = 0;
1138 int mpi_source = source.value();
1139 if (source.isProcNull())
1140 ARCCORE_THROW(NotImplementedException,"Probe with MPI_PROC_NULL");
1141 if (source.isNull() && !m_is_allow_null_rank_for_any_source)
1142 ARCCORE_FATAL("Can not use MPI_Mprobe with null rank. Use MessageRank::anySourceRank() instead");
1143 if (source.isNull() || source.isAnySource())
1144 mpi_source = MPI_ANY_SOURCE;
1145 int mpi_tag = tag.value();
1146 if (tag.isNull())
1147 mpi_tag = MPI_ANY_TAG;
1148 if (is_blocking){
1149 ret = MPI_Mprobe(mpi_source,mpi_tag,m_communicator,&message,&mpi_status);
1150 has_message = true;
1151 }
1152 else {
1153 ret = MPI_Improbe(mpi_source, mpi_tag, m_communicator, &has_message, &message, &mpi_status);
1154 }
1155 if (ret!=0)
1156 ARCCORE_FATAL("Error during call to MPI_Mprobe r={0}",ret);
1157 MessageId ret_message;
1158 if (has_message!=0){
1159 MessageSourceInfo si(_buildSourceInfoFromStatus(mpi_status));
1160 ret_message = MessageId(si,message);
1161 }
1162 return ret_message;
1163}
1164
1165/*---------------------------------------------------------------------------*/
1166/*---------------------------------------------------------------------------*/
1167
1168MessageId MpiAdapter::
1169probeMessage(PointToPointMessageInfo message)
1170{
1171 if (!message.isValid())
1172 return MessageId();
1173
1174 // Il faut avoir initialisé le message avec un couple (rang/tag).
1175 if (!message.isRankTag())
1176 ARCCORE_FATAL("Invalid message_info: message.isRankTag() is false");
1177
1178 return _probeMessage(message.destinationRank(),message.tag(),message.isBlocking());
1179}
1180
1181/*---------------------------------------------------------------------------*/
1182/*---------------------------------------------------------------------------*/
1183
1184MessageSourceInfo MpiAdapter::
1185_legacyProbeMessage(MessageRank source,MessageTag tag,bool is_blocking)
1186{
1187 MPI_Status mpi_status;
1188 int has_message = 0;
1189 int ret = 0;
1190 int mpi_source = source.value();
1191 if (source.isProcNull())
1192 ARCCORE_THROW(NotImplementedException,"Probe with MPI_PROC_NULL");
1193 if (source.isNull() && !m_is_allow_null_rank_for_any_source)
1194 ARCCORE_FATAL("Can not use MPI_Probe with null rank. Use MessageRank::anySourceRank() instead");
1195 if (source.isNull() || source.isAnySource())
1196 mpi_source = MPI_ANY_SOURCE;
1197 int mpi_tag = tag.value();
1198 if (tag.isNull())
1199 mpi_tag = MPI_ANY_TAG;
1200 if (is_blocking){
1201 ret = MPI_Probe(mpi_source,mpi_tag,m_communicator,&mpi_status);
1202 has_message = true;
1203 }
1204 else
1205 ret = MPI_Iprobe(mpi_source,mpi_tag,m_communicator,&has_message,&mpi_status);
1206 if (ret!=0)
1207 ARCCORE_FATAL("Error during call to MPI_Mprobe r={0}",ret);
1208 if (has_message!=0)
1209 return _buildSourceInfoFromStatus(mpi_status);
1210 return {};
1211}
1212
1213/*---------------------------------------------------------------------------*/
1214/*---------------------------------------------------------------------------*/
1215
1216MessageSourceInfo MpiAdapter::
1217legacyProbeMessage(PointToPointMessageInfo message)
1218{
1219 if (!message.isValid())
1220 return {};
1221
1222 // Il faut avoir initialisé le message avec un couple (rang/tag).
1223 if (!message.isRankTag())
1224 ARCCORE_FATAL("Invalid message_info: message.isRankTag() is false");
1225
1226 return _legacyProbeMessage(message.destinationRank(),message.tag(),message.isBlocking());
1227}
1228
1229/*---------------------------------------------------------------------------*/
1230/*---------------------------------------------------------------------------*/
1231//! Réception via MPI_Mrecv() ou MPI_Imrecv()
1232Request MpiAdapter::
1233directRecv(void* recv_buffer,Int64 recv_buffer_size,
1234 MessageId message,Int64 elem_size,MPI_Datatype data_type,
1235 bool is_blocked)
1236{
1237 MPI_Status mpi_status;
1238 MPI_Request mpi_request = MPI_REQUEST_NULL;
1239 MPI_Message mpi_message = (MPI_Message)message;
1240 int ret = 0;
1241 double begin_time = 0.0;
1242 double end_time = 0.0;
1243
1244 Int64 recv_size = recv_buffer_size * elem_size;
1245 if (m_is_trace){
1246 info() << "MPI_TRACE: MPI Mrecv: recv before "
1247 << " size=" << recv_size
1248 << " from_msg=" << message
1249 << " datatype=" << data_type
1250 << " blocking=" << is_blocked;
1251 }
1252 if (is_blocked){
1253 // si m_mpi_lock n'est pas nul, il faut
1254 // utiliser un MPI_IRecv suivi d'une boucle
1255 // active de MPI_Test pour eviter tout probleme
1256 // de dead lock.
1257 if (m_mpi_lock){
1258 {
1259 MpiLock::Section mls(m_mpi_lock);
1260 begin_time = MPI_Wtime();
1261 int rbuf_size = _checkSize(recv_buffer_size);
1262 MPI_Imrecv(recv_buffer,rbuf_size,data_type,&mpi_message,&mpi_request);
1263 //m_mpi_prof->iRecv(recv_buffer, rbuf_size, data_type, i_proc, mpi_tag, m_communicator, &mpi_request);
1264 }
1265 int is_finished = 0;
1266 MPI_Status mpi_status;
1267 while (is_finished==0){
1268 MpiLock::Section mls(m_mpi_lock);
1269 MPI_Request_get_status(mpi_request,&is_finished,&mpi_status);
1270 if (is_finished!=0){
1271 end_time = MPI_Wtime();
1272 m_mpi_prof->wait(&mpi_request, (MPI_Status *) MPI_STATUS_IGNORE);
1273 mpi_request = MPI_REQUEST_NULL;
1274 }
1275 }
1276 }
1277 else{
1278 MpiLock::Section mls(m_mpi_lock);
1279 begin_time = MPI_Wtime();
1280 int rbuf_size = _checkSize(recv_buffer_size);
1281 MPI_Mrecv(recv_buffer,rbuf_size,data_type,&mpi_message,&mpi_status);
1282 //m_mpi_prof->recv(recv_buffer, rbuf_size, data_type, i_proc, mpi_tag, m_communicator, &mpi_status);
1283 end_time = MPI_Wtime();
1284 }
1285 }
1286 else{
1287 {
1288 MpiLock::Section mls(m_mpi_lock);
1289 begin_time = MPI_Wtime();
1290 int rbuf_size = _checkSize(recv_buffer_size);
1291 //m_mpi_prof->iRecv(recv_buffer, rbuf_size, data_type, i_proc, mpi_tag, m_communicator, &mpi_request);
1292 ret = MPI_Imrecv(recv_buffer,rbuf_size,data_type,&mpi_message,&mpi_request);
1293 //m_mpi_prof->iRecv(recv_buffer, rbuf_size, data_type, i_proc, mpi_tag, m_communicator, &mpi_request);
1294 end_time = MPI_Wtime();
1295 ARCCORE_ADD_REQUEST(mpi_request);
1296 }
1297 if (m_is_trace){
1298 info() << "MPI Recv: recv after "
1299 << " request=" << mpi_request;
1300 }
1301 }
1302 double sr_time = (end_time-begin_time);
1303
1304 debug(Trace::High) << "MPI Recv: recv after " << recv_size
1305 << " time " << sr_time << " blocking " << is_blocked;
1306 m_stat->add(MpiInfo(eMpiName::Recv).name(),end_time-begin_time,recv_size);
1307 return buildRequest(ret,mpi_request);
1308}
1309
1310/*---------------------------------------------------------------------------*/
1311/*---------------------------------------------------------------------------*/
1312
1313Request MpiAdapter::
1314directRecvPack(void* recv_buffer,Int64 recv_buffer_size,
1315 Int32 proc,int mpi_tag,bool is_blocking)
1316{
1317 return directRecv(recv_buffer,recv_buffer_size,proc,1,MPI_PACKED,mpi_tag,is_blocking);
1318}
1319
1320/*---------------------------------------------------------------------------*/
1321/*---------------------------------------------------------------------------*/
1322
1323// FIXME: Implement direct method with MPI_STATUS_IGNORE
1324void MpiAdapter::
1325waitAllRequests(ArrayView<Request> requests)
1326{
1327 UniqueArray<bool> indexes(requests.size());
1328 UniqueArray<MPI_Status> mpi_status(requests.size());
1329 while (_waitAllRequestsMPI(requests, indexes, mpi_status)){
1330 ; // Continue tant qu'il y a des requêtes.
1331 }
1332}
1333
1334/*---------------------------------------------------------------------------*/
1335/*---------------------------------------------------------------------------*/
1336
1337// FIXME: Implement direct method with MPI_STATUS_IGNORE
1338void MpiAdapter::
1339waitSomeRequests(ArrayView<Request> requests,
1340 ArrayView<bool> indexes,
1341 bool is_non_blocking)
1342{
1343 UniqueArray<MPI_Status> mpi_status(requests.size());
1344 waitSomeRequestsMPI(requests, indexes, mpi_status, is_non_blocking);
1345}
1346
1347/*---------------------------------------------------------------------------*/
1348/*---------------------------------------------------------------------------*/
1349
1351{
1352 SubRequestInfo(Ref<ISubRequest> sr, Integer i, int source_rank, int source_tag)
1353 : sub_request(sr)
1354 , index(i)
1355 , mpi_source_rank(source_rank)
1356 , mpi_source_tag(source_tag)
1357 {}
1358
1359 Ref<ISubRequest> sub_request;
1360 Integer index = -1;
1361 int mpi_source_rank = MPI_PROC_NULL;
1362 int mpi_source_tag = 0;
1363};
1364
1365/*---------------------------------------------------------------------------*/
1366/*---------------------------------------------------------------------------*/
1367
1368bool MpiAdapter::
1369_handleEndRequests(ArrayView<Request> requests,ArrayView<bool> done_indexes,
1370 ArrayView<MPI_Status> status)
1371{
1372 UniqueArray<SubRequestInfo> new_requests;
1373 Integer size = requests.size();
1374 {
1375 MpiLock::Section mls(m_mpi_lock);
1376 for( Integer i=0; i<size; ++i ) {
1377 if (done_indexes[i]){
1378 // Attention à bien utiliser une référence sinon le reset ne
1379 // s'applique pas à la bonne variable
1380 Request& r = requests[i];
1381 // Note: la requête peut ne pas être valide (par exemple s'il s'agit)
1382 // d'une requête bloquante mais avoir tout de même une sous-requête.
1383 if (r.hasSubRequest()){
1384 if (m_is_trace)
1385 info() << "Done request with sub-request r=" << r << " mpi_r=" << r << " i=" << i
1386 << " source_rank=" << status[i].MPI_SOURCE
1387 << " source_tag=" << status[i].MPI_TAG;
1388 new_requests.add(SubRequestInfo(r.subRequest(), i, status[i].MPI_SOURCE, status[i].MPI_TAG));
1389 }
1390 if (r.isValid()){
1391 _removeRequest((MPI_Request)(r));
1392 r.reset();
1393 }
1394 }
1395 }
1396 }
1397
1398 // NOTE: les appels aux sous-requêtes peuvent générer d'autres requêtes.
1399 // Il faut faire attention à ne pas utiliser les sous-requêtes avec
1400 // le verrou actif.
1401 bool has_new_request = false;
1402 if (!new_requests.empty()){
1403 // Contient le status de la \a ième requête
1404 UniqueArray<MPI_Status> old_status(size);
1405 {
1406 Integer index = 0;
1407 for( Integer i=0; i<size; ++i ){
1408 if (done_indexes[i]){
1409 old_status[i] = status[index];
1410 ++index;
1411 }
1412 }
1413 }
1414 // S'il y a des nouvelles requêtes, il faut décaler les valeurs de 'status'
1415 for( SubRequestInfo& sri : new_requests ){
1416 Integer index = sri.index;
1417 if (m_is_trace)
1418 info() << "Before handle new request index=" << index
1419 << " sri.source_rank=" << sri.mpi_source_rank
1420 << " sri.source_tag=" << sri.mpi_source_tag;
1421 SubRequestCompletionInfo completion_info(MessageRank(old_status[index].MPI_SOURCE), MessageTag(old_status[index].MPI_TAG));
1422 Request r = sri.sub_request->executeOnCompletion(completion_info);
1423 if (m_is_trace)
1424 info() << "Handle new request index=" << index << " old_r=" << requests[index] << " new_r=" << r;
1425 // S'il y a une nouvelle requête, alors elle remplace
1426 // l'ancienne et donc il faut faire comme si
1427 // la requête d'origine n'est pas terminée.
1428 if (r.isValid()){
1429 has_new_request = true;
1430 requests[index] = r;
1431 done_indexes[index] = false;
1432 }
1433 }
1434 {
1435 Integer index = 0;
1436 for( Integer i=0; i<size; ++i ){
1437 if (done_indexes[i]){
1438 status[index] = old_status[i];
1439 ++index;
1440 }
1441 }
1442 }
1443
1444 }
1445 return has_new_request;
1446}
1447
1448/*---------------------------------------------------------------------------*/
1449/*---------------------------------------------------------------------------*/
1450
1451bool MpiAdapter::
1452_waitAllRequestsMPI(ArrayView<Request> requests,
1453 ArrayView<bool> indexes,
1454 ArrayView<MPI_Status> mpi_status)
1455{
1456 Integer size = requests.size();
1457 if (size==0)
1458 return false;
1459 //ATTENTION: Mpi modifie en retour de MPI_Waitall ce tableau
1460 UniqueArray<MPI_Request> mpi_request(size);
1461 for( Integer i=0; i<size; ++i ){
1462 mpi_request[i] = (MPI_Request)(requests[i]);
1463 }
1464 if (m_is_trace)
1465 info() << " MPI_waitall begin size=" << size;
1466 double diff_time = 0.0;
1467 if (m_mpi_lock){
1468 double begin_time = MPI_Wtime();
1469 for( Integer i=0; i<size; ++i ){
1470 MPI_Request request = (MPI_Request)(mpi_request[i]);
1471 int is_finished = 0;
1472 while (is_finished==0){
1473 MpiLock::Section mls(m_mpi_lock);
1474 m_mpi_prof->test(&request, &is_finished, (MPI_Status *) MPI_STATUS_IGNORE);
1475 }
1476 }
1477 double end_time = MPI_Wtime();
1478 diff_time = end_time - begin_time;
1479 }
1480 else{
1481 //TODO: transformer en boucle while et MPI_Testall si m_mpi_lock est non nul
1482 MpiLock::Section mls(m_mpi_lock);
1483 double begin_time = MPI_Wtime();
1484 m_mpi_prof->waitAll(size, mpi_request.data(), mpi_status.data());
1485 double end_time = MPI_Wtime();
1486 diff_time = end_time - begin_time;
1487 }
1488
1489 // Indique que chaque requête est traitée car on a fait un waitall.
1490 for( Integer i=0; i<size; ++i ){
1491 indexes[i] = true;
1492 }
1493
1494 bool has_new_request = _handleEndRequests(requests,indexes,mpi_status);
1495 if (m_is_trace)
1496 info() << " MPI_waitall end size=" << size;
1497 m_stat->add(MpiInfo(eMpiName::Waitall).name(),diff_time,size);
1498 return has_new_request;
1499}
1500
1501/*---------------------------------------------------------------------------*/
1502/*---------------------------------------------------------------------------*/
1503
1504void MpiAdapter::
1505waitSomeRequestsMPI(ArrayView<Request> requests,ArrayView<bool> indexes,
1506 ArrayView<MPI_Status> mpi_status,bool is_non_blocking)
1507{
1508 Integer size = requests.size();
1509 if (size==0)
1510 return;
1511 //TODO: utiliser des StackArray (quand ils seront disponibles...)
1512 UniqueArray<MPI_Request> mpi_request(size);
1513 UniqueArray<MPI_Request> saved_mpi_request(size);
1514 UniqueArray<int> completed_requests(size);
1515 int nb_completed_request = 0;
1516
1517 // Sauve la requete pour la desallouer dans m_allocated_requests,
1518 // car sa valeur ne sera plus valide après appel à MPI_Wait*
1519 for (Integer i = 0; i < size; ++i) {
1520 // Dans le cas où l'on appelle cette méthode plusieurs fois
1521 // avec le même tableau de requests, il se peut qu'il y ai des
1522 // requests invalides qui feront planter l'appel à MPI.
1523 if (!requests[i].isValid()) {
1524 saved_mpi_request[i] = MPI_REQUEST_NULL;
1525 }
1526 else {
1527 saved_mpi_request[i] = static_cast<MPI_Request>(requests[i]);
1528 }
1529 }
1530
1531 // N'affiche le debug que en mode bloquant ou si explicitement demandé pour
1532 // éviter trop de messages
1533 bool is_print_debug = m_is_trace || (!is_non_blocking);
1534 if (is_print_debug)
1535 debug() << "WaitRequestBegin is_non_blocking=" << is_non_blocking << " n=" << size;
1536
1537 double begin_time = MPI_Wtime();
1538
1539 try{
1540 if (is_non_blocking){
1541 _trace(MpiInfo(eMpiName::Testsome).name().localstr());
1542 {
1543 MpiLock::Section mls(m_mpi_lock);
1544 m_mpi_prof->testSome(size, saved_mpi_request.data(), &nb_completed_request,
1545 completed_requests.data(), mpi_status.data());
1546 }
1547 //If there is no active handle in the list, it returns outcount = MPI_UNDEFINED.
1548 if (nb_completed_request == MPI_UNDEFINED) // Si aucune requete n'etait valide.
1549 nb_completed_request = 0;
1550 if (is_print_debug)
1551 debug() << "WaitSomeRequestMPI: TestSome nb_completed=" << nb_completed_request;
1552 }
1553 else{
1554 _trace(MpiInfo(eMpiName::Waitsome).name().localstr());
1555 {
1556 // TODO: si le verrou existe, il faut faire une boucle de testSome() pour ne
1557 // pas bloquer.
1558 MpiLock::Section mls(m_mpi_lock);
1559 m_mpi_prof->waitSome(size, saved_mpi_request.data(), &nb_completed_request,
1560 completed_requests.data(), mpi_status.data());
1561 }
1562 // Il ne faut pas utiliser mpi_request[i] car il est modifié par Mpi
1563 // mpi_request[i] == MPI_REQUEST_NULL
1564 if (nb_completed_request == MPI_UNDEFINED) // Si aucune requete n'etait valide.
1565 nb_completed_request = 0;
1566 if (is_print_debug)
1567 debug() << "WaitSomeRequest nb_completed=" << nb_completed_request;
1568 }
1569 }
1570 catch(TimeoutException& ex)
1571 {
1572 std::ostringstream ostr;
1573 if (is_non_blocking)
1574 ostr << MpiInfo(eMpiName::Testsome).name();
1575 else
1576 ostr << MpiInfo(eMpiName::Waitsome).name();
1577 ostr << " size=" << size
1578 << " is_non_blocking=" << is_non_blocking;
1579 ex.setAdditionalInfo(ostr.str());
1580 throw;
1581 }
1582
1583 for( int z=0; z<nb_completed_request; ++z ){
1584 int index = completed_requests[z];
1585 if (is_print_debug)
1586 debug() << "Completed my_rank=" << m_comm_rank << " z=" << z
1587 << " index=" << index
1588 << " tag=" << mpi_status[z].MPI_TAG
1589 << " source=" << mpi_status[z].MPI_SOURCE;
1590
1591 indexes[index] = true;
1592 }
1593
1594 bool has_new_request = _handleEndRequests(requests,indexes,mpi_status);
1595 if (has_new_request){
1596 // Si on a de nouvelles requêtes, alors il est possible qu'aucune
1597 // requête n'aie aboutie. En cas de testSome, cela n'est pas grave.
1598 // En cas de waitSome, cela signifie qu'il faut attendre à nouveau.
1599 }
1600 double end_time = MPI_Wtime();
1601 m_stat->add(MpiInfo(eMpiName::Waitsome).name(),end_time-begin_time,size);
1602}
1603
1604/*---------------------------------------------------------------------------*/
1605/*---------------------------------------------------------------------------*/
1606
1607void MpiAdapter::
1608freeRequest(Request& request)
1609{
1610 if (!request.isValid()){
1611 warning() << "MpiAdapter::freeRequest() null request r=" << (MPI_Request)request;
1612 _checkFatalInRequest();
1613 return;
1614 }
1615 {
1616 MpiLock::Section mls(m_mpi_lock);
1617
1618 auto mr = (MPI_Request)request;
1619 _removeRequest(mr);
1620 MPI_Request_free(&mr);
1621 }
1622 request.reset();
1623}
1624
1625/*---------------------------------------------------------------------------*/
1626/*---------------------------------------------------------------------------*/
1627
1628bool MpiAdapter::
1629testRequest(Request& request)
1630{
1631 // Il est autorisé par MPI de faire un test avec une requête nulle.
1632 if (!request.isValid())
1633 return true;
1634
1635 auto mr = (MPI_Request)request;
1636 int is_finished = 0;
1637
1638 {
1639 MpiLock::Section mls(m_mpi_lock);
1640
1641 // Il faut d'abord recuperer l'emplacement de la requete car si elle
1642 // est finie, elle sera automatiquement libérée par MPI lors du test
1643 // et du coup on ne pourra plus la supprimer
1644 RequestSet::Iterator request_iter = m_request_set->findRequest(mr);
1645
1646 m_mpi_prof->test(&mr, &is_finished, (MPI_Status *) MPI_STATUS_IGNORE);
1647 //info() << "** TEST REQUEST r=" << mr << " is_finished=" << is_finished;
1648 if (is_finished!=0){
1649 m_request_set->removeRequest(request_iter);
1650 if (request.hasSubRequest())
1651 ARCCORE_THROW(NotImplementedException,"SubRequest support");
1652 request.reset();
1653 return true;
1654 }
1655 }
1656
1657 return false;
1658}
1659
1660/*---------------------------------------------------------------------------*/
1661/*---------------------------------------------------------------------------*/
1662/*!
1663 * \warning Cette fonction doit etre appelee avec le verrou mpi_lock actif.
1664 */
1665void MpiAdapter::
1666_addRequest(MPI_Request request)
1667{
1668 m_request_set->addRequest(request);
1669}
1670
1671/*---------------------------------------------------------------------------*/
1672/*---------------------------------------------------------------------------*/
1673/*!
1674 * \warning Cette fonction doit etre appelee avec le verrou mpi_lock actif.
1675 */
1676void MpiAdapter::
1677_removeRequest(MPI_Request request)
1678{
1679 m_request_set->removeRequest(request);
1680}
1681
1682/*---------------------------------------------------------------------------*/
1683/*---------------------------------------------------------------------------*/
1684
1685void MpiAdapter::
1686enableDebugRequest(bool enable_debug_request)
1687{
1688 m_stat->enable(enable_debug_request);
1689 //if (!m_enable_debug_request)
1690 //info() << "WARNING: Mpi adpater debug request is disabled (multi-threading)";
1691}
1692
1693/*---------------------------------------------------------------------------*/
1694/*---------------------------------------------------------------------------*/
1695
1696void MpiAdapter::
1697_checkFatalInRequest()
1698{
1699 if (isRequestErrorAreFatal())
1700 ARCCORE_FATAL("Error in requests management");
1701}
1702
1703/*---------------------------------------------------------------------------*/
1704/*---------------------------------------------------------------------------*/
1705
1706void MpiAdapter::
1707setMpiProfiling(IMpiProfiling* mpi_profiling)
1708{
1709 m_mpi_prof = mpi_profiling;
1710}
1711
1712/*---------------------------------------------------------------------------*/
1713/*---------------------------------------------------------------------------*/
1714
1715IMpiProfiling* MpiAdapter::
1716getMpiProfiling() const
1717{
1718 return m_mpi_prof;
1719}
1720
1721/*---------------------------------------------------------------------------*/
1722/*---------------------------------------------------------------------------*/
1723
1724void MpiAdapter::
1725setProfiler(IProfiler* profiler)
1726{
1727 if (!profiler){
1728 m_mpi_prof = nullptr;
1729 return;
1730 }
1731
1732 IMpiProfiling* p = dynamic_cast<IMpiProfiling*>(profiler);
1733 if (!p)
1734 ARCCORE_FATAL("Invalid profiler. Profiler has to implemented interface 'IMpiProfiling'");
1735 m_mpi_prof = p;
1736}
1737
1738/*---------------------------------------------------------------------------*/
1739/*---------------------------------------------------------------------------*/
1740
1741IProfiler* MpiAdapter::
1742profiler() const
1743{
1744 return m_mpi_prof;
1745}
1746
1747/*---------------------------------------------------------------------------*/
1748/*---------------------------------------------------------------------------*/
1749
1750} // End namespace Arccore::MessagePassing::Mpi
1751
1752/*---------------------------------------------------------------------------*/
1753/*---------------------------------------------------------------------------*/
bool empty() const
Capacité (nombre d'éléments alloués) du vecteur.
Vue modifiable d'un tableau d'un type T.
constexpr Integer size() const noexcept
Retourne la taille du tableau.
void add(ConstReferenceType val)
Ajoute l'élément val à la fin du tableau.
Interface du gestionnaire de traces.
Iterator findRequest(MPI_Request request)
Vérifie que la requête est dans la liste.
bool m_no_check_request
Vrai si on vérifie pas les requêtes.
static MpiMessagePassingMng * create(MPI_Comm comm, bool clean_comm=false)
Créé un gestionnaire associé au communicateur comm.
Requête d'un message.
Definition Request.h:77
Référence à une instance.
Chaîne de caractères unicode.
TraceAccessor(ITraceMng *m)
Construit un accesseur via le gestionnaire de trace m.
TraceMessage info() const
Flot pour un message d'information.
TraceMessage error() const
Flot pour un message d'erreur.
Vecteur 1D de données avec sémantique par valeur (style STL).
ARCCORE_BASE_EXPORT IStackTraceService * getStackTraceService()
Service utilisé pour obtenir la pile d'appel.
ARCCORE_BASE_EXPORT String getStackTrace()
Retourne une chaîne de caractere contenant la pile d'appel.
ARCCORE_BASE_EXPORT String getEnvironmentVariable(const String &name)
Variable d'environnement du nom name.
std::int64_t Int64
Type entier signé sur 64 bits.
Int32 Integer
Type représentant un entier.
auto makeRef(InstanceType *t) -> Ref< InstanceType >
Créé une référence sur un pointeur.
ARCCORE_BASE_EXPORT bool arccoreIsCheck()
Vrai si on est en mode vérification.
std::int32_t Int32
Type entier signé sur 32 bits.