Arcane  4.1.12.0
Developer documentation
Loading...
Searching...
No Matches
HybridMessageQueue.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2026 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* HybridMessageQueue.cc (C) 2000-2025 */
9/* */
10/* Message file for an MPI/Thread implementation. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/utils/ArcanePrecomp.h"
15
16#include "arcane/utils/Array.h"
17#include "arcane/utils/FatalErrorException.h"
18#include "arcane/utils/NotImplementedException.h"
19#include "arcane/utils/NotSupportedException.h"
20#include "arcane/utils/ArgumentException.h"
21#include "arcane/utils/TraceInfo.h"
22#include "arcane/utils/ITraceMng.h"
23#include "arcane/utils/ValueConvert.h"
24
25#include "arcane/parallel/mpithread/HybridMessageQueue.h"
26#include "arcane/parallel/mpi/MpiParallelMng.h"
27
28#include "arccore/message_passing_mpi/internal/MpiAdapter.h"
29
30/*---------------------------------------------------------------------------*/
31/*---------------------------------------------------------------------------*/
32
33// Macro to display messages for debug
34#define TRACE_DEBUG(needed_debug_level, format_str, ...) \
35 if (m_debug_level >= needed_debug_level) { \
36 info() << String::format("Hybrid " format_str, __VA_ARGS__); \
37 traceMng()->flush(); \
38 }
39
40/*---------------------------------------------------------------------------*/
41/*---------------------------------------------------------------------------*/
42
44{
45
46/*---------------------------------------------------------------------------*/
47/*---------------------------------------------------------------------------*/
48
49HybridMessageQueue::
50HybridMessageQueue(ISharedMemoryMessageQueue* thread_queue, MpiParallelMng* mpi_pm,
51 Int32 local_nb_rank)
52: TraceAccessor(mpi_pm->traceMng())
53, m_thread_queue(thread_queue)
54, m_mpi_parallel_mng(mpi_pm)
55, m_mpi_adapter(mpi_pm->adapter())
56, m_local_nb_rank(local_nb_rank)
57, m_rank_tag_builder(local_nb_rank)
58, m_debug_level(0)
59{
60 if (auto v = Convert::Type<Int32>::tryParseFromEnvironment("ARCCORE_ALLOW_NULL_RANK_FOR_MPI_ANY_SOURCE", true))
61 m_is_allow_null_rank_for_any_source = v.value() != 0;
62}
63
64/*---------------------------------------------------------------------------*/
65/*---------------------------------------------------------------------------*/
66
67void HybridMessageQueue::
68_checkValidRank(MessageRank rank)
69{
70 if (rank.isNull())
71 ARCANE_THROW(ArgumentException, "null rank");
72}
73
74/*---------------------------------------------------------------------------*/
75/*---------------------------------------------------------------------------*/
76
77void HybridMessageQueue::
78_checkValidSource(const PointToPointMessageInfo& message)
79{
80 MessageRank source = message.emiterRank();
81 if (source.isNull())
82 ARCANE_THROW(ArgumentException, "null source");
83}
84
85/*---------------------------------------------------------------------------*/
86/*---------------------------------------------------------------------------*/
87
88PointToPointMessageInfo HybridMessageQueue::
89_buildSharedMemoryMessage(const PointToPointMessageInfo& message,
90 const SourceDestinationFullRankInfo& fri)
91{
92 PointToPointMessageInfo p2p_message(message);
93 p2p_message.setEmiterRank(fri.source().localRank());
94 p2p_message.setDestinationRank(fri.destination().localRank());
95 return p2p_message;
96}
97
98/*---------------------------------------------------------------------------*/
99/*---------------------------------------------------------------------------*/
100
101PointToPointMessageInfo HybridMessageQueue::
102_buildMPIMessage(const PointToPointMessageInfo& message,
103 const SourceDestinationFullRankInfo& fri)
104{
105 PointToPointMessageInfo p2p_message(message);
106 p2p_message.setEmiterRank(fri.source().mpiRank());
107 p2p_message.setDestinationRank(fri.destination().mpiRank());
108 return p2p_message;
109}
110
111/*---------------------------------------------------------------------------*/
112/*---------------------------------------------------------------------------*/
113
114void HybridMessageQueue::
115waitAll(ArrayView<Request> requests)
116{
117 // TODO: merge what is possible with waitSome.
118 Integer nb_request = requests.size();
119 UniqueArray<Request> mpi_requests;
120 UniqueArray<Request> thread_requests;
121 for (Integer i = 0; i < nb_request; ++i) {
122 Request r = requests[i];
123 if (!r.isValid())
124 continue;
125 IRequestCreator* creator = r.creator();
126 if (creator == m_mpi_adapter) {
127 mpi_requests.add(r);
128 }
129 else if (creator == m_thread_queue)
130 thread_requests.add(r);
131 else
132 ARCANE_FATAL("Invalid IRequestCreator");
133 }
134
135 if (mpi_requests.size() != 0)
136 m_mpi_adapter->waitAllRequests(mpi_requests);
137 if (thread_requests.size() != 0)
138 m_thread_queue->waitAll(thread_requests);
139
140 // We reset all requests to be able to call the Wait functions!
141 for (Request r : requests)
142 r.reset();
143}
144
145/*---------------------------------------------------------------------------*/
146/*---------------------------------------------------------------------------*/
147
148void HybridMessageQueue::
149waitSome(Int32 rank, ArrayView<Request> requests, ArrayView<bool> requests_done,
150 bool is_non_blocking)
151{
152 Integer nb_done = 0;
153 do {
154 TRACE_DEBUG(2, "Hybrid: wait some rank={0} requests n={1} nb_done={2} is_non_blocking={3}",
155 rank, requests.size(), nb_done, is_non_blocking);
156 nb_done = _testOrWaitSome(rank, requests, requests_done);
157 if (is_non_blocking || nb_done == (-1))
158 break;
159 } while (nb_done == 0);
160}
161
162/*---------------------------------------------------------------------------*/
163/*---------------------------------------------------------------------------*/
164
165Integer HybridMessageQueue::
166_testOrWaitSome(Int32 rank, ArrayView<Request> requests, ArrayView<bool> requests_done)
167{
168 Integer nb_request = requests.size();
169 TRACE_DEBUG(2, "Hybrid: wait some rank={0} requests n={1}", rank, nb_request);
170
171 // We must separate MPI requests from shared memory requests.
172 // TODO: with the notion of generalized MPI requests, it might be
173 // possible to merge this.
174 UniqueArray<Request> mpi_requests;
175 UniqueArray<Request> shm_requests;
176 // Index of requests in the global list \a requests
177 UniqueArray<Integer> mpi_requests_index;
178 UniqueArray<Integer> shm_requests_index;
179
180 Integer nb_done = 0;
181 for (Integer i = 0; i < nb_request; ++i) {
182 Request r = requests[i];
183 if (!r.isValid())
184 continue;
185 IRequestCreator* creator = r.creator();
186 if (creator == m_mpi_adapter) {
187 mpi_requests.add(r);
188 mpi_requests_index.add(i);
189 }
190 else if (creator == m_thread_queue) {
191 shm_requests.add(r);
192 shm_requests_index.add(i);
193 }
194 else
195 ARCANE_FATAL("Invalid IRequestCreator");
196 }
197
198 TRACE_DEBUG(2, "Hybrid: wait some rank={0} nb_mpi={1} nb_shm={2}",
199 rank, mpi_requests.size(), shm_requests.size());
200
201 // If there are no valid requests, there is no need to proceed.
202 // However, we must not return '0' because we must make
203 // the difference between no requests available for the 'is_non_blocking'
204 // mode
205 // and no valid requests.
206 if (mpi_requests.size() == 0 && shm_requests.size() == 0)
207 return (-1);
208
209 // Even in waitSome mode, we must use non-blocking mode because
210 // we do not know between threads and MPI which requests will be
211 // available
212
213 // The requests may have been modified if they are not finished.
214 // Therefore, they must be put back into the \a requests list. In our
215 // case, it is enough to only copy the new value into
216 // the corresponding instance of HybridMessageRequest.
217 UniqueArray<bool> mpi_done_indexes;
218 Integer nb_mpi_request = mpi_requests.size();
219
220 if (nb_mpi_request != 0) {
221 mpi_done_indexes.resize(nb_mpi_request);
222 mpi_done_indexes.fill(false);
223 m_mpi_adapter->waitSomeRequests(mpi_requests, mpi_done_indexes, true);
224 TRACE_DEBUG(2, "Hybrid: MPI wait some requests n={0} after=", nb_mpi_request, mpi_done_indexes);
225 for (Integer i = 0; i < nb_mpi_request; ++i) {
226 Integer index_in_global = mpi_requests_index[i];
227 if (mpi_done_indexes[i]) {
228 requests_done[index_in_global] = true;
229 requests[index_in_global].reset();
230 ++nb_done;
231 TRACE_DEBUG(1, "MPI rank={0} set done i={1} in_global={2}",
232 rank, i, index_in_global);
233 }
234 else
235 requests[index_in_global] = mpi_requests[i];
236 }
237 }
238
239 UniqueArray<bool> shm_done_indexes;
240 Integer nb_shm_request = shm_requests.size();
241 TRACE_DEBUG(2, "SHM wait some requests n={0}", nb_shm_request);
242 if (shm_requests.size() != 0) {
243 shm_done_indexes.resize(nb_shm_request);
244 shm_done_indexes.fill(false);
245 m_thread_queue->waitSome(rank, shm_requests, shm_done_indexes, true);
246 for (Integer i = 0; i < nb_shm_request; ++i) {
247 Integer index_in_global = shm_requests_index[i];
248 if (shm_done_indexes[i]) {
249 requests_done[index_in_global] = true;
250 requests[index_in_global].reset();
251 ++nb_done;
252 TRACE_DEBUG(1, "SHM rank={0} set done i={1} in_global={2}",
253 rank, i, index_in_global);
254 }
255 else
256 requests[index_in_global] = shm_requests[i];
257 }
258 }
259 return nb_done;
260}
261
262/*---------------------------------------------------------------------------*/
263/*---------------------------------------------------------------------------*/
264
265Request HybridMessageQueue::
266_addReceiveRankTag(const PointToPointMessageInfo& message, ReceiveBufferInfo buf_info)
267{
268 // We do not support receives with ANY_RANK because we do not know
269 // whether we need to perform a 'receive' with MPI or in shared memory.
270 // In this case, the user must instead use probe()
271 // to know what is available and send an addReceive()
272 // with a MessageId.
273 if (message.destinationRank().isNull())
274 ARCANE_THROW(NotSupportedException, "Receive with any rank. Use probe() and MessageId instead");
275
276 SourceDestinationFullRankInfo fri = _getFullRankInfo(message);
277 bool is_same_mpi_rank = fri.isSameMpiRank();
278
279 if (is_same_mpi_rank) {
280 TRACE_DEBUG(1, "** MPITMQ SHM ADD RECV S queue={0} message={1}", this, message);
281 PointToPointMessageInfo p2p_message(_buildSharedMemoryMessage(message, fri));
282 return m_thread_queue->addReceive(p2p_message, buf_info);
283 }
284
285 ISerializer* serializer = buf_info.serializer();
286 if (serializer) {
287 TRACE_DEBUG(1, "** MPITMQ MPI ADD RECV S queue={0} message={1}", this, message);
288 PointToPointMessageInfo p2p_message(_buildMPIMessage(message, fri));
289 p2p_message.setTag(m_rank_tag_builder.tagForReceive(MessageTag(message.tag()), fri));
290 return m_mpi_parallel_mng->receiveSerializer(serializer, p2p_message);
291 }
292 else {
293 ByteSpan buf = buf_info.memoryBuffer();
294 Int64 size = buf.size();
295
296 TRACE_DEBUG(1, "** MPITMQ THREAD ADD RECV B queue={0} message={1} size={2} same_mpi?={3}",
297 this, message, size, fri.isSameMpiRank());
298
299 //TODO: use the real MPI_Datatype
300 MPI_Datatype char_data_type = MpiBuiltIn::datatype(char());
301 MessageTag mpi_tag = m_rank_tag_builder.tagForReceive(message.tag(), fri);
302 Request r = m_mpi_adapter->directRecv(buf.data(), size, fri.destination().mpiRankValue(), sizeof(char),
303 char_data_type, mpi_tag.value(), false);
304 return r;
305 }
306}
307
308/*---------------------------------------------------------------------------*/
309/*---------------------------------------------------------------------------*/
310
311Request HybridMessageQueue::
312_addReceiveMessageId(const PointToPointMessageInfo& message, ReceiveBufferInfo buf_info)
313{
314 MessageId message_id = message.messageId();
315 MessageId::SourceInfo si(message_id.sourceInfo());
316
317 if (si.rank() != message.destinationRank())
318 ARCANE_FATAL("Incohence between messsage_id rank and destination rank x1={0} x2={1}",
319 si.rank(), message.destinationRank());
320
321 TRACE_DEBUG(1, "** MPITMQ ADD_RECV (message_id) queue={0} message={1}",
322 this, message);
323
324 SourceDestinationFullRankInfo fri = _getFullRankInfo(message);
325 if (fri.isSameMpiRank()) {
326 PointToPointMessageInfo p2p_message(_buildSharedMemoryMessage(message, fri));
327 return m_thread_queue->addReceive(p2p_message, buf_info);
328 }
329
330 TRACE_DEBUG(1, "** MPITMQ MPI ADD RECV (message_id) queue={0} message={1}", this, message);
331
332 ISerializer* serializer = buf_info.serializer();
333 if (serializer) {
334 PointToPointMessageInfo p2p_message(_buildMPIMessage(message, fri));
335 //p2p_message.setTag(m_rank_tag_builder.tagForReceive(message.tag(),fri));
336 TRACE_DEBUG(1, "** MPI ADD RECV Serializer (message_id) message={0} p2p_message={1}",
337 message, p2p_message);
338 return m_mpi_parallel_mng->receiveSerializer(serializer, p2p_message);
339 }
340 else {
341 ByteSpan buf = buf_info.memoryBuffer();
342 Int64 size = buf.size();
343
344 // TODO: use the real MPI_Datatype
345 MPI_Datatype char_data_type = MpiBuiltIn::datatype(char());
346 MessageId mpi_message(message_id);
347 MessageId::SourceInfo mpi_si(si);
348 mpi_si.setRank(fri.destination().mpiRank());
349 mpi_message.setSourceInfo(mpi_si);
350 return m_mpi_adapter->directRecv(buf.data(), size, mpi_message, sizeof(char),
351 char_data_type, false);
352 }
353}
354
355/*---------------------------------------------------------------------------*/
356/*---------------------------------------------------------------------------*/
357
358Request HybridMessageQueue::
359addReceive(const PointToPointMessageInfo& message, ReceiveBufferInfo buf)
360{
361 _checkValidSource(message);
362
363 if (!message.isValid())
364 return Request();
365
366 if (message.isRankTag())
367 return _addReceiveRankTag(message, buf);
368
369 if (message.isMessageId())
370 return _addReceiveMessageId(message, buf);
371
372 ARCANE_THROW(NotSupportedException, "Invalid message_info");
373}
374
375/*---------------------------------------------------------------------------*/
376/*---------------------------------------------------------------------------*/
377
378Request HybridMessageQueue::
379addSend(const PointToPointMessageInfo& message, SendBufferInfo buf_info)
380{
381 if (!message.isValid())
382 return Request();
383 if (message.destinationRank().isNull())
384 ARCCORE_FATAL("Null destination");
385 if (!message.isRankTag())
386 ARCCORE_FATAL("Invalid message_info for sending: message.isRankTag() is false");
387
388 SourceDestinationFullRankInfo fri = _getFullRankInfo(message);
389
390 // Same rank, so send via the shared memory queue.
391 if (fri.isSameMpiRank()) {
392 TRACE_DEBUG(1, "** MPITMQ SHM ADD SEND S queue={0} message={1}", this, message);
393 PointToPointMessageInfo p2p_message(_buildSharedMemoryMessage(message, fri));
394 return m_thread_queue->addSend(p2p_message, buf_info);
395 }
396
397 // Send via MPI
398 MessageTag mpi_tag = m_rank_tag_builder.tagForSend(message.tag(), fri);
399 const ISerializer* serializer = buf_info.serializer();
400 if (serializer) {
401 PointToPointMessageInfo p2p_message(_buildMPIMessage(message, fri));
402 p2p_message.setTag(mpi_tag);
403 TRACE_DEBUG(1, "** MPITMQ MPI ADD SEND Serializer queue={0} message={1} p2p_message={2}",
404 this, message, p2p_message);
405 return m_mpi_parallel_mng->sendSerializer(serializer, p2p_message);
406 }
407 else {
408 ByteConstSpan buf = buf_info.memoryBuffer();
409 Int64 size = buf.size();
410
411 // TODO: use m_mpi_parallel_mng but must be careful
412 // to use blocking mode
413 // TODO: use the real MPI_Datatype
414 MPI_Datatype char_data_type = MpiBuiltIn::datatype(char());
415
416 TRACE_DEBUG(1, "** MPITMQ MPI ADD SEND B queue={0} message={1} size={2} mpi_tag={3} mpi_rank={4}",
417 this, message, size, mpi_tag, fri.destination().mpiRank());
418
419 return m_mpi_adapter->directSend(buf.data(), size, fri.destination().mpiRankValue(),
420 sizeof(char), char_data_type, mpi_tag.value(), false);
421 }
422}
423
424/*---------------------------------------------------------------------------*/
425/*---------------------------------------------------------------------------*/
426
427MP::MessageId HybridMessageQueue::
428probe(const MP::PointToPointMessageInfo& message)
429{
430 TRACE_DEBUG(1, "Probe msg='{0}' queue={1} is_valid={2}",
431 message, this, message.isValid());
432
433 MessageRank orig = message.emiterRank();
434 if (orig.isNull())
435 ARCANE_THROW(ArgumentException, "null sender");
436
437 if (!message.isValid())
438 return MessageId();
439
440 // The message must be initialized with a (rank/tag) pair.
441 if (!message.isRankTag())
442 ARCCORE_FATAL("Invalid message_info: message.isRankTag() is false");
443
444 MessageRank dest = message.destinationRank();
445 MessageTag user_tag = message.tag();
446 bool is_blocking = message.isBlocking();
447 if (is_blocking)
448 ARCANE_THROW(NotImplementedException, "blocking probe");
449 if (user_tag.isNull())
450 ARCANE_THROW(NotImplementedException, "probe with ANY_TAG");
451 FullRankInfo orig_fri = m_rank_tag_builder.rank(orig);
452 FullRankInfo dest_fri = m_rank_tag_builder.rank(dest);
453 MessageId message_id;
454 Int32 found_dest = dest.value();
455 const bool is_any_source = dest.isNull() || dest.isAnySource();
456 if (dest.isNull() && !m_is_allow_null_rank_for_any_source)
457 ARCANE_FATAL("Can not use probe() with null rank. Use MessageRank::anySourceRank() instead");
458 if (is_any_source) {
459 // Since we don't know who we will receive from, we must
460 // test both the thread queue and via MPI.
461 MP::PointToPointMessageInfo p2p_message(message);
462 p2p_message.setEmiterRank(orig_fri.localRank());
463 message_id = m_thread_queue->probe(p2p_message);
464 if (message_id.isValid()) {
465 // We found a message in the thread list.
466 // Since we are in our thread list, the global rank is our
467 // MPI rank + the found local rank.
468 found_dest = orig_fri.mpiRankValue() * m_local_nb_rank + message_id.sourceInfo().rank().value();
469 TRACE_DEBUG(2, "Probe with null_rank (thread) orig={0} found_dest={1} tag={2}",
470 orig, found_dest, user_tag);
471 }
472 else {
473 // Search via MPI.
474 // The difficulty is that the local rank of the originating PE of the message
475 // is encoded in the tag and we do not know the originating PE.
476 // Therefore, we must test all potential tags. Their number is
477 // equal to 'm_nb_local_rank'.
478 for (Integer z = 0, zn = m_local_nb_rank; z < zn; ++z) {
479 MP::PointToPointMessageInfo mpi_message(message);
480 MessageTag mpi_tag = m_rank_tag_builder.tagForReceive(user_tag, orig_fri.localRank(), MessageRank(z));
481 mpi_message.setTag(mpi_tag);
482 TRACE_DEBUG(2, "Probe with null_rank orig={0} dest={1} tag={2}", orig, dest, mpi_tag);
483 message_id = m_mpi_adapter->probeMessage(mpi_message);
484 if (message_id.isValid()) {
485 // We found an MPI message. We must extract the local rank
486 // from the tag. The MPI rank is the one in the message.
487 MessageRank mpi_rank = message_id.sourceInfo().rank();
488 MessageTag ret_tag = message_id.sourceInfo().tag();
489 Int32 local_rank = m_rank_tag_builder.getReceiveRankFromTag(ret_tag);
490 found_dest = mpi_rank.value() * m_local_nb_rank + local_rank;
491 TRACE_DEBUG(2, "Probe null rank found mpi_rank={0} local_rank={1} tag={2}",
492 ret_tag, mpi_rank, local_rank, ret_tag);
493 break;
494 }
495 }
496 }
497 }
498 else {
499 // The rank `dest` must be converted to the rank expected by the thread queue
500 // or by MPI.
501 if (orig_fri.mpiRank() == dest_fri.mpiRank()) {
502 MP::PointToPointMessageInfo p2p_message(message);
503 p2p_message.setDestinationRank(MP::MessageRank(dest_fri.localRank()));
504 p2p_message.setEmiterRank(MessageRank(orig_fri.localRank()));
505 message_id = m_thread_queue->probe(p2p_message);
506 }
507 else {
508 MP::PointToPointMessageInfo mpi_message(message);
509 MessageTag mpi_tag = m_rank_tag_builder.tagForReceive(user_tag, orig_fri, dest_fri);
510 mpi_message.setTag(mpi_tag);
511 mpi_message.setDestinationRank(MP::MessageRank(dest_fri.mpiRank()));
512 TRACE_DEBUG(2, "Probe orig={0} dest={1} mpi_tag={2} user_tag={3}", orig, dest, mpi_tag, user_tag);
513 message_id = m_mpi_adapter->probeMessage(mpi_message);
514 }
515 }
516 if (message_id.isValid()) {
517 // The local rank returned by the previous methods must be transformed
518 // into a global rank
519 MessageId::SourceInfo si = message_id.sourceInfo();
520 si.setRank(MessageRank(found_dest));
521 message_id.setSourceInfo(si);
522 }
523 return message_id;
524}
525
526/*---------------------------------------------------------------------------*/
527/*---------------------------------------------------------------------------*/
528
529MP::MessageSourceInfo HybridMessageQueue::
530legacyProbe(const MP::PointToPointMessageInfo& message)
531{
532 TRACE_DEBUG(1, "LegacyProbe msg='{0}' queue={1} is_valid={2}",
533 message, this, message.isValid());
534
535 MessageRank orig = message.emiterRank();
536 if (orig.isNull())
537 ARCANE_THROW(ArgumentException, "null sender");
538
539 if (!message.isValid())
540 return {};
541
542 // The message must be initialized with a (rank/tag) pair.
543 if (!message.isRankTag())
544 ARCCORE_FATAL("Invalid message_info: message.isRankTag() is false");
545
546 const MessageRank dest = message.destinationRank();
547 const MessageTag user_tag = message.tag();
548 const bool is_blocking = message.isBlocking();
549 if (is_blocking)
550 ARCANE_THROW(NotImplementedException, "blocking probe");
551 if (user_tag.isNull())
552 ARCANE_THROW(NotImplementedException, "legacyProbe with ANY_TAG");
553 FullRankInfo orig_fri = m_rank_tag_builder.rank(orig);
554 FullRankInfo dest_fri = m_rank_tag_builder.rank(dest);
555 MP::MessageSourceInfo message_source_info;
556 Int32 found_dest = dest.value();
557 const bool is_any_source = dest.isNull() || dest.isAnySource();
558 if (dest.isNull() && !m_is_allow_null_rank_for_any_source)
559 ARCANE_FATAL("Can not use legacyProbe() with null rank. Use MessageRank::anySourceRank() instead");
560 if (is_any_source) {
561 // Since we don't know who we will receive from, we must test
562 // both the thread queue and via MPI.
563 MP::PointToPointMessageInfo p2p_message(message);
564 p2p_message.setEmiterRank(orig_fri.localRank());
565 message_source_info = m_thread_queue->legacyProbe(p2p_message);
566 if (message_source_info.isValid()) {
567 // We found a message in the thread list.
568 // Since we are in our thread list, the global rank is our
569 // MPI rank + the found local rank.
570 found_dest = orig_fri.mpiRankValue() * m_local_nb_rank + message_source_info.rank().value();
571 TRACE_DEBUG(2, "LegacyProbe with null_rank (thread) orig={0} found_dest={1} tag={2}",
572 orig, found_dest, user_tag);
573 }
574 else {
575 // Search via MPI.
576 // The difficulty is that the local rank of the originating PE of the message
577 // is encoded in the tag and we do not know the originating PE.
578 // Therefore, we must test all potential tags. Their number is
579 // equal to 'm_nb_local_rank'.
580 for (Integer z = 0, zn = m_local_nb_rank; z < zn; ++z) {
581 MP::PointToPointMessageInfo mpi_message(message);
582 MessageTag mpi_tag = m_rank_tag_builder.tagForReceive(user_tag, orig_fri.localRank(), MessageRank(z));
583 mpi_message.setTag(mpi_tag);
584 TRACE_DEBUG(2, "LegacyProbe with null_rank orig={0} dest={1} tag={2}", orig, dest, mpi_tag);
585 message_source_info = m_mpi_adapter->legacyProbeMessage(mpi_message);
586 if (message_source_info.isValid()) {
587 // We found an MPI message. We must extract the local rank
588 // from the tag. The MPI rank is the one in the message.
589 MessageRank mpi_rank = message_source_info.rank();
590 MessageTag ret_tag = message_source_info.tag();
591 Int32 local_rank = m_rank_tag_builder.getReceiveRankFromTag(ret_tag);
592 found_dest = mpi_rank.value() * m_local_nb_rank + local_rank;
593 TRACE_DEBUG(2, "LegacyProbe null rank found mpi_rank={0} local_rank={1} tag={2}",
594 ret_tag, mpi_rank, local_rank, ret_tag);
595 // Restore the original tag to be able to perform a receive with it.
596 message_source_info.setTag(user_tag);
597 break;
598 }
599 }
600 }
601 }
602 else {
603 // The rank `dest` must be converted to the rank expected by the thread queue
604 // or by MPI.
605 if (orig_fri.mpiRank() == dest_fri.mpiRank()) {
606 MP::PointToPointMessageInfo p2p_message(message);
607 p2p_message.setDestinationRank(MP::MessageRank(dest_fri.localRank()));
608 p2p_message.setEmiterRank(MessageRank(orig_fri.localRank()));
609 TRACE_DEBUG(2, "LegacyProbe SHM orig={0} dest={1} tag={2}", orig, dest, user_tag);
610 message_source_info = m_thread_queue->legacyProbe(p2p_message);
611 }
612 else {
613 MP::PointToPointMessageInfo mpi_message(message);
614 MessageTag mpi_tag = m_rank_tag_builder.tagForReceive(user_tag, orig_fri, dest_fri);
615 mpi_message.setTag(mpi_tag);
616 mpi_message.setDestinationRank(MP::MessageRank(dest_fri.mpiRank()));
617 TRACE_DEBUG(2, "LegacyProbe MPI orig={0} dest={1} mpi_tag={2} user_tag={3}", orig, dest, mpi_tag, user_tag);
618 message_source_info = m_mpi_adapter->legacyProbeMessage(mpi_message);
619 if (message_source_info.isValid()) {
620 // Restore the original tag to be able to perform a receive with it.
621 message_source_info.setTag(user_tag);
622 }
623 }
624 }
625 if (message_source_info.isValid()) {
626 // The local rank returned by the previous methods must be transformed into a global rank
627 message_source_info.setRank(MessageRank(found_dest));
628 }
629 TRACE_DEBUG(2, "LegacyProbe has matched message? = {0}", message_source_info.isValid());
630 return message_source_info;
631}
632
633/*---------------------------------------------------------------------------*/
634/*---------------------------------------------------------------------------*/
635
636std::ostream& operator<<(std::ostream& o, const FullRankInfo& fri)
637{
638 return o << "(local=" << fri.m_local_rank << ",global="
639 << fri.m_global_rank << ",mpi=" << fri.m_mpi_rank << ")";
640}
641
642/*---------------------------------------------------------------------------*/
643/*---------------------------------------------------------------------------*/
644
645} // End namespace Arcane::MessagePassing
646
647/*---------------------------------------------------------------------------*/
648/*---------------------------------------------------------------------------*/
#define ARCANE_THROW(exception_class,...)
Macro for throwing an exception with formatting.
#define ARCANE_FATAL(...)
Macro throwing a FatalErrorException.
#define ARCCORE_FATAL(...)
Macro throwing a FatalErrorException.
Declarations of types and methods used by message exchange mechanisms.
Int32 Integer
Type representing an integer.
std::int32_t Int32
Signed integer type of 32 bits.