Arcane  v3.16.0.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
HybridMessageQueue.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2025 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* HybridMessageQueue.cc (C) 2000-2025 */
9/* */
10/* File de messages pour une implémentation MPI/Thread. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/utils/ArcanePrecomp.h"
15
16#include "arcane/utils/Array.h"
17#include "arcane/utils/FatalErrorException.h"
18#include "arcane/utils/NotImplementedException.h"
19#include "arcane/utils/NotSupportedException.h"
20#include "arcane/utils/ArgumentException.h"
21#include "arcane/utils/TraceInfo.h"
22#include "arcane/utils/ITraceMng.h"
23#include "arcane/utils/ValueConvert.h"
24
25#include "arcane/parallel/mpithread/HybridMessageQueue.h"
26#include "arcane/parallel/mpi/MpiParallelMng.h"
27
28#include "arccore/message_passing_mpi/internal/MpiAdapter.h"
29
30/*---------------------------------------------------------------------------*/
31/*---------------------------------------------------------------------------*/
32
33// Macro pour afficher des messages pour debug
34#define TRACE_DEBUG(needed_debug_level,format_str,...) \
35 if (m_debug_level>=needed_debug_level){ \
36 info() << String::format("Hybrid " format_str,__VA_ARGS__);\
37 traceMng()->flush();\
38 }
39
40/*---------------------------------------------------------------------------*/
41/*---------------------------------------------------------------------------*/
42
44{
45
46/*---------------------------------------------------------------------------*/
47/*---------------------------------------------------------------------------*/
48
49HybridMessageQueue::
50HybridMessageQueue(ISharedMemoryMessageQueue* thread_queue,MpiParallelMng* mpi_pm,
51 Int32 local_nb_rank)
52: TraceAccessor(mpi_pm->traceMng())
53, m_thread_queue(thread_queue)
54, m_mpi_parallel_mng(mpi_pm)
55, m_mpi_adapter(mpi_pm->adapter())
56, m_local_nb_rank(local_nb_rank)
57, m_rank_tag_builder(local_nb_rank)
58, m_debug_level(0)
59{
60 if (auto v = Convert::Type<Int32>::tryParseFromEnvironment("ARCCORE_ALLOW_NULL_RANK_FOR_MPI_ANY_SOURCE", true))
61 m_is_allow_null_rank_for_any_source = v.value() != 0;
62}
63
64/*---------------------------------------------------------------------------*/
65/*---------------------------------------------------------------------------*/
66
67void HybridMessageQueue::
68_checkValidRank(MessageRank rank)
69{
70 if (rank.isNull())
71 ARCANE_THROW(ArgumentException,"null rank");
72}
73
74/*---------------------------------------------------------------------------*/
75/*---------------------------------------------------------------------------*/
76
77void HybridMessageQueue::
78_checkValidSource(const PointToPointMessageInfo& message)
79{
80 MessageRank source = message.emiterRank();
81 if (source.isNull())
82 ARCANE_THROW(ArgumentException,"null source");
83}
84
85/*---------------------------------------------------------------------------*/
86/*---------------------------------------------------------------------------*/
87
88PointToPointMessageInfo HybridMessageQueue::
89_buildSharedMemoryMessage(const PointToPointMessageInfo& message,
90 const SourceDestinationFullRankInfo& fri)
91{
92 PointToPointMessageInfo p2p_message(message);
93 p2p_message.setEmiterRank(fri.source().localRank());
94 p2p_message.setDestinationRank(fri.destination().localRank());
95 return p2p_message;
96}
97
98/*---------------------------------------------------------------------------*/
99/*---------------------------------------------------------------------------*/
100
101PointToPointMessageInfo HybridMessageQueue::
102_buildMPIMessage(const PointToPointMessageInfo& message,
103 const SourceDestinationFullRankInfo& fri)
104{
105 PointToPointMessageInfo p2p_message(message);
106 p2p_message.setEmiterRank(fri.source().mpiRank());
107 p2p_message.setDestinationRank(fri.destination().mpiRank());
108 return p2p_message;
109}
110
111/*---------------------------------------------------------------------------*/
112/*---------------------------------------------------------------------------*/
113
114void HybridMessageQueue::
115waitAll(ArrayView<Request> requests)
116{
117 // TODO: fusionner ce qui est possible avec waitSome.
118 Integer nb_request = requests.size();
119 UniqueArray<Request> mpi_requests;
120 UniqueArray<Request> thread_requests;
121 for( Integer i=0; i<nb_request; ++i ){
122 Request r = requests[i];
123 if (!r.isValid())
124 continue;
125 IRequestCreator* creator = r.creator();
126 if (creator==m_mpi_adapter) {
127 mpi_requests.add(r);
128 }
129 else if (creator==m_thread_queue)
130 thread_requests.add(r);
131 else
132 ARCANE_FATAL("Invalid IRequestCreator");
133 }
134
135 if (mpi_requests.size()!=0)
136 m_mpi_adapter->waitAllRequests(mpi_requests);
137 if (thread_requests.size()!=0)
138 m_thread_queue->waitAll(thread_requests);
139
140 // On remet à zero toutes les requetes pour pouvoir rappeler les fonctions Wait !
141 for( Request r : requests )
142 r.reset();
143}
144
145/*---------------------------------------------------------------------------*/
146/*---------------------------------------------------------------------------*/
147
148void HybridMessageQueue::
149waitSome(Int32 rank,ArrayView<Request> requests,ArrayView<bool> requests_done,
150 bool is_non_blocking)
151{
152 Integer nb_done = 0;
153 do{
154 TRACE_DEBUG(2,"Hybrid: wait some rank={0} requests n={1} nb_done={2} is_non_blocking={3}",
155 rank,requests.size(),nb_done,is_non_blocking);
156 nb_done = _testOrWaitSome(rank,requests,requests_done);
157 if (is_non_blocking || nb_done==(-1))
158 break;
159 } while (nb_done==0);
160}
161
162/*---------------------------------------------------------------------------*/
163/*---------------------------------------------------------------------------*/
164
165Integer HybridMessageQueue::
166_testOrWaitSome(Int32 rank,ArrayView<Request> requests,ArrayView<bool> requests_done)
167{
168 Integer nb_request = requests.size();
169 TRACE_DEBUG(2,"Hybrid: wait some rank={0} requests n={1}",rank,nb_request);
170
171 // Il faut séparer les requêtes MPI des requêtes en mémoire partagée.
172 // TODO: avec la notion de requête généralisé de MPI, il serait peut-être
173 // possible de fusionner cela.
174 UniqueArray<Request> mpi_requests;
175 UniqueArray<Request> shm_requests;
176 // Indice des requêtes dans la liste globale \a requests
177 UniqueArray<Integer> mpi_requests_index;
178 UniqueArray<Integer> shm_requests_index;
179
180 Integer nb_done = 0;
181 for( Integer i=0; i<nb_request; ++i ){
182 Request r = requests[i];
183 if (!r.isValid())
184 continue;
185 IRequestCreator* creator = r.creator();
186 if (creator==m_mpi_adapter){
187 mpi_requests.add(r);
188 mpi_requests_index.add(i);
189 }
190 else if (creator==m_thread_queue){
191 shm_requests.add(r);
192 shm_requests_index.add(i);
193 }
194 else
195 ARCANE_FATAL("Invalid IRequestCreator");
196 }
197
198 TRACE_DEBUG(2,"Hybrid: wait some rank={0} nb_mpi={1} nb_shm={2}",
199 rank,mpi_requests.size(),shm_requests.size());
200
201 // S'il n'y a aucune requête valide, inutile d'aller plus loin.
202 // Il ne faut cependant pas retourner '0' car on doit faire
203 // la différence entre aucune requête disponible pour le mode 'is_non_blocking'
204 // et aucune requête valide.
205 if (mpi_requests.size()==0 && shm_requests.size()==0)
206 return (-1);
207
208 // Même en mode waitSome, il faut utiliser le mode non bloquant car
209 // on ne sait pas entre les threads et MPI quelles seront les requêtes
210 // qui sont disponibles
211
212 // Les requêtes ont pu être modifiées si elles ne sont pas terminées.
213 // Il faut donc les remettre dans la liste \a requests. Dans notre
214 // cas il suffit uniquement de recopier la nouvelle valeur dans
215 // l'instance correspondante de HybridMessageRequest.
216 UniqueArray<bool> mpi_done_indexes;
217 Integer nb_mpi_request = mpi_requests.size();
218
219 if (nb_mpi_request!=0){
220 mpi_done_indexes.resize(nb_mpi_request);
221 mpi_done_indexes.fill(false);
222 m_mpi_adapter->waitSomeRequests(mpi_requests,mpi_done_indexes,true);
223 TRACE_DEBUG(2,"Hybrid: MPI wait some requests n={0} after=",nb_mpi_request,mpi_done_indexes);
224 for( Integer i=0; i<nb_mpi_request; ++i ){
225 Integer index_in_global = mpi_requests_index[i];
226 if (mpi_done_indexes[i]){
227 requests_done[index_in_global] = true;
228 requests[index_in_global].reset();
229 ++nb_done;
230 TRACE_DEBUG(1,"MPI rank={0} set done i={1} in_global={2}",
231 rank,i,index_in_global);
232 }
233 else
234 requests[index_in_global] = mpi_requests[i];
235 }
236 }
237
238 UniqueArray<bool> shm_done_indexes;
239 Integer nb_shm_request = shm_requests.size();
240 TRACE_DEBUG(2,"SHM wait some requests n={0}",nb_shm_request);
241 if (shm_requests.size()!=0){
242 shm_done_indexes.resize(nb_shm_request);
243 shm_done_indexes.fill(false);
244 m_thread_queue->waitSome(rank,shm_requests,shm_done_indexes,true);
245 for( Integer i=0; i<nb_shm_request; ++i ){
246 Integer index_in_global = shm_requests_index[i];
247 if (shm_done_indexes[i]){
248 requests_done[index_in_global] = true;
249 requests[index_in_global].reset();
250 ++nb_done;
251 TRACE_DEBUG(1,"SHM rank={0} set done i={1} in_global={2}",
252 rank,i,index_in_global);
253 }
254 else
255 requests[index_in_global] = shm_requests[i];
256 }
257 }
258 return nb_done;
259}
260
261/*---------------------------------------------------------------------------*/
262/*---------------------------------------------------------------------------*/
263
264Request HybridMessageQueue::
265_addReceiveRankTag(const PointToPointMessageInfo& message,ReceiveBufferInfo buf_info)
266{
267 // On ne supporte pas les réceptions avec ANY_RANK car on ne sait pas
268 // s'il faut faire un 'receive' avec MPI ou en mémoire partagée.
269 // Dans ce cas, l'utilisateur doit plutôt utiliser probe()
270 // pour savoir ce qui est disponible et envoyer faire un addReceive()
271 // avec un MessageId.
272 if (message.destinationRank().isNull())
273 ARCANE_THROW(NotSupportedException,"Receive with any rank. Use probe() and MessageId instead");
274
275 SourceDestinationFullRankInfo fri = _getFullRankInfo(message);
276 bool is_same_mpi_rank = fri.isSameMpiRank();
277
278 if (is_same_mpi_rank){
279 TRACE_DEBUG(1,"** MPITMQ SHM ADD RECV S queue={0} message={1}",this,message);
280 PointToPointMessageInfo p2p_message(_buildSharedMemoryMessage(message,fri));
281 return m_thread_queue->addReceive(p2p_message,buf_info);
282 }
283
284 ISerializer* serializer = buf_info.serializer();
285 if (serializer){
286 TRACE_DEBUG(1,"** MPITMQ MPI ADD RECV S queue={0} message={1}",this,message);
287 PointToPointMessageInfo p2p_message(_buildMPIMessage(message,fri));
288 p2p_message.setTag(m_rank_tag_builder.tagForReceive(MessageTag(message.tag()),fri));
289 return m_mpi_parallel_mng->receiveSerializer(serializer,p2p_message);
290 }
291 else{
292 ByteSpan buf = buf_info.memoryBuffer();
293 Int64 size = buf.size();
294
295 TRACE_DEBUG(1,"** MPITMQ THREAD ADD RECV B queue={0} message={1} size={2} same_mpi?={3}",
296 this,message,size,fri.isSameMpiRank());
297
298 //TODO: utiliser le vrai MPI_Datatype
299 MPI_Datatype char_data_type = MpiBuiltIn::datatype(char());
300 MessageTag mpi_tag = m_rank_tag_builder.tagForReceive(message.tag(),fri);
301 Request r = m_mpi_adapter->directRecv(buf.data(),size,fri.destination().mpiRankValue(),sizeof(char),
302 char_data_type,mpi_tag.value(),false);
303 return r;
304 }
305}
306
307/*---------------------------------------------------------------------------*/
308/*---------------------------------------------------------------------------*/
309
310Request HybridMessageQueue::
311_addReceiveMessageId(const PointToPointMessageInfo& message,ReceiveBufferInfo buf_info)
312{
313 MessageId message_id = message.messageId();
314 MessageId::SourceInfo si(message_id.sourceInfo());
315
316 if (si.rank()!=message.destinationRank())
317 ARCANE_FATAL("Incohence between messsage_id rank and destination rank x1={0} x2={1}",
318 si.rank(),message.destinationRank());
319
320 TRACE_DEBUG(1,"** MPITMQ ADD_RECV (message_id) queue={0} message={1}",
321 this,message);
322
323 SourceDestinationFullRankInfo fri = _getFullRankInfo(message);
324 if (fri.isSameMpiRank()){
325 PointToPointMessageInfo p2p_message(_buildSharedMemoryMessage(message,fri));
326 return m_thread_queue->addReceive(p2p_message,buf_info);
327 }
328
329 TRACE_DEBUG(1,"** MPITMQ MPI ADD RECV (message_id) queue={0} message={1}",this,message);
330
331 ISerializer* serializer = buf_info.serializer();
332 if (serializer){
333 PointToPointMessageInfo p2p_message(_buildMPIMessage(message,fri));
334 //p2p_message.setTag(m_rank_tag_builder.tagForReceive(message.tag(),fri));
335 TRACE_DEBUG(1,"** MPI ADD RECV Serializer (message_id) message={0} p2p_message={1}",
336 message,p2p_message);
337 return m_mpi_parallel_mng->receiveSerializer(serializer,p2p_message);
338 }
339 else{
340 ByteSpan buf = buf_info.memoryBuffer();
341 Int64 size = buf.size();
342
343 // TODO: utiliser le vrai MPI_Datatype
344 MPI_Datatype char_data_type = MpiBuiltIn::datatype(char());
345 MessageId mpi_message(message_id);
346 MessageId::SourceInfo mpi_si(si);
347 mpi_si.setRank(fri.destination().mpiRank());
348 mpi_message.setSourceInfo(mpi_si);
349 return m_mpi_adapter->directRecv(buf.data(),size,mpi_message,sizeof(char),
350 char_data_type,false);
351 }
352}
353
354/*---------------------------------------------------------------------------*/
355/*---------------------------------------------------------------------------*/
356
357Request HybridMessageQueue::
358addReceive(const PointToPointMessageInfo& message,ReceiveBufferInfo buf)
359{
360 _checkValidSource(message);
361
362 if (!message.isValid())
363 return Request();
364
365 if (message.isRankTag())
366 return _addReceiveRankTag(message,buf);
367
368 if (message.isMessageId())
369 return _addReceiveMessageId(message,buf);
370
371 ARCANE_THROW(NotSupportedException,"Invalid message_info");
372}
373
374/*---------------------------------------------------------------------------*/
375/*---------------------------------------------------------------------------*/
376
377Request HybridMessageQueue::
378addSend(const PointToPointMessageInfo& message,SendBufferInfo buf_info)
379{
380 if (!message.isValid())
381 return Request();
382 if (message.destinationRank().isNull())
383 ARCCORE_FATAL("Null destination");
384 if (!message.isRankTag())
385 ARCCORE_FATAL("Invalid message_info for sending: message.isRankTag() is false");
386
387 SourceDestinationFullRankInfo fri = _getFullRankInfo(message);
388
389 // Même rang donc envoie via la file en mémoire partagée.
390 if (fri.isSameMpiRank()){
391 TRACE_DEBUG(1,"** MPITMQ SHM ADD SEND S queue={0} message={1}",this,message);
392 PointToPointMessageInfo p2p_message(_buildSharedMemoryMessage(message,fri));
393 return m_thread_queue->addSend(p2p_message,buf_info);
394 }
395
396 // Envoie via MPI
397 MessageTag mpi_tag = m_rank_tag_builder.tagForSend(message.tag(),fri);
398 const ISerializer* serializer = buf_info.serializer();
399 if (serializer){
400 PointToPointMessageInfo p2p_message(_buildMPIMessage(message,fri));
401 p2p_message.setTag(mpi_tag);
402 TRACE_DEBUG(1,"** MPITMQ MPI ADD SEND Serializer queue={0} message={1} p2p_message={2}",
403 this,message,p2p_message);
404 return m_mpi_parallel_mng->sendSerializer(serializer,p2p_message);
405 }
406 else{
407 ByteConstSpan buf = buf_info.memoryBuffer();
408 Int64 size = buf.size();
409
410 // TODO: utiliser m_mpi_parallel_mng mais il faut faire attention
411 // d'utiliser le mode bloquant
412 // TODO: utiliser le vrai MPI_Datatype
413 MPI_Datatype char_data_type = MpiBuiltIn::datatype(char());
414
415 TRACE_DEBUG(1,"** MPITMQ MPI ADD SEND B queue={0} message={1} size={2} mpi_tag={3} mpi_rank={4}",
416 this,message,size,mpi_tag,fri.destination().mpiRank());
417
418 return m_mpi_adapter->directSend(buf.data(),size,fri.destination().mpiRankValue(),
419 sizeof(char),char_data_type,mpi_tag.value(),false);
420 }
421}
422
423/*---------------------------------------------------------------------------*/
424/*---------------------------------------------------------------------------*/
425
426MP::MessageId HybridMessageQueue::
427probe(const MP::PointToPointMessageInfo& message)
428{
429 TRACE_DEBUG(1,"Probe msg='{0}' queue={1} is_valid={2}",
430 message,this,message.isValid());
431
432 MessageRank orig = message.emiterRank();
433 if (orig.isNull())
434 ARCANE_THROW(ArgumentException,"null sender");
435
436 if (!message.isValid())
437 return MessageId();
438
439 // Il faut avoir initialisé le message avec un couple (rang/tag).
440 if (!message.isRankTag())
441 ARCCORE_FATAL("Invalid message_info: message.isRankTag() is false");
442
443 MessageRank dest = message.destinationRank();
444 MessageTag user_tag = message.tag();
445 bool is_blocking = message.isBlocking();
446 if (is_blocking)
447 ARCANE_THROW(NotImplementedException,"blocking probe");
448 if (user_tag.isNull())
449 ARCANE_THROW(NotImplementedException,"probe with ANY_TAG");
450 FullRankInfo orig_fri = m_rank_tag_builder.rank(orig);
451 FullRankInfo dest_fri = m_rank_tag_builder.rank(dest);
452 MessageId message_id;
453 Int32 found_dest = dest.value();
454 const bool is_any_source = dest.isNull() || dest.isAnySource();
455 if (dest.isNull() && !m_is_allow_null_rank_for_any_source)
456 ARCANE_FATAL("Can not use probe() with null rank. Use MessageRank::anySourceRank() instead");
457 if (is_any_source) {
458 // Comme on ne sait pas de qui on va recevoir, il faut tester à la
459 // fois la file de thread et via MPI.
460 MP::PointToPointMessageInfo p2p_message(message);
461 p2p_message.setEmiterRank(orig_fri.localRank());
462 message_id = m_thread_queue->probe(p2p_message);
463 if (message_id.isValid()){
464 // On a trouvé un message dans la liste de thread.
465 // Comme on est dans notre liste de thread, le
466 // rang global est notre rang MPI + le rang local trouvé.
467 found_dest = orig_fri.mpiRankValue()*m_local_nb_rank + message_id.sourceInfo().rank().value();
468 TRACE_DEBUG(2,"Probe with null_rank (thread) orig={0} found_dest={1} tag={2}",
469 orig,found_dest,user_tag);
470 }
471 else{
472 // Recherche via MPI.
473 // La difficulté est que le rang local du PE originaire du message
474 // est codé dans le tag et qu'on ne connait pas le PE originaire.
475 // Il faut donc tester tous les tag potentiels. Leur nombre est
476 // égal à 'm_nb_local_rank'.
477 for( Integer z=0, zn=m_local_nb_rank; z<zn; ++z ){
478 MP::PointToPointMessageInfo mpi_message(message);
479 MessageTag mpi_tag = m_rank_tag_builder.tagForReceive(user_tag,orig_fri.localRank(),MessageRank(z));
480 mpi_message.setTag(mpi_tag);
481 TRACE_DEBUG(2,"Probe with null_rank orig={0} dest={1} tag={2}",orig,dest,mpi_tag);
482 message_id = m_mpi_adapter->probeMessage(mpi_message);
483 if (message_id.isValid()){
484 // On a trouvé un message MPI. Il faut extraire du tag le
485 // rang local. Le rang MPI est celui dans le message.
486 MessageRank mpi_rank = message_id.sourceInfo().rank();
487 MessageTag ret_tag = message_id.sourceInfo().tag();
488 Int32 local_rank = m_rank_tag_builder.getReceiveRankFromTag(ret_tag);
489 found_dest = mpi_rank.value()*m_local_nb_rank + local_rank;
490 TRACE_DEBUG(2,"Probe null rank found mpi_rank={0} local_rank={1} tag={2}",
491 ret_tag,mpi_rank,local_rank,ret_tag);
492 break;
493 }
494 }
495 }
496 }
497 else{
498 // Il faut convertir le rang `dest` en le rang attendu par la file de thread
499 // ou par MPI.
500 if (orig_fri.mpiRank()==dest_fri.mpiRank()){
501 MP::PointToPointMessageInfo p2p_message(message);
502 p2p_message.setDestinationRank(MP::MessageRank(dest_fri.localRank()));
503 p2p_message.setEmiterRank(MessageRank(orig_fri.localRank()));
504 message_id = m_thread_queue->probe(p2p_message);
505 }
506 else{
507 MP::PointToPointMessageInfo mpi_message(message);
508 MessageTag mpi_tag = m_rank_tag_builder.tagForReceive(user_tag,orig_fri,dest_fri);
509 mpi_message.setTag(mpi_tag);
510 mpi_message.setDestinationRank(MP::MessageRank(dest_fri.mpiRank()));
511 TRACE_DEBUG(2,"Probe orig={0} dest={1} mpi_tag={2} user_tag={3}",orig,dest,mpi_tag,user_tag);
512 message_id = m_mpi_adapter->probeMessage(mpi_message);
513 }
514 }
515 if (message_id.isValid()){
516 // Il faut transformer le rang local retourné par les méthodes précédentes
517 // en un rang global
518 MessageId::SourceInfo si = message_id.sourceInfo();
519 si.setRank(MessageRank(found_dest));
520 message_id.setSourceInfo(si);
521 }
522 return message_id;
523}
524
525/*---------------------------------------------------------------------------*/
526/*---------------------------------------------------------------------------*/
527
528MP::MessageSourceInfo HybridMessageQueue::
529legacyProbe(const MP::PointToPointMessageInfo& message)
530{
531 TRACE_DEBUG(1,"LegacyProbe msg='{0}' queue={1} is_valid={2}",
532 message,this,message.isValid());
533
534 MessageRank orig = message.emiterRank();
535 if (orig.isNull())
536 ARCANE_THROW(ArgumentException,"null sender");
537
538 if (!message.isValid())
539 return {};
540
541 // Il faut avoir initialisé le message avec un couple (rang/tag).
542 if (!message.isRankTag())
543 ARCCORE_FATAL("Invalid message_info: message.isRankTag() is false");
544
545 const MessageRank dest = message.destinationRank();
546 const MessageTag user_tag = message.tag();
547 const bool is_blocking = message.isBlocking();
548 if (is_blocking)
549 ARCANE_THROW(NotImplementedException,"blocking probe");
550 if (user_tag.isNull())
551 ARCANE_THROW(NotImplementedException,"legacyProbe with ANY_TAG");
552 FullRankInfo orig_fri = m_rank_tag_builder.rank(orig);
553 FullRankInfo dest_fri = m_rank_tag_builder.rank(dest);
554 MP::MessageSourceInfo message_source_info;
555 Int32 found_dest = dest.value();
556 const bool is_any_source = dest.isNull() || dest.isAnySource();
557 if (dest.isNull() && !m_is_allow_null_rank_for_any_source)
558 ARCANE_FATAL("Can not use legacyProbe() with null rank. Use MessageRank::anySourceRank() instead");
559 if (is_any_source) {
560 // Comme on ne sait pas de qui on va recevoir, il faut tester à la
561 // fois la file de thread et via MPI.
562 MP::PointToPointMessageInfo p2p_message(message);
563 p2p_message.setEmiterRank(orig_fri.localRank());
564 message_source_info = m_thread_queue->legacyProbe(p2p_message);
565 if (message_source_info.isValid()){
566 // On a trouvé un message dans la liste de thread.
567 // Comme on est dans notre liste de thread, le
568 // rang global est notre rang MPI + le rang local trouvé.
569 found_dest = orig_fri.mpiRankValue()*m_local_nb_rank + message_source_info.rank().value();
570 TRACE_DEBUG(2,"LegacyProbe with null_rank (thread) orig={0} found_dest={1} tag={2}",
571 orig,found_dest,user_tag);
572 }
573 else{
574 // Recherche via MPI.
575 // La difficulté est que le rang local du PE originaire du message
576 // est codé dans le tag et qu'on ne connait pas le PE originaire.
577 // Il faut donc tester tous les tag potentiels. Leur nombre est
578 // égal à 'm_nb_local_rank'.
579 for( Integer z=0, zn=m_local_nb_rank; z<zn; ++z ){
580 MP::PointToPointMessageInfo mpi_message(message);
581 MessageTag mpi_tag = m_rank_tag_builder.tagForReceive(user_tag,orig_fri.localRank(),MessageRank(z));
582 mpi_message.setTag(mpi_tag);
583 TRACE_DEBUG(2,"LegacyProbe with null_rank orig={0} dest={1} tag={2}",orig,dest,mpi_tag);
584 message_source_info = m_mpi_adapter->legacyProbeMessage(mpi_message);
585 if (message_source_info.isValid()){
586 // On a trouvé un message MPI. Il faut extraire du tag le
587 // rang local. Le rang MPI est celui dans le message.
588 MessageRank mpi_rank = message_source_info.rank();
589 MessageTag ret_tag = message_source_info.tag();
590 Int32 local_rank = m_rank_tag_builder.getReceiveRankFromTag(ret_tag);
591 found_dest = mpi_rank.value()*m_local_nb_rank + local_rank;
592 TRACE_DEBUG(2,"LegacyProbe null rank found mpi_rank={0} local_rank={1} tag={2}",
593 ret_tag,mpi_rank,local_rank,ret_tag);
594 // Remet le tag d'origine pour pouvoir faire un receive avec.
595 message_source_info.setTag(user_tag);
596 break;
597 }
598 }
599 }
600 }
601 else{
602 // Il faut convertir le rang `dest` en le rang attendu par la file de thread
603 // ou par MPI.
604 if (orig_fri.mpiRank()==dest_fri.mpiRank()){
605 MP::PointToPointMessageInfo p2p_message(message);
606 p2p_message.setDestinationRank(MP::MessageRank(dest_fri.localRank()));
607 p2p_message.setEmiterRank(MessageRank(orig_fri.localRank()));
608 TRACE_DEBUG(2,"LegacyProbe SHM orig={0} dest={1} tag={2}",orig,dest,user_tag);
609 message_source_info = m_thread_queue->legacyProbe(p2p_message);
610 }
611 else{
612 MP::PointToPointMessageInfo mpi_message(message);
613 MessageTag mpi_tag = m_rank_tag_builder.tagForReceive(user_tag,orig_fri,dest_fri);
614 mpi_message.setTag(mpi_tag);
615 mpi_message.setDestinationRank(MP::MessageRank(dest_fri.mpiRank()));
616 TRACE_DEBUG(2,"LegacyProbe MPI orig={0} dest={1} mpi_tag={2} user_tag={3}",orig,dest,mpi_tag,user_tag);
617 message_source_info = m_mpi_adapter->legacyProbeMessage(mpi_message);
618 if (message_source_info.isValid()){
619 // Remet le tag d'origine pour pouvoir faire un receive avec.
620 message_source_info.setTag(user_tag);
621 }
622 }
623 }
624 if (message_source_info.isValid()){
625 // Il faut transformer le rang local retourné par les méthodes précédentes
626 // en un rang global
627 message_source_info.setRank(MessageRank(found_dest));
628 }
629 TRACE_DEBUG(2,"LegacyProbe has matched message? = {0}",message_source_info.isValid());
630 return message_source_info;
631}
632
633/*---------------------------------------------------------------------------*/
634/*---------------------------------------------------------------------------*/
635
636std::ostream& operator<<(std::ostream& o,const FullRankInfo& fri)
637{
638 return o << "(local=" << fri.m_local_rank << ",global="
639 << fri.m_global_rank << ",mpi=" << fri.m_mpi_rank << ")";
640}
641
642/*---------------------------------------------------------------------------*/
643/*---------------------------------------------------------------------------*/
644
645} // End namespace Arcane::MessagePassing
646
647/*---------------------------------------------------------------------------*/
648/*---------------------------------------------------------------------------*/
#define ARCANE_THROW(exception_class,...)
Macro pour envoyer une exception avec formattage.
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Interface d'une file de messages avec les threads.
std::ostream & operator<<(std::ostream &o, eExecutionPolicy exec_policy)
Affiche le nom de la politique d'exécution.
Déclarations des types et méthodes utilisés par les mécanismes d'échange de messages.
Definition Parallel.h:50
Int32 Integer
Type représentant un entier.
std::int32_t Int32
Type entier signé sur 32 bits.