Arcane  4.1.12.0
Developer documentation
Loading...
Searching...
No Matches
SharedMemoryMessageQueue.cc
1// -*- tab-width: 2; indent-tabs-mode: nil; coding: utf-8-with-signature -*-
2//-----------------------------------------------------------------------------
3// Copyright 2000-2026 CEA (www.cea.fr) IFPEN (www.ifpenergiesnouvelles.com)
4// See the top-level COPYRIGHT file for details.
5// SPDX-License-Identifier: Apache-2.0
6//-----------------------------------------------------------------------------
7/*---------------------------------------------------------------------------*/
8/* SharedMemoryMessageQueue.cc (C) 2000-2025 */
9/* */
10/* Implementation of a shared memory message queue. */
11/*---------------------------------------------------------------------------*/
12/*---------------------------------------------------------------------------*/
13
14#include "arcane/utils/FatalErrorException.h"
15#include "arcane/utils/NotImplementedException.h"
16#include "arcane/utils/NotSupportedException.h"
17#include "arcane/utils/ArgumentException.h"
18#include "arcane/utils/ITraceMng.h"
19#include "arcane/utils/ValueConvert.h"
20
21#include "arccore/common/internal/MemoryResourceMng.h"
22
23#include "arcane/parallel/thread/SharedMemoryMessageQueue.h"
24#include "arcane/parallel/thread/IAsyncQueue.h"
25
26#include "arcane/core/ISerializeMessage.h"
27
28// Macro to display messages for debugging
29#define TRACE_DEBUG(format_str, ...) \
30 if (m_is_debug && m_trace_mng) { \
31 m_trace_mng->info() << String::format(format_str, __VA_ARGS__); \
32 m_trace_mng->flush(); \
33 }
34
35/*---------------------------------------------------------------------------*/
36/*---------------------------------------------------------------------------*/
37
39{
40
41/*---------------------------------------------------------------------------*/
42/*---------------------------------------------------------------------------*/
43
55{
56 SendBufferInfo send_info = sender->sendBufferInfo();
57 ReceiveBufferInfo receive_info = this->receiveBufferInfo();
58
59 const ISerializer* send_serializer = send_info.serializer();
60 ISerializer* receive_serializer = receive_info.serializer();
61 if (receive_serializer) {
62 if (!send_serializer)
63 ARCANE_FATAL("No send serializer for receive serializer");
64 receive_serializer->copy(send_serializer);
65 return;
66 }
67
68 ByteConstSpan send_span = send_info.memoryBuffer();
69 ByteSpan receive_span = receive_info.memoryBuffer();
70 Int64 send_size = send_span.size();
71 Int64 receive_size = receive_span.size();
72 if (send_size > receive_size)
73 ARCANE_FATAL("Not enough memory for receiving message receive={0} send={1}",
74 receive_size, send_size);
75
77}
78
79/*---------------------------------------------------------------------------*/
80/*---------------------------------------------------------------------------*/
81
82void SharedMemoryMessageRequest::
83destroy()
84{
85 if (m_is_destroyed)
86 ARCANE_FATAL("Request already destroyed");
87 m_is_destroyed = true;
88 // Comment out for debug.
89 delete this;
90}
91
92/*---------------------------------------------------------------------------*/
93/*---------------------------------------------------------------------------*/
94
99class RequestAsyncQueue
100{
101 public:
102
103 RequestAsyncQueue()
104 : m_async_queue(IAsyncQueue::createQueue())
105 {}
106 ~RequestAsyncQueue()
107 {
108 delete m_async_queue;
109 }
110
111 public:
112
113 void push(SharedMemoryMessageRequest* v)
114 {
115 m_async_queue->push(v);
116 }
118 {
119 return reinterpret_cast<SharedMemoryMessageRequest*>(m_async_queue->pop());
120 }
122 {
123 return reinterpret_cast<SharedMemoryMessageRequest*>(m_async_queue->tryPop());
124 }
125
126 private:
127
128 IAsyncQueue* m_async_queue;
129};
130
131/*---------------------------------------------------------------------------*/
132/*---------------------------------------------------------------------------*/
133
134/*---------------------------------------------------------------------------*/
135/*---------------------------------------------------------------------------*/
136
148{
149 static MessageTag SERIALIZER_TAG() { return MessageTag(125); }
150
151 public:
152
153 SubQueue(SharedMemoryMessageQueue* master_queue, MessageRank rank);
154
155 public:
156
157 MessageRank rank() const { return m_rank; }
158 void setTraceMng(ITraceMng* tm) { m_trace_mng = tm; }
159 void wait(SharedMemoryMessageRequest* tmr);
160 void testRequest(SharedMemoryMessageRequest* tmr);
161 void waitRequestAvailable();
162 void checkRequestAvailable();
163 void waitSome(ArrayView<Request> requests, ArrayView<bool> requests_done, bool is_non_blocking);
164 MessageId probe(const PointToPointMessageInfo& message);
165 MessageSourceInfo legacyProbe(const PointToPointMessageInfo& message);
166
167 public:
168
170 addReceive(Int64 request_id, const PointToPointMessageInfo& message, ReceiveBufferInfo buf);
172 addSend(Int64 request_id, const PointToPointMessageInfo& message, SendBufferInfo buf);
173
174 private:
175
176 SharedMemoryMessageQueue* m_master_queue = nullptr;
177 MessageRank m_rank;
181 RequestAsyncQueue m_async_message_queue;
182 ITraceMng* m_trace_mng = nullptr;
183 bool m_is_debug = false;
184 bool m_is_allow_null_rank_for_any_source = true;
185
186 private:
187
188 void _removeRequest(SharedMemoryMessageRequest* tmr, Array<SharedMemoryMessageRequest*>& requests);
189 bool _checkSendDone(SharedMemoryMessageRequest* tmr_send);
190 bool _checkRecvDone(SharedMemoryMessageRequest* tmr_recv);
191 void _checkRequestDone(SharedMemoryMessageRequest* tmr);
194 _getMatchingSendRequest(MessageRank recv_dest, MessageRank recv_orig, MessageTag tag);
195 void _testOrWaitRequestAvailable(bool is_blocking);
197 _createReceiveRequest(Int64 request_id, MessageRank dest, MessageTag tag,
198 ReceiveBufferInfo receive_buffer);
200 _createSendRequest(Int64 request_id, MessageRank orig, MessageTag tag,
201 SendBufferInfo send_buffer);
202};
203
204/*---------------------------------------------------------------------------*/
205/*---------------------------------------------------------------------------*/
206
207SharedMemoryMessageQueue::SubQueue::
208SubQueue(SharedMemoryMessageQueue* master_queue, MessageRank rank)
209: m_master_queue(master_queue)
210, m_rank(rank)
211{
212 if (auto v = Convert::Type<Int32>::tryParseFromEnvironment("ARCCORE_ALLOW_NULL_RANK_FOR_MPI_ANY_SOURCE", true))
213 m_is_allow_null_rank_for_any_source = v.value() != 0;
214}
215
216/*---------------------------------------------------------------------------*/
217/*---------------------------------------------------------------------------*/
218
219/*---------------------------------------------------------------------------*/
220/*---------------------------------------------------------------------------*/
221
222SharedMemoryMessageRequest* SharedMemoryMessageQueue::SubQueue::
223_createReceiveRequest(Int64 request_id, MessageRank dest,
224 MessageTag tag, ReceiveBufferInfo receive_buffer)
225{
226 auto* tmr = new SharedMemoryMessageRequest(this, request_id, m_rank, dest, tag, receive_buffer);
227 return tmr;
228}
229
230/*---------------------------------------------------------------------------*/
231/*---------------------------------------------------------------------------*/
232
233SharedMemoryMessageRequest* SharedMemoryMessageQueue::SubQueue::
234_createSendRequest(Int64 request_id, MessageRank orig,
235 MessageTag tag, SendBufferInfo send_buffer)
236{
237 SubQueue* queue = m_master_queue->_getSubQueue(orig);
238 auto* tmr = new SharedMemoryMessageRequest(queue, request_id, orig, m_rank, tag, send_buffer);
239 return tmr;
240}
241
242/*---------------------------------------------------------------------------*/
243/*---------------------------------------------------------------------------*/
244
245SharedMemoryMessageRequest* SharedMemoryMessageQueue::SubQueue::
246addReceive(Int64 request_id, const PointToPointMessageInfo& message, ReceiveBufferInfo buf)
247{
248 SharedMemoryMessageRequest* tmr = nullptr;
249 if (message.isRankTag()) {
250 MessageTag tag = message.tag();
251 MessageRank dest = message.destinationRank();
252 tmr = _createReceiveRequest(request_id, dest, tag, buf);
253 TRACE_DEBUG("** ADD RECV queue={0} id={1} ORIG={2} DEST={3} tag={4} tmr={5} size={6} serializer={7}",
254 this, request_id, m_rank, dest, tag, tmr, buf.memoryBuffer().size(), buf.serializer());
255 }
256 else if (message.isMessageId()) {
257 MessageId message_id = message.messageId();
258 MessageId::SourceInfo si = message_id.sourceInfo();
259 MessageRank dest = si.rank();
260 MessageTag tag = si.tag();
261
262 // We know the 'send' request that matches this one. We must therefore
263 // position it in \a tmr now to ensure that the correct one is used.
264 // To do this, search for the send corresponding to the ID of our request
265 Int64 req_id = (size_t)message_id;
266 SharedMemoryMessageRequest* send_request = nullptr;
267 for (Integer i = 0, n = m_send_requests.size(); i < n; ++i)
268 if (m_send_requests[i]->id() == req_id) {
269 send_request = m_send_requests[i];
270 break;
271 }
272 if (!send_request)
273 ARCANE_FATAL("Can not find matching send request from MessageId");
274
275 tmr = _createReceiveRequest(request_id, dest, tag, buf);
276 TRACE_DEBUG("** ADD RECV FromMessageId queue={0} id={1} ORIG={2} DEST={3} tag={4} tmr={5} size={6} serializer={7}",
277 this, request_id, m_rank, dest, tag, tmr, buf.memoryBuffer().size(), buf.serializer());
278 tmr->setMatchingSendRequest(send_request);
279 }
280 else
281 ARCANE_THROW(NotSupportedException, "Invalid 'MessageInfo'");
282
283 m_recv_requests.add(tmr);
284 return tmr;
285}
286
287/*---------------------------------------------------------------------------*/
288/*---------------------------------------------------------------------------*/
289
290SharedMemoryMessageRequest* SharedMemoryMessageQueue::SubQueue::
291addSend(Int64 request_id, const PointToPointMessageInfo& message, SendBufferInfo buf)
292{
293 MessageTag tag = message.tag();
294 if (tag.isNull())
295 ARCANE_THROW(ArgumentException, "null tag");
296 MessageRank orig = message.emiterRank();
297 auto* tmr = _createSendRequest(request_id, orig, tag, buf);
298 m_async_message_queue.push(tmr);
299 TRACE_DEBUG("** ADD SEND queue={0} ORIG={1} DEST={2} tag={3} size={4} tmr={5} serializer={6}",
300 this, orig, m_rank, tag, buf.memoryBuffer().size(), tmr, buf.serializer());
301 return tmr;
302}
303
304/*---------------------------------------------------------------------------*/
305/*---------------------------------------------------------------------------*/
306
307void SharedMemoryMessageQueue::SubQueue::
308_checkRequestDone(SharedMemoryMessageRequest* tmr)
309{
310 if (tmr->isDone())
311 ARCANE_FATAL("Can not check already done request");
312
313 if (tmr->isRecv())
314 _checkRecvDone(tmr);
315 else
316 _checkSendDone(tmr);
317}
318
319/*---------------------------------------------------------------------------*/
320/*---------------------------------------------------------------------------*/
321
329{
330 if (tmr->isDone()) {
331 if (tmr->isRecv())
332 _removeRequest(tmr, m_recv_requests);
333 else
334 _removeRequest(tmr, m_done_requests);
335 }
336}
337
338/*---------------------------------------------------------------------------*/
339/*---------------------------------------------------------------------------*/
340
341void SharedMemoryMessageQueue::SubQueue::
342_testOrWaitRequestAvailable(bool is_blocking)
343{
344 SharedMemoryMessageRequest* sq = nullptr;
345 if (is_blocking)
346 sq = m_async_message_queue.pop();
347 else
348 sq = m_async_message_queue.tryPop();
349 if (sq) {
350 if (sq->orig() == m_rank)
351 m_done_requests.add(sq);
352 else
353 m_send_requests.add(sq);
354 }
355}
356
357/*---------------------------------------------------------------------------*/
358/*---------------------------------------------------------------------------*/
359
360void SharedMemoryMessageQueue::SubQueue::
361waitRequestAvailable()
362{
363 // Blocks until a message is received.
364 _testOrWaitRequestAvailable(true);
365}
366
367/*---------------------------------------------------------------------------*/
368/*---------------------------------------------------------------------------*/
369
370void SharedMemoryMessageQueue::SubQueue::
371checkRequestAvailable()
372{
373 _testOrWaitRequestAvailable(false);
374}
375
376/*---------------------------------------------------------------------------*/
377/*---------------------------------------------------------------------------*/
378
379void SharedMemoryMessageQueue::SubQueue::
380wait(SharedMemoryMessageRequest* tmr)
381{
382 if (tmr->queue() != this)
383 ARCANE_FATAL("Bad queue");
384 TRACE_DEBUG("**** WAIT MESSAGE tmr={0} rank={1} recv?={2} dest={3}"
385 " nb_send={4} nb_done={5}",
386 tmr, m_rank, tmr->isRecv(),
387 tmr->dest(), m_send_requests.size(), m_done_requests.size());
388 while (!tmr->isDone()) {
389 _checkRequestDone(tmr);
390
391 if (!tmr->isDone()) {
392 // Blocks until a message is received.
393 waitRequestAvailable();
394 }
395 }
396
397 _cleanupRequestIfDone(tmr);
398}
399
400/*---------------------------------------------------------------------------*/
401/*---------------------------------------------------------------------------*/
402
403void SharedMemoryMessageQueue::SubQueue::
404testRequest(SharedMemoryMessageRequest* tmr)
405{
406 if (tmr->queue() != this)
407 ARCANE_FATAL("Bad queue");
408
409 _checkRequestDone(tmr);
410 _cleanupRequestIfDone(tmr);
411}
412
413/*---------------------------------------------------------------------------*/
414/*---------------------------------------------------------------------------*/
415
416void SharedMemoryMessageQueue::SubQueue::
417_removeRequest(SharedMemoryMessageRequest* tmr, Array<SharedMemoryMessageRequest*>& requests)
418{
419 for (Integer i = 0, n = requests.size(); i < n; ++i) {
420 SharedMemoryMessageRequest* tmr2 = requests[i];
421 if (tmr == tmr2) {
422 requests.remove(i);
423 return;
424 }
425 }
426 ARCANE_FATAL("Can not remove request");
427}
428
429/*---------------------------------------------------------------------------*/
430/*---------------------------------------------------------------------------*/
431
432bool SharedMemoryMessageQueue::SubQueue::
433_checkSendDone(SharedMemoryMessageRequest* tmr_send)
434{
435 for (SharedMemoryMessageRequest* tmr : m_done_requests) {
436 if (tmr == tmr_send) {
437 tmr_send->setDone(true);
438 return true;
439 }
440 }
441 return false;
442}
443
444/*---------------------------------------------------------------------------*/
445/*---------------------------------------------------------------------------*/
446
447/*
448 * \brief Checks if a send message matches the request \a tmr_recv.
449 *
450 * Checks if a request in the list of sent messages matches
451 * \a tmr_recv. This is the case if the source/destination pair is suitable
452 * or if the destination is A_NULL_RANK (which corresponds to MPI_ANY_SOURCE
453 * in the MPI case).
454 * It is important to process requests in order of arrival to comply with the MPI standard.
455 *
456 * \retval true if a matching request was found.
457 * \retval false otherwise.
458 */
459SharedMemoryMessageRequest* SharedMemoryMessageQueue::SubQueue::
460_getMatchingSendRequest(MessageRank recv_dest, MessageRank recv_orig, MessageTag tag)
461{
462 bool is_any_tag = tag.isNull();
463 bool is_any_dest = recv_dest.isNull() || recv_dest.isAnySource();
464 if (recv_dest.isNull() && !m_is_allow_null_rank_for_any_source)
465 ARCANE_FATAL("Can not use probe() with null rank. Use MessageRank::anySourceRank() instead");
466 for (Integer j = 0, n = m_send_requests.size(); j < n; ++j) {
467 SharedMemoryMessageRequest* tmr_send = m_send_requests[j];
468 TRACE_DEBUG("CHECK RECV DONE id={7} tmr_send={0} recv_dest={1}"
469 " recv_orig={2} send_dest={3} send_orig={4} request={5}/{6}\n",
470 tmr_send, recv_dest, recv_orig,
471 tmr_send->dest(), tmr_send->orig(), j, n, m_rank);
472 if (recv_orig == tmr_send->dest()) {
473 bool is_rank_ok = (recv_dest == tmr_send->orig()) || (is_any_dest && m_rank == recv_orig);
474 bool is_tag_ok = (is_any_tag || tmr_send->tag() == tag);
475 if (is_rank_ok && is_tag_ok) {
476 return tmr_send;
477 }
478 }
479 }
480 return nullptr;
481}
482
483/*---------------------------------------------------------------------------*/
484/*---------------------------------------------------------------------------*/
485
486bool SharedMemoryMessageQueue::SubQueue::
487_checkRecvDone(SharedMemoryMessageRequest* tmr_recv)
488{
489 // Checks if the send is already associated with our request. This is the case
490 // if we used a probe() call.
491 auto* tmr_send = tmr_recv->matchingSendRequest();
492 if (!tmr_send)
493 tmr_send = _getMatchingSendRequest(tmr_recv->dest(), tmr_recv->orig(), tmr_recv->tag());
494 if (tmr_send) {
495 tmr_recv->setSource(tmr_send->orig());
496 tmr_recv->copyFromSender(tmr_send);
497 tmr_recv->setDone(true);
498 tmr_send->queue()->m_async_message_queue.push(tmr_send);
499 _removeRequest(tmr_send, m_send_requests);
500 return true;
501 }
502 return false;
503}
504
505/*---------------------------------------------------------------------------*/
506/*---------------------------------------------------------------------------*/
507
508void SharedMemoryMessageQueue::SubQueue::
509waitSome(ArrayView<Request> requests, ArrayView<bool> requests_done,
510 bool is_non_blocking)
511{
512 Integer nb_request = requests.size();
513 requests_done.fill(false);
514 bool one_request_done = false;
515 TRACE_DEBUG("** WAIT SOME REQUEST rank={0} nb_request={1}\n", m_rank, nb_request);
516 while (!one_request_done) {
517 if (is_non_blocking)
518 checkRequestAvailable();
519 bool has_valid_request = false;
520 for (Integer i = 0; i < nb_request; ++i) {
521 Request request = requests[i];
522 if (!request.isValid())
523 continue;
524 SharedMemoryMessageRequest* tmr = requests[i];
525 if (!tmr)
526 continue;
527 this->testRequest(tmr);
528 if (tmr->isDone()) {
529 one_request_done = true;
530 requests_done[i] = true;
531 tmr->destroy();
532 if (requests[i].hasSubRequest())
533 ARCANE_THROW(NotImplementedException, "handling of sub requests");
534 requests[i].reset();
535 }
536 has_valid_request = true;
537 }
538 // If at least one request is completed, exit the loop.
539 if (one_request_done)
540 break;
541 // If not, no request succeeded. We block until there is
542 // a message in the queue unless there are no valid requests
543 // (This is possible if all requests in the list are null requests).
544 if (!has_valid_request)
545 break;
546 // In non-blocking mode, we already tested at the beginning of the loop if
547 // a request is available. If we are here, we can exit directly
548 // otherwise we will block.
549 if (is_non_blocking)
550 break;
551 TRACE_DEBUG("** WAIT REQUEST AVAILABLE rank={0}", m_rank);
552 waitRequestAvailable();
553 }
554}
555
556/*---------------------------------------------------------------------------*/
557/*---------------------------------------------------------------------------*/
558
559MP::MessageId SharedMemoryMessageQueue::SubQueue::
560probe(const MP::PointToPointMessageInfo& message)
561{
562 TRACE_DEBUG("Probe rank={0} nb_send={1} nb_receive={2} queue={3} is_valid={4}",
563 m_rank, m_send_requests.size(), m_recv_requests.size(), this, message.isValid());
564 if (!message.isValid())
565 return MessageId();
566
567 // The message must be initialized with a (rank/tag) pair.
568 if (!message.isRankTag())
569 ARCCORE_FATAL("Invalid message_info: message.isRankTag() is false");
570
571 MessageRank rank = message.destinationRank();
572 MessageTag tag = message.tag();
573 bool is_blocking = message.isBlocking();
574 if (is_blocking)
575 ARCANE_THROW(NotImplementedException, "blocking probe");
576
577 // TODO: look into adding anti-loop safety
578 // TODO: we should check that if this
579 // method is called twice with the same information, the same message is not retrieved.
580 // When that is the case, legacyProbe() will need to be modified accordingly.
581 for (;;) {
582 _testOrWaitRequestAvailable(is_blocking);
583 auto* req = _getMatchingSendRequest(rank, m_rank, tag);
584 if (req) {
585 // TODO: Verify that the request has a buffer and not an
586 // 'ISerializer'.
587 Int64 send_size = req->sendBufferInfo().memoryBuffer().size();
588 MessageId::SourceInfo si(req->orig(), req->tag(), send_size);
589 return MessageId(si, (size_t)req->id());
590 }
591 if (!is_blocking)
592 // In non-blocking mode, exit the loop even if no request is found.
593 break;
594 }
595 return {};
596}
597
598/*---------------------------------------------------------------------------*/
599/*---------------------------------------------------------------------------*/
600
601MP::MessageSourceInfo SharedMemoryMessageQueue::SubQueue::
602legacyProbe(const MP::PointToPointMessageInfo& message)
603{
604 // Performs a normal probe but does not retain message information.
605 // NOTE: this works because probe() can return the same
606 // message multiple times. When that is no longer the case, this will need to be modified.
607 MP::MessageId message_id = probe(message);
608 if (message_id.isValid())
609 return message_id.sourceInfo();
610 return {};
611}
612
613/*---------------------------------------------------------------------------*/
614/*---------------------------------------------------------------------------*/
615
616/*---------------------------------------------------------------------------*/
617/*---------------------------------------------------------------------------*/
618
619SharedMemoryMessageQueue::
620~SharedMemoryMessageQueue()
621{
622 for (SubQueue* sq : m_sub_queues)
623 delete sq;
624}
625
626/*---------------------------------------------------------------------------*/
627/*---------------------------------------------------------------------------*/
628
629void SharedMemoryMessageQueue::
630init(Integer nb_thread)
631{
632 m_atomic_request_id = 1;
633 m_nb_thread = nb_thread;
634 Integer nb_queue = nb_thread;
635 m_sub_queues.resize(nb_queue);
636 for (Integer i = 0; i < nb_queue; ++i) {
637 m_sub_queues[i] = new SubQueue(this, MessageRank(i));
638 }
639}
640
641/*---------------------------------------------------------------------------*/
642/*---------------------------------------------------------------------------*/
643
644SharedMemoryMessageQueue::SubQueue* SharedMemoryMessageQueue::
645_getSourceSubQueue(const MP::PointToPointMessageInfo& message)
646{
647 MessageRank orig = message.emiterRank();
648 if (orig.isNull())
649 ARCANE_THROW(ArgumentException, "null message.sourceRank()");
650 return _getSubQueue(orig);
651}
652
653/*---------------------------------------------------------------------------*/
654/*---------------------------------------------------------------------------*/
655
656SharedMemoryMessageQueue::SubQueue* SharedMemoryMessageQueue::
657_getDestinationSubQueue(const MP::PointToPointMessageInfo& message)
658{
659 MessageRank dest = message.destinationRank();
660 if (dest.isNull())
661 ARCANE_THROW(ArgumentException, "null message.destinationRank()");
662 return _getSubQueue(dest);
663}
664
665/*---------------------------------------------------------------------------*/
666/*---------------------------------------------------------------------------*/
667
668void SharedMemoryMessageQueue::
669waitAll(ArrayView<Request> requests)
670{
671 Integer nb_request = requests.size();
672 // To prevent blocking, the requests must be processed
673 // in ascending order of queues, and 'receive' FIRST!!
674 UniqueArray<SharedMemoryMessageRequest*> sorted_requests(nb_request);
675 for (Integer i = 0; i < nb_request; ++i) {
676 sorted_requests[i] = (SharedMemoryMessageRequest*)requests[i].requestAsVoidPtr();
677 }
678 std::sort(std::begin(sorted_requests), std::end(sorted_requests),
679 SharedMemoryMessageRequest::SortFunctor(m_nb_thread));
680
681#if 0
682 for( Integer i=0; i<nb_request; ++i ){
683 SharedMemoryMessageRequest* tmr = sorted_requests[i];
684 cout << String::format("** WAIT FOR REQUEST tmr={0}\n",tmr);
685 }
686#endif
687
688 for (Integer i = 0; i < nb_request; ++i) {
689 SharedMemoryMessageRequest* tmr = sorted_requests[i];
690 if (tmr) {
691 SubQueue* sub_queue = tmr->queue();
692 sub_queue->wait(tmr);
693 tmr->destroy();
694 }
695 if (requests[i].hasSubRequest())
696 ARCANE_THROW(NotImplementedException, "handling of sub requests");
697 requests[i].reset();
698 }
699}
700
701/*---------------------------------------------------------------------------*/
702/*---------------------------------------------------------------------------*/
703
704void SharedMemoryMessageQueue::
705waitSome(Int32 rank, ArrayView<Parallel::Request> requests,
706 ArrayView<bool> requests_done, bool is_non_blocking)
707{
708 requests_done.fill(false);
709 auto sub_queue = _getSubQueue(MessageRank(rank));
710 sub_queue->waitSome(requests, requests_done, is_non_blocking);
711}
712
713/*---------------------------------------------------------------------------*/
714/*---------------------------------------------------------------------------*/
715
716auto SharedMemoryMessageQueue::
717addReceive(const PointToPointMessageInfo& message, ReceiveBufferInfo buf) -> Request
718{
719 auto* sq = _getSourceSubQueue(message);
720 return _request(sq->addReceive(_getNextRequestId(), message, buf));
721}
722
723/*---------------------------------------------------------------------------*/
724/*---------------------------------------------------------------------------*/
725
726auto SharedMemoryMessageQueue::
727addSend(const PointToPointMessageInfo& message, SendBufferInfo buf) -> Request
728{
729 auto* sq = _getDestinationSubQueue(message);
730 return _request(sq->addSend(_getNextRequestId(), message, buf));
731}
732
733/*---------------------------------------------------------------------------*/
734/*---------------------------------------------------------------------------*/
735
736auto SharedMemoryMessageQueue::
737probe(const MP::PointToPointMessageInfo& message) -> MessageId
738{
739 auto* sq = _getSourceSubQueue(message);
740 return sq->probe(message);
741}
742
743/*---------------------------------------------------------------------------*/
744/*---------------------------------------------------------------------------*/
745
746auto SharedMemoryMessageQueue::
747legacyProbe(const MP::PointToPointMessageInfo& message) -> MessageSourceInfo
748{
749 auto* sq = _getSourceSubQueue(message);
750 return sq->legacyProbe(message);
751}
752
753/*---------------------------------------------------------------------------*/
754/*---------------------------------------------------------------------------*/
755
756void SharedMemoryMessageQueue::
757setTraceMng(Int32 rank, ITraceMng* tm)
758{
759 _getSubQueue(MessageRank(rank))->setTraceMng(tm);
760}
761
762/*---------------------------------------------------------------------------*/
763/*---------------------------------------------------------------------------*/
764
765auto SharedMemoryMessageQueue::
766_request(SharedMemoryMessageRequest* tmr) -> Request
767{
768 return Request(0, this, tmr);
769}
770
771/*---------------------------------------------------------------------------*/
772/*---------------------------------------------------------------------------*/
773
774} // End namespace Arcane::MessagePassing
775
776/*---------------------------------------------------------------------------*/
777/*---------------------------------------------------------------------------*/
#define ARCANE_THROW(exception_class,...)
Macro for throwing an exception with formatting.
#define ARCANE_FATAL(...)
Macro throwing a FatalErrorException.
#define ARCCORE_FATAL(...)
Macro throwing a FatalErrorException.
Modifiable view of an array of type T.
Base class for 1D data vectors.
Constant view on a contiguous memory region containing fixed-size elements.
Template class for converting a type.
virtual void copy(const ISerializer *from)=0
Copies the data from from into this instance.
static void genericCopy(ConstMemoryView from, MutableMemoryView to)
Generic copy using platform::getDataMemoryRessourceMng().
Asynchronous queue allowing the exchange of information between threads.
Definition IAsyncQueue.h:33
Information for sending/receiving a point-to-point message.
void _cleanupRequestIfDone(SharedMemoryMessageRequest *tmr)
Cleans up the request tmr if it is finished.
SharedMemoryMessageRequest(SubQueue *queue, Int64 request_id, MessageRank orig, MessageRank dest, MessageTag tag, ReceiveBufferInfo buf)
Create a send request.
void copyFromSender(SharedMemoryMessageRequest *sender)
Copies the sender message information into the receive message.
Mutable view on a contiguous memory region containing fixed-size elements.
1D data vector with value semantics (STL style).
Declarations of types and methods used by message exchange mechanisms.
std::int64_t Int64
Signed integer type of 64 bits.
Int32 Integer
Type representing an integer.
std::int32_t Int32
Signed integer type of 32 bits.