Arcane  v4.1.1.0
Documentation développeur
Chargement...
Recherche...
Aucune correspondance
HybridMessageQueue.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2025 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* HybridMessageQueue.cc (C) 2000-2025 */
9/* */
10/* File de messages pour une implémentation MPI/Thread. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/utils/ArcanePrecomp.h"
15
16#include "arcane/utils/Array.h"
17#include "arcane/utils/FatalErrorException.h"
18#include "arcane/utils/NotImplementedException.h"
19#include "arcane/utils/NotSupportedException.h"
20#include "arcane/utils/ArgumentException.h"
21#include "arcane/utils/TraceInfo.h"
22#include "arcane/utils/ITraceMng.h"
23#include "arcane/utils/ValueConvert.h"
24
25#include "arcane/parallel/mpithread/HybridMessageQueue.h"
26#include "arcane/parallel/mpi/MpiParallelMng.h"
27
28#include "arccore/message_passing_mpi/internal/MpiAdapter.h"
29
30/*---------------------------------------------------------------------------*/
31/*---------------------------------------------------------------------------*/
32
33// Macro pour afficher des messages pour debug
34#define TRACE_DEBUG(needed_debug_level,format_str,...) \
35 if (m_debug_level>=needed_debug_level){ \
36 info() << String::format("Hybrid " format_str,__VA_ARGS__);\
37 traceMng()->flush();\
38 }
39
40/*---------------------------------------------------------------------------*/
41/*---------------------------------------------------------------------------*/
42
44{
45
46/*---------------------------------------------------------------------------*/
47/*---------------------------------------------------------------------------*/
48
49HybridMessageQueue::
50HybridMessageQueue(ISharedMemoryMessageQueue* thread_queue,MpiParallelMng* mpi_pm,
51 Int32 local_nb_rank)
52: TraceAccessor(mpi_pm->traceMng())
53, m_thread_queue(thread_queue)
54, m_mpi_parallel_mng(mpi_pm)
55, m_mpi_adapter(mpi_pm->adapter())
56, m_local_nb_rank(local_nb_rank)
57, m_rank_tag_builder(local_nb_rank)
58, m_debug_level(0)
59{
60 if (auto v = Convert::Type<Int32>::tryParseFromEnvironment("ARCCORE_ALLOW_NULL_RANK_FOR_MPI_ANY_SOURCE", true))
61 m_is_allow_null_rank_for_any_source = v.value() != 0;
62}
63
64/*---------------------------------------------------------------------------*/
65/*---------------------------------------------------------------------------*/
66
67void HybridMessageQueue::
68_checkValidRank(MessageRank rank)
69{
70 if (rank.isNull())
71 ARCANE_THROW(ArgumentException,"null rank");
72}
73
74/*---------------------------------------------------------------------------*/
75/*---------------------------------------------------------------------------*/
76
77void HybridMessageQueue::
78_checkValidSource(const PointToPointMessageInfo& message)
79{
80 MessageRank source = message.emiterRank();
81 if (source.isNull())
82 ARCANE_THROW(ArgumentException,"null source");
83}
84
85/*---------------------------------------------------------------------------*/
86/*---------------------------------------------------------------------------*/
87
88PointToPointMessageInfo HybridMessageQueue::
89_buildSharedMemoryMessage(const PointToPointMessageInfo& message,
90 const SourceDestinationFullRankInfo& fri)
91{
92 PointToPointMessageInfo p2p_message(message);
93 p2p_message.setEmiterRank(fri.source().localRank());
94 p2p_message.setDestinationRank(fri.destination().localRank());
95 return p2p_message;
96}
97
98/*---------------------------------------------------------------------------*/
99/*---------------------------------------------------------------------------*/
100
101PointToPointMessageInfo HybridMessageQueue::
102_buildMPIMessage(const PointToPointMessageInfo& message,
103 const SourceDestinationFullRankInfo& fri)
104{
105 PointToPointMessageInfo p2p_message(message);
106 p2p_message.setEmiterRank(fri.source().mpiRank());
107 p2p_message.setDestinationRank(fri.destination().mpiRank());
108 return p2p_message;
109}
110
111/*---------------------------------------------------------------------------*/
112/*---------------------------------------------------------------------------*/
113
114void HybridMessageQueue::
115waitAll(ArrayView<Request> requests)
116{
117 // TODO: fusionner ce qui est possible avec waitSome.
118 Integer nb_request = requests.size();
119 UniqueArray<Request> mpi_requests;
120 UniqueArray<Request> thread_requests;
121 for( Integer i=0; i<nb_request; ++i ){
122 Request r = requests[i];
123 if (!r.isValid())
124 continue;
125 IRequestCreator* creator = r.creator();
126 if (creator==m_mpi_adapter) {
127 mpi_requests.add(r);
128 }
129 else if (creator==m_thread_queue)
130 thread_requests.add(r);
131 else
132 ARCANE_FATAL("Invalid IRequestCreator");
133 }
134
135 if (mpi_requests.size()!=0)
136 m_mpi_adapter->waitAllRequests(mpi_requests);
137 if (thread_requests.size()!=0)
138 m_thread_queue->waitAll(thread_requests);
139
140 // On remet à zero toutes les requetes pour pouvoir rappeler les fonctions Wait !
141 for( Request r : requests )
142 r.reset();
143}
144
145/*---------------------------------------------------------------------------*/
146/*---------------------------------------------------------------------------*/
147
148void HybridMessageQueue::
149waitSome(Int32 rank,ArrayView<Request> requests,ArrayView<bool> requests_done,
150 bool is_non_blocking)
151{
152 Integer nb_done = 0;
153 do{
154 TRACE_DEBUG(2,"Hybrid: wait some rank={0} requests n={1} nb_done={2} is_non_blocking={3}",
155 rank,requests.size(),nb_done,is_non_blocking);
156 nb_done = _testOrWaitSome(rank,requests,requests_done);
157 if (is_non_blocking || nb_done==(-1))
158 break;
159 } while (nb_done==0);
160}
161
162/*---------------------------------------------------------------------------*/
163/*---------------------------------------------------------------------------*/
164
165Integer HybridMessageQueue::
166_testOrWaitSome(Int32 rank,ArrayView<Request> requests,ArrayView<bool> requests_done)
167{
168 Integer nb_request = requests.size();
169 TRACE_DEBUG(2,"Hybrid: wait some rank={0} requests n={1}",rank,nb_request);
170
171 // Il faut séparer les requêtes MPI des requêtes en mémoire partagée.
172 // TODO: avec la notion de requête généralisé de MPI, il serait peut-être
173 // possible de fusionner cela.
174 UniqueArray<Request> mpi_requests;
175 UniqueArray<Request> shm_requests;
176 // Indice des requêtes dans la liste globale \a requests
177 UniqueArray<Integer> mpi_requests_index;
178 UniqueArray<Integer> shm_requests_index;
179
180 Integer nb_done = 0;
181 for( Integer i=0; i<nb_request; ++i ){
182 Request r = requests[i];
183 if (!r.isValid())
184 continue;
185 IRequestCreator* creator = r.creator();
186 if (creator==m_mpi_adapter){
187 mpi_requests.add(r);
188 mpi_requests_index.add(i);
189 }
190 else if (creator==m_thread_queue){
191 shm_requests.add(r);
192 shm_requests_index.add(i);
193 }
194 else
195 ARCANE_FATAL("Invalid IRequestCreator");
196 }
197
198 TRACE_DEBUG(2,"Hybrid: wait some rank={0} nb_mpi={1} nb_shm={2}",
199 rank,mpi_requests.size(),shm_requests.size());
200
201 // S'il n'y a aucune requête valide, inutile d'aller plus loin.
202 // Il ne faut cependant pas retourner '0' car on doit faire
203 // la différence entre aucune requête disponible pour le mode 'is_non_blocking'
204 // et aucune requête valide.
205 if (mpi_requests.size()==0 && shm_requests.size()==0)
206 return (-1);
207
208 // Même en mode waitSome, il faut utiliser le mode non bloquant car
209 // on ne sait pas entre les threads et MPI quelles seront les requêtes
210 // qui sont disponibles
211
212 // Les requêtes ont pu être modifiées si elles ne sont pas terminées.
213 // Il faut donc les remettre dans la liste \a requests. Dans notre
214 // cas il suffit uniquement de recopier la nouvelle valeur dans
215 // l'instance correspondante de HybridMessageRequest.
216 UniqueArray<bool> mpi_done_indexes;
217 Integer nb_mpi_request = mpi_requests.size();
218
219 if (nb_mpi_request!=0){
220 mpi_done_indexes.resize(nb_mpi_request);
221 mpi_done_indexes.fill(false);
222 m_mpi_adapter->waitSomeRequests(mpi_requests,mpi_done_indexes,true);
223 TRACE_DEBUG(2,"Hybrid: MPI wait some requests n={0} after=",nb_mpi_request,mpi_done_indexes);
224 for( Integer i=0; i<nb_mpi_request; ++i ){
225 Integer index_in_global = mpi_requests_index[i];
226 if (mpi_done_indexes[i]){
227 requests_done[index_in_global] = true;
228 requests[index_in_global].reset();
229 ++nb_done;
230 TRACE_DEBUG(1,"MPI rank={0} set done i={1} in_global={2}",
231 rank,i,index_in_global);
232 }
233 else
234 requests[index_in_global] = mpi_requests[i];
235 }
236 }
237
238 UniqueArray<bool> shm_done_indexes;
239 Integer nb_shm_request = shm_requests.size();
240 TRACE_DEBUG(2,"SHM wait some requests n={0}",nb_shm_request);
241 if (shm_requests.size()!=0){
242 shm_done_indexes.resize(nb_shm_request);
243 shm_done_indexes.fill(false);
244 m_thread_queue->waitSome(rank,shm_requests,shm_done_indexes,true);
245 for( Integer i=0; i<nb_shm_request; ++i ){
246 Integer index_in_global = shm_requests_index[i];
247 if (shm_done_indexes[i]){
248 requests_done[index_in_global] = true;
249 requests[index_in_global].reset();
250 ++nb_done;
251 TRACE_DEBUG(1,"SHM rank={0} set done i={1} in_global={2}",
252 rank,i,index_in_global);
253 }
254 else
255 requests[index_in_global] = shm_requests[i];
256 }
257 }
258 return nb_done;
259}
260
261/*---------------------------------------------------------------------------*/
262/*---------------------------------------------------------------------------*/
263
264Request HybridMessageQueue::
265_addReceiveRankTag(const PointToPointMessageInfo& message,ReceiveBufferInfo buf_info)
266{
267 // On ne supporte pas les réceptions avec ANY_RANK car on ne sait pas
268 // s'il faut faire un 'receive' avec MPI ou en mémoire partagée.
269 // Dans ce cas, l'utilisateur doit plutôt utiliser probe()
270 // pour savoir ce qui est disponible et envoyer faire un addReceive()
271 // avec un MessageId.
272 if (message.destinationRank().isNull())
273 ARCANE_THROW(NotSupportedException,"Receive with any rank. Use probe() and MessageId instead");
274
275 SourceDestinationFullRankInfo fri = _getFullRankInfo(message);
276 bool is_same_mpi_rank = fri.isSameMpiRank();
277
278 if (is_same_mpi_rank){
279 TRACE_DEBUG(1,"** MPITMQ SHM ADD RECV S queue={0} message={1}",this,message);
280 PointToPointMessageInfo p2p_message(_buildSharedMemoryMessage(message,fri));
281 return m_thread_queue->addReceive(p2p_message,buf_info);
282 }
283
284 ISerializer* serializer = buf_info.serializer();
285 if (serializer){
286 TRACE_DEBUG(1,"** MPITMQ MPI ADD RECV S queue={0} message={1}",this,message);
287 PointToPointMessageInfo p2p_message(_buildMPIMessage(message,fri));
288 p2p_message.setTag(m_rank_tag_builder.tagForReceive(MessageTag(message.tag()),fri));
289 return m_mpi_parallel_mng->receiveSerializer(serializer,p2p_message);
290 }
291 else{
292 ByteSpan buf = buf_info.memoryBuffer();
293 Int64 size = buf.size();
294
295 TRACE_DEBUG(1,"** MPITMQ THREAD ADD RECV B queue={0} message={1} size={2} same_mpi?={3}",
296 this,message,size,fri.isSameMpiRank());
297
298 //TODO: utiliser le vrai MPI_Datatype
299 MPI_Datatype char_data_type = MpiBuiltIn::datatype(char());
300 MessageTag mpi_tag = m_rank_tag_builder.tagForReceive(message.tag(),fri);
301 Request r = m_mpi_adapter->directRecv(buf.data(),size,fri.destination().mpiRankValue(),sizeof(char),
302 char_data_type,mpi_tag.value(),false);
303 return r;
304 }
305}
306
307/*---------------------------------------------------------------------------*/
308/*---------------------------------------------------------------------------*/
309
310Request HybridMessageQueue::
311_addReceiveMessageId(const PointToPointMessageInfo& message,ReceiveBufferInfo buf_info)
312{
313 MessageId message_id = message.messageId();
314 MessageId::SourceInfo si(message_id.sourceInfo());
315
316 if (si.rank()!=message.destinationRank())
317 ARCANE_FATAL("Incohence between messsage_id rank and destination rank x1={0} x2={1}",
318 si.rank(),message.destinationRank());
319
320 TRACE_DEBUG(1,"** MPITMQ ADD_RECV (message_id) queue={0} message={1}",
321 this,message);
322
323 SourceDestinationFullRankInfo fri = _getFullRankInfo(message);
324 if (fri.isSameMpiRank()){
325 PointToPointMessageInfo p2p_message(_buildSharedMemoryMessage(message,fri));
326 return m_thread_queue->addReceive(p2p_message,buf_info);
327 }
328
329 TRACE_DEBUG(1,"** MPITMQ MPI ADD RECV (message_id) queue={0} message={1}",this,message);
330
331 ISerializer* serializer = buf_info.serializer();
332 if (serializer){
333 PointToPointMessageInfo p2p_message(_buildMPIMessage(message,fri));
334 //p2p_message.setTag(m_rank_tag_builder.tagForReceive(message.tag(),fri));
335 TRACE_DEBUG(1,"** MPI ADD RECV Serializer (message_id) message={0} p2p_message={1}",
336 message,p2p_message);
337 return m_mpi_parallel_mng->receiveSerializer(serializer,p2p_message);
338 }
339 else{
340 ByteSpan buf = buf_info.memoryBuffer();
341 Int64 size = buf.size();
342
343 // TODO: utiliser le vrai MPI_Datatype
344 MPI_Datatype char_data_type = MpiBuiltIn::datatype(char());
345 MessageId mpi_message(message_id);
346 MessageId::SourceInfo mpi_si(si);
347 mpi_si.setRank(fri.destination().mpiRank());
348 mpi_message.setSourceInfo(mpi_si);
349 return m_mpi_adapter->directRecv(buf.data(),size,mpi_message,sizeof(char),
350 char_data_type,false);
351 }
352}
353
354/*---------------------------------------------------------------------------*/
355/*---------------------------------------------------------------------------*/
356
357Request HybridMessageQueue::
358addReceive(const PointToPointMessageInfo& message,ReceiveBufferInfo buf)
359{
360 _checkValidSource(message);
361
362 if (!message.isValid())
363 return Request();
364
365 if (message.isRankTag())
366 return _addReceiveRankTag(message,buf);
367
368 if (message.isMessageId())
369 return _addReceiveMessageId(message,buf);
370
371 ARCANE_THROW(NotSupportedException,"Invalid message_info");
372}
373
374/*---------------------------------------------------------------------------*/
375/*---------------------------------------------------------------------------*/
376
377Request HybridMessageQueue::
378addSend(const PointToPointMessageInfo& message,SendBufferInfo buf_info)
379{
380 if (!message.isValid())
381 return Request();
382 if (message.destinationRank().isNull())
383 ARCCORE_FATAL("Null destination");
384 if (!message.isRankTag())
385 ARCCORE_FATAL("Invalid message_info for sending: message.isRankTag() is false");
386
387 SourceDestinationFullRankInfo fri = _getFullRankInfo(message);
388
389 // Même rang donc envoie via la file en mémoire partagée.
390 if (fri.isSameMpiRank()){
391 TRACE_DEBUG(1,"** MPITMQ SHM ADD SEND S queue={0} message={1}",this,message);
392 PointToPointMessageInfo p2p_message(_buildSharedMemoryMessage(message,fri));
393 return m_thread_queue->addSend(p2p_message,buf_info);
394 }
395
396 // Envoie via MPI
397 MessageTag mpi_tag = m_rank_tag_builder.tagForSend(message.tag(),fri);
398 const ISerializer* serializer = buf_info.serializer();
399 if (serializer){
400 PointToPointMessageInfo p2p_message(_buildMPIMessage(message,fri));
401 p2p_message.setTag(mpi_tag);
402 TRACE_DEBUG(1,"** MPITMQ MPI ADD SEND Serializer queue={0} message={1} p2p_message={2}",
403 this,message,p2p_message);
404 return m_mpi_parallel_mng->sendSerializer(serializer,p2p_message);
405 }
406 else{
407 ByteConstSpan buf = buf_info.memoryBuffer();
408 Int64 size = buf.size();
409
410 // TODO: utiliser m_mpi_parallel_mng mais il faut faire attention
411 // d'utiliser le mode bloquant
412 // TODO: utiliser le vrai MPI_Datatype
413 MPI_Datatype char_data_type = MpiBuiltIn::datatype(char());
414
415 TRACE_DEBUG(1,"** MPITMQ MPI ADD SEND B queue={0} message={1} size={2} mpi_tag={3} mpi_rank={4}",
416 this,message,size,mpi_tag,fri.destination().mpiRank());
417
418 return m_mpi_adapter->directSend(buf.data(),size,fri.destination().mpiRankValue(),
419 sizeof(char),char_data_type,mpi_tag.value(),false);
420 }
421}
422
423/*---------------------------------------------------------------------------*/
424/*---------------------------------------------------------------------------*/
425
426MP::MessageId HybridMessageQueue::
427probe(const MP::PointToPointMessageInfo& message)
428{
429 TRACE_DEBUG(1,"Probe msg='{0}' queue={1} is_valid={2}",
430 message,this,message.isValid());
431
432 MessageRank orig = message.emiterRank();
433 if (orig.isNull())
434 ARCANE_THROW(ArgumentException,"null sender");
435
436 if (!message.isValid())
437 return MessageId();
438
439 // Il faut avoir initialisé le message avec un couple (rang/tag).
440 if (!message.isRankTag())
441 ARCCORE_FATAL("Invalid message_info: message.isRankTag() is false");
442
443 MessageRank dest = message.destinationRank();
444 MessageTag user_tag = message.tag();
445 bool is_blocking = message.isBlocking();
446 if (is_blocking)
447 ARCANE_THROW(NotImplementedException,"blocking probe");
448 if (user_tag.isNull())
449 ARCANE_THROW(NotImplementedException,"probe with ANY_TAG");
450 FullRankInfo orig_fri = m_rank_tag_builder.rank(orig);
451 FullRankInfo dest_fri = m_rank_tag_builder.rank(dest);
452 MessageId message_id;
453 Int32 found_dest = dest.value();
454 const bool is_any_source = dest.isNull() || dest.isAnySource();
455 if (dest.isNull() && !m_is_allow_null_rank_for_any_source)
456 ARCANE_FATAL("Can not use probe() with null rank. Use MessageRank::anySourceRank() instead");
457 if (is_any_source) {
458 // Comme on ne sait pas de qui on va recevoir, il faut tester à la
459 // fois la file de thread et via MPI.
460 MP::PointToPointMessageInfo p2p_message(message);
461 p2p_message.setEmiterRank(orig_fri.localRank());
462 message_id = m_thread_queue->probe(p2p_message);
463 if (message_id.isValid()){
464 // On a trouvé un message dans la liste de thread.
465 // Comme on est dans notre liste de thread, le
466 // rang global est notre rang MPI + le rang local trouvé.
467 found_dest = orig_fri.mpiRankValue()*m_local_nb_rank + message_id.sourceInfo().rank().value();
468 TRACE_DEBUG(2,"Probe with null_rank (thread) orig={0} found_dest={1} tag={2}",
469 orig,found_dest,user_tag);
470 }
471 else{
472 // Recherche via MPI.
473 // La difficulté est que le rang local du PE originaire du message
474 // est codé dans le tag et qu'on ne connait pas le PE originaire.
475 // Il faut donc tester tous les tag potentiels. Leur nombre est
476 // égal à 'm_nb_local_rank'.
477 for( Integer z=0, zn=m_local_nb_rank; z<zn; ++z ){
478 MP::PointToPointMessageInfo mpi_message(message);
479 MessageTag mpi_tag = m_rank_tag_builder.tagForReceive(user_tag,orig_fri.localRank(),MessageRank(z));
480 mpi_message.setTag(mpi_tag);
481 TRACE_DEBUG(2,"Probe with null_rank orig={0} dest={1} tag={2}",orig,dest,mpi_tag);
482 message_id = m_mpi_adapter->probeMessage(mpi_message);
483 if (message_id.isValid()){
484 // On a trouvé un message MPI. Il faut extraire du tag le
485 // rang local. Le rang MPI est celui dans le message.
486 MessageRank mpi_rank = message_id.sourceInfo().rank();
487 MessageTag ret_tag = message_id.sourceInfo().tag();
488 Int32 local_rank = m_rank_tag_builder.getReceiveRankFromTag(ret_tag);
489 found_dest = mpi_rank.value()*m_local_nb_rank + local_rank;
490 TRACE_DEBUG(2,"Probe null rank found mpi_rank={0} local_rank={1} tag={2}",
491 ret_tag,mpi_rank,local_rank,ret_tag);
492 break;
493 }
494 }
495 }
496 }
497 else{
498 // Il faut convertir le rang `dest` en le rang attendu par la file de thread
499 // ou par MPI.
500 if (orig_fri.mpiRank()==dest_fri.mpiRank()){
501 MP::PointToPointMessageInfo p2p_message(message);
502 p2p_message.setDestinationRank(MP::MessageRank(dest_fri.localRank()));
503 p2p_message.setEmiterRank(MessageRank(orig_fri.localRank()));
504 message_id = m_thread_queue->probe(p2p_message);
505 }
506 else{
507 MP::PointToPointMessageInfo mpi_message(message);
508 MessageTag mpi_tag = m_rank_tag_builder.tagForReceive(user_tag,orig_fri,dest_fri);
509 mpi_message.setTag(mpi_tag);
510 mpi_message.setDestinationRank(MP::MessageRank(dest_fri.mpiRank()));
511 TRACE_DEBUG(2,"Probe orig={0} dest={1} mpi_tag={2} user_tag={3}",orig,dest,mpi_tag,user_tag);
512 message_id = m_mpi_adapter->probeMessage(mpi_message);
513 }
514 }
515 if (message_id.isValid()){
516 // Il faut transformer le rang local retourné par les méthodes précédentes
517 // en un rang global
518 MessageId::SourceInfo si = message_id.sourceInfo();
519 si.setRank(MessageRank(found_dest));
520 message_id.setSourceInfo(si);
521 }
522 return message_id;
523}
524
525/*---------------------------------------------------------------------------*/
526/*---------------------------------------------------------------------------*/
527
528MP::MessageSourceInfo HybridMessageQueue::
529legacyProbe(const MP::PointToPointMessageInfo& message)
530{
531 TRACE_DEBUG(1,"LegacyProbe msg='{0}' queue={1} is_valid={2}",
532 message,this,message.isValid());
533
534 MessageRank orig = message.emiterRank();
535 if (orig.isNull())
536 ARCANE_THROW(ArgumentException,"null sender");
537
538 if (!message.isValid())
539 return {};
540
541 // Il faut avoir initialisé le message avec un couple (rang/tag).
542 if (!message.isRankTag())
543 ARCCORE_FATAL("Invalid message_info: message.isRankTag() is false");
544
545 const MessageRank dest = message.destinationRank();
546 const MessageTag user_tag = message.tag();
547 const bool is_blocking = message.isBlocking();
548 if (is_blocking)
549 ARCANE_THROW(NotImplementedException,"blocking probe");
550 if (user_tag.isNull())
551 ARCANE_THROW(NotImplementedException,"legacyProbe with ANY_TAG");
552 FullRankInfo orig_fri = m_rank_tag_builder.rank(orig);
553 FullRankInfo dest_fri = m_rank_tag_builder.rank(dest);
554 MP::MessageSourceInfo message_source_info;
555 Int32 found_dest = dest.value();
556 const bool is_any_source = dest.isNull() || dest.isAnySource();
557 if (dest.isNull() && !m_is_allow_null_rank_for_any_source)
558 ARCANE_FATAL("Can not use legacyProbe() with null rank. Use MessageRank::anySourceRank() instead");
559 if (is_any_source) {
560 // Comme on ne sait pas de qui on va recevoir, il faut tester à la
561 // fois la file de thread et via MPI.
562 MP::PointToPointMessageInfo p2p_message(message);
563 p2p_message.setEmiterRank(orig_fri.localRank());
564 message_source_info = m_thread_queue->legacyProbe(p2p_message);
565 if (message_source_info.isValid()){
566 // On a trouvé un message dans la liste de thread.
567 // Comme on est dans notre liste de thread, le
568 // rang global est notre rang MPI + le rang local trouvé.
569 found_dest = orig_fri.mpiRankValue()*m_local_nb_rank + message_source_info.rank().value();
570 TRACE_DEBUG(2,"LegacyProbe with null_rank (thread) orig={0} found_dest={1} tag={2}",
571 orig,found_dest,user_tag);
572 }
573 else{
574 // Recherche via MPI.
575 // La difficulté est que le rang local du PE originaire du message
576 // est codé dans le tag et qu'on ne connait pas le PE originaire.
577 // Il faut donc tester tous les tag potentiels. Leur nombre est
578 // égal à 'm_nb_local_rank'.
579 for( Integer z=0, zn=m_local_nb_rank; z<zn; ++z ){
580 MP::PointToPointMessageInfo mpi_message(message);
581 MessageTag mpi_tag = m_rank_tag_builder.tagForReceive(user_tag,orig_fri.localRank(),MessageRank(z));
582 mpi_message.setTag(mpi_tag);
583 TRACE_DEBUG(2,"LegacyProbe with null_rank orig={0} dest={1} tag={2}",orig,dest,mpi_tag);
584 message_source_info = m_mpi_adapter->legacyProbeMessage(mpi_message);
585 if (message_source_info.isValid()){
586 // On a trouvé un message MPI. Il faut extraire du tag le
587 // rang local. Le rang MPI est celui dans le message.
588 MessageRank mpi_rank = message_source_info.rank();
589 MessageTag ret_tag = message_source_info.tag();
590 Int32 local_rank = m_rank_tag_builder.getReceiveRankFromTag(ret_tag);
591 found_dest = mpi_rank.value()*m_local_nb_rank + local_rank;
592 TRACE_DEBUG(2,"LegacyProbe null rank found mpi_rank={0} local_rank={1} tag={2}",
593 ret_tag,mpi_rank,local_rank,ret_tag);
594 // Remet le tag d'origine pour pouvoir faire un receive avec.
595 message_source_info.setTag(user_tag);
596 break;
597 }
598 }
599 }
600 }
601 else{
602 // Il faut convertir le rang `dest` en le rang attendu par la file de thread
603 // ou par MPI.
604 if (orig_fri.mpiRank()==dest_fri.mpiRank()){
605 MP::PointToPointMessageInfo p2p_message(message);
606 p2p_message.setDestinationRank(MP::MessageRank(dest_fri.localRank()));
607 p2p_message.setEmiterRank(MessageRank(orig_fri.localRank()));
608 TRACE_DEBUG(2,"LegacyProbe SHM orig={0} dest={1} tag={2}",orig,dest,user_tag);
609 message_source_info = m_thread_queue->legacyProbe(p2p_message);
610 }
611 else{
612 MP::PointToPointMessageInfo mpi_message(message);
613 MessageTag mpi_tag = m_rank_tag_builder.tagForReceive(user_tag,orig_fri,dest_fri);
614 mpi_message.setTag(mpi_tag);
615 mpi_message.setDestinationRank(MP::MessageRank(dest_fri.mpiRank()));
616 TRACE_DEBUG(2,"LegacyProbe MPI orig={0} dest={1} mpi_tag={2} user_tag={3}",orig,dest,mpi_tag,user_tag);
617 message_source_info = m_mpi_adapter->legacyProbeMessage(mpi_message);
618 if (message_source_info.isValid()){
619 // Remet le tag d'origine pour pouvoir faire un receive avec.
620 message_source_info.setTag(user_tag);
621 }
622 }
623 }
624 if (message_source_info.isValid()){
625 // Il faut transformer le rang local retourné par les méthodes précédentes
626 // en un rang global
627 message_source_info.setRank(MessageRank(found_dest));
628 }
629 TRACE_DEBUG(2,"LegacyProbe has matched message? = {0}",message_source_info.isValid());
630 return message_source_info;
631}
632
633/*---------------------------------------------------------------------------*/
634/*---------------------------------------------------------------------------*/
635
636std::ostream& operator<<(std::ostream& o,const FullRankInfo& fri)
637{
638 return o << "(local=" << fri.m_local_rank << ",global="
639 << fri.m_global_rank << ",mpi=" << fri.m_mpi_rank << ")";
640}
641
642/*---------------------------------------------------------------------------*/
643/*---------------------------------------------------------------------------*/
644
645} // End namespace Arcane::MessagePassing
646
647/*---------------------------------------------------------------------------*/
648/*---------------------------------------------------------------------------*/
#define ARCANE_THROW(exception_class,...)
Macro pour envoyer une exception avec formattage.
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
#define ARCCORE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Interface d'une file de messages avec les threads.
Déclarations des types et méthodes utilisés par les mécanismes d'échange de messages.
Int32 Integer
Type représentant un entier.
std::int32_t Int32
Type entier signé sur 32 bits.