14#include "arcane/utils/FatalErrorException.h"
15#include "arcane/utils/NotImplementedException.h"
16#include "arcane/utils/NotSupportedException.h"
17#include "arcane/utils/ArgumentException.h"
18#include "arcane/utils/ITraceMng.h"
19#include "arcane/utils/ValueConvert.h"
21#include "arcane/utils/internal/MemoryResourceMng.h"
23#include "arcane/parallel/thread/SharedMemoryMessageQueue.h"
24#include "arcane/parallel/thread/IAsyncQueue.h"
26#include "arcane/core/ISerializeMessage.h"
29#define TRACE_DEBUG(format_str,...) \
30 if (m_is_debug && m_trace_mng){ \
31 m_trace_mng->info() << String::format(format_str,__VA_ARGS__); \
32 m_trace_mng->flush();\
59 const ISerializer* send_serializer = send_info.serializer();
60 ISerializer* receive_serializer = receive_info.serializer();
61 if (receive_serializer){
63 ARCANE_FATAL(
"No send serializer for receive serializer");
64 receive_serializer->
copy(send_serializer);
68 ByteConstSpan send_span = send_info.memoryBuffer();
69 ByteSpan receive_span = receive_info.memoryBuffer();
70 Int64 send_size = send_span.size();
71 Int64 receive_size = receive_span.size();
72 if (send_size > receive_size)
73 ARCANE_FATAL(
"Not enough memory for receiving message receive={0} send={1}",
74 receive_size,send_size);
82void SharedMemoryMessageRequest::
87 m_is_destroyed =
true;
98class RequestAsyncQueue
102 : m_async_queue(IAsyncQueue::createQueue()){}
105 delete m_async_queue;
110 m_async_queue->push(v);
146 SubQueue(SharedMemoryMessageQueue* master_queue,
MessageRank rank);
151 void setTraceMng(
ITraceMng* tm) { m_trace_mng = tm; }
154 void waitRequestAvailable();
155 void checkRequestAvailable();
169 SharedMemoryMessageQueue* m_master_queue =
nullptr;
176 bool m_is_debug =
false;
177 bool m_is_allow_null_rank_for_any_source =
true;
188 void _testOrWaitRequestAvailable(
bool is_blocking);
200SharedMemoryMessageQueue::SubQueue::
201SubQueue(SharedMemoryMessageQueue* master_queue,
MessageRank rank)
202: m_master_queue(master_queue)
206 m_is_allow_null_rank_for_any_source = v.value() != 0;
230 SubQueue* queue = m_master_queue->_getSubQueue(orig);
238SharedMemoryMessageRequest* SharedMemoryMessageQueue::SubQueue::
239addReceive(Int64 request_id,
const PointToPointMessageInfo& message,ReceiveBufferInfo buf)
241 SharedMemoryMessageRequest* tmr =
nullptr;
242 if (message.isRankTag()){
243 MessageTag tag = message.tag();
244 MessageRank dest = message.destinationRank();
245 tmr = _createReceiveRequest(request_id,dest,tag,buf);
246 TRACE_DEBUG(
"** ADD RECV queue={0} id={1} ORIG={2} DEST={3} tag={4} tmr={5} size={6} serializer={7}",
247 this,request_id,m_rank,dest,tag,tmr,buf.memoryBuffer().size(),buf.serializer());
249 else if (message.isMessageId()){
250 MessageId message_id = message.messageId();
251 MessageId::SourceInfo si = message_id.sourceInfo();
252 MessageRank dest = si.rank();
253 MessageTag tag = si.tag();
258 Int64 req_id = (size_t)message_id;
259 SharedMemoryMessageRequest* send_request =
nullptr;
260 for(
Integer i=0, n=m_send_requests.size(); i<n; ++i )
261 if (m_send_requests[i]->
id()==req_id){
262 send_request = m_send_requests[i];
266 ARCANE_FATAL(
"Can not find matching send request from MessageId");
268 tmr = _createReceiveRequest(request_id,dest,tag,buf);
269 TRACE_DEBUG(
"** ADD RECV FromMessageId queue={0} id={1} ORIG={2} DEST={3} tag={4} tmr={5} size={6} serializer={7}",
270 this,request_id,m_rank,dest,tag,tmr,buf.memoryBuffer().size(),buf.serializer());
271 tmr->setMatchingSendRequest(send_request);
274 ARCANE_THROW(NotSupportedException,
"Invalid 'MessageInfo'");
276 m_recv_requests.add(tmr);
283SharedMemoryMessageRequest* SharedMemoryMessageQueue::SubQueue::
284addSend(
Int64 request_id,
const PointToPointMessageInfo& message,SendBufferInfo buf)
286 MessageTag tag = message.tag();
289 MessageRank orig = message.emiterRank();
290 auto* tmr = _createSendRequest(request_id,orig,tag,buf);
291 m_async_message_queue.push(tmr);
292 TRACE_DEBUG(
"** ADD SEND queue={0} ORIG={1} DEST={2} tag={3} size={4} tmr={5} serializer={6}",
293 this,orig,m_rank,tag,buf.memoryBuffer().size(),tmr,buf.serializer());
300void SharedMemoryMessageQueue::SubQueue::
301_checkRequestDone(SharedMemoryMessageRequest* tmr)
324 _removeRequest(tmr,m_recv_requests);
326 _removeRequest(tmr,m_done_requests);
333void SharedMemoryMessageQueue::SubQueue::
334_testOrWaitRequestAvailable(
bool is_blocking)
338 sq = m_async_message_queue.pop();
340 sq = m_async_message_queue.tryPop();
342 if (sq->orig()==m_rank)
343 m_done_requests.add(sq);
345 m_send_requests.add(sq);
352void SharedMemoryMessageQueue::SubQueue::
353waitRequestAvailable()
356 _testOrWaitRequestAvailable(
true);
362void SharedMemoryMessageQueue::SubQueue::
363checkRequestAvailable()
365 _testOrWaitRequestAvailable(
false);
371void SharedMemoryMessageQueue::SubQueue::
372wait(SharedMemoryMessageRequest* tmr)
374 if (tmr->queue()!=
this)
376 TRACE_DEBUG(
"**** WAIT MESSAGE tmr={0} rank={1} recv?={2} dest={3}"
377 " nb_send={4} nb_done={5}",tmr,m_rank,tmr->isRecv(),
378 tmr->dest(),m_send_requests.size(),m_done_requests.size());
379 while (!tmr->isDone()){
380 _checkRequestDone(tmr);
384 waitRequestAvailable();
388 _cleanupRequestIfDone(tmr);
394void SharedMemoryMessageQueue::SubQueue::
395testRequest(SharedMemoryMessageRequest* tmr)
397 if (tmr->queue()!=
this)
400 _checkRequestDone(tmr);
401 _cleanupRequestIfDone(tmr);
407void SharedMemoryMessageQueue::SubQueue::
408_removeRequest(SharedMemoryMessageRequest* tmr,Array<SharedMemoryMessageRequest*>& requests)
410 for(
Integer i=0, n=requests.size(); i<n; ++i ){
411 SharedMemoryMessageRequest* tmr2 = requests[i];
423bool SharedMemoryMessageQueue::SubQueue::
424_checkSendDone(SharedMemoryMessageRequest* tmr_send)
426 for( SharedMemoryMessageRequest* tmr : m_done_requests ){
428 tmr_send->setDone(
true);
450SharedMemoryMessageRequest* SharedMemoryMessageQueue::SubQueue::
451_getMatchingSendRequest(MessageRank recv_dest,MessageRank recv_orig,MessageTag tag)
453 bool is_any_tag = tag.isNull();
454 bool is_any_dest = recv_dest.isNull() || recv_dest.isAnySource();
455 if (recv_dest.isNull() && !m_is_allow_null_rank_for_any_source)
456 ARCANE_FATAL(
"Can not use probe() with null rank. Use MessageRank::anySourceRank() instead");
457 for(
Integer j=0, n=m_send_requests.size(); j<n; ++j ){
458 SharedMemoryMessageRequest* tmr_send = m_send_requests[j];
459 TRACE_DEBUG(
"CHECK RECV DONE id={7} tmr_send={0} recv_dest={1}"
460 " recv_orig={2} send_dest={3} send_orig={4} request={5}/{6}\n",
461 tmr_send,recv_dest,recv_orig,
462 tmr_send->dest(),tmr_send->orig(),j,n,m_rank);
463 if (recv_orig==tmr_send->dest()){
464 bool is_rank_ok = (recv_dest==tmr_send->orig()) || (is_any_dest && m_rank==recv_orig);
465 bool is_tag_ok = (is_any_tag || tmr_send->tag()==tag);
466 if (is_rank_ok && is_tag_ok) {
477bool SharedMemoryMessageQueue::SubQueue::
478_checkRecvDone(SharedMemoryMessageRequest* tmr_recv)
482 auto* tmr_send = tmr_recv->matchingSendRequest();
484 tmr_send = _getMatchingSendRequest(tmr_recv->dest(),tmr_recv->orig(),tmr_recv->tag());
486 tmr_recv->setSource(tmr_send->orig());
487 tmr_recv->copyFromSender(tmr_send);
488 tmr_recv->setDone(
true);
489 tmr_send->queue()->m_async_message_queue.push(tmr_send);
490 _removeRequest(tmr_send,m_send_requests);
499void SharedMemoryMessageQueue::SubQueue::
500waitSome(ArrayView<Request> requests,ArrayView<bool> requests_done,
501 bool is_non_blocking)
503 Integer nb_request = requests.size();
504 requests_done.fill(
false);
505 bool one_request_done =
false;
506 TRACE_DEBUG(
"** WAIT SOME REQUEST rank={0} nb_request={1}\n",m_rank,nb_request);
507 while (!one_request_done){
509 checkRequestAvailable();
510 bool has_valid_request =
false;
511 for(
Integer i=0; i<nb_request; ++i ){
512 Request request = requests[i];
513 if (!request.isValid())
515 SharedMemoryMessageRequest* tmr = requests[i];
518 this->testRequest(tmr);
520 one_request_done =
true;
521 requests_done[i] =
true;
523 if (requests[i].hasSubRequest())
524 ARCANE_THROW(NotImplementedException,
"handling of sub requests");
527 has_valid_request =
true;
530 if (one_request_done)
536 if (!has_valid_request)
543 TRACE_DEBUG(
"** WAIT REQUEST AVAILABLE rank={0}",m_rank);
544 waitRequestAvailable();
551MP::MessageId SharedMemoryMessageQueue::SubQueue::
552probe(
const MP::PointToPointMessageInfo& message)
554 TRACE_DEBUG(
"Probe rank={0} nb_send={1} nb_receive={2} queue={3} is_valid={4}",
555 m_rank,m_send_requests.size(),m_recv_requests.size(),
this,message.isValid());
556 if (!message.isValid())
560 if (!message.isRankTag())
561 ARCCORE_FATAL(
"Invalid message_info: message.isRankTag() is false");
563 MessageRank rank = message.destinationRank();
564 MessageTag tag = message.tag();
565 bool is_blocking = message.isBlocking();
575 _testOrWaitRequestAvailable(is_blocking);
576 auto* req = _getMatchingSendRequest(rank,m_rank,tag);
580 Int64 send_size = req->sendBufferInfo().memoryBuffer().size();
581 MessageId::SourceInfo si(req->orig(),req->tag(),send_size);
582 return MessageId(si,(
size_t)req->id());
594MP::MessageSourceInfo SharedMemoryMessageQueue::SubQueue::
595legacyProbe(
const MP::PointToPointMessageInfo& message)
600 MP::MessageId message_id = probe(message);
601 if (message_id.isValid())
602 return message_id.sourceInfo();
612SharedMemoryMessageQueue::
613~SharedMemoryMessageQueue()
622void SharedMemoryMessageQueue::
625 m_atomic_request_id = 1;
626 m_nb_thread = nb_thread;
628 m_sub_queues.resize(nb_queue);
629 for(
Integer i=0; i<nb_queue; ++i ){
630 m_sub_queues[i] =
new SubQueue(
this,MessageRank(i));
637SharedMemoryMessageQueue::SubQueue* SharedMemoryMessageQueue::
638_getSourceSubQueue(
const MP::PointToPointMessageInfo& message)
640 MessageRank orig = message.emiterRank();
642 ARCANE_THROW(ArgumentException,
"null message.sourceRank()");
643 return _getSubQueue(orig);
649SharedMemoryMessageQueue::SubQueue* SharedMemoryMessageQueue::
650_getDestinationSubQueue(
const MP::PointToPointMessageInfo& message)
652 MessageRank dest = message.destinationRank();
654 ARCANE_THROW(ArgumentException,
"null message.destinationRank()");
655 return _getSubQueue(dest);
661void SharedMemoryMessageQueue::
662waitAll(ArrayView<Request> requests)
664 Integer nb_request = requests.size();
667 UniqueArray<SharedMemoryMessageRequest*> sorted_requests(nb_request);
668 for(
Integer i=0; i<nb_request; ++i ){
669 sorted_requests[i] = (SharedMemoryMessageRequest*)requests[i].requestAsVoidPtr();
671 std::sort(std::begin(sorted_requests),std::end(sorted_requests),
672 SharedMemoryMessageRequest::SortFunctor(m_nb_thread));
675 for(
Integer i=0; i<nb_request; ++i ){
676 SharedMemoryMessageRequest* tmr = sorted_requests[i];
677 cout << String::format(
"** WAIT FOR REQUEST tmr={0}\n",tmr);
681 for(
Integer i=0; i<nb_request; ++i ){
682 SharedMemoryMessageRequest* tmr = sorted_requests[i];
685 sub_queue->wait(tmr);
688 if (requests[i].hasSubRequest())
689 ARCANE_THROW(NotImplementedException,
"handling of sub requests");
697void SharedMemoryMessageQueue::
698waitSome(
Int32 rank,ArrayView<Parallel::Request> requests,
699 ArrayView<bool> requests_done,
bool is_non_blocking)
701 requests_done.fill(
false);
702 auto sub_queue = _getSubQueue(MessageRank(rank));
703 sub_queue->waitSome(requests,requests_done,is_non_blocking);
709auto SharedMemoryMessageQueue::
710addReceive(
const PointToPointMessageInfo& message,ReceiveBufferInfo buf) -> Request
712 auto* sq = _getSourceSubQueue(message);
713 return _request(sq->addReceive(_getNextRequestId(),message,buf));
719auto SharedMemoryMessageQueue::
720addSend(
const PointToPointMessageInfo& message,SendBufferInfo buf) -> Request
722 auto* sq = _getDestinationSubQueue(message);
723 return _request(sq->addSend(_getNextRequestId(),message,buf));
729auto SharedMemoryMessageQueue::
730probe(
const MP::PointToPointMessageInfo& message) -> MessageId
732 auto* sq = _getSourceSubQueue(message);
733 return sq->probe(message);
739auto SharedMemoryMessageQueue::
740legacyProbe(
const MP::PointToPointMessageInfo& message) -> MessageSourceInfo
742 auto* sq = _getSourceSubQueue(message);
743 return sq->legacyProbe(message);
749void SharedMemoryMessageQueue::
750setTraceMng(
Int32 rank,ITraceMng* tm)
752 _getSubQueue(MessageRank(rank))->setTraceMng(tm);
758auto SharedMemoryMessageQueue::
759_request(SharedMemoryMessageRequest* tmr) -> Request
761 return Request(0,
this,tmr);
#define ARCANE_THROW(exception_class,...)
Macro pour envoyer une exception avec formattage.
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Vue modifiable d'un tableau d'un type T.
Tableau d'items de types quelconques.
Interface d'un sérialiseur.
virtual void copy(const ISerializer *from)=0
Copie les données de from dans cette instance.
Interface du gestionnaire de traces.
static void genericCopy(ConstMemoryView from, MutableMemoryView to)
Copie générique utilisant platform::getDataMemoryRessourceMng()
File asynchrone permettant d'échanger des informations entre threads.
Informations pour envoyer/recevoir un message point à point.
Informations des buffers de réception.
Informations des buffers d'envoie.
File pour les messages d'un rang en mémoire partagée.
void _cleanupRequestIfDone(SharedMemoryMessageRequest *tmr)
Nettoyage de la requête tmr si elle est finie.
Message entre SharedMemoryMessageQueue.
SharedMemoryMessageRequest(SubQueue *queue, Int64 request_id, MessageRank orig, MessageRank dest, MessageTag tag, ReceiveBufferInfo buf)
Créé une requête d'envoie.
void copyFromSender(SharedMemoryMessageRequest *sender)
Copie dans le message de réception les informations du message d'envoi.
Vecteur 1D de données avec sémantique par valeur (style STL).
Déclarations des types et méthodes utilisés par les mécanismes d'échange de messages.
std::int64_t Int64
Type entier signé sur 64 bits.
Int32 Integer
Type représentant un entier.
std::int32_t Int32
Type entier signé sur 32 bits.