14#include "arcane/utils/FatalErrorException.h"
15#include "arcane/utils/NotImplementedException.h"
16#include "arcane/utils/NotSupportedException.h"
17#include "arcane/utils/ArgumentException.h"
18#include "arcane/utils/ITraceMng.h"
19#include "arcane/utils/ValueConvert.h"
21#include "arccore/common/internal/MemoryResourceMng.h"
23#include "arcane/parallel/thread/SharedMemoryMessageQueue.h"
24#include "arcane/parallel/thread/IAsyncQueue.h"
26#include "arcane/core/ISerializeMessage.h"
29#define TRACE_DEBUG(format_str, ...) \
30 if (m_is_debug && m_trace_mng) { \
31 m_trace_mng->info() << String::format(format_str, __VA_ARGS__); \
32 m_trace_mng->flush(); \
59 const ISerializer* send_serializer = send_info.serializer();
60 ISerializer* receive_serializer = receive_info.serializer();
61 if (receive_serializer) {
63 ARCANE_FATAL(
"No send serializer for receive serializer");
64 receive_serializer->
copy(send_serializer);
68 ByteConstSpan send_span = send_info.memoryBuffer();
69 ByteSpan receive_span = receive_info.memoryBuffer();
70 Int64 send_size = send_span.size();
71 Int64 receive_size = receive_span.size();
72 if (send_size > receive_size)
73 ARCANE_FATAL(
"Not enough memory for receiving message receive={0} send={1}",
74 receive_size, send_size);
82void SharedMemoryMessageRequest::
87 m_is_destroyed =
true;
99class RequestAsyncQueue
104 : m_async_queue(IAsyncQueue::createQueue())
108 delete m_async_queue;
115 m_async_queue->push(v);
153 SubQueue(SharedMemoryMessageQueue* master_queue,
MessageRank rank);
158 void setTraceMng(
ITraceMng* tm) { m_trace_mng = tm; }
161 void waitRequestAvailable();
162 void checkRequestAvailable();
176 SharedMemoryMessageQueue* m_master_queue =
nullptr;
183 bool m_is_debug =
false;
184 bool m_is_allow_null_rank_for_any_source =
true;
195 void _testOrWaitRequestAvailable(
bool is_blocking);
207SharedMemoryMessageQueue::SubQueue::
208SubQueue(SharedMemoryMessageQueue* master_queue,
MessageRank rank)
209: m_master_queue(master_queue)
213 m_is_allow_null_rank_for_any_source = v.value() != 0;
237 SubQueue* queue = m_master_queue->_getSubQueue(orig);
245SharedMemoryMessageRequest* SharedMemoryMessageQueue::SubQueue::
246addReceive(Int64 request_id,
const PointToPointMessageInfo& message, ReceiveBufferInfo buf)
248 SharedMemoryMessageRequest* tmr =
nullptr;
249 if (message.isRankTag()) {
250 MessageTag tag = message.tag();
251 MessageRank dest = message.destinationRank();
252 tmr = _createReceiveRequest(request_id, dest, tag, buf);
253 TRACE_DEBUG(
"** ADD RECV queue={0} id={1} ORIG={2} DEST={3} tag={4} tmr={5} size={6} serializer={7}",
254 this, request_id, m_rank, dest, tag, tmr, buf.memoryBuffer().size(), buf.serializer());
256 else if (message.isMessageId()) {
257 MessageId message_id = message.messageId();
258 MessageId::SourceInfo si = message_id.sourceInfo();
259 MessageRank dest = si.rank();
260 MessageTag tag = si.tag();
265 Int64 req_id = (size_t)message_id;
266 SharedMemoryMessageRequest* send_request =
nullptr;
267 for (
Integer i = 0, n = m_send_requests.size(); i < n; ++i)
268 if (m_send_requests[i]->
id() == req_id) {
269 send_request = m_send_requests[i];
273 ARCANE_FATAL(
"Can not find matching send request from MessageId");
275 tmr = _createReceiveRequest(request_id, dest, tag, buf);
276 TRACE_DEBUG(
"** ADD RECV FromMessageId queue={0} id={1} ORIG={2} DEST={3} tag={4} tmr={5} size={6} serializer={7}",
277 this, request_id, m_rank, dest, tag, tmr, buf.memoryBuffer().size(), buf.serializer());
278 tmr->setMatchingSendRequest(send_request);
281 ARCANE_THROW(NotSupportedException,
"Invalid 'MessageInfo'");
283 m_recv_requests.add(tmr);
290SharedMemoryMessageRequest* SharedMemoryMessageQueue::SubQueue::
291addSend(
Int64 request_id,
const PointToPointMessageInfo& message, SendBufferInfo buf)
293 MessageTag tag = message.tag();
296 MessageRank orig = message.emiterRank();
297 auto* tmr = _createSendRequest(request_id, orig, tag, buf);
298 m_async_message_queue.push(tmr);
299 TRACE_DEBUG(
"** ADD SEND queue={0} ORIG={1} DEST={2} tag={3} size={4} tmr={5} serializer={6}",
300 this, orig, m_rank, tag, buf.memoryBuffer().size(), tmr, buf.serializer());
307void SharedMemoryMessageQueue::SubQueue::
308_checkRequestDone(SharedMemoryMessageRequest* tmr)
332 _removeRequest(tmr, m_recv_requests);
334 _removeRequest(tmr, m_done_requests);
341void SharedMemoryMessageQueue::SubQueue::
342_testOrWaitRequestAvailable(
bool is_blocking)
346 sq = m_async_message_queue.pop();
348 sq = m_async_message_queue.tryPop();
350 if (sq->orig() == m_rank)
351 m_done_requests.add(sq);
353 m_send_requests.add(sq);
360void SharedMemoryMessageQueue::SubQueue::
361waitRequestAvailable()
364 _testOrWaitRequestAvailable(
true);
370void SharedMemoryMessageQueue::SubQueue::
371checkRequestAvailable()
373 _testOrWaitRequestAvailable(
false);
379void SharedMemoryMessageQueue::SubQueue::
380wait(SharedMemoryMessageRequest* tmr)
382 if (tmr->queue() !=
this)
384 TRACE_DEBUG(
"**** WAIT MESSAGE tmr={0} rank={1} recv?={2} dest={3}"
385 " nb_send={4} nb_done={5}",
386 tmr, m_rank, tmr->isRecv(),
387 tmr->dest(), m_send_requests.size(), m_done_requests.size());
388 while (!tmr->isDone()) {
389 _checkRequestDone(tmr);
391 if (!tmr->isDone()) {
393 waitRequestAvailable();
397 _cleanupRequestIfDone(tmr);
403void SharedMemoryMessageQueue::SubQueue::
404testRequest(SharedMemoryMessageRequest* tmr)
406 if (tmr->queue() !=
this)
409 _checkRequestDone(tmr);
410 _cleanupRequestIfDone(tmr);
416void SharedMemoryMessageQueue::SubQueue::
417_removeRequest(SharedMemoryMessageRequest* tmr, Array<SharedMemoryMessageRequest*>& requests)
419 for (
Integer i = 0, n = requests.size(); i < n; ++i) {
420 SharedMemoryMessageRequest* tmr2 = requests[i];
432bool SharedMemoryMessageQueue::SubQueue::
433_checkSendDone(SharedMemoryMessageRequest* tmr_send)
435 for (SharedMemoryMessageRequest* tmr : m_done_requests) {
436 if (tmr == tmr_send) {
437 tmr_send->setDone(
true);
459SharedMemoryMessageRequest* SharedMemoryMessageQueue::SubQueue::
460_getMatchingSendRequest(MessageRank recv_dest, MessageRank recv_orig, MessageTag tag)
462 bool is_any_tag = tag.isNull();
463 bool is_any_dest = recv_dest.isNull() || recv_dest.isAnySource();
464 if (recv_dest.isNull() && !m_is_allow_null_rank_for_any_source)
465 ARCANE_FATAL(
"Can not use probe() with null rank. Use MessageRank::anySourceRank() instead");
466 for (
Integer j = 0, n = m_send_requests.size(); j < n; ++j) {
467 SharedMemoryMessageRequest* tmr_send = m_send_requests[j];
468 TRACE_DEBUG(
"CHECK RECV DONE id={7} tmr_send={0} recv_dest={1}"
469 " recv_orig={2} send_dest={3} send_orig={4} request={5}/{6}\n",
470 tmr_send, recv_dest, recv_orig,
471 tmr_send->dest(), tmr_send->orig(), j, n, m_rank);
472 if (recv_orig == tmr_send->dest()) {
473 bool is_rank_ok = (recv_dest == tmr_send->orig()) || (is_any_dest && m_rank == recv_orig);
474 bool is_tag_ok = (is_any_tag || tmr_send->tag() == tag);
475 if (is_rank_ok && is_tag_ok) {
486bool SharedMemoryMessageQueue::SubQueue::
487_checkRecvDone(SharedMemoryMessageRequest* tmr_recv)
491 auto* tmr_send = tmr_recv->matchingSendRequest();
493 tmr_send = _getMatchingSendRequest(tmr_recv->dest(), tmr_recv->orig(), tmr_recv->tag());
495 tmr_recv->setSource(tmr_send->orig());
496 tmr_recv->copyFromSender(tmr_send);
497 tmr_recv->setDone(
true);
498 tmr_send->queue()->m_async_message_queue.push(tmr_send);
499 _removeRequest(tmr_send, m_send_requests);
508void SharedMemoryMessageQueue::SubQueue::
509waitSome(ArrayView<Request> requests, ArrayView<bool> requests_done,
510 bool is_non_blocking)
512 Integer nb_request = requests.size();
513 requests_done.fill(
false);
514 bool one_request_done =
false;
515 TRACE_DEBUG(
"** WAIT SOME REQUEST rank={0} nb_request={1}\n", m_rank, nb_request);
516 while (!one_request_done) {
518 checkRequestAvailable();
519 bool has_valid_request =
false;
520 for (
Integer i = 0; i < nb_request; ++i) {
521 Request request = requests[i];
522 if (!request.isValid())
524 SharedMemoryMessageRequest* tmr = requests[i];
527 this->testRequest(tmr);
529 one_request_done =
true;
530 requests_done[i] =
true;
532 if (requests[i].hasSubRequest())
533 ARCANE_THROW(NotImplementedException,
"handling of sub requests");
536 has_valid_request =
true;
539 if (one_request_done)
544 if (!has_valid_request)
551 TRACE_DEBUG(
"** WAIT REQUEST AVAILABLE rank={0}", m_rank);
552 waitRequestAvailable();
559MP::MessageId SharedMemoryMessageQueue::SubQueue::
560probe(
const MP::PointToPointMessageInfo& message)
562 TRACE_DEBUG(
"Probe rank={0} nb_send={1} nb_receive={2} queue={3} is_valid={4}",
563 m_rank, m_send_requests.size(), m_recv_requests.size(),
this, message.isValid());
564 if (!message.isValid())
568 if (!message.isRankTag())
569 ARCCORE_FATAL(
"Invalid message_info: message.isRankTag() is false");
571 MessageRank rank = message.destinationRank();
572 MessageTag tag = message.tag();
573 bool is_blocking = message.isBlocking();
575 ARCANE_THROW(NotImplementedException,
"blocking probe");
582 _testOrWaitRequestAvailable(is_blocking);
583 auto* req = _getMatchingSendRequest(rank, m_rank, tag);
587 Int64 send_size = req->sendBufferInfo().memoryBuffer().size();
588 MessageId::SourceInfo si(req->orig(), req->tag(), send_size);
589 return MessageId(si, (
size_t)req->id());
601MP::MessageSourceInfo SharedMemoryMessageQueue::SubQueue::
602legacyProbe(
const MP::PointToPointMessageInfo& message)
607 MP::MessageId message_id = probe(message);
608 if (message_id.isValid())
609 return message_id.sourceInfo();
619SharedMemoryMessageQueue::
620~SharedMemoryMessageQueue()
629void SharedMemoryMessageQueue::
632 m_atomic_request_id = 1;
633 m_nb_thread = nb_thread;
635 m_sub_queues.resize(nb_queue);
636 for (
Integer i = 0; i < nb_queue; ++i) {
637 m_sub_queues[i] =
new SubQueue(
this, MessageRank(i));
644SharedMemoryMessageQueue::SubQueue* SharedMemoryMessageQueue::
645_getSourceSubQueue(
const MP::PointToPointMessageInfo& message)
647 MessageRank orig = message.emiterRank();
649 ARCANE_THROW(ArgumentException,
"null message.sourceRank()");
650 return _getSubQueue(orig);
656SharedMemoryMessageQueue::SubQueue* SharedMemoryMessageQueue::
657_getDestinationSubQueue(
const MP::PointToPointMessageInfo& message)
659 MessageRank dest = message.destinationRank();
661 ARCANE_THROW(ArgumentException,
"null message.destinationRank()");
662 return _getSubQueue(dest);
668void SharedMemoryMessageQueue::
669waitAll(ArrayView<Request> requests)
671 Integer nb_request = requests.size();
674 UniqueArray<SharedMemoryMessageRequest*> sorted_requests(nb_request);
675 for (
Integer i = 0; i < nb_request; ++i) {
676 sorted_requests[i] = (SharedMemoryMessageRequest*)requests[i].requestAsVoidPtr();
678 std::sort(std::begin(sorted_requests), std::end(sorted_requests),
679 SharedMemoryMessageRequest::SortFunctor(m_nb_thread));
682 for(
Integer i=0; i<nb_request; ++i ){
683 SharedMemoryMessageRequest* tmr = sorted_requests[i];
684 cout << String::format(
"** WAIT FOR REQUEST tmr={0}\n",tmr);
688 for (
Integer i = 0; i < nb_request; ++i) {
689 SharedMemoryMessageRequest* tmr = sorted_requests[i];
692 sub_queue->wait(tmr);
695 if (requests[i].hasSubRequest())
696 ARCANE_THROW(NotImplementedException,
"handling of sub requests");
704void SharedMemoryMessageQueue::
705waitSome(
Int32 rank, ArrayView<Parallel::Request> requests,
706 ArrayView<bool> requests_done,
bool is_non_blocking)
708 requests_done.fill(
false);
709 auto sub_queue = _getSubQueue(MessageRank(rank));
710 sub_queue->waitSome(requests, requests_done, is_non_blocking);
716auto SharedMemoryMessageQueue::
717addReceive(
const PointToPointMessageInfo& message, ReceiveBufferInfo buf) -> Request
719 auto* sq = _getSourceSubQueue(message);
720 return _request(sq->addReceive(_getNextRequestId(), message, buf));
726auto SharedMemoryMessageQueue::
727addSend(
const PointToPointMessageInfo& message, SendBufferInfo buf) -> Request
729 auto* sq = _getDestinationSubQueue(message);
730 return _request(sq->addSend(_getNextRequestId(), message, buf));
736auto SharedMemoryMessageQueue::
737probe(
const MP::PointToPointMessageInfo& message) -> MessageId
739 auto* sq = _getSourceSubQueue(message);
740 return sq->probe(message);
746auto SharedMemoryMessageQueue::
747legacyProbe(
const MP::PointToPointMessageInfo& message) -> MessageSourceInfo
749 auto* sq = _getSourceSubQueue(message);
750 return sq->legacyProbe(message);
756void SharedMemoryMessageQueue::
757setTraceMng(
Int32 rank, ITraceMng* tm)
759 _getSubQueue(MessageRank(rank))->setTraceMng(tm);
765auto SharedMemoryMessageQueue::
766_request(SharedMemoryMessageRequest* tmr) -> Request
768 return Request(0,
this, tmr);
#define ARCANE_THROW(exception_class,...)
Macro for throwing an exception with formatting.
#define ARCANE_FATAL(...)
Macro throwing a FatalErrorException.
#define ARCCORE_FATAL(...)
Macro throwing a FatalErrorException.
Modifiable view of an array of type T.
Base class for 1D data vectors.
Constant view on a contiguous memory region containing fixed-size elements.
Template class for converting a type.
virtual void copy(const ISerializer *from)=0
Copies the data from from into this instance.
static void genericCopy(ConstMemoryView from, MutableMemoryView to)
Generic copy using platform::getDataMemoryRessourceMng().
Asynchronous queue allowing the exchange of information between threads.
Information for sending/receiving a point-to-point message.
Receive buffer information.
File for messages from a rank in shared memory.
void _cleanupRequestIfDone(SharedMemoryMessageRequest *tmr)
Cleans up the request tmr if it is finished.
Message within SharedMemoryMessageQueue.
SharedMemoryMessageRequest(SubQueue *queue, Int64 request_id, MessageRank orig, MessageRank dest, MessageTag tag, ReceiveBufferInfo buf)
Create a send request.
void copyFromSender(SharedMemoryMessageRequest *sender)
Copies the sender message information into the receive message.
Mutable view on a contiguous memory region containing fixed-size elements.
1D data vector with value semantics (STL style).
Declarations of types and methods used by message exchange mechanisms.
std::int64_t Int64
Signed integer type of 64 bits.
Int32 Integer
Type representing an integer.
std::int32_t Int32
Signed integer type of 32 bits.