14#include "arcane/utils/FatalErrorException.h"
15#include "arcane/utils/NotImplementedException.h"
16#include "arcane/utils/NotSupportedException.h"
17#include "arcane/utils/ArgumentException.h"
18#include "arcane/utils/ITraceMng.h"
19#include "arcane/utils/ValueConvert.h"
21#include "arcane/utils/internal/MemoryResourceMng.h"
23#include "arcane/parallel/thread/SharedMemoryMessageQueue.h"
24#include "arcane/parallel/thread/IAsyncQueue.h"
26#include "arcane/core/ISerializeMessage.h"
29#define TRACE_DEBUG(format_str,...) \
30 if (m_is_debug && m_trace_mng){ \
31 m_trace_mng->info() << String::format(format_str,__VA_ARGS__); \
32 m_trace_mng->flush();\
63 ARCANE_FATAL(
"No send serializer for receive serializer");
73 ARCANE_FATAL(
"Not enough memory for receiving message receive={0} send={1}",
82void SharedMemoryMessageRequest::
87 m_is_destroyed =
true;
102 : m_async_queue(IAsyncQueue::createQueue()){}
105 delete m_async_queue;
110 m_async_queue->
push(v);
154 void waitRequestAvailable();
155 void checkRequestAvailable();
176 bool m_is_debug =
false;
177 bool m_is_allow_null_rank_for_any_source =
true;
188 void _testOrWaitRequestAvailable(
bool is_blocking);
200SharedMemoryMessageQueue::SubQueue::
206 m_is_allow_null_rank_for_any_source = v.value() != 0;
226SharedMemoryMessageRequest* SharedMemoryMessageQueue::SubQueue::
230 SubQueue* queue = m_master_queue->_getSubQueue(orig);
238SharedMemoryMessageRequest* SharedMemoryMessageQueue::SubQueue::
239addReceive(
Int64 request_id,
const PointToPointMessageInfo& message,ReceiveBufferInfo buf)
241 SharedMemoryMessageRequest* tmr =
nullptr;
242 if (message.isRankTag()){
243 MessageTag tag = message.tag();
244 MessageRank dest = message.destinationRank();
245 tmr = _createReceiveRequest(request_id,dest,tag,buf);
246 TRACE_DEBUG(
"** ADD RECV queue={0} id={1} ORIG={2} DEST={3} tag={4} tmr={5} size={6} serializer={7}",
247 this,request_id,m_rank,dest,tag,tmr,buf.memoryBuffer().size(),buf.serializer());
249 else if (message.isMessageId()){
250 MessageId message_id = message.messageId();
251 MessageId::SourceInfo si = message_id.
sourceInfo();
252 MessageRank dest = si.rank();
253 MessageTag tag = si.tag();
258 Int64 req_id = (size_t)message_id;
259 SharedMemoryMessageRequest* send_request =
nullptr;
260 for( Integer i=0, n=m_send_requests.size(); i<n; ++i )
261 if (m_send_requests[i]->
id()==req_id){
262 send_request = m_send_requests[i];
266 ARCANE_FATAL(
"Can not find matching send request from MessageId");
268 tmr = _createReceiveRequest(request_id,dest,tag,buf);
269 TRACE_DEBUG(
"** ADD RECV FromMessageId queue={0} id={1} ORIG={2} DEST={3} tag={4} tmr={5} size={6} serializer={7}",
270 this,request_id,m_rank,dest,tag,tmr,buf.memoryBuffer().size(),buf.serializer());
271 tmr->setMatchingSendRequest(send_request);
274 ARCANE_THROW(NotSupportedException,
"Invalid 'MessageInfo'");
276 m_recv_requests.add(tmr);
283SharedMemoryMessageRequest* SharedMemoryMessageQueue::SubQueue::
284addSend(
Int64 request_id,
const PointToPointMessageInfo& message,SendBufferInfo buf)
286 MessageTag tag = message.tag();
289 MessageRank orig = message.emiterRank();
290 auto* tmr = _createSendRequest(request_id,orig,tag,buf);
291 m_async_message_queue.push(tmr);
292 TRACE_DEBUG(
"** ADD SEND queue={0} ORIG={1} DEST={2} tag={3} size={4} tmr={5} serializer={6}",
293 this,orig,m_rank,tag,buf.memoryBuffer().size(),tmr,buf.serializer());
300void SharedMemoryMessageQueue::SubQueue::
301_checkRequestDone(SharedMemoryMessageRequest* tmr)
324 _removeRequest(
tmr,m_recv_requests);
326 _removeRequest(
tmr,m_done_requests);
333void SharedMemoryMessageQueue::SubQueue::
338 sq = m_async_message_queue.pop();
340 sq = m_async_message_queue.tryPop();
342 if (
sq->orig()==m_rank)
343 m_done_requests.add(
sq);
345 m_send_requests.add(
sq);
352void SharedMemoryMessageQueue::SubQueue::
353waitRequestAvailable()
356 _testOrWaitRequestAvailable(
true);
362void SharedMemoryMessageQueue::SubQueue::
363checkRequestAvailable()
365 _testOrWaitRequestAvailable(
false);
371void SharedMemoryMessageQueue::SubQueue::
372wait(SharedMemoryMessageRequest* tmr)
374 if (tmr->queue()!=
this)
376 TRACE_DEBUG(
"**** WAIT MESSAGE tmr={0} rank={1} recv?={2} dest={3}"
377 " nb_send={4} nb_done={5}",tmr,m_rank,tmr->isRecv(),
378 tmr->dest(),m_send_requests.size(),m_done_requests.size());
379 while (!tmr->isDone()){
380 _checkRequestDone(tmr);
384 waitRequestAvailable();
388 _cleanupRequestIfDone(tmr);
394void SharedMemoryMessageQueue::SubQueue::
395testRequest(SharedMemoryMessageRequest* tmr)
397 if (tmr->queue()!=
this)
400 _checkRequestDone(tmr);
401 _cleanupRequestIfDone(tmr);
407void SharedMemoryMessageQueue::SubQueue::
408_removeRequest(SharedMemoryMessageRequest* tmr,Array<SharedMemoryMessageRequest*>& requests)
410 for( Integer i=0, n=requests.size(); i<n; ++i ){
411 SharedMemoryMessageRequest* tmr2 = requests[i];
423bool SharedMemoryMessageQueue::SubQueue::
424_checkSendDone(SharedMemoryMessageRequest* tmr_send)
426 for( SharedMemoryMessageRequest* tmr : m_done_requests ){
428 tmr_send->setDone(
true);
450SharedMemoryMessageRequest* SharedMemoryMessageQueue::SubQueue::
451_getMatchingSendRequest(MessageRank recv_dest,MessageRank recv_orig,MessageTag tag)
453 bool is_any_tag = tag.isNull();
454 bool is_any_dest = recv_dest.isNull() || recv_dest.isAnySource();
455 if (recv_dest.isNull() && !m_is_allow_null_rank_for_any_source)
456 ARCANE_FATAL(
"Can not use probe() with null rank. Use MessageRank::anySourceRank() instead");
457 for( Integer j=0, n=m_send_requests.size(); j<n; ++j ){
458 SharedMemoryMessageRequest* tmr_send = m_send_requests[j];
459 TRACE_DEBUG(
"CHECK RECV DONE id={7} tmr_send={0} recv_dest={1}"
460 " recv_orig={2} send_dest={3} send_orig={4} request={5}/{6}\n",
461 tmr_send,recv_dest,recv_orig,
462 tmr_send->dest(),tmr_send->orig(),j,n,m_rank);
463 if (recv_orig==tmr_send->dest()){
464 bool is_rank_ok = (recv_dest==tmr_send->orig()) || (is_any_dest && m_rank==recv_orig);
465 bool is_tag_ok = (is_any_tag || tmr_send->tag()==tag);
466 if (is_rank_ok && is_tag_ok) {
477bool SharedMemoryMessageQueue::SubQueue::
478_checkRecvDone(SharedMemoryMessageRequest* tmr_recv)
482 auto* tmr_send = tmr_recv->matchingSendRequest();
484 tmr_send = _getMatchingSendRequest(tmr_recv->dest(),tmr_recv->orig(),tmr_recv->tag());
486 tmr_recv->setSource(tmr_send->orig());
487 tmr_recv->copyFromSender(tmr_send);
488 tmr_recv->setDone(
true);
489 tmr_send->queue()->m_async_message_queue.push(tmr_send);
490 _removeRequest(tmr_send,m_send_requests);
499void SharedMemoryMessageQueue::SubQueue::
500waitSome(ArrayView<Request> requests,ArrayView<bool> requests_done,
501 bool is_non_blocking)
503 Integer nb_request = requests.size();
504 requests_done.fill(
false);
505 bool one_request_done =
false;
506 TRACE_DEBUG(
"** WAIT SOME REQUEST rank={0} nb_request={1}\n",m_rank,nb_request);
507 while (!one_request_done){
509 checkRequestAvailable();
510 bool has_valid_request =
false;
511 for( Integer i=0; i<nb_request; ++i ){
512 Request request = requests[i];
513 if (!request.isValid())
515 SharedMemoryMessageRequest* tmr = requests[i];
518 this->testRequest(tmr);
520 one_request_done =
true;
521 requests_done[i] =
true;
523 if (requests[i].hasSubRequest())
524 ARCANE_THROW(NotImplementedException,
"handling of sub requests");
527 has_valid_request =
true;
530 if (one_request_done)
536 if (!has_valid_request)
543 TRACE_DEBUG(
"** WAIT REQUEST AVAILABLE rank={0}",m_rank);
544 waitRequestAvailable();
554 TRACE_DEBUG(
"Probe rank={0} nb_send={1} nb_receive={2} queue={3} is_valid={4}",
555 m_rank,m_send_requests.size(),m_recv_requests.size(),
this,message.isValid());
556 if (!message.isValid())
561 ARCCORE_FATAL(
"Invalid message_info: message.isRankTag() is false");
564 MessageTag tag = message.
tag();
575 _testOrWaitRequestAvailable(is_blocking);
576 auto* req = _getMatchingSendRequest(rank,m_rank,tag);
580 Int64 send_size = req->sendBufferInfo().memoryBuffer().size();
581 MessageId::SourceInfo si(req->orig(),req->tag(),send_size);
582 return MessageId(si,(
size_t)req->id());
601 if (message_id.isValid())
612SharedMemoryMessageQueue::
613~SharedMemoryMessageQueue()
615 for( SubQueue* sq : m_sub_queues )
622void SharedMemoryMessageQueue::
623init(Integer nb_thread)
625 m_atomic_request_id = 1;
626 m_nb_thread = nb_thread;
628 m_sub_queues.resize(nb_queue);
629 for( Integer i=0; i<nb_queue; ++i ){
630 m_sub_queues[i] =
new SubQueue(
this,MessageRank(i));
637SharedMemoryMessageQueue::SubQueue* SharedMemoryMessageQueue::
642 ARCANE_THROW(ArgumentException,
"null message.sourceRank()");
643 return _getSubQueue(orig);
649SharedMemoryMessageQueue::SubQueue* SharedMemoryMessageQueue::
654 ARCANE_THROW(ArgumentException,
"null message.destinationRank()");
655 return _getSubQueue(dest);
661void SharedMemoryMessageQueue::
662waitAll(ArrayView<Request> requests)
664 Integer nb_request = requests.size();
667 UniqueArray<SharedMemoryMessageRequest*> sorted_requests(nb_request);
668 for( Integer i=0; i<nb_request; ++i ){
669 sorted_requests[i] = (SharedMemoryMessageRequest*)requests[i].requestAsVoidPtr();
671 std::sort(std::begin(sorted_requests),std::end(sorted_requests),
672 SharedMemoryMessageRequest::SortFunctor(m_nb_thread));
675 for( Integer i=0; i<nb_request; ++i ){
676 SharedMemoryMessageRequest* tmr = sorted_requests[i];
677 cout << String::format(
"** WAIT FOR REQUEST tmr={0}\n",tmr);
681 for( Integer i=0; i<nb_request; ++i ){
682 SharedMemoryMessageRequest* tmr = sorted_requests[i];
684 SubQueue* sub_queue = tmr->queue();
685 sub_queue->wait(tmr);
688 if (requests[i].hasSubRequest())
689 ARCANE_THROW(NotImplementedException,
"handling of sub requests");
697void SharedMemoryMessageQueue::
698waitSome(
Int32 rank,ArrayView<Parallel::Request> requests,
699 ArrayView<bool> requests_done,
bool is_non_blocking)
701 requests_done.fill(
false);
702 auto sub_queue = _getSubQueue(MessageRank(rank));
703 sub_queue->waitSome(requests,requests_done,is_non_blocking);
709auto SharedMemoryMessageQueue::
710addReceive(
const PointToPointMessageInfo& message,ReceiveBufferInfo buf) -> Request
712 auto* sq = _getSourceSubQueue(message);
713 return _request(sq->addReceive(_getNextRequestId(),message,buf));
719auto SharedMemoryMessageQueue::
720addSend(
const PointToPointMessageInfo& message,SendBufferInfo buf) -> Request
722 auto* sq = _getDestinationSubQueue(message);
723 return _request(sq->addSend(_getNextRequestId(),message,buf));
729auto SharedMemoryMessageQueue::
732 auto* sq = _getSourceSubQueue(message);
733 return sq->probe(message);
739auto SharedMemoryMessageQueue::
742 auto* sq = _getSourceSubQueue(message);
743 return sq->legacyProbe(message);
749void SharedMemoryMessageQueue::
750setTraceMng(
Int32 rank,ITraceMng* tm)
752 _getSubQueue(MessageRank(rank))->setTraceMng(tm);
758auto SharedMemoryMessageQueue::
759_request(SharedMemoryMessageRequest* tmr) -> Request
761 return Request(0,
this,tmr);
#define ARCANE_THROW(exception_class,...)
Macro pour envoyer une exception avec formattage.
#define ARCANE_FATAL(...)
Macro envoyant une exception FatalErrorException.
Lecteur des fichiers de maillage via la bibliothèque LIMA.
static void genericCopy(ConstMemoryView from, MutableMemoryView to)
Copie générique utilisant platform::getDataMemoryRessourceMng()
File asynchrone permettant d'échanger des informations entre threads.
virtual void * pop()=0
Récupère la première valeur de la file et bloque s'il n'y en a pas.
virtual void push(void *v)=0
Ajoute v dans la file.
virtual void * tryPop()=0
Récupère la première valeur s'il y en. Retourne nullptr sinon.
Informations des buffers de réception.
Informations des buffers d'envoie.
File pour les messages d'un rang en mémoire partagée.
void _cleanupRequestIfDone(SharedMemoryMessageRequest *tmr)
Nettoyage de la requête tmr si elle est finie.
File de messages entre les rangs partagés par un SharedMemoryParallelMng.
Message entre SharedMemoryMessageQueue.
void copyFromSender(SharedMemoryMessageRequest *sender)
Copie dans le message de réception les informations du message d'envoi.
Interface d'un sérialiseur.
Interface du gestionnaire de traces.
MessageSourceInfo sourceInfo() const
Informations sur la source du message;.
Informations sur la source d'un message.
Informations pour envoyer/recevoir un message point à point.
MessageRank emiterRank() const
Rang de l'émetteur du message.
MessageRank destinationRank() const
Rang de la destination du message.
bool isBlocking() const
Indique si le message est bloquant.
MessageTag tag() const
Tag du message.
bool isRankTag() const
Vrai si l'instance a été créée avec un couple (rank,tag). Dans ce cas rank() et tag() sont valides.
Déclarations des types et méthodes utilisés par les mécanismes d'échange de messages.
Int32 Integer
Type représentant un entier.